Run Berkeley Spark’s PySpark using Docker in a couple minutes

For those of you interested in running the BDAS Spark stack in a virtualized cluster really quickly, the fastest way is using Linux Containers controlled by Docker. Using Andre Schumacher’s fantastic Berkeley Spark on Docker scripts and tutorial, you can get yourself a virtual cluster of whatever size you’d like in a couple minutes!

However, the tutorial is Scala centric, and you will be instantly dropped into a Scala shell. I am primarily interested in using Python as my tool to do analysis and data science tasks, so we needed to do a couple more steps.

Follow Andre’s tutorial, and start up a Spark 0.8.0 cluster on top of Docker as you normally would. Here I am starting up a 6-worker cluster:

1 user@aliens:~/Documents/docker-scripts⟫ sudo deploy/deploy.sh -i amplab/spark:0.8.0 -w 6 -c
*** Starting Spark 0.8.0 ***
starting nameserver container
started nameserver container: 5093b46c4df527528cae0194a8b2849a258e314dc2e0b847c67950776b5715df
DNS host->IP file mapped: /tmp/dnsdir_10034/0hosts
NAMESERVER_IP: 172.17.0.18
waiting for nameserver to come up
starting master container
started master container: 4d431889af3c7176fa1a9ffee850c6658840a307e86ad6fbf2691e54fe8fb792
MASTER_IP: 172.17.0.19

...lots more output

We are interested in the part that tells us how to connect with SSH into the master node…

***********************************************************************
start shell via:            sudo /home/user/Documents/docker-scripts/deploy/start_shell.sh -i amplab/spark-shell:0.8.0 -n 5093b46c4df527528cae0194a8b2849a258e314dc2e0b847c67950776b5715df

visit Spark WebUI at:       http://172.17.0.19:8080/
visit Hadoop Namenode at:   http://172.17.0.19:50070
ssh into master via:        ssh -i /home/user/Documents/docker-scripts/deploy/../apache-hadoop-hdfs-precise/files/id_rsa -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no root@172.17.0.19

/data mapped:

kill master via:           sudo docker kill 4d431889af3c7176fa1a9ffee850c6658840a307e86ad6fbf2691e54fe8fb792
***********************************************************************

You can see in the second output above that you can SSH into the master using the ‘ssh -i ….’ command. However, there is a bug with permission on the id_rsa file, and SSH will not let you get into the master node.

Fix this with:

chmod 0600 docker-scripts/apache-hadoop-hdfs-precise/files/id_rsa

Great, now we can enter the master node with this command:

ssh -i docker-scripts/apache-hadoop-hdfs-precise/files/id_rsa -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no root@172.17.0.19

Of course use the IP addresses that Docker is generating for you, in the output above MASTER_IP.

So from inside the master node, we want to use Python2.7 to do our work, so go find ‘pyspark’ inside of /opt/spark-VERSION

/opt/spark-0.8.0/pyspark

We have a huge problem!

root@master:/opt/spark-0.8.0# ./pyspark
Python 2.7.3 (default, Apr 20 2012, 22:39:59)
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Traceback (most recent call last):
  File "/opt/spark-0.8.0/python/pyspark/shell.py", line 25, in 
    import pyspark
  File "/opt/spark-0.8.0/python/pyspark/__init__.py", line 41, in 
    from pyspark.context import SparkContext
  File "/opt/spark-0.8.0/python/pyspark/context.py", line 21, in 
    from threading import Lock
ImportError: No module named threading
>>> sc
Traceback (most recent call last):
  File "", line 1, in 
NameError: name 'sc' is not defined
>>>

Fix Number two:

Get out of the Python / PySpark Shell (Ctrl-D), and install python2.7:

sudo apt-get install python2.7

Great! Now run ‘pyspark’ again, and you should see it working perfectly:

root@master:/opt/spark-0.8.0# ./pyspark
Python 2.7.3 (default, Apr 20 2012, 22:39:59)
[GCC 4.6.3] on linux2
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 0.8.0
      /_/

Using Python version 2.7.3 (default, Apr 20 2012 22:39:59)
Spark context avaiable as sc.
>>>

>>> sc

>>>

There you go! Python 2.7 on Spark using Docker, isn’t it lovely?

Aris

Spark Summit 2013 in San Francisco!

I will be heading over to Spark Summit 2013 to learn all about the latest in Berkeley’s Spark Platform – BDAS. Not only does it have a better name than Hadoop, but it’s built for a lot more flexibility than Map-Reduce (even given Hadoop YARN…yuck) — and the python API is pretty slick. I have enjoyed using Spark so far, so I look forward to meeting all the community folks in the next couple of days.

Aris Vlasakakis

Automagically GZip a file remotely before downloading it with SCP!

I don’t know about you, but since I don’t live in South Korea, I do not have near-infinite bandwidth.  If you have a file up on a Unix server that you want to download with SCP down to your server, it sure would be awesome if you could automatically compress the file on the remote server before downloading it locally, saving you space on your local disk and time on the transfer. Rsync won’t help you if you don’t have the file on your local drive and are just downloading a “fresher” version.

But wait, there’s more! What about wrapping it up into a Bash function so you can call it easily?

This is another use of GNU Parallel to do heavy lifting of compressing the file remotely, downloading the newly-compressed file (using Rsync and passwordless SSH) and then deleting the compressed file on the remote server (so no junk is left there).

function remote_gzip {
    parallel -S $1 --cleanup --return {/}.gz "gzip --best {} -c > {/}.gz" ::: $2
}

You can put this Bash function into your .bashrc or something similar, where it will always be with you. Make sure to source the file to get the function into your bash shell; “. ~/.bashrc”.

So if you have a server named remote.com and a file at location /var/logs/bigfile.log, call it like this:

 

remote_gzip remote.com /var/logs/bigfile.log

In your current working directory you will have bigfile.log.gz.

Enjoy!

-Aris

Use multiple CPU Cores with your Linux commands — awk, sed, bzip2, grep, wc, etc.

Here’s a  common problem: You ever want to add up a very large list (hundreds of megabytes) or grep through it, or other kind of operation that is embarrassingly parallel? Data scientists, I am talking to you. You probably  have about four cores or more, but our tried and true tools like grepbzip2wcawksed and so forth are singly-threaded and will just use one CPU core. To paraphrase Cartman, “How do I reach these cores”? Let’s use all of our CPU cores on our Linux box by using GNU Parallel and doing a little in-machine map-reduce magic by using all of our cores and using the little-known parameter –pipes (otherwise known as –spreadstdin). Your pleasure is proportional to the number of CPUs, I promise.   BZIP2 So, bzip2 is better compression than gzip, but it’s so slow! Put down the razor, we have the technology to solve this. Instead of this:

cat bigfile.bin | bzip2 --best > compressedfile.bz2

Do this:

cat bigfile.bin | parallel --pipe --recend '' -k bzip2 --best > compressedfile.bz2

Especially with bzip2, GNU parallel is dramatically faster on multiple core machines. Give it a whirl and you will be sold.     GREP If you have an enormous text file, rather than this:

grep pattern bigfile.txt

do this:

cat bigfile.txt | parallel  --pipe grep 'pattern'

or this:

cat bigfile.txt | parallel --block 10M --pipe grep 'pattern'

These second command shows you using –block with 10 MB of data from your file — you might play with this parameter to find our how many input record lines you want per CPU core. I gave a previous example of how to use grep with a large number of files, rather than just a single large file. AWK Here’s an example of using awk to add up the numbers in a very large file. Rather than this:

cat rands20M.txt | awk '{s+=$1} END {print s}'

do this!

cat rands20M.txt | parallel --pipe awk \'{s+=\$1} END {print s}\' | awk '{s+=$1} END {print s}'

This is more involved: the –pipe option in parallel spreads out the output to multiple chunks for the awk call, giving a bunch of sub-totals. These sub totals go into the second pipe with the identical awk call, which gives the final total. The first awk call has three backslashes in there due to the need to escape the awk call for GNU parallel. WC Want to create a super-parallel count of lines in a file? Instead of this:

wc -l bigfile.txt

Do this:

cat bigfile.txt | parallel  --pipe wc -l | awk '{s+=$1} END {print s}'

This is pretty neat: What is happening here is during the parallel call, we are ‘mapping’ a bunch of calls to wc -l , generating sub-totals, and finally adding them up with the final pipe pointing to awk. SED Feel like using sed to do a huge number of replacements in a huge file? Instead of this:

sed s^old^new^g bigfile.txt

Do this:

cat bigfile.txt | parallel --pipe sed s^old^new^g

…and then pipe it into your favorite file to store the output.

Enjoy!

–Aris

Lustre file system hangs server during really fast I/O?

If you are a proud user of the Lustre file system, and you find yourself pushing many thousands of very small files very quickly, it can choke Lustre, as of version 2.4.1. It will hang so badly that you may not be able to kill the process that is writing to Lustre, and causing this problem.

Yes, that means that

kill -9 <PID> 

does not work at all, nor does any other signal you can imagine.

Previously, I had to reboot the server to kill the damn process that ate my server. It turns out that there is a very simple solution to keep your sanity…

sudo umount -f <Lustre mount point>  

Just forcibly unmount the Lustre file system mount point, and that will cause your process run amok to commit hara kiri and crash, thereby stopping it. This is far better than rebooting a server!

Using GNU Parallel to roll-your-own Map Reduce!

Map Reduce is a popular paradigm for algorithms to process big data in parallel, but its use in multi-core servers and clustered servers has been primarily been the purview of frameworks like Hadoop.

Well guess what: rather than mucking around with big nasty Java systems like  Hadoop, you can actually run Map-Reduce style calculations using GNU Parallel. Yes, you can use all the cores on your server as well as *other* servers  nearby (in your cluster usually).

This is particularly handy when you want to operate on big files or a lot of files, and you know your tool (like grep) is not designed to use all your cores.

Here’s how to apply Map Reduce in a regime like this. Imagine you have a directory with thousands of files, and you know you want to search through a directory of thousands of logs, remove all the instances of integers, and then prints out the mean average of those numbers. You have a multi-core box  and you get anxious when you look at htop and see all those lonely cores looking for work. You get the idea.

 

So, assume your logs are in the directory /var/log/ 

sudo find /var/log/  -type f | sudo parallel egrep -I -i -o ‘[[:digit:]]+’ {} | awk ‘{s+=$1} END {print s/NR}’

…and you just mapped and reduced the shit out of those files, using all of your cores on your local machine.

Here’s the pipeline:

  • sudo find /var/log/  -type f

This is basically collecting the list of file names you want to process.

  • Mapping Step (Number of mappers equal to number of cores):
  • sudo parallel -j0 egrep -I -i -o ‘[[:digit:]]+’ {}

This is GNU Parallel, turning each core into a map task, like Hadoop! It is taking all your input files and using as many possible cores (-j0 flag). The ‘egrep’ call is just scraping out all instances of integers…you could do whatever you want here.

  • Reducer Step (One reducer in this case) 
  • awk ‘{s+=$1} END {print s/NR}’

Once the mappers above all emit their integers, this Reducer task effectively just takes list and adds them up.

This really works like multi-core magic.