Figure1 Spark & Python from http://static1.squarespace.com/static/538cea80e4b00f1fad490c1b/t/54b6b348e4b0a14bf3ec8b98/1421259603601/fast_data_apps.jpg
PREFACE
The goals of the articles today are to get you hands-on experience running MapReduce and gain a deeper understanding of the MapReduce paradigm, become more familiar with Apache Spark and get hands on experience with running Spark on a local installation and also learn how to apply the MapReduce paradigm to Spark by implementing certain problems/algorithms in Spark.
3.1 Quick Background
Apache Hadoop is an Open-source MapReduce Framework and also implements its own Hadoop Distributed File System (HDFS), both inspired by Google papers on their MapReduce and Google File System. For programming interface, Hadoop supports MapReduce Java APIs but it also has a big ecosystems (see Figure3 ), in which many other companies develop extensions, tools and higher-level programming platform to extend Apache Hadoop,
For example, you can also use other scripting languages like Python, JavaScript, Ruby or Groovy to control by the syntax similar to that of SQL for RDBMS systems through Apache Pig [1] developed by Yahoo.
Figure2 Apache Hadoop from https://i2.wp.com/hadoop.nl/hadoopelephant_rgb.png
Figure3 Hadoop ecosystems from http://techblog.baghel.com/media/1/20130616-HadoopEcosystem.JPG
Despite the popularity of Hadoop, it has several disadvantages, for example, it doesn’t fit for small data and is vulnerable by nature due to its development language in Java. Therefore, when comes to big data, Hadoop may not the only answer. Apache Spark originally developed in the AMP lab at UC Berkeley is another choose offering fast and general engine for large-scale data processing, running on HDFS and providing Java, Scala, Python APIs for Database, Machine learning and Graph algorithm.
3.2 Setup Hadoop and Spark
Of course, environment setup sometime is really troublesome and time-consuming depending on your OS environments. So the fastest way to setup is to just use Amazon AWS EC2 but it’s also the most costly option. For practice purpose, installing them on local seems the most economical way you can do. However, notice both frameworks are in fact designed for running on cluster-level machines and processing a large-scale of data so you won’t expect the performance from distributing system and they could run much slower than local software to process the dataset when you install them as standalone mode.
As the focus today is to practice with the actual codes, to save the pages, I only leave the links below for setup depending on your local OS.
-
Hadoop on Mac, Linux and Windows: http://wiki.apache.org/hadoop/Running_Hadoop_On_OS_X_10.564-bit(Single-Node_Cluster) https://wiki.apache.org/hadoop/Hadoop2OnWindows
-
Spark setup on Mac, Linux and Windows: You can refer to the thread in StackOver http://stackoverflow.com/questions/25481325/how-to-set-up-spark-on-windows https://shellzero.wordpress.com/2015/07/24/how-to-install-apache-spark-on-mac-os-x-yosemite/
-
Spark and Hadoop in Amazon EC2 https://wiki.apache.org/hadoop/AmazonEC2 http://spark.apache.org/docs/latest/ec2-scripts.html
3.3 Run Spark on the command line
Actually running Spark with python file (xxx.py) is similar to how you run your Python files with python xxx.py), you just run the following command:
$ spark-submit xxx.py # Runs the Spark file xxx.py
If your Spark file takes in arguments (much like the Spark files we have provided), the command will be similar, but you will instead add however any arguments that you need, like:
$ spark-submit xxx.py arg1 arg2 # Runs the Spark file xxx.py and passes in arg1 and arg2 to xxx.py
Spark also includes this neat interpreter that runs with Python 2.7.3 and will let you test out any of your Spark commands right in the interpreter! The command is as follows:
$ pyspark # Runs the Spark interpreter.
If you want to preload some files (say a.py, b.py, c.py), you can run the following command:
$ pyspark --py-files a.py, b.py, c.py # Runs the Spark interpreter and you can now import stuff from a, b, and c
3.4 Practices
Ex1 – Generating the dataset
The fundamental idea we want to use WSC’s power through MapReduce framework is that we may have a huge workload that could be consumed couple days, months or even years to finish by using only one machine. Therefore, before starting to code the own map and reduce function, we need dataset.
In the examples, we’ll be working heavily with textual data and have some pre-generated datasets but it’s always more fun to use a dataset that you find interesting.
So this section, I’ll teach you guys how to generate the dataset from Project Gutenberg (a database of public-domain literary works) step by step.
I’ve put the code used in the following examples and data into my GitHub, Feel free to download from WSC_MapReduce-Spark
- Head over to Project Gutenberg, pick a work of your choosing, and download the “Plain Text UTF-8” version into your lab directory. In this case, I pick EBook of Little Dorrit, by Charles Dickens in Figure4
Figure4 EBook of Little Dorrit
- Open up the file you downloaded in your favorite text editor and insert “—END.OF.DOCUMENT—” (without the quotes) by itself on a new line wherever you want Hadoop to split the input file into separate (key, value) pairs. The importer we’re using will assign an arbitrary key (like “doc_xyz”) and the value will be the contents of our input file between two “—END.OF.DOCUMENT—” markers.
Figure5 insert “—END.OF.DOCUMENT—” in EBook
-
The importer is actually a MapReduce program which is used to convert TXT files into a Hadoop sequence file (the .seq extension). These are NOT human-readable. You can take a look at Importer.java if you want, but the implementation details aren’t important for this section.
-
We need to build out our importer JAR first by the following command in the console (in my folder, I’ve got these files, WordCount.java, pg963.txt, DocWordCount.java Importer.java and a folder /data/ and notice the paths of hadoop-core.jar and commons-cli.jar depend on your local installation.
javac -g -deprecation –cp /paul/hadoop/hadoop-core.jar:/paul/hadoop/lib/commons-cli.jar *.java jar cf wc.jar -C classe .
- Now you can generate your input file like so:
hadoop jar wc.jar Importer YOUR_FILE_FROM_STEP_2.txt
- Your generated .seq file can now be found in the convertedOut directory
Figure6 convertedOut directory
Ex2 – Running Word Count
Here we use the common-seen example word. Open WordCount.java you will see the codes adapted from the http://wiki.apache.org/hadoop/WordCountHadoopwiki
What here basically do is to define mapper class – WordCountMap and reducer class – SumReduce, and then create a job object to represent a wordcount Job and tell Hadoop where to locate the code that must be shipped if this job is to be run across a cluster. And then set all the required tasks and parameters before we start, such as datatypes of the keys and values outputted by the maps and reduces and set the mapper, combiner, reducer to use. You can see the comments in the following code snippet for more info.
Code snippet of WordCount.java:
Now, we can see the result by compile and package the .java source file into a .jar and then run it on our desired input (we use pre-built data as example - data/billOfRights.txt.seq but you can replace it with your seg file).
hadoop jar wc.jar WordCount data/billOfRights.txt.seq wc-out-wordcount-small
This will run WordCount over billOfRights.txt.seq. Your output should be visible in wc-out-wordcount-small/part-r-00000. If we had used multiple reduces, the output would be split across part-r-[id.num], where Reducer “id.num” outputs to the corresponding file. The key-value pair for your Map tasks is a document identifier and the actual document text.
Figure7 the result of word count over billOfRights.txt.seq in part-r-00000.
For simplifying the compiling process, we create a Makefile. Instead, you can just run the following make command for the same result as above.
$ make wordcount-small
Next, try your code on the larger input file complete-works-mark-twain.txt.seq. In general, Hadoop requires that the output directory not exist when a MapReduce job is executed, however our Makefile takes care of this by removing our old output directory.
Ex3 – Document Word Count
Now is you turn to play around MapReduce. Open DocWordCount.java. Notice that it currently contains the same code as WordCount.java (but with modified class names), which you just compiled and tried for yourself. Modify it to count the number of documents containing each word rather than the number of times each word occurs in the input - for example, the word “Affery” appears in six documents rather than 66 times of occurrence in the all documents.
Academy 1 Affairs 2 Affery 6 After 10 Again 3
Code snippet of DocWordCount.java:
You can test DocWordCount using either of the following (for our two data sets):
$ make docwordcount-small # Output in wc-out-docwordcount-small/
OR
$ make docwordcount-medium # Output in wc-out-wordcount-medium/
Ex4 –Working with Spark
Now that you have gained some familiarity with the MapReduce paradigm, we will shift gears into Spark and investigate how to do what we did in the previous exercise in Spark! I have provided a complete wordcount.py, to get you a bit more familiar with how Spark works. To help you with understanding the code, I have added some comments, but feel free to check out transformations and actions on the Spark website for a more detailed explanation on some of the methods that can be used in Spark.
To get you started on implementing DocWordCount.java in Spark, we have provided a skeleton file docwordcount.py.
To test your docwordcount.py, you can run either of the following two commands:
$ make sparkdwc-small # Output in spark-wc-out-docwordcount-small/
OR
$ make sparkdwc-medium # Output in spark-wc-out-docwordcount-medium/
Ex5 – Working with Spark
Open index.py. Notice that the code is similar to docwordcount.py. Modify it to output every word and a list of locations (document identifier followed by the word index of EACH time that word appears in that document). Make sure your word indices start at zero. Your output should have lines that look like the following:
(word1 document1-id, word# word# …) (word1 document2-id, word# word# …) . . . (word2 document1-id, word# word# …) (word2 document3-id, word# word# …) . . . Notice that there will be a line of output for EACH document in which that word appears and EACH word and document pair should only have ONE list of indices. Remember that you need to also keep track of the document ID as well.
For this exercise, you may not need all the functions we have provided. If a function is not used, feel free to remove the method that is trying to call it. Make sure your output for this is sorted as well (just like in the previous exercise.
You can test index.py by using either of the following commands (for our two datasets):
$ make index-small # Output in spark-wc-out-index-small/
OR
$ make index-medium # Output in spark-wc-out-index-medium/
The output from running make index-medium will be a large file. In order to more easily look at its contents, you can use the commands cat, head, more, and grep:
$ head -25 OUTPUTFILE # view the first 25 lines of output
$ cat OUTPUTFILE | more # scroll through output one screen at a time (use Space)
$ cat OUTPUTFILE | grep the # output only lines containing 'the' (case-sensitive)
3.3 Solution
I put all the solutions for the examples above in the folder named solutions here.
let me know if you have any other further questions.