Apache Spark Word Count: Data Analytics With a Publicly Available Dataset
Let's take things up a notch and check out how quickly we can get some huge datasets to perform word counts on the Yelp dataset.
Join the DZone community and get the full member experience.
Join For FreeIn a previous post, I discussed my findings for developing a simple word count app in Java and running it against the text of a typical novel, such as Alice in Wonderland.
Here's a summary of finding so far:
- 2012 MacBook Pro with a 2.4GHz i7 and SanDisk SSD — it completed in 100ms
- 2015 MacBook Pro with a 2.2GHz i7 and stock SSD — it completed in 82ms
- Ubuntu 16.04 server running under VMware ESXi on my HP DL380 Gen7 rack server configured with 2 x 2.4 GHz vCPUs, 6GB RAM — it completed in 250ms
- The slower time probably can be accounted for slower data access from the WD Black 7200rpm HDDs in my server, which is likely an I/O bottleneck compared to loading the same data file from SSD
Now, let's take it up a notch and throw some larger datasets into the mix using the 434,886ms to complete or just over Yelp dataset — taking the reviews.json
file exported as a CSV file running the same app on the same Ubuntu server in a VM on the DL380, performing the same word count against a 2.8GB file. With the same Java word count app, this took seven minutes. OK, now we've got something large enough to play with as a benchmark:
kev@esxi-ubuntu-mongodb1 : ~ $ java -jar wordcount.jar ./data/yelp/reviewtext.csv
Incidentally, if you're interested what the word counts from the Yelp reviews.json
data file look like, here's the first few counts in descending order before we get to some terms you'd expect in Yelp reviews, like "good" and "food":
the : 22169071
and : 18025282
I : 13845506
a : 13416437
to : 12952150
was : 9340074
of : 7811688
: 7600402
is : 6513076
for : 6207595
in : 5895711
it : 5604281
The : 4702963
that : 4446918
with : 4353165
my : 4188709
but : 3659431
on : 3642918
you : 3570824
have : 3311139
this : 3270666
had : 3015103
they : 3001066
not : 2829568
were : 2814656
are : 2660714
we : 2639049
at : 2624837
so : 2378603
place : 2358191
be : 2276537
good : 2232567
food : 2215236
Now, let's look at rewriting the analysis using Apache Spark. The equivalent code using the Spark API for loading the dataset and performing the word count turned out to be like this (although if you search for "Apache Spark word count," there's many different ways you could use the available APIs to implement a word count):
JavaRDD<String> lines = spark.read().textFile(filepath).javaRDD();
JavaPairRDD<String, Integer> words = lines
.flatMap(line -> Arrays.asList(line.split("\\s+")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((x, y) -> x + y);
List<Tuple2<String, Integer>> results = words.collect();
Submitting the job to run on a standalone Spark node (an Ubuntu Server 16.04 VM on ESXi) with 1 core (-master local[1]
):
./bin/spark-submit --class "kh.textanalysis.spark.SparkWordCount"
--master local[1] ../spark-word-count-0.0.1-SNAPSHOT.jar
../data/yelp/reviewtext.csv
The job completes in 336,326ms (or approx. 5.6 minutes). At this point, this is with minimal understanding of how best to approach or structure an effective Spark job, but we're already made an improvement and this first test is with a single Spark local node and no additional worker nodes in the cluster.
Next, with a standalone local node, and 2 cores (-master local[2]
): 170261ms, or 2.8 mins. Now we're talking.
Let's trying deploying the master node, and then add some worker nodes.
17/11/07 18:43:33 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://192.168.1.82:7077…
17/11/07 18:43:33 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.1.82:7077
My first guess is that the default 7077 port is not open, so:
sudo ufw allow 7077
Retrying, now the job has submitted, and starts up but gives this error.
17/11/07 18:47:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 22 tasks 17/11/07 18:47:44 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
Taking a look at the master node web UI on 8080:
True, we have the master started, but no workers yet — so let's start up 1 slave node first (another Ubuntu Server 16.04 VM on ESXi):
$ ./start-slave.sh -m 1G spark://192.168.1.82:7077
starting org.apache.spark.deploy.worker.Worker, logging to /home/kev/spark-2.2.0-bin-hadoop2.7/logs/spark-kev-org.apache.spark.deploy.worker.Worker-1-ubuntuspk1.out
Now, we've got one worker up and ready:
Submitting the same job again to the master node, there's a FileNotFound
on not finding the file we're attempting to process:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 192.168.1.89, executor 0): java.io.FileNotFoundException: File file:/home/kev/data/yelp/reviewtext.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
I was curious how the data file would be shared with the slave nodes, and this answers my question. I guess it does not (or, the way I've implemented my job so far assumes the data file is local). Clearly, have some more work to do in this area to work out what the approach is to share the data file between the nodes. In the meantime, I'm just going to copy the same file across each of the worker nodes to get something up and running. I'm guessing a better way would be to mount a shared drive between each of the workers, but I'll come back to this later.
The data file copied to my worker VMs, and restarting, now we've got the job running on one slave, one core, 1GB:
Here's an interesting view of the processing steps in my job:
Taking a quick look at the HP server ILO — the CPUs are barely breaking a sweat and fan speeds are still low:
The Spark dashboard shows completion times, so now we're at 5.3 minutes:
Let's add an extra vCPU to the first worker node (I need to reconfigure my VM in ESXi and then restart). The console's showing an additional stopped worker from a second slave VM that I'll start up for the last test. First, two vCPUs, starting up with -c 2
for two cores:
./start-slave.sh -c 2 -m 1G spark://192.168.1.82:7077
166773ms, or 2.7 minutes. Looking good!
Now, with the additional second slave node, also started with two vCPUs, we have two worker slave nodes (four vCPUs total). During the middle of the run, checking fan speeds — warming up and fans are running a little faster — but nothing crazy so far:
102977ms, or 1.7 minutes!
Curious how far we can go... let's try two worker slave nodes, four vCPUs each, and bump up the available RAM to 3GB. Reconfigure my VMs, and off we go:
./start-slave.sh -c 4 -m 3G spark://192.168.1.82:7077
81998ms, or 1.4 minutes!
Still pretty good, although the performance gains for doubling the cores per worker and adding more RAM seems to be leveling off. And now, we're not seeing the same magnitude of improvements. So it's possible that at this point, it might be more interesting to add additional slave worker nodes. I've been creating my ESXi VMs by hand up until this point, so maybe this is the time to look into some automation for spinning up multiple copies of the same VM.
Let's summarize the results so far, run the standalone Java 8 app on the same HP server as Spark, and then compare the various Spark configurations so far:
I've got ample resources left on my HP DL380 G7 rack server to run a few more VMs equally sized to what I have running, so maybe if I can work out an easy way to template the VMs I have so far, I'll spin up some additional VMs as worker nodes and see what's the fastest processing time I can get with the hardware I have. More updates to come later.
Published at DZone with permission of Kevin Hooke, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments