Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

How to Get Hadoop Data Into a Python Model

DZone's Guide to

How to Get Hadoop Data Into a Python Model

Walk through the process of integration Hadoop and Python by moving Hadoop data into a Python program with MRJob, a library that lets us write MapReduce jobs in Python.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Hadoop is an open-source software framework for distributed storage and distributed processing of very large datasets. All the modules in Hadoop are designed with an assumption that hardware failures should be automatically handled by the framework.

At the core of Apache Hadoop is a storage component known as Hadoop Distributed File System (HDFS) and a processing component called MapReduce. Hadoop splits files into large blocks so that they can then be distributed across nodes in a cluster. By distributing the files across many nodes, processing times are significantly improved because no single node has to handle a large file.

In this article, we’ll walk through the process of integrating Hadoop and Python by moving Hadoop data into a Python program.

HDFS and YARN

Let’s start by defining the terms.

HDFS

The Hadoop distributed file system (HDFS) is a distributed, scalable, and portable file-system written in Java for the Hadoop framework. It's the file system supporting Hadoop.

YARN

YARN is a resource-management platform responsible for managing computing resources in clusters and using them for scheduling of a user application. The fundamental objective of YARN is to split up the functionalities of resource management and job scheduling/monitoring into separate daemons. The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM).

The ResourceManager and the NodeManager form the data computation framework. The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system. The NodeManager is the per-machine framework agent responsible for containers, monitoring their resource usage (CPU, memory, disk, network), and reporting the same to the ResourceManager/Scheduler.

The YARN resource manager also offers a web interface.

MRJob and Real-World Applications

While Hadoop streaming is a simple way to do MapReduce tasks, it's complicated to use and not really user-friendly when things fail and we have to debug our code. In addition, if we wanted to do a join from two different sets of data, it would be complicated to handle both with a single mapper.

MRJob is a library written and maintained by Yelp that allows us to write MapReduce jobs in Python. It has extensive documentation and allows for the serverless application of your code for testing.

Here's the word count MapReduce, a commonly used example program for demonstrating MapReduce Logic, rewritten using MRJob:

"""The classic MapReduce job which counts the frequency of words."""
from mrjob.job import MRJob
import re
 
WORD_RE = re.compile(r"[\w']+")
 
class MRWordFreqCount(MRJob):
 
def mapper(self, _, line):
    for word in WORD_RE.findall(line):
        yield (word.lower(), 1)
 
def reducer(self, word, counts):
    yield (word, sum(counts))
 
if __name__ == '__main__':
 MRWordFreqCount.run()

What if we wanted to perform a calculation that involves multiple steps? For example, what if we wanted to count the words in documents stored in our database and then find the most common word being used? This would involve the following steps:

  • Map our text to a mapper that outputs pairs of (word, 1).
  • Combine the pairs using the word as key (optional).
  • Reduce the pairs using the word as key.
  • Find the word with the maximum count.

Here is that logic executed in Python:

from mrjob.job import MRJob
from mrjob.step import MRStep
import re
 
WORD_RE = re.compile(r"[\w']+")
 

class MRMostUsedWord(MRJob):
 
def steps(self):
    return [
        MRStep(mapper=self.mapper_get_words,
               combiner=self.combiner_count_words,
               reducer=self.reducer_count_words),
        MRStep(reducer=self.reducer_find_max_word)
    ]
 
def mapper_get_words(self, _, line):
    # yield each word in the line
    for word in WORD_RE.findall(line):
        yield (word.lower(), 1)
 
def combiner_count_words(self, word, counts):
    # optimization: sum the words we've seen so far
    yield (word, sum(counts))
 
def reducer_count_words(self, word, counts):
    # send all (num_occurrences, word) pairs to the same reducer.
    # num_occurrences is so we can easily use Python's max() function.
    yield None, (sum(counts), word)
 
# discard the key; it is just None
def reducer_find_max_word(self, _, word_count_pairs):
    # each item of word_count_pairs is (count, word),
    # so yielding one results in key=counts, value=word
    yield max(word_count_pairs)
 
 
if __name__ == '__main__':
MRMostUsedWord.run()

Seeing how powerful MRJob can be, let’s write a class that returns the top 15 most frequent words in our text:

from mrjob.job import MRJob
from mrjob.step import MRStep
import re
from heapq import nlargest
 
WORD_RE = re.compile(r"[\w']+")
 
 
class MRMostUsedWords(MRJob):
 
def mapper_get_words(self, _, line):
    # yield each word in the line
    for word in WORD_RE.findall(line):
        yield (word.lower(), 1)
 
def combiner_count_words(self, word, counts):
    # sum the words we've seen so far
    yield (word, sum(counts))
 
def reducer_count_words(self, word, counts):
    # send all (num_occurrences, word) pairs to the same reducer.
    # num_occurrences is so we can easily use Python's max() function.
    yield None, (sum(counts), word)
 
# discard the key; it is just None
def reducer_find_top_15_word(self, _, word_count_pairs):
    # each item of word_count_pairs is (count, word),
    # so yielding the top 15 results in key=counts, value=word
    for val in nlargest(15, word_count_pairs)
        yield val
 
def steps(self):
    return [
        MRStep(mapper=self.mapper_get_words,
               combiner=self.combiner_count_words,
               reducer=self.reducer_count_words),
        MRStep(reducer=self.reducer_find_top_15_word)
    ]
 
 
if __name__ == '__main__':
MRMostUsedWords.run()

Hadoop and MRJob have plenty of versatility to help you answer numerous questions in your dev environment. There are other mappers you can use, but MRJob’s documentation, logging capacity, and ability to function without converting your Python code make it an ideal option. See the documentation below for inspiration on how you can implement the two into your data science arsenal.

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
big data ,python ,hdfs ,yarn ,hadoop ,distributed storage ,data processing ,mapreduce ,data streaming ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}