DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

The Latest Big Data Topics

article thumbnail
Avro's Built-In Sorting
avro has a little-known gem of a feature which allows you to control which fields in an avro record are used for partitioning , sorting and grouping in mapreduce. the following figure gives a quick refresher as to what these terms mean. oh, and don’t take the placement of the “sorting” literally - sorting actually occurs on both the map and reduce side - but it’s always performed in the context of a specific partition (i.e. for a specific reducer). by default all the fields in an avro map output key are used for partitioning, sorting and grouping in mapreduce. let’s walk through an example and see how this works. you’ll begin with a simple schema github source : {"type": "record", "name": "com.alexholmes.avro.weathernoignore", "doc": "a weather reading.", "fields": [ {"name": "station", "type": "string"}, {"name": "time", "type": "long"}, {"name": "temp", "type": "int"}, {"name": "counter", "type": "int", "default": 0} ] } we’re going to see what happens when we run this code against a small sample data set, which we’ll generate using avro code github source : file input = tmpfolder.newfile("input.txt"); avrofiles.createfile(input, weathernoignore.schema$, arrays.aslist( weathernoignore.newbuilder().setstation("sfo").settime(1).settemp(3).build(), weathernoignore.newbuilder().setstation("iad").settime(1).settemp(1).build(), weathernoignore.newbuilder().setstation("sfo").settime(2).settemp(1).build(), weathernoignore.newbuilder().setstation("sfo").settime(1).settemp(2).build(), weathernoignore.newbuilder().setstation("sfo").settime(1).settemp(1).build() ).toarray()); to understand how avro is partitioning, sorting and grouping the data, we’ll write an identity mapper and reducer, with a small enhancement to the reducer to increment the counter field for each record we see in an individual reducer instance github source : package com.alexholmes.avro.sort.basic; import com.alexholmes.avro.weathernoignore; import org.apache.avro.mapred.avrokey; import org.apache.avro.mapred.avrovalue; import org.apache.avro.mapreduce.avrojob; import org.apache.avro.mapreduce.avrokeyinputformat; import org.apache.avro.mapreduce.avrokeyoutputformat; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.nullwritable; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import java.io.ioexception; public class avrosort { private static class sortmapper extends mapper, nullwritable, avrokey, avrovalue> { @override protected void map(avrokey key, nullwritable value, context context) throws ioexception, interruptedexception { context.write(key, new avrovalue(key.datum())); } } private static class sortreducer extends reducer, avrovalue, avrokey, nullwritable> { @override protected void reduce(avrokey key, iterable> values, context context) throws ioexception, interruptedexception { int counter = 1; for (avrovalue weathernoignore : values) { weathernoignore.datum().setcounter(counter++); context.write(new avrokey(weathernoignore.datum()), nullwritable.get()); } } } public boolean runmapreduce(final job job, path inputpath, path outputpath) throws exception { fileinputformat.setinputpaths(job, inputpath); job.setinputformatclass(avrokeyinputformat.class); avrojob.setinputkeyschema(job, weathernoignore.schema$); job.setmapperclass(sortmapper.class); avrojob.setmapoutputkeyschema(job, weathernoignore.schema$); avrojob.setmapoutputvalueschema(job, weathernoignore.schema$); job.setreducerclass(sortreducer.class); avrojob.setoutputkeyschema(job, weathernoignore.schema$); job.setoutputformatclass(avrokeyoutputformat.class); fileoutputformat.setoutputpath(job, outputpath); return job.waitforcompletion(true); } } if you look at the output of the job below, you’ll see that the output is sorted across all the fields, and that the sorting is in field ordinal order. what this means is that when mapreduce is sorting these records, it compares the station field first, then the time field second, and so on according to the ordering of the fields in the avro schema. this is pretty much what you’d expect if you write your own complex writable type, and your comparator compared all the fields in order. {"station": "iad", "time": 1, "temp": 1, "counter": 1} {"station": "sfo", "time": 1, "temp": 1, "counter": 1} {"station": "sfo", "time": 1, "temp": 2, "counter": 1} {"station": "sfo", "time": 1, "temp": 3, "counter": 1} {"station": "sfo", "time": 2, "temp": 1, "counter": 1} oh, and before we move on notice that the value for the counter field is always 1 , meaning that each reducer was only fed a single key/vaue pair, which makes sense since our identity mapper only emitted a single value for each key, the keys are unique, and the mapreduce partitioner, sorter and grouper were using all the fields in the record. excluding fields for sorting avro gives us the ability to indicate that specific fields should be ignored when performing ordering functions. in mapreduce these fields are ignored for sorting/partitioning and grouping in mapreduce, which basically means that we have the ability to perform secondary sorting. let’s examine the following schema github source : {"type": "record", "name": "com.alexholmes.avro.weather", "doc": "a weather reading.", "fields": [ {"name": "station", "type": "string"}, {"name": "time", "type": "long"}, {"name": "temp", "type": "int", "order": "ignore"}, {"name": "counter", "type": "int", "order": "ignore", "default": 0} ] } it’s pretty much identical to the first schema, the only difference being that the last two fields are flagged as being “ignored” for sorting/partitioning/grouping. let’s run the same (other than modified to work with the different schema) mapreduce code github source as above against this new schema and examine the outputs. {"station": "iad", "time": 1, "temp": 1, "counter": 1} {"station": "sfo", "time": 1, "temp": 3, "counter": 1} {"station": "sfo", "time": 1, "temp": 2, "counter": 2} {"station": "sfo", "time": 1, "temp": 1, "counter": 3} {"station": "sfo", "time": 2, "temp": 1, "counter": 1} there are a couple of notable differences between this output, and the output from the previous schema which didn’t have any ignored fields. first, it’s clear that the temp field isn’t being used in the sorting, which makes sense since we specified that it should be ignored in the schema. however, more interestingly, note the value of the counter field. all records that had identical station and time values went to the same reducer invocation, evidenced by the increasing value of counter . this is essentially secondary sort! now, all of this greatness isn’t without some limitations: you can’t support two mapreduce jobs that use the same avro key, but have different sorting/partitioning/grouping requirements. although it’s conceivable that you could create a new instance of the avro schema and set the ignored flags for these fields yourself. the partitioner, sorter and grouping functions in mapreduce all work off of the same fields (i.e. they all ignore fields that set as ignored in the schema). this means that your options for secondary sorting are limited. for example, you wouldn’t be able to partition all stations to the same reducer, and then group by station and time. ordering uses a field’s ordinal position to determine its order within the overall set of fields to be ordered. in other words, in a two-field record, the first field is always compared before the second. there’s no way to change this behavior other than flipping the order of the fields in the record. having said all of that - the “ignoring fields” feature for sorting is pretty awesome, and something that will no doubt come in handy in my future mapreduce work.
May 29, 2013
by Alex Holmes
· 8,090 Views
article thumbnail
Bucketing, Multiplexing and Combining in Hadoop - Part 1
this is the first blog post in a series which looks at some data organization patterns in mapreduce. we’ll look at how to bucket output across multiple files in a single task, how to multiplex data across multiple files, and also how to coalesce data. these are all common patterns that are useful to have in your mapreduce toolkit. we’ll kick things off with a look at bucketing data outputs in your map or reduce tasks. by default when using a fileoutputformat-derived outputformat (such as textoutputformat), all the outputs for a reduce task (or a map task in a map-only job) are written to a single file in hdfs. imagine a situation where you have user activity logs being streamed into hdfs, and you want to write a mapreduce job to better organize the incoming data. as an example a large organization with multiple products may want to bucket the logs based on the product. to do this you’ll need the ability to write to multiple output files in a single task. let’s take a look at how we can make that happen. multipleoutputformat there are a few ways you can achieve your goal, and the first option we’ll look at is the multipleoutputformat class in hadoop. this is an abstract class that lets you do the following: define the output path for each and every key/value output record being emitted by a task. incorporate the input paths into the output directory for map-only jobs. redefine the key and value that are used to write to the underlying recordwriter . this is useful in situations where you want to remove data from the outputs as it duplicates data in the filename. for each output path, define the recordwriter that should be used to write the outputs. ok enough with the words - let’s look at some data and code. first up is the simple data we’ll use in our example - imagine you work at a fruit market with locations in multiple cities, and you have a purchase transaction stream which contains the store location along with the fruit that was purchased. cupertino apple sunnyvale banana cupertino pear to help bucket your data for future analysis, you want to bin each record into city-specific files. for the simple data set above you don’t want to filter, project or transform your data, just bucket it out, so a simple identity map-only job will do the job. to force more than one mapper, we’ll write the data to two separate files. $ tab="$(printf '\t')" $ hdfs -put - file1.txt << eof cupertino${tab}apple sunnyvale${tab}banana eof $ hdfs -put - file2.txt << eof cupertino${tab}pear eof here’s the code which will let you write city-specific output files. import org.apache.commons.lang.stringutils; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.identitymapper; import org.apache.hadoop.mapred.lib.multipletextoutputformat; import org.apache.hadoop.util.progressable; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; import java.io.ioexception; import java.util.arrays; /** * an example of how to use {@link org.apache.hadoop.mapred.lib.multipleoutputformat}. */ public class mofexample extends configured implements tool { /** * create output files based on the output record's key name. */ static class keybasedmultipletextoutputformat extends multipletextoutputformat { @override protected string generatefilenameforkeyvalue(text key, text value, string name) { return key.tostring() + "/" + name; } } /** * the main job driver. */ public int run(final string[] args) throws exception { string csvinputs = stringutils.join(arrays.copyofrange(args, 0, args.length - 1), ","); path outputdir = new path(args[args.length - 1]); jobconf jobconf = new jobconf(super.getconf()); jobconf.setjarbyclass(mofexample.class); jobconf.setnumreducetasks(0); jobconf.setmapperclass(identitymapper.class); jobconf.setinputformat(keyvaluetextinputformat.class); jobconf.setoutputformat(keybasedmultipletextoutputformat.class); fileinputformat.setinputpaths(jobconf, csvinputs); fileoutputformat.setoutputpath(jobconf, outputdir); return jobclient.runjob(jobconf).issuccessful() ? 0 : 1; } /** * main entry point for the utility. * * @param args arguments * @throws exception when something goes wrong */ public static void main(final string[] args) throws exception { int res = toolrunner.run(new configuration(), new mofexample(), args); system.exit(res); } } run this code and you’ll see the following files in hdfs, where /output is the job output directory: $ hadoop fs -lsr /output /output/cupertino/part-00000 /output/cupertino/part-00001 /output/sunnyvale/part-00000 if you look at the output files you’ll see that the files contain the correct buckets. $ hadoop fs -lsr /output/cupertino/* cupertino apple cupertino pear $ hadoop fs -lsr /output/sunnyvale/* sunnyvale banana awesome, you have your data bucketed by store. now that we have everything working, let’s look at what we did to get there. we had to do two things to get this working: extend multipletextoutputformat this is where the magic happened - let’s look at that class again. static class keybasedmultipletextoutputformat extends multipletextoutputformat { @override protected string generatefilenameforkeyvalue(text key, text value, string name) { return key.tostring() + "/" + name; } } you are working with text, which is why you extended multipletextoutputformat , a class that in turn extends multipleoutputformat . multipletextoutputformat is a simple class which instructs the multipleoutputformat to use textoutputformat as the underlying output format for writing out the records. if you were to use multipleoutputformat as-is it behaves as if you were using the regular textoutputformat , which is to say that it’ll only write to a single output file. to write data to multiple files you had to extend it, as with the example above. the generatefilenameforkeyvalue method allows you to return the output path for an input record. the third argument, name , is the original fileoutputformat -created filename, which is in the form “part-nnnnn”, where “nnnnn” is the task index, to ensure uniqueness. to avoid file collisions, it’s a good idea to make sure your generated output paths are unique, and leveraging the original output file is certainly a good way of doing this. in our example we’re using the key as the directory name, and then writing to the original fileoutputformat filename within that directory. specify the outputformat the next step was easy - specify that this output format should be used for your job: jobconf.setoutputformat(keybasedmultipletextoutputformat.class); earlier we also mentioned that you can use the input path as part of the output path, which we will look at next. using the input filename as part of the output filename in map-only jobs what if we wanted to keep the input filename as part of the output filename? this only works for map-only jobs, and can be accomplished by overriding the getinputfilebasedoutputfilename method. let’s look at the following code to understand how this method fits into the overall sequence of actions that the multipleoutputformat class performs: public void write(k key, v value) throws ioexception { // get the file name based on the key string keybasedpath = generatefilenameforkeyvalue(key, value, myname); // get the file name based on the input file name string finalpath = getinputfilebasedoutputfilename(myjob, keybasedpath); // get the actual key k actualkey = generateactualkey(key, value); v actualvalue = generateactualvalue(key, value); recordwriter rw = this.recordwriters.get(finalpath); if (rw == null) { // if we don't have the record writer yet for the final path, create // one // and add it to the cache rw = getbaserecordwriter(myfs, myjob, finalpath, myprogressable); this.recordwriters.put(finalpath, rw); } rw.write(actualkey, actualvalue); }; the getinputfilebasedoutputfilename method is called with the output of generatefilenameforkeyvalue , which contains our already-customized output file. our new keybasedmultipletextoutputformat can now be updated to override getinputfilebasedoutputfilename and append the original input filename to the output filename: static class keybasedmultipletextoutputformat extends multipletextoutputformat { @override protected string generatefilenameforkeyvalue(object key, object value, string name) { return key.tostring() + "/" + name; } @override protected string getinputfilebasedoutputfilename(jobconf job, string name) { string infilename = new path(job.get("map.input.file")).getname(); return name + "-" + infilename; } if you run with your modified outputformat class you’ll see the following files in hdfs, confirming that the input filenames are now concatenated to the end of each output file. $ hadoop fs -lsr /output /output/cupertino/part-00000-file1.txt /output/cupertino/part-00001-file2.txt /output/sunnyvale/part-00000-file1.txt the implementation of getinputfilebasedoutputfilename in multipleoutputformat doesn’t do anything interesting by default, but if you set the value of the mapred.outputformat.numoftrailinglegs configurable to an integer greater than 0, then the getinputfilebasedoutputfilename will use part of the input path as the output path. let’s see what happens when we set the value to 1: jobconf.setint("mapred.outputformat.numoftrailinglegs", 1); the output files in hdfs now exactly mirror the input files used for the job: $ hadoop fs -lsr /output /output/file1.txt /output/file2.txt if we set mapred.outputformat.numoftrailinglegs to 2, and our input files exist in the /inputs directory, then our output directory looks like this: $ hadoop fs -lsr /output /output/input/file1.txt /output/input/file2.txt basically as you keep incrementing mapred.outputformat.numoftrailinglegs , then multipleoutputformat will continue to go up the parent directories of the input file and use them in the output path. modifying the output key and value it’s very possible that the actual key and value you want to emit are different from those that were used to determine the output file. in our example, we took the output key and wrote to a directory using the key name. if you do that keeping the key in the output file may be redundant. how would we modify the output record so that the key isn’t written? multipleoutputformat has your back with the generateactualkey method. class keybasedmultipletextoutputformat extends multipletextoutputformat { @override protected string generatefilenameforkeyvalue(text key, text value, string name) { return key.tostring() + "/" + name; } @override protected text generateactualkey(text key, text value) { return null; } } the returned value from this method replaces the key that’s supplied to the underlying recordwriter , so if you return null as in the above example, no key will be written to the file. $ hadoop fs -lsr /output/cupertino/* apple pear $ hadoop fs -lsr /output/sunnyvale/* banana you can achieve the same result for the output value by overriding the generateactualvalue method. changing the recordwriter in our final step we’ll look at how you can leverage multiple recordwriter classes for different output files. this is accomplished by overriding the getrecordwriter method. in the example below we’re leveraging the same textoutputformat for all the files, but it gives you a sense of what can be accomplished. static class keybasedmultipletextoutputformat extends multipletextoutputformat { @override protected string generatefilenameforkeyvalue(text key, text value, string name) { return key.tostring() + "/" + name; } @override public recordwriter getrecordwriter(filesystem fs, jobconf job, string name, progressable prog) throws ioexception { if (name.startswith("apple")) { return new textoutputformat().getrecordwriter(fs, job, name, prog); } else if (name.startswith("banana")) { return new textoutputformat().getrecordwriter(fs, job, name, prog); } return super.getrecordwriter(fs, job, name, prog); } } conclusion when using multipleoutputformat , give some thought to the number of distinct files that each reducer will create. it would be prudent to plan your bucketing so that you have a relatively small number of files. in this post we extended multipletextoutputformat , which is a simple extension of multipleoutputformat that supports text outputs. multiplesequencefileoutputformat also exists to support sequencefiles in a similar fashion. so what are the shortcomings with the multipleoutputformat class? if you have a job that uses both map and reduce phases, then multipleoutputformat can’t be used in the map-side to write outputs. of course, multipleoutputformat works fine in map-only jobs. all recordwriter classes must support exactly the same output record types. for example, you wouldn’t be able to support a recordwriter that emitted for one output file, and have another recordwriter that emitted . multipleoutputformat exists in the mapred package, so it won’t work with a job that requires use of the mapreduce package. all is not lost if you bump into either one of these issues, as you’ll discover in the next blog post.
May 20, 2013
by Alex Holmes
· 6,266 Views
article thumbnail
Hebrew Search with ElasticSearch
Hebrew search is not an easy task, and HebMorph is a project I started several years ago to address that problem. After a certain period of inactivity I'm back actively working on it. I'm also happy to say there are already several live systems using it to enable Hebrew searches in their applications. This post is a short step-by-step guide on how to use HebMorph in an ElasticSearch installation. There are quite a few configuration options and things to consider when enabling Hebrew search, most are in the realm of performance vs relevance trade-offs, but I'll talk about those in a separate post. 0. What exactly is HebMorph HebMorph is a project a bit wider than just providing a Hebrew search plugin for ElasticSearch, but for the purpose of this post let us treat it in that narrow aspect. HebMorph has 3 main parts - the hspell dictionary files, the hebmorph-core package which is a wrapper around the dictionary files with important bits that allow for locating words even if they weren't written exactly as they appear in the dictionary, and the hebmorph-lucene package which contains various tools for processing streams of text into Lucene tokens - the searchable parts. To enable Hebrew search from ElasticSearch we are going to need to use the Hebrew analyzer class HebMorph provides to analyze incoming Hebrew texts. That is done by providing ElasticSearch with the HebMorph packages and then telling it to use the Hebrew analyzer on text fields as needed. 1. Get HebMorph and hspell At the moment you will have to compile HebMorph from sources yourself using Maven. In the future we might upload it to a centralized repository, but since we still actively working on a lot of stuff there it is still a bit too early for that. Probably the easiest way to get HebMorph is to do git clone from the main repository. The repository is located at https://github.com/synhershko/HebMorph and includes the latest hspell files already under /hspell-data-files. If you are new to git GitHub offers great tutorials for getting started with it, and they also enable you to download the entire source tree as a zip or a tarball. Once you have the sources, run mvn package or mvn install to create 2 jars - hebmorph-core and hebmorph-lucene. Those 2 packages are required before moving on to the next step. 2. Create an ElasticSearch plugin In this step we will create a new plugin which we will use in the next step to create the Hebrew analyzers in. If you already have a plugin you wish to use, skip to the next step. ElasticSearch plugins are compiled Java packages you simply drop to the plugins folder of your ElasticSearch installation and it gets detected automatically by the ElasticSearch instance once it is initialized. If you are new to this, you might want to read up a bit on that in the official ElasticSearch documentation. Here is a great guide to start with: http://jfarrell.github.io/ The gist of this is having a Java project with a es-plugin.properties file embedded as a resource and pointing to class that tells ElasticSearch what classes to load as plugins, and their plugin type. In the next section we will use this to add our own Analyzer implementation which makes use of HebMorph's capabilities. 3. Creating an Hebrew Analyzer HebMorph already comes with MorphAnalyzer - an Analyzer implementation which takes care of Hebrew-aware tokenization, lemmatization and whatnot. Because it is highly configurable, personally I prefer re-implementing it in the ElasticSearch plugin so it is easier to change the configurations in code. In case you wondered, I'm not planning in supporting external configurations for this as it is too subtle and you should really know what you are doing there. Don't forget to add dependencies to hebmorph-core and hebmorph-lucene to your project. My common Analyzer setup for Hebrew search looks like this: public abstract class HebrewAnalyzer extends ReusableAnalyzerBase { protected enum AnalyzerType { INDEXING, QUERY, EXACT } private static final DictRadix prefixesTree = LingInfo.buildPrefixTree(false); private static DictRadix dictRadix; private final StreamLemmatizer lemmatizer; private final LemmaFilterBase lemmaFilter; protected final Version matchVersion; protected final AnalyzerType analyzerType; protected final char originalTermSuffix = '$'; static { try { dictRadix = Loader.loadDictionaryFromHSpellData(new File(resourcesPath + "hspell-data-files"), true); } catch (IOException e) { // TODO log } } protected HebrewAnalyzer(final AnalyzerType analyzerType) throws IOException { this.matchVersion = matchVersion; this.analyzerType = analyzerType; lemmatizer = new StreamLemmatizer(null, dictRadix, prefixesTree, null); lemmaFilter = new BasicLemmaFilter(); } @Override protected TokenStreamComponents createComponents(final String fieldName, final Reader reader) { // on query - if marked as keyword don't keep origin, else only lemmatized (don't suffix) // if word termintates with $ will output word$, else will output all lemmas or word$ if OOV if (analyzerType == AnalyzerType.QUERY) { final StreamLemmasFilter src = new StreamLemmasFilter(reader, lemmatizer, null, lemmaFilter); src.setAlwaysSaveMarkedOriginal(true); src.setSuffixForExactMatch(originalTermSuffix); TokenStream tok = new SuffixKeywordFilter(src, '$'); return new TokenStreamComponents(src, tok); } if (analyzerType == AnalyzerType.EXACT) { // on exact - we don't care about suffixes at all, we always output original word with suffix only final HebrewTokenizer src = new HebrewTokenizer(reader, prefixesTree, null); TokenStream tok = new NiqqudFilter(src); tok = new LowerCaseFilter(matchVersion, tok); tok = new AlwaysAddSuffixFilter(tok, '$', false); return new TokenStreamComponents(src, tok); } // on indexing we should always keep both the stem and marked original word // will ignore $ && will always output all lemmas + origin word$ // basically, if analyzerType == AnalyzerType.INDEXING) final StreamLemmasFilter src = new StreamLemmasFilter(reader, lemmatizer, null, lemmaFilter); src.setAlwaysSaveMarkedOriginal(true); TokenStream tok = new SuffixKeywordFilter(src, '$'); return new TokenStreamComponents(src, tok); } public static class HebrewIndexingAnalyzer extends HebrewAnalyzer { public HebrewIndexingAnalyzer() throws IOException { super(AnalyzerType.INDEXING); } } public static class HebrewQueryAnalyzer extends HebrewAnalyzer { public HebrewQueryAnalyzer() throws IOException { super(AnalyzerType.QUERY); } } public static class HebrewExactAnalyzer extends HebrewAnalyzer { public HebrewExactAnalyzer() throws IOException { super(AnalyzerType.EXACT); } } } You may notice how I created 3 separate analyzers - one for indexing, one for querying and the last for exact querying. I'll be talking more about this in future posts, but the idea is to be able to provide flexibility on querying while still allow for correct indexing. Configuring the analyzers to be picked up from ElasticSearch is rather easy now. First, you need to wrap each analyzer in a "provider", like so: public class HebrewQueryAnalyzerProvider extends AbstractIndexAnalyzerProvider { private final HebrewAnalyzer.HebrewQueryAnalyzer hebrewAnalyzer; @Inject public HebrewQueryAnalyzerProvider(Index index, @IndexSettings Settings indexSettings, Environment env, @Assisted String name, @Assisted Settings settings) throws IOException { super(index, indexSettings, name, settings); hebrewAnalyzer = new HebrewAnalyzer.HebrewQueryAnalyzer(); } @Override public HebrewAnalyzer.HebrewQueryAnalyzer get() { return hebrewAnalyzer; } } After you've created such providers for all types of analyzers, create an AnalysisBinderProcessor like this (or update your existing one with definitions for the Hebrew analyzers): public class MyAnalysisBinderProcessor extends AnalysisModule.AnalysisBinderProcessor { private final static HashMap> languageAnalyzers = new HashMap<>(); static { languageAnalyzers.put("hebrew", HebrewIndexingAnalyzerProvider.class); languageAnalyzers.put("hebrew_query", HebrewQueryAnalyzerProvider.class); languageAnalyzers.put("hebrew_exact", HebrewExactAnalyzerProvider.class); } public static boolean analyzerExists(final String analyzerName) { return languageAnalyzers.containsKey(analyzerName); } @Override public void processAnalyzers(final AnalyzersBindings analyzersBindings) { for (Map.Entry> entry : languageAnalyzers.entrySet()) { analyzersBindings.processAnalyzer(entry.getKey(), entry.getValue()); } } } Don't forget to update your Plugin class to catch the AnalysisBinderProcessor - it should look something like this (plus any other stuff you want to add there): public class MyPlugin extends AbstractPlugin { @Override public String name() { return "my-plugin"; } @Override public String description() { return "Implements custom actions required by me"; } @Override public void processModule(Module module) { if (module instanceof AnalysisModule) { ((AnalysisModule)module).addProcessor(new MyAnalysisBinderProcessor()); } } } 4. Using the Hebrew analyzers Compile the ElasticSearch plugin and drop it along with its dependencies in a folder under the /plugins folder of ElasticSearch. You now have 3 new types of analyzers at your disposal: "hebrew", "hebrew_query" and "hebrew_exact". For indexing, you want to use the "hebrew" analyzer. In your mapping, you can define a certain field or an entire set of fields to use that specific analyzer by setting the analyzer for that field. You can also leave the analyzer configuration blank, and specify the analyzer to use for those fields with unspecified analyzer using the _analyzer field in the index request. See more about both here and here. The "hebrew" analyzer will expand each term to all recognized lemmas; in case the word wasn't recognized it will try to tolerate spelling errors or missing Yud/Vav - most of the time it will be successful (with some rate of false positives, which the lemma-filters should remove to some degree). Some words will still remain unrecognized and thus will be indexed as-is. When querying using a QueryString query you can specify what analyzer to use - use the "hebrew_query" or "hebrew_exact" analyzer. The former will perform lemma expansion similar to the indexing analyzer, and the latter will avoid that and allow you to perform exact matches (useful when searching for names or exact phrases). I pretty much ignored a lot of the complexity involved in fine tuning searches for Hebrew, and many very cool things HebMorph allows you to do with Hebrew search for the sake of focus. I will revisit them in a later blog post. 5. Administration The hspell dictionary files are looked up by a physical location on disk - you will need to provide a path they are saved at. Since dictionaries update, it is sometimes easier to update them that way in a distributed environment like the one I'm working with. It may be desirable to have them compiled within the same jar file as the code itself - I'll be happy to accept a pull request to do that. The code above is working with ElasticSearch 0.90 GA and Lucene 4.2.1. I also had it running on earlier versions of both technologies, but may had to make a few minor changes. I assume the samples would break on future versions and I'll probably don't have much time going back and keeping it up to date, but bear in mind most of the time the changes are minor and easy to understand and make by yourself. Both HebMorph and the hspell dictionary are released under the AGPL3. For any questions on licensing, feel free to contact me.
May 6, 2013
by Itamar Syn-hershko
· 7,030 Views
article thumbnail
Debugging “Wrong FS expected: file:///” exception from HDFS
I just spent some time putting together some basic Java code to read some data from HDFS. Pretty basic stuff. No map reduce involved. Pretty boilerplate code like the stuff from this popular tutorial on the topic. No matter what, I kept hitting my head on this error: Exception in thread “main” java.lang.IllegalArgumentException: Wrong FS: hdfs://localhost:9000/user/hadoop/DOUG_SVD/out.txt, expected: file:/// If you checkout the tutorial above, what’s supposed to be happening is that an instance of Hadoop’s Configuration should encounter a fs.default.name property, in one of the config files its given. The Configuration should realize that this property has a value of hdfs://localhost:9000. When you use the Configuration to create a Hadoop FileSystem instance, it should happily read this property from Configuration and process paths from HDFS. That’s a long way of saying these three lines of Java code: // pickup config files off classpath Configuration conf = new Configuration() // explicitely add other config files conf.addResource("/home/hadoop/conf/core-site.xml"); // create a FileSystem object needed to load file resources FileSystem fs = FileSystem.get(conf); // load files and stuff below! Well… My Hadoop config files (core-site.xml) appear setup correctly. It appears to be in my CLASSPATH. I’m even trying to explicitly add the resource. Basically I’ve followed all the troubleshooting tips you’re supposed to follow when you encounter this exception. But I’m STILL getting this exception. Head meet wall. This has to be something stupid. Troubleshooting Hadoop’s Configuration & FileSystem Objects Well before I reveal my dumb mistake in the above code, it turns out there’s some helpful functions to help debug these kind of problems: As Configuration is just a bunch of key/value pairs from a set of resources, its useful to know what resources it thinks it loaded and what properties it thinks it loaded from those files. getRaw() — return the raw value for a configuration item (like conf.getRaw("fs.default.name")) toString() — Configuration‘s toString shows the resources loaded You can similarly checkout FileSystem‘s helpful toString method. It nicely lays out where it thinks its pointing (native vs HDFS vs S3 etc). So if you similarly are looking for a stupid mistake like I was, pepper your code with printouts of these bits of info. They will at least point you in a new direction to search for your dumb mistake. Drumroll Please Turns out I missed the crucial step of passing a Path object not a String to addResource. They appear to do slightly different things. Adding a String adds a resource relative to the classpath. Adding a Path is used to add a resource at an absolute location and does not consider the classpath. So to explicitly load the correct config file, the code above gets turned into (drumroll please): // pickup config files off classpath Configuration conf = new Configuration() // explicitely add other config files // PASS A PATH NOT A STRING! conf.addResource(new Path("/home/hadoop/conf/core-site.xml")); FileSystem fs = FileSystem.get(conf); // load files and stuff below! Then Tada! everything magically works! Hopefully these tips can save you the next time you encounter these kinds of problems.
March 27, 2013
by Doug Turnbull
· 17,944 Views
article thumbnail
In-Memory Data Grids
Introduction The IT buzzword of 2012 is without a doubt Big Data. It’s new and here to stay, and for a good reason. Big data is data that exceeds the processing capacity of conventional database systems. Great examples are CERN with the Large Hadron Collider, whose experiments generate 25 petabytes of data annually, or Walmart, which handles more than one million customer transaction every hour. Problems These vast amounts of data leave us with two problems. Problem 1: To gain value from this data, one must choose an alternative way to process it. The value of big data to an organization falls into two categories: analytical use, and enabling new products. Big data analytics can reveal insights hidden previously by data too costly to process, such as peer influence among customers, revealed by analyzing shoppers’ transactions, social and geographical data. Being able to process every item of data in reasonable time removes the troublesome need for sampling and promotes an investigative approach to data, in contrast to the somewhat static nature of running predetermined reports. Problem 2: The data is too big, moves too fast, or doesn’t fit the strictures of your database architectures. Remember the CERN case where the LHC produces over 25 Petabytes of data annually? No “classic” database architecture or setup is capable of holding these amounts of data. Solutions Fortunately, both problems can be solved by implementing the correct infrastructure and rethinking data storage. There are two critical factors in Big Data environments: size and speed. We already discussed the vast amounts of data and desire to be able to access and process the data fast. The latter is the main differentiator from more traditional data warehouses. Just imagine what you can do when you can access all your data real-time. Enter big data. A common Big Data implementation is an in-memory data grid that lives in a distributed cluster, ensuring both speed, by storing data in-memory, and capacity by using scalability features provided by a cluster. As a bonus, availability is ensured by using a distributed cluster. As for the data storage, there are typically two kinds: in-memory databases and in-memory data grids. But first some background. It is not a new attempt to use main memory as a storage area instead of a disk. In our daily lives there are numerous examples of main memory databases (MMDB), as they perform much faster than disk-based databases. An every day example is a mobile phone. When you SMS or call someone most mobile service providers use MMDB to get the information on your contact as soon as possible. The same applies to your phone. When someone calls you, the caller details are looked up in the contacts application, usually providing a name and sometimes a picture. In memory data grids In Memory Data Grid (IMDG) is the same as MMDB in that it stores data in main memory, but it has a totally different architecture. The features of IMDG can be summarized as follows: Data is distributed and stored on multiple servers. Each server operates in the active mode. A data model is usually object-oriented (serialized) and non-relational. According to the necessity, you often need to add or reduce servers. No traditional database features such as tables. In other words, IMDG is designed to store data in main memory, ensure scalability and store an object itself. These days, there are many IMDG products, both commercial and open source. Some of the most commonly used products are: Hazelcast (http://www.hazelcast.com) JBoss Infinispan (http://www.jboss.org/infinispan) GridGain DataGrid (http://www.gridgain.com/features/in-memory-data-grid/) VMware Gemfire (http://www.vmware.com/nl/products/application-platform/vfabric-gemfire/overview.html) Oracle Coherence (http://www.oracle.com/technetwork/middleware/coherence/overview/index.html) Gigaspaces XAP (http://www.gigaspaces.com/datagrid) Terracotta Enterprise Suite (http://terracotta.org/products/enterprise-suite) Why Memory? The main reasons for using main memory for data storage are once again the two main themes of Big Data: speed and capacity. The processing performance of main memory is 800 times faster than an HDD and up to 40 times faster than an. Moreover, the latest x86 server supports main memory of hundreds of GB per server. It is said that the limit of a traditional processing database’s (OLTP) data capacity is approximately 1 TB and that the OLTP processing data capacity would not increment well. If servers using main memory of 1 TB or larger become more commonly used, you will be able to conduct operations with the entire data placed in main memory, at least in the field of OLTP. IMDG Architecture To use main memory as a storage area, two weak points should be overcome: Limited capacity: involves data that exceeds the maximum capacity of the main memory of the server Reliability: involves data loss in case of a (system) failure. IMDG overcomes the limit of capacity by ensuring horizontal scalability using a distributed architecture, and resolves the issue of reliability through a replication system as part of the grid (or a distributed cluster). Now let’s discuss how an IMDG actually works. First of all, it is important to understand that an IMDG is not the same as an in-memory database, also referred to as MMDB (main memory databases). Typical examples of MMDBs are Oracle TimesTen or Sap Hana. MMDBs are full database products that simply reside in memory. As a result of being a full-blown database, they also carry the weight and overhead of database management features. IMDG is different. No tables, indexes, triggers, stored procedures, process managers etc. Just plain storage. The data model used in IMDG is key-value pairs. A key-value pair is a list with only two parts: a key and a value. The key can be used for storing and retrieving the values in the list. A key can be compared to the index or primary key of a table in a database. Note that IMDG are closely tied to development environments such as Java as the key-value pairs are represented by the structures provided by such a programming environment. Most IMDGs are written in Java, and can only be used within other Java applications. Therefore, the values of key-value pairs can be anything supported by Java, ranging from simple data types such as a string or number, to complex objects. This overcomes the two important hurdles: as you can store complex Java objects as value, there’s no need to translate these objects into a relational datamodel (which is the case in more traditional applications using a database for storage). Furthermore, the seeming limitation of being able to store only one value per key, is actually no limitation at all. Large memory sizes Most of the products introduced above use Java as an implementation language. Java reserves and uses a part of the RAM (internal memory) for dynamic memory allocation. This reserved memory space is called the Java heap. All runtime objects created by a Java application are stored in heap. Using large amounts of data causes two problems. Size limitation: By default, the heap size is 128 MB, but for current business applications, this limit is reached easily. Once the heap is “full”, no new objects can be created and the Java application will show some nasty errors. Performance: It is possible to increase the size of the heap, but this introduces some new problems. When a heap reaches a size of more than 4 gigabytes, Java will have serious issues with memory managements, causing your application to slow down or even freeze. Java has a feature called Garbage Collector, which periodically scans the heap and checks each object if it is still valid and being used. If not, the garbage collector removes the object and defragments the newly available space. The problem is, the larger the heap size, the more work to do for the garbage collector, resulting in performance degradation. Imagine a large bank has a Java application that manages customers, accounts and transactions. We have seen that an IMDG allows the application to store and access all data very quickly by caching it in memory, instead of storing the data in relatively slow databases. Let’s assume the combined data has a size of 40 gigabytes. Storing it in heap is simply not possible, considering the performance penalties of Java’s memory management capabilities. The graph below illustrates the garbage collection pause time when placing cached data in heap: Terracotta’s BigMemory product has a method to overcome these limitations. The method is to use an off-heap memory (direct buffer). Data will not be stored in Java’s heap, but directly in the available internal memory (RAM). Since, this is not subject to Java’s garbage collector, there are no performance penalties. The differences on performance are significant, as can be seen in the graph below: Using off-heap storage has some major benefits: You can use all the available memory on your machine, not just the memory that is allocated to the heap (usually less that 512 Mb). This allows you to store more data in a in-memory data grid, greatly speeding up your application. The heap can be relieved by storing data in native memory, speeding up Java applications as less heap space has to be garbage collected. Clustering, fail over and high availability So far, we have seen IMDG features that are applicable to a single server. However, the real power of IMDG lies in it’s networking and clustering capabilities, providing features as data replication, data synchronization between clients, fail over and high availability. To achieve this, a cluster of servers (or server array) acts a backbone of the infrastructure. Applications (that still can have their own IMDG or off-heap cache) that are connected to the cluster can share, replicate and backup their data with either the cluster or other applications. The graph below depicts a typical setup using Terracotta's BigMemory: The caches on the application servers are usually referred to as “level 1” cache, while the data cache on the server array is referred to as “level 2” cache. There are many different scenarios possible for storing, clustering, synchronizing and replicating data. Covering all these topics goes far beyond the scope of this article. For more information, consult the technical documentation of the product of your choice. Conclusion Big Data brings us some new challenges. First of all, storing and accessing vast amounts of data makes us rethink traditional methods and technologies. Next, there’s the question what to do with all the available data. The potential value for marketing, financial and other businesses is huge. In order to facilitate Big Data, in-memory data grids are considered the best option. IMDGs with off-heap storage are even more powerful, allowing data centric enterprise application to overcome certain limits of the Java platform, such as memory and performance constraints. As the amount of data that (large) companies produce and store, grows exponentially, databases will hit a limit. Accessing your data without a performance penalty simply will not be possible. The answer to this is using an IMDG.
March 13, 2013
by Roy Prins
· 32,622 Views · 5 Likes
article thumbnail
Database Concepts for a java Dev: Database Normalization
In this part, I will be briefing about different types of Database Normalizations using a sample data model. What is Database Normalization? Normalization is the process of efficiently organizing data in the database. Primary Goal of Normalization? Eliminating redundant data & ensuring meaningful data dependencies. Types of Normalization The following are the three most common normal forms in the database normalization process First Normal Form (1NF) Second Normal Form (2NF) Third Normal Form (3NF) Sample Data Model for Demonstration The following data model will be used to demonstrate all the three normal forms First Normal Form (1NF) First Normal Form (1NF) sets the very basic rules for an organized database: Create separate set of tables for each group of related data and identify each row with a unique columns [primary key] or set of columns [composite key] Eliminate duplicate columns from the table The following data model depicts the tables after 1NF rules are applied - Second Normal Form (2NF) Second Normal Form (2NF) further addresses the concept of removing duplicate data: Meet all the requirements of the first normal form Remove subsets of data that apply to multiple rows of a table and place them in separate tables Create relationships between these new tables and their predecessors through the use of foreign keys So basically the objective of the Second Normal Form is to take that is only partly dependent on the primary key and enter that data into another table. The following data model depicts the tables after 2NF rules are applied. Data from EMPLOYEE_TABLE is split into 2 tables – EMPLOYEE_TABLE and EMPLOYEE_HR_TABLE. Similarly data from CUSTOMER_TABLE is moved to CUSTOMER_TABLE and CUSTOMER_ORDER table Third Normal Form (3NF) Third normal form (3NF) goes one large step further: Meet all the requirements of the second normal form. Remove columns that are not dependent upon the primary key. The following data model depicts the tables after 3NF rules are applied. Further state and country details are moved to their own tables because they are not dependent on the primary key. Advantages of Normalizing the Database There are several advantages of normalization - Data can be stored as small atomic pieces Saves space Increases speed Reduces data anomalies Easy maintenance Other parts of this series include: Part 1 – ACID Properties Part 2 – Keys Part 4 – Database Transactions [coming soon] Part 5 – Indexes [coming soon]
March 13, 2013
by Jagadeesh Motamarri
· 10,905 Views · 1 Like
article thumbnail
Using the Libjars Option with Hadoop
When working with MapReduce one of the challenges that is encountered early-on is determining how to make your third-part JAR’s available to the map and reduce tasks. One common approach is to create a fat jar, which is a JAR that contains your classes as well as your third-party classes (see this Cloudera blog post for more details). A more elegant solution is to take advantage of the libjars option in the hadoop jar command, also mentioned in the Cloudera post at a high level. Here I’ll go into detail on the three steps required to make this work. Add libjars to the options It can be confusing to know exactly where to put libjars when running the hadoop jar command. The following example shows the correct position of this option: $ export LIBJARS=/path/jar1,/path/jar2 $ hadoop jar my-example.jar com.example.MyTool -libjars ${LIBJARS} -mytoolopt value It’s worth noting in the above example that the JAR’s supplied as the value of the libjar option are comma-separated, and not separated by your O.S. path delimiter (which is how a Java classpath is delimited). You may think that you’re done, but often times this step alone may not be enough - read on for more details! Make sure your code is using GenericOptionsParser The Java class that’s being supplied to the hadoop jar command should use the GenericOptionsParser class to parse the options being supplied on the CLI. The easiest way to do that is demonstrated with the following code, which leverages the ToolRunner class to parse-out the options: public static void main(final String[] args) throws Exception { Configuration conf = new Configuration(); int res = ToolRunner.run(conf, new com.example.MyTool(), args); System.exit(res); } t is crucial that the configuration object being passed into the ToolRunner.run method is the same one that you’re using when setting-up your job. To guarantee this, your class should use the getConf() method defined in Configurable (and implemented in Configured) to access the configuration: public class SmallFilesMapReduce extends Configured implements Tool { public final int run(final String[] args) throws Exception { Job job = new Job(super.getConf()); ... job.waitForCompletion(true); return ...; } f you don’t leverage the Configuration object supplied to the ToolRunner.run method in your MapReduce driver code, then your job won’t be correctly configured and your third-party JAR’s won’t be copied to the Distributed Cache or loaded in the remote task JVM’s. It’s the ToolRunner.run method (actually it delegates the command parsing to GenericOptionsParser) which actually parses-out the libjars argument, and adds to the Configuration object a value for the tmpjarproperty. So a quick way to make sure that this step is working is to look at the job file for your MapReduce job (there’s a link when viewing the job details from the JobTracker), and make sure that the tmpjar configuration name exists with a value identical to the path that you specified in your command. You can also use the command-line to search for the libjars configuration in HDFS $ hadoop fs -cat /_logs/history/*.xml | grep tmpjars Use HADOOP_CLASSPATH to make your third-party JAR’s available on the client-side So far the first two steps tackled what you needed to do to to make your third-party JAR’s available to the remote map and reduce task JVM’s. But what hasn’t been covered so far is making these same JAR’s available to the client JVM, which is the JVM that’s created when you run the hadoop jar command. For this to happen, you should set the HADOOP_CLASSPATH environment variable to contain the O.S. path-delimited list of third-party JAR’s. Let’s extend the commands in the first step above with the addition of setting the HADOOP_CLASSPATH environment variable: $ export LIBJARS=/path/jar1,/path/jar2 $ export HADOOP_CLASSPATH=/path/jar1:/path/jar2 $ hadoop jar my-example.jar com.example.MyTool -libjars ${LIBJARS} -mytoolopt value Note that value for HADOOP_CLASSPATH uses a Unix path delimiter of :, so modify accordingly for your platform. And if you don’t like the copy-paste above you could modify that line to substitute the commas for semi-colons: $ export HADOOP_CLASSPATH=`echo ${LIBJARS} | sed s/,/:/g`
February 26, 2013
by Alex Holmes
· 22,493 Views
article thumbnail
Building an Online-Recommendation Engine with MongoDB
once upon a time there was a munich pizza baker who developed a technique to beam pizza out of bright sunshine. he can produce more than a thousand pizzas per second and needs a channel to sell this amount of pizza and decides to build an online shop. mario’s initial idea is to sell pizzas, but now he is thinking about introduction of new product lines like beverages, salads and pasta. before we take a look to the validation of mario´s idea, lets take a short look at the existing online shop. mario’s online shop is based on mongodb , apache wicket and spring . mongodb is a document-oriented nosql-database . mongodb stores records not in tables as a relational database but in bson documents, which is a binary version of json (java script object notation) and very similar to the object structure in mario’s application. the usage of mongodb makes his development easier and deployment faster. the figure shows a json document which is very similar to a java object: a json document property with the according value corresponds to the java object property with the appropriate value. you can add or remove properties in your java object and this will automatically change your database schema. so there is no need to put your java object model into a relational schema via hibernate. mario also decided to build his online shop only with open-source technologies like apache wicket and spring. wicket is a very common lightweight component-based web application framework and it is closely patterned after stateful gui frameworks such as javafx . the spring framework is an open source application framework and inversion of control container for the java platform and does not impose any specific programming model. spring has become popular in the java community as an alternative to, replacement for, or even addition to the enterprise javabean (ejb) model. because of this architecture mario is able to deploy its application in a lightweight application server like tomcat or jetty . this figure shows the system landscape of mario. mario has two major system on the lefthand site there is his online shop and on the righthand site there is ‘pas’ a famous billing system. in the middle is hadoop that connects both systems together. in the business world an application normally does not stand alone. in most cases an application must communicate with others. the lean architecture of marios online shop enables him to connect the billing system ‘pas’ to his online shop. spring for apache hadoop provides this integration between the two systems online shop and ‘pas’. hadoop supports data-intensive distributed applications and implements a computational paradigm named mapreduce, where the computation is divided into many small fragments, each of them may be executed or re-executed on any node in the cluster of commodity hardware. mario uses hadoop as an etl layer that enables him to transfer gigabytes of order information into the billing system. in this case hadoop makes it possible for a financial controller to verify if all orders were billed correctly. in addition to the online shop feature mario has a real-time sales dashboard that enables him to track his sales in real time. the dashboard displays daily and monthly sales statistics for each pizza and contains a map with the geographical overview of customer activity and competitor locations. here is a walkthrough of the shop : now lets talk about mario’s incredible new idea : mario wants to sell even more pizza! and other products as well. mario decides to use lean startup methods in order to test the possible introduction of new product lines and plans an experiment to validate his new idea using a scientific approach and pure facts instead of hunches. mario´s core assumption is that customers wants to buy other products than pizza – drinks, salads and pasta. furthermore he is worried about pricing. mario contacts all customers to complete a survey and provides an incentive for the participation, a free pizza to every customer who responds to the survey. the result of the survey validated mario’s assumption – customers want to buy beverages, salads and pasta. but he also found out that his customers are willing to pay higher prices for high-quality products and that they simply love his easy shopping flow. currently a pizza order can be completed with three clicks only, so there is new riskiest assumption to validate: will a more complex shopping flow affect his sales? the figures shows a validation board. a validation board is a deceptively simple tool for testing out product ideas. furthermore a validation board tracks pivots which follows from customer feedback. mario decides to introduce beverages, salads and pasta product lines and thinks about a possibility, how he can handle the extension of the product line without destroying the easy shopping flow. that’s why mario thinks a recommendation engine is the right way for him. panels for recommendations can be integrated in the online shop without changing the shopping flow. mario hired a statistician to help him implement a recommender system for his online shop for better cross-selling. he also defined new measurement points to validate his new idea . therefore he tracks the conversion rate of orders as well as cross-selling rates and every event in the online shop is already tracked in realtime. so mario can very easily perform further experiments in order to verify more assumptions. follow the blog to see how the story continues or come to mongodb usergroup meetup in munich , february 20, 2013 or mongodb days in berlin , february 26, 2013 to get a live presentation. our talk sheds light on how to build an online recommendation engine based on mongodb and apache mahout. we’ll show which recommenders must be built to reach mario’s goal and how these can be integrated in mario’s shop infrastructure.
February 17, 2013
by Comsysto Gmbh
· 8,433 Views
article thumbnail
Using awk and Friends with Hadoop
imagine you have a csv file that you want to manipulate. here’s a sample file we can play with: lopez,charlie,2002,11,21 parker,ward,1995,04,08 henderson,russell,2007,10,01 our goal is to transform this into the following form by combining the last three columns: lopez,charlie,20021121 parker,ward,19950408 henderson,russell,20071001 in linux this would take all of two seconds (excuse the awkward awk command): shell$ awk -f"," '{ print $1","$2","$3$4$5 }' people.txt what if you wanted to quickly do the same in hdfs - and let’s assume you want to write the results back to hdfs. one approach would be to use the hdfs cli to stream the inputs into awk, and stream the awk output back into hdfs. you could do this with the hdfs cat and put - options (note that adding a hyphen after put instructs the put command to stream data from standard input to hdfs): shell$ hadoop fs -cat people.txt | awk -f"," '{ print $1","$2","$3$4$5 }' | hadoop fs -put - people-coalesed.txt btw, if your input and output files are lzop-compressed then this command would work: shell$ hadoop fs -cat people.txt.lzo | lzop -dc | awk -f"," '{ print $1","$2","$3$4$5 }' | \ lzop -c | hadoop fs -put - people-coalesed.txt.lzo this is great if your file isn’t too large, but if it’s multiple gigabytes in length then you probably want to harness the power of mapreduce to get this done in a jiffy! the words “in a jiffy” and “mapreduce” aren’t commonly used together, so what do we do? well you could crack open pig or hive and write some custom user-defined functions, but this means you end up in java which we want to avoid. hadoop streaming comes to the rescue in these situations. let’s first create our awk script which will be executed: shell$ cat people.awk #!/bin/awk -f begin { fs = "," } { print $1","$2","$3$4$5 } in linux, if you make this awk script executable, you could execute is as follows: shell$ ./people.awk people.txt in mapreduce-land we don’t need to join data in this particular example, so we don’t need to run any reducers. call your awk script from mappers via hadoop streaming with this command: shell$ hadoop_home=/usr/lib/hadoop shell$ ${hadoop_home}/bin/hadoop \ jar ${hadoop_home}/contrib/streaming/*.jar \ -d mapreduce.job.reduces=0 \ -d mapred.reduce.tasks=0 \ -input people.txt \ -output people-coalesed \ -mapper people.awk \ -file people.awk a few options in the hadoop streaming command are worth examining: finally - to get lzo into the picture you need to add -inputformat , -d mapred.output.compress and -d mapred.output.compression.codec arguments: shell$ hadoop_home=/usr/lib/hadoop shell$ ${hadoop_home}/bin/hadoop \ jar ${hadoop_home}/contrib/streaming/*.jar \ -d mapreduce.job.reduces=0 \ -d mapred.reduce.tasks=0 \ -d mapred.output.compress=true \ -d stream.map.input.ignorekey=true \ -d mapred.output.compression.codec=com.hadoop.compression.lzo.lzopcodec \ -inputformat com.hadoop.mapred.deprecatedlzotextinputformat \ -input people.txt.lzo \ -output people-coalesed \ -mapper people.awk \ -file people.awk
February 14, 2013
by Alex Holmes
· 13,120 Views · 1 Like
article thumbnail
Sorting Text Files with MapReduce
in my last post i wrote about sorting files in linux. decently large files (in the tens of gb’s) can be sorted fairly quickly using that approach. but what if your files are already in hdfs, or ar hundreds of gb’s in size or larger? in this case it makes sense to use mapreduce and leverage your cluster resources to sort your data in parallel. mapreduce should be thought of as a ubiquitous sorting tool, since by design it sorts all the map output records (using the map output keys), so that all the records that reach a single reducer are sorted. the diagram below shows the internals of how the shuffle phase works in mapreduce. given that mapreduce already performs sorting between the map and reduce phases, then sorting files can be accomplished with an identity function (one where the inputs to the map and reduce phases are emitted directly). this is in fact what the sort example that is bundled with hadoop does. you can look at the how the example code works by examining the org.apache.hadoop.examples.sort class. to use this example code to sort text files in hadoop, you would use it as follows: shell$ export hadoop_home=/usr/lib/hadoop shell$ $hadoop_home/bin/hadoop jar $hadoop_home/hadoop-examples.jar sort \ -informat org.apache.hadoop.mapred.keyvaluetextinputformat \ -outformat org.apache.hadoop.mapred.textoutputformat \ -outkey org.apache.hadoop.io.text \ -outvalue org.apache.hadoop.io.text \ /hdfs/path/to/input \ /hdfs/path/to/output this works well, but it doesn’t offer some of the features that i commonly rely upon in linux’s sort, such as sorting on a specific column, and case-insensitive sorts. linux-esque sorting in mapreduce i’ve started a new github repo called hadoop-utils , where i plan to roll useful helper classes and utilities. the first one is a flexible hadoop sort. the same hadoop example sort can be accomplished with the hadoop-utils sort as follows: shell$ $hadoop_home/bin/hadoop jar hadoop-utils--jar-with-dependencies.jar \ com.alexholmes.hadooputils.sort.sort \ /hdfs/path/to/input \ /hdfs/path/to/output to bring sorting in mapreduce closer to the linux sort, the --key and --field-separator options can be used to specify one or more columns that should be used for sorting, as well as a custom separator (whitespace is the default). for example, imagine you had a file in hdfs called /input/300names.txt which contained first and last names: shell$ hadoop fs -cat 300names.txt | head -n 5 roy franklin mario gardner willis romero max wilkerson latoya larson to sort on the last name you would run: shell$ $hadoop_home/bin/hadoop jar hadoop-utils--jar-with-dependencies.jar \ com.alexholmes.hadooputils.sort.sort \ --key 2 \ /input/300names.txt \ /hdfs/path/to/output the syntax of --key is pos1[,pos2] , where the first position (pos1) is required, and the second position (pos2) is optional - if it’s omitted then pos1 through the rest of the line is used for sorting. just like the linux sort, --key is 1-based, so --key 2 in the above example will sort on the second column in the file. lzop integration another trick that this sort utility has is its tight integration with lzop, a useful compression codec that works well with large files in mapreduce (see chapter 5 of hadoop in practice for more details on lzop). it can work with lzop input files that span multiple splits, and can also lzop-compress outputs, and even create lzop index files. you would do this with the codec and lzop-index options: shell$ $hadoop_home/bin/hadoop jar hadoop-utils--jar-with-dependencies.jar \ com.alexholmes.hadooputils.sort.sort \ --key 2 \ --codec com.hadoop.compression.lzo.lzopcodec \ --map-codec com.hadoop.compression.lzo.lzocodec \ --lzop-index \ /hdfs/path/to/input \ /hdfs/path/to/output multiple reducers and total ordering if your sort job runs with multiple reducers (either because mapreduce.job.reduces in mapred-site.xml has been set to a number larger than 1, or because you’ve used the -r option to specify the number of reducers on the command-line), then by default hadoop will use the hashpartitioner to distribute records across the reducers. use of the hashpartitioner means that you can’t concatenate your output files to create a single sorted output file. to do this you’ll need total ordering , which is supported by both the hadoop example sort and the hadoop-utils sort - the hadoop-utils sort enables this with the --total-order option. shell$ $hadoop_home/bin/hadoop jar hadoop-utils--jar-with-dependencies.jar \ com.alexholmes.hadooputils.sort.sort \ --total-order 0.1 10000 10 \ /hdfs/path/to/input \ /hdfs/path/to/output the syntax is for this option is unintuitive so let’s look at what each field means. more details on total ordering can be seen in chapter 4 of hadoop in practice . more details for details on how to download and run the hadoop-utils sort take a look at the cli guide in the github project page .
January 26, 2013
by Alex Holmes
· 15,434 Views
article thumbnail
Chunk Oriented Processing in Spring Batch
Big Data Sets’ Processing is one of the most important problem in the software world. Spring Batch is a lightweight and robust batch framework to process the data sets. Spring Batch Framework offers ‘TaskletStep Oriented’ and ‘Chunk Oriented’ processing style. In this article, Chunk Oriented Processing Model is explained. Also, TaskletStep Oriented Processing in Spring Batch Article is definitely suggested to investigate how to develop TaskletStep Oriented Processing in Spring Batch. Chunk Oriented Processing Feature has come with Spring Batch v2.0. It refers to reading the data one at a time, and creating ‘chunks’ that will be written out, within a transaction boundary. One item is read from an ItemReader, handed to an ItemProcessor, and written. Once the number of items read equals the commit interval, the entire chunk is written out via the ItemWriter, and then the transaction is committed. Basically, this feature should be used if at least one data item’ s reading and writing is required. Otherwise, TaskletStep Oriented processing can be used if the data item’ s only reading or writing is required. Chunk Oriented Processing model exposes three important interface as ItemReader, ItemProcessor and ItemWriter via org.springframework.batch.item package. ItemReader : This interface is used for providing the data. It reads the data which will be processed. ItemProcessor : This interface is used for item transformation. It processes input object and transforms to output object. ItemWriter : This interface is used for generic output operations. It writes the datas which are transformed by ItemProcessor. For example, the datas can be written to database, memory or outputstream (etc). In this sample application, we will write to database. Let us take a look how to develop Chunk Oriented Processing Model. Used Technologies : JDK 1.7.0_09 Spring 3.1.3 Spring Batch 2.1.9 Hibernate 4.1.8 Tomcat JDBC 7.0.27 MySQL 5.5.8 MySQL Connector 5.1.17 Maven 3.0.4 Step 1 : Create Maven Project A maven project is created as below. (It can be created by using Maven or IDE Plug-in). Step 2: Libraries A new USER Table is created by executing below script: CREATE TABLE ONLINETECHVISION.USER ( id int(11) NOT NULL AUTO_INCREMENT, name varchar(45) NOT NULL, surname varchar(45) NOT NULL, PRIMARY KEY (`id`) ); Step 3: Libraries Firstly, dependencies are added to Maven’ s pom.xml. 3.1.3.RELEASE 2.1.9.RELEASE org.springframework spring-core ${spring.version} org.springframework spring-context ${spring.version} org.springframework spring-tx ${spring.version} org.springframework spring-orm ${spring.version} org.springframework.batch spring-batch-core ${spring-batch.version} org.hibernate hibernate-core 4.1.8.Final org.apache.tomcat tomcat-jdbc 7.0.27 mysql mysql-connector-java 5.1.17 log4j log4j 1.2.16 maven-compiler-plugin(Maven Plugin) is used to compile the project with JDK 1.7 org.apache.maven.plugins maven-compiler-plugin 3.0 1.7 1.7 The following Maven plugin can be used to create runnable-jar, org.apache.maven.plugins maven-shade-plugin 2.0 package shade 1.7 1.7 com.onlinetechvision.exe.Application META-INF/spring.handlers META-INF/spring.schemas Step 4 : Create User Entity User Entity is created. This entity will be stored after processing. package com.onlinetechvision.user; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.Table; /** * User Entity * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ @Entity @Table(name="USER") public class User { private int id; private String name; private String surname; @Id @GeneratedValue(strategy=GenerationType.AUTO) @Column(name="ID", unique = true, nullable = false) public int getId() { return id; } public void setId(int id) { this.id = id; } @Column(name="NAME", unique = true, nullable = false) public String getName() { return name; } public void setName(String name) { this.name = name; } @Column(name="SURNAME", unique = true, nullable = false) public String getSurname() { return surname; } public void setSurname(String surname) { this.surname = surname; } @Override public String toString() { StringBuffer strBuff = new StringBuffer(); strBuff.append("id : ").append(getId()); strBuff.append(", name : ").append(getName()); strBuff.append(", surname : ").append(getSurname()); return strBuff.toString(); } } Step 5 : Create IUserDAO Interface IUserDAO Interface is created to expose data access functionality. package com.onlinetechvision.user.dao; import java.util.List; import com.onlinetechvision.user.User; /** * User DAO Interface * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public interface IUserDAO { /** * Adds User * * @param User user */ void addUser(User user); /** * Gets User List * */ List getUsers(); } Step 6 : Create UserDAO IMPL UserDAO Class is created by implementing IUserDAO Interface. package com.onlinetechvision.user.dao; import java.util.List; import org.hibernate.SessionFactory; import com.onlinetechvision.user.User; /** * User DAO * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public class UserDAO implements IUserDAO { private SessionFactory sessionFactory; /** * Gets Hibernate Session Factory * * @return SessionFactory - Hibernate Session Factory */ public SessionFactory getSessionFactory() { return sessionFactory; } /** * Sets Hibernate Session Factory * * @param SessionFactory - Hibernate Session Factory */ public void setSessionFactory(SessionFactory sessionFactory) { this.sessionFactory = sessionFactory; } /** * Adds User * * @param User user */ @Override public void addUser(User user) { getSessionFactory().getCurrentSession().save(user); } /** * Gets User List * * @return List - User list */ @SuppressWarnings({ "unchecked" }) @Override public List getUsers() { List list = getSessionFactory().getCurrentSession().createQuery("from User").list(); return list; } } Step 7 : Create IUserService Interface IUserService Interface is created for service layer. package com.onlinetechvision.user.service; import java.util.List; import com.onlinetechvision.user.User; /** * * User Service Interface * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public interface IUserService { /** * Adds User * * @param User user */ void addUser(User user); /** * Gets User List * * @return List - User list */ List getUsers(); } Step 8 : Create UserService IMPL UserService Class is created by implementing IUserService Interface. package com.onlinetechvision.user.service; import java.util.List; import org.springframework.transaction.annotation.Transactional; import com.onlinetechvision.user.User; import com.onlinetechvision.user.dao.IUserDAO; /** * * User Service * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ @Transactional(readOnly = true) public class UserService implements IUserService { IUserDAO userDAO; /** * Adds User * * @param User user */ @Transactional(readOnly = false) @Override public void addUser(User user) { getUserDAO().addUser(user); } /** * Gets User List * */ @Override public List getUsers() { return getUserDAO().getUsers(); } public IUserDAO getUserDAO() { return userDAO; } public void setUserDAO(IUserDAO userDAO) { this.userDAO = userDAO; } } Step 9 : Create TestReader IMPL TestReader Class is created by implementing ItemReader Interface. This class is called in order to read items. When read method returns null, reading operation is completed. The following steps explains with details how to be executed firstJob. The commit-interval value of firstjob is 2 and the following steps are executed : 1) firstTestReader is called to read first item(firstname_0, firstsurname_0) 2) firstTestReader is called again to read second item(firstname_1, firstsurname_1) 3) testProcessor is called to process first item(FIRSTNAME_0, FIRSTSURNAME_0) 4) testProcessor is called to process second item(FIRSTNAME_1, FIRSTSURNAME_1) 5) testWriter is called to write first item(FIRSTNAME_0, FIRSTSURNAME_0) to database 6) testWriter is called to write second item(FIRSTNAME_1, FIRSTSURNAME_1) to database 7) first and second items are committed and the transaction is closed. 8) firstTestReader is called to read third item(firstname_2, firstsurname_2) 9) maxIndex value of firstTestReader is 3. read method returns null and item reading operation is completed. 10) testProcessor is called to process third item(FIRSTNAME_2, FIRSTSURNAME_2) 11) testWriter is called to write first item(FIRSTNAME_2, FIRSTSURNAME_2) to database 12) third item is committed and the transaction is closed. firstStep is completed with COMPLETED status and secondStep is started. secondJob and thirdJob are executed in the same way. package com.onlinetechvision.item; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.NonTransientResourceException; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException; import com.onlinetechvision.user.User; /** * TestReader Class is created to read items which will be processed * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public class TestReader implements ItemReader { private int index; private int maxIndex; private String namePrefix; private String surnamePrefix; /** * Reads items one by one * * @return User * * @throws Exception * @throws UnexpectedInputException * @throws ParseException * @throws NonTransientResourceException * */ @Override public User read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { User user = new User(); user.setName(getNamePrefix() + "_" + index); user.setSurname(getSurnamePrefix() + "_" + index); if(index > getMaxIndex()) { return null; } incrementIndex(); return user; } /** * Increments index which defines read-count * * @return int * */ private int incrementIndex() { return index++; } public int getMaxIndex() { return maxIndex; } public void setMaxIndex(int maxIndex) { this.maxIndex = maxIndex; } public String getNamePrefix() { return namePrefix; } public void setNamePrefix(String namePrefix) { this.namePrefix = namePrefix; } public String getSurnamePrefix() { return surnamePrefix; } public void setSurnamePrefix(String surnamePrefix) { this.surnamePrefix = surnamePrefix; } } Step 10 : Create FailedCaseTestReader IMPL FailedCaseTestReader Class is created in order to simulate the failed job status. In this sample application, when thirdJob is processed at fifthStep, failedCaseTestReader is called and exception is thrown so its status will be FAILED. package com.onlinetechvision.item; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.NonTransientResourceException; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException; import com.onlinetechvision.user.User; /** * FailedCaseTestReader Class is created in order to simulate the failed job status. * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public class FailedCaseTestReader implements ItemReader { private int index; private int maxIndex; private String namePrefix; private String surnamePrefix; /** * Reads items one by one * * @return User * * @throws Exception * @throws UnexpectedInputException * @throws ParseException * @throws NonTransientResourceException * */ @Override public User read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { User user = new User(); user.setName(getNamePrefix() + "_" + index); user.setSurname(getSurnamePrefix() + "_" + index); if(index >= getMaxIndex()) { throw new Exception("Unexpected Error!"); } incrementIndex(); return user; } /** * Increments index which defines read-count * * @return int * */ private int incrementIndex() { return index++; } public int getMaxIndex() { return maxIndex; } public void setMaxIndex(int maxIndex) { this.maxIndex = maxIndex; } public String getNamePrefix() { return namePrefix; } public void setNamePrefix(String namePrefix) { this.namePrefix = namePrefix; } public String getSurnamePrefix() { return surnamePrefix; } public void setSurnamePrefix(String surnamePrefix) { this.surnamePrefix = surnamePrefix; } } Step 11 : Create TestProcessor IMPL TestProcessor Class is created by implementing ItemProcessor Interface. This class is called to process items. User item is received from TestReader, processed and returned to TestWriter. package com.onlinetechvision.item; import java.util.Locale; import org.springframework.batch.item.ItemProcessor; import com.onlinetechvision.user.User; /** * TestProcessor Class is created to process items. * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public class TestProcessor implements ItemProcessor { /** * Processes items one by one * * @param User user * @return User * @throws Exception * */ @Override public User process(User user) throws Exception { user.setName(user.getName().toUpperCase(Locale.ENGLISH)); user.setSurname(user.getSurname().toUpperCase(Locale.ENGLISH)); return user; } } Step 12 : Create TestWriter IMPL TestWriter Class is created by implementing ItemWriter Interface. This class is called to write items to DB, memory etc… package com.onlinetechvision.item; import java.util.List; import org.springframework.batch.item.ItemWriter; import com.onlinetechvision.user.User; import com.onlinetechvision.user.service.IUserService; /** * TestWriter Class is created to write items to DB, memory etc... * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public class TestWriter implements ItemWriter { private IUserService userService; /** * Writes items via list * * @throws Exception * */ @Override public void write(List userList) throws Exception { for(User user : userList) { getUserService().addUser(user); } System.out.println("User List : " + getUserService().getUsers()); } public IUserService getUserService() { return userService; } public void setUserService(IUserService userService) { this.userService = userService; } } Step 13 : Create FailedStepTasklet Class FailedStepTasklet is created by implementing Tasklet Interface. It illustrates business logic in failed step. package com.onlinetechvision.tasklet; import org.apache.log4j.Logger; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; /** * FailedStepTasklet Class illustrates a failed job. * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public class FailedStepTasklet implements Tasklet { private static final Logger logger = Logger.getLogger(FailedStepTasklet.class); private String taskResult; /** * Executes FailedStepTasklet * * @param StepContribution stepContribution * @param ChunkContext chunkContext * @return RepeatStatus * @throws Exception * */ public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { logger.debug("Task Result : " + getTaskResult()); throw new Exception("Error occurred!"); } public String getTaskResult() { return taskResult; } public void setTaskResult(String taskResult) { this.taskResult = taskResult; } } Step 14 : Create BatchProcessStarter Class BatchProcessStarter Class is created to launch the jobs. Also, it logs their execution results. package com.onlinetechvision.spring.batch; import org.apache.log4j.Logger; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRestartException; /** * BatchProcessStarter Class launches the jobs and logs their execution results. * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public class BatchProcessStarter { private static final Logger logger = Logger.getLogger(BatchProcessStarter.class); private Job firstJob; private Job secondJob; private Job thirdJob; private JobLauncher jobLauncher; private JobRepository jobRepository; /** * Starts the jobs and logs their execution results. * */ public void start() { JobExecution jobExecution = null; JobParametersBuilder builder = new JobParametersBuilder(); try { getJobLauncher().run(getFirstJob(), builder.toJobParameters()); jobExecution = getJobRepository().getLastJobExecution(getFirstJob().getName(), builder.toJobParameters()); logger.debug(jobExecution.toString()); getJobLauncher().run(getSecondJob(), builder.toJobParameters()); jobExecution = getJobRepository().getLastJobExecution(getSecondJob().getName(), builder.toJobParameters()); logger.debug(jobExecution.toString()); getJobLauncher().run(getThirdJob(), builder.toJobParameters()); jobExecution = getJobRepository().getLastJobExecution(getThirdJob().getName(), builder.toJobParameters()); logger.debug(jobExecution.toString()); } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) { logger.error(e); } } public Job getFirstJob() { return firstJob; } public void setFirstJob(Job firstJob) { this.firstJob = firstJob; } public Job getSecondJob() { return secondJob; } public void setSecondJob(Job secondJob) { this.secondJob = secondJob; } public Job getThirdJob() { return thirdJob; } public void setThirdJob(Job thirdJob) { this.thirdJob = thirdJob; } public JobLauncher getJobLauncher() { return jobLauncher; } public void setJobLauncher(JobLauncher jobLauncher) { this.jobLauncher = jobLauncher; } public JobRepository getJobRepository() { return jobRepository; } public void setJobRepository(JobRepository jobRepository) { this.jobRepository = jobRepository; } } Step 15 : Create dataContext.xml jdbc.properties, is created. It defines data-source informations and is read via dataContext.xml jdbc.db.driverClassName=com.mysql.jdbc.Driver jdbc.db.url=jdbc:mysql://localhost:3306/onlinetechvision jdbc.db.username=root jdbc.db.password=root jdbc.db.initialSize=10 jdbc.db.minIdle=3 jdbc.db.maxIdle=10 jdbc.db.maxActive=10 jdbc.db.testWhileIdle=true jdbc.db.testOnBorrow=true jdbc.db.testOnReturn=true jdbc.db.initSQL=SELECT 1 FROM DUAL jdbc.db.validationQuery=SELECT 1 FROM DUAL jdbc.db.timeBetweenEvictionRunsMillis=30000 Step 16 : Create dataContext.xml Spring Configuration file, dataContext.xml, is created. It covers dataSource, sessionFactory and transactionManager definitions. com.onlinetechvision.user.User org.hibernate.dialect.MySQLDialect true Step 17 : Create jobContext.xml Spring Configuration file, jobContext.xml, is created. It covers jobRepository, jobLauncher, item reader, item processor, item writer, tasklet and job definitions. Step 18 : Create applicationContext.xml Spring Configuration file, applicationContext.xml, is created. It covers bean definitions. Step 19 : Create Application Class Application Class is created to run the application. package com.onlinetechvision.exe; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.onlinetechvision.spring.batch.BatchProcessStarter; /** * Application Class starts the application. * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public class Application { /** * Starts the application * * @param String[] args * */ public static void main(String[] args) { ApplicationContext appContext = new ClassPathXmlApplicationContext("applicationContext.xml"); BatchProcessStarter batchProcessStarter = (BatchProcessStarter)appContext.getBean("batchProcessStarter"); batchProcessStarter.start(); } } Step 20 : Build Project After OTV_SpringBatch_Chunk_Oriented_Processing Project is built, OTV_SpringBatch_Chunk_Oriented_Processing-0.0.1-SNAPSHOT.jar will be created. STEP 21 : RUN PROJECT After created OTV_SpringBatch_Chunk_Oriented_Processing-0.0.1-SNAPSHOT.jar file is run, the following database and console output logs will be shown : Database screenshot : First Job’ s console output : 16.12.2012 19:30:41 INFO (SimpleJobLauncher.java:118) - Job: [FlowJob: [name=firstJob]] launched with the following parameters: [{}] 16.12.2012 19:30:41 DEBUG (AbstractJob.java:278) - Job execution starting: JobExecution: id=0, version=0, startTime=null, endTime=null, lastUpdated=Sun Dec 16 19:30:41 GMT 2012, status=STARTING, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{}], Job=[firstJob]] User List : [id : 181, name : FIRSTNAME_0, surname : FIRSTSURNAME_0, id : 182, name : FIRSTNAME_1, surname : FIRSTSURNAME_1, id : 183, name : FIRSTNAME_2, surname : FIRSTSURNAME_2, id : 184, name : SECONDNAME_0, surname : SECONDSURNAME_0, id : 185, name : SECONDNAME_1, surname : SECONDSURNAME_1, id : 186, name : SECONDNAME_2, surname : SECONDSURNAME_2] 16.12.2012 19:30:42 DEBUG (BatchProcessStarter.java:43) - JobExecution: id=0, version=2, startTime=Sun Dec 16 19:30:41 GMT 2012, endTime=Sun Dec 16 19:30:42 GMT 2012, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{}], Job=[firstJob]] Second Job’ s console output : 16.12.2012 19:30:42 INFO (SimpleJobLauncher.java:118) - Job: [FlowJob: [name=secondJob]] launched with the following parameters: [{}] 16.12.2012 19:30:42 DEBUG (AbstractJob.java:278) - Job execution starting: JobExecution: id=1, version=0, startTime=null, endTime=null, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=STARTING, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=1, version=0, JobParameters=[{}], Job=[secondJob]] User List : [id : 181, name : FIRSTNAME_0, surname : FIRSTSURNAME_0, id : 182, name : FIRSTNAME_1, surname : FIRSTSURNAME_1, id : 183, name : FIRSTNAME_2, surname : FIRSTSURNAME_2, id : 184, name : SECONDNAME_0, surname : SECONDSURNAME_0, id : 185, name : SECONDNAME_1, surname : SECONDSURNAME_1, id : 186, name : SECONDNAME_2, surname : SECONDSURNAME_2, id : 187, name : THIRDNAME_0, surname : THIRDSURNAME_0, id : 188, name : THIRDNAME_1, surname : THIRDSURNAME_1, id : 189, name : THIRDNAME_2, surname : THIRDSURNAME_2, id : 190, name : THIRDNAME_3, surname : THIRDSURNAME_3, id : 191, name : FOURTHNAME_0, surname : FOURTHSURNAME_0, id : 192, name : FOURTHNAME_1, surname : FOURTHSURNAME_1, id : 193, name : FOURTHNAME_2, surname : FOURTHSURNAME_2, id : 194, name : FOURTHNAME_3, surname : FOURTHSURNAME_3] 16.12.2012 19:30:42 DEBUG (BatchProcessStarter.java:47) - JobExecution: id=1, version=2, startTime=Sun Dec 16 19:30:42 GMT 2012, endTime=Sun Dec 16 19:30:42 GMT 2012, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=1, version=0, JobParameters=[{}], Job=[secondJob]] Third Job’ s console output : 16.12.2012 19:30:42 INFO (SimpleJobLauncher.java:118) - Job: [FlowJob: [name=thirdJob]] launched with the following parameters: [{}] 16.12.2012 19:30:42 DEBUG (AbstractJob.java:278) - Job execution starting: JobExecution: id=2, version=0, startTime=null, endTime=null, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=STARTING, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=2, version=0, JobParameters=[{}], Job=[thirdJob]] 16.12.2012 19:30:42 DEBUG (TransactionTemplate.java:159) - Initiating transaction rollback on application exception org.springframework.batch.repeat.RepeatException: Exception in batch process; nested exception is java.lang.Exception: Unexpected Error! ... 16.12.2012 19:30:43 DEBUG (BatchProcessStarter.java:51) - JobExecution: id=2, version=2, startTime=Sun Dec 16 19:30:42 GMT 2012, endTime=Sun Dec 16 19:30:43 GMT 2012, lastUpdated=Sun Dec 16 19:30:43 GMT 2012, status=FAILED, exitStatus=exitCode=FAILED;exitDescription=, job=[JobInstance: id=2, version=0, JobParameters=[{}], Job=[thirdJob]] Step 22 : Download https://github.com/erenavsarogullari/OTV_SpringBatch_Chunk_Oriented_Processing REFERENCES : Chunk Oriented Processing in Spring Batch
January 3, 2013
by Eren Avsarogullari
· 153,068 Views · 7 Likes
article thumbnail
C++ benchmark – std::vector VS std::list
a updated version of this article is available: c++ benchmark – std::vector vs std::list vs std::deque in c++, the two most used data structures are the std::vector and the std::list. in this article, we will compare the performance in practice of these two data structures on several different workloads. in this article, when i talk about a list it is the std::list implementation and vector refers to the std::vector implementation. it is generally said that a list should be used when random insert and remove will be performed (performed in o(1) versus o(n) for a vector). if we look only at the complexity, search in both data structures should be roughly equivalent, complexity being in o(n). when random insert/replace operations are performed on a vector, all the subsequent data needs to be moved so each element will be copied. that is why the size of the data type is an important factor when comparing those two data structures. however, in practice, there is a huge difference in the usage of the memory caches. all the data in a vector is contiguous where the std::list allocates memory separately for each element. how does that change the results in practice ? keep in mind that all the tests performed are made on vector and list even if other data structures could be better suited to the given workload. in the graphs and in the text, n is used to refer to the number of elements of the collection. all the tests performed have been performed on an intel core i7 q 820 @ 1.73ghz. the code has been compiled in 64 bits with gcc 4.7.2 with -02 and -march=native. the code has been compiled with c++11 support (-std=c++11). fill the first test is to fill the data structures by adding elements to the back of the container. two variations of vector are used, vector_pre being a std::vector with the size passed in parameters to the constructor, resulting in only one allocation of memory. fill (8 bytes)vector_prevectorlist100010000100000100000003006009001,200milliseconds fill (1024 bytes)vector_prevectorlist100010000100000100000006,00012,00018,00024,000milliseconds all data structures are impacted the same way when the data size increases because there will be more memory to allocate. the vector_pre is clearly the winner of this test, being one order of magnitude faster than a list and about twice as fast as a vector without pre-allocation. the results are directly linked to the allocations that have to be performed, allocation being slow. whatever the data size is, push_back to a vector will always be faster than to a list. this is logical because vector allocates more memory than necessary and so does not need to allocate memory for each element. but this test is not very interesting, generally building the data structure is not critical. what is critical is the operations that are performed on the data structure. that will be tested in the coming sections. random find the first operation is that is tested is the search. the container is filled with all the numbers in [0, n] and shuffled. then, each number in [0,n] is searched in the container with std::find that performs a simple linear search. yes, vector is represented in the graph, its line is the same as the x line ! performing a linear search in a vector is several orders of magnitude faster than in a list . the only reason is the usage of the cache line. when a data is accessed, the data is fetched from the main memory to the cache. not only the accessed data is accessed, but a whole cacheline is fetched. as the elements in a vector are contiguous, when you access an element, the next element is automatically in the cache. as the main memory is orders of magnitude slower than the cache, this makes a huge difference. in the list case, the processor spends its whole time waiting for data being fetched from memory to the cache. if we augment the size of the data type to 1kb, the results remain the same, but slower: random insert in the case of random insert, in theory, the list should be much faster, its insert operation being in o(1) versus o(n) for a vector. the container is filled with all the numbers in [0, n] and shuffled. then, 1000 random values are inserted at a random position in the container. the random position is found by linear search. in both cases, the complexity of the search is o(n), the only difference comes from the insert that follow the search. when, the vector should be slower than the list, it is almost an order of magnitude faster. again, this is because finding the position in a list is much slower than copying a lot of small elements. if we increase the size: the two lines are getting closer, but vector is still faster. increase it to 1kb: this time, list outperforms vector by an order of magnitude ! the performance of random insert in a list are not impacted much by the size of the data type, where vector suffers a lot when big sizes are used. we can also see that list doesn’t seem to care about the size of the collection. it is because the size of the collection only impact the search and not the insertion and as few search are performed, it does not change the results a lot. if the iterator was already known (no need for linear search), it would be faster to insert into a list than into the vector. random remove in theory, random remove is the same case than random insert. now that we’ve seen the results with random insert, we could expect the same behavior for random remove. the container is filled with all the numbers in [0, n] and shuffled. then, 1000 random values are removed from a random position in the container. again, vector is several times faster and looks to scale better. again, this is because it is very cheap to copy small elements. let’s increase it directly to 1kb element. the two lines have been reversed ! the behavior of random remove is the same as the behavior of random insert, for the same reasons. push front the next operation that we will compare is inserting elements in front of the collection. this is the worst case for vector, because after each insertion, all the previously inserted will be moved and copied. for a list, it does not make a difference compared to pushing to the back. the results are crystal-clear and as expected. vector is very bad at inserting elements to the front. this does not need further explanations. there is no need to change the data size, it will only make vector much slower. sort the next operation that is tested is the performance of sorting a vector or a list. for a vector std::sort is used and for a list the member function sort is used. we can see that sorting a list is several times slower. it comes from the poor usage of the cache. if we increase the size of the element to 1kb: this time the list is faster than the vector. it is not very clear on the graph, but the values for the list are almost the same as for the previous results. that is because std::list::sort() does not perform any copy, only pointers to the elements are changed. on the other hand, swapping two elements in a vector involves at least three copies, so the cost of sorting will increase as the cost of copying increases. number crunching finally, we can also test a number crunching operation. here, random elements are inserted into the container that is kept sorted. it means, that the position where the element has to be inserted is first searched by iterating through elements and the inserted. as we talk about number crunching, only 8 bytes elements are tested. we can clearly see that vector is more than an order of magnitude faster than list and this will only be more as the size of the collection increase. this is because traversing the list is much more expensive than copying the elements of the vector. conclusion to conclude, we can get some facts about each data structure: std::vector is insanely faster than std::list to find an element std::vector always performs faster than std::list with very small data std::vector is always faster to push elements at the back than std::list std::list handles large elements very well, especially for sorting or inserting in the front these are the simple conclusions on usage of each data structure: for number crunching : use std::vector for linear search : use std::vector for random insert/remove : use std::list (if data size very small (< 64b on my computer), use std::vector) for big data size : use std::list (not if intended for searching) if you have the time, in practice, the best way to decide is always to benchmark both versions, or even to try other data structures. i hope that you found this article interesting. if you have any comment or have an idea about another workload that you would like to test, don’t hesitate to post a comment if you have a question on results, don’t hesitate as well. the code source of the benchmark is available online: https://github.com/wichtounet/articles/blob/master/src/vector_list/bench.cpp
December 6, 2012
by Baptiste Wicht
· 45,065 Views
article thumbnail
Enterprise-ready Tool Support for Apache Camel
apache camel is my favorite integration framework on the java platform due to great dsls, a huge community, and so many different components. camel is used by many developers from different companies all over the world. however, most guys are not aware that some really cool and – more important – enterprise-ready tooling is available for camel, too. many people ask me about camel tooling when i do talks at conferences. this is the reason for this short blog post about camel tooling. [fyi: i work for talend (one of the vendors).] ide support camel consists of a set of normal java libraries and is therefore usable with any java ide (such as eclipse, netbeans or intellij idea) or even a classic text editor. programming dsls are available for java, groovy, and scala. even a kotlin dsl is in the works, thanks to camel’s founder james strachan. all familiar ide features such as code completion or javadoc view are available for these dsls. in the spring xml dsl, the eclipse-based springsource tool suite (sts) should be emphasized, which provides the best support for the spring framework and xml configurations. camel-specific tooling besides classical ide support, further products are available to provide additional functionality. integration problems can be modeled with the help of enterprise integration patterns (eip, http://www.eaipatterns.com/). eips are implemented by camel. visual designers are available to help modeling integration problems with these eips. these tools even generate the corresponding source code automatically. ideally, developers do not have to write any source code by hand. camel tooling is offered by talend with talend esb (http://de.talend.com/products/esb) and jboss, formerly fusesource, with fuse ide (http://fusesource.com/products/fuse-ide). both companies also provide full-time committers for the apache camel project. let’s take a short look at these two products in the following. open studio for talend esb talend esb is an eclipse-based integration platform within the talend unified platform. the familiar “look and feel” and the intuitive use of eclipse remain. the esb is open source and freely available. the paid enterprise version offers additional features and support. the esb can be used independently or in combination with other parts of the talend unified platform, such as BPM, big data, or master data management. the great benefit is that everything can be done within one suite using the same gui and concepts, based on eclipse. the entire talend unified platform is based on the “zero-coding” approach. this way, a very efficient implementation of integration problems is possible using the eips and components. routes are modeled and configured with intuitive tool support, all source code is generated. of course, custom integration logic can still be written and included, for example, pojos, spring beans, scripts in different languages, or own camel components. plenty of other components besides camel’s ones are available for talend esb – for example connectors to alfresco, jasper, sap, salesforce, or host systems. figure 1: visual designer of talend’s esb fuse ide the fuse ide is an eclipse plugin, which is installed from the eclipse update site. the visual designer (see figure 2) generates camel routes as xml code using the spring xml dsl. the generated code is editable vice-versa, i.e. the developer can change the source code. the graphical model applies changes automatically. fuse ide is intuitive to use for creating camel routes. fusesource offers some other products, which can be used in combination with fuse ide – such as management console or fuse mq for messaging. under fusesource, fuse ide was a proprietary product. however, fusesource was recently taken over by redhat (http://www.redhat.com/about/news/press-archive/2012/6/red-hat-to-acquire-fusesource) and now belongs to the jboss division. in the new roadmap, the fuse ide is still included. it will probably be integrated into the jboss enterprise soa platform and become “open sourced”. the integration of fusesource will take at least a few more months time to complete (http://www.redhat.com/promo/jboss_integration_week/). jboss now “owns” three esb products (jboss esb, switchyard and fuse esb). probably, these will be merged into one product in the end (switchyard is also based on camel). nevertheless, the fusesource products will also be supported for some time – primarily in order to satisfy existing customers (my guess). figure 2: visual designer of fuse ide (jboss, former fusesource) enterprise-ready tooling is already available for apache camel! the bottom line is that enterprise-ready tooling is already available for apache camel. it is great to see different companies working on tooling for apache camel. the winner definitely is apache camel… and there is no loser! talend esb and fuse ide are two different approaches for different kinds of projects. if you like the „zero-coding“ approach, then take a closer look at talend’s esb. it is really easy and efficient to realize integration projects without writing source code – nevertheless, there is enough flexibility for customization and adding own source code. the combination with bpm, mdm or big data (based on hadoop) is also supported within the unified platform using the same open source and „zero-coding“ concepts. if you „insist“ on writing and refactoring all source code by yourself within the text editor of an ide, then take a look at fuse ide. your best would be to try out both and see which one fits best into your next enterprise integration project. if you know any other cool camel tooling (no matter if it is enterprise-ready or not), or if you have any other feedback, please write a comment. thank you. best regards, kai wähner (twitter: @kaiwaehner) content from my blog: http://www.kai-waehner.de/blog/2012/11/23/enterprise-ready-tool-support-for-apache-camel/
November 26, 2012
by Kai Wähner DZone Core CORE
· 15,517 Views
article thumbnail
Enabling JMX Monitoring for Hadoop & Hive
Hadoop’s NameNode and JobTracker expose interesting metrics and statistics over the JMX. Hive seems not to expose anything intersting but it still might be useful to monitor its JVM or do simpler profiling/sampling on it. Let’s see how to enable JMX and how to access it securely, over SSH. Background: We run NameNode, JobTracker and Hive on the same server. Monitoring og TaskTrackers and DataNodes isn’t that interesting but still might be useful to have. Configuration /etc/hadoop/hadoop-env.sh diff --git a/etc/hadoop/hadoop-env.sh b/etc/hadoop/hadoop-env.sh index 69a13b1..e8ca596 100644 --- a/etc/hadoop/hadoop-env.sh +++ b/etc/hadoop/hadoop-env.sh @@ -14,7 +14,8 @@ export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-"/etc/hadoop"} #export HADOOP_NAMENODE_INIT_HEAPSIZE="" # Extra Java runtime options. Empty by default. -export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true $HADOOP_CLIENT_OPTS" +# Added $HIVE_OPTS that is set by hive-env.sh when starting hiveserver +export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true $HADOOP_CLIENT_OPTS $HIVE_OPTS" # Command specific options appended to HADOOP_OPTS when specified export HADOOP_NAMENODE_OPTS="-Dhadoop.security.logger=INFO,DRFAS -Dhdfs.audit.logger=INFO,DRFAAUDIT $HADOOP_NAMENODE_OPTS" @@ -43,3 +44,16 @@ export HADOOP_SECURE_DN_PID_DIR=/var/run/hadoop # A string representing this instance of hadoop. $USER by default. export HADOOP_IDENT_STRING=$USER + +### JMX settings +export JMX_OPTS=" -Dcom.sun.management.jmxremote.authenticate=false \ + -Dcom.sun.management.jmxremote.ssl=false \ + -Dcom.sun.management.jmxremote.port" +# -Dcom.sun.management.jmxremote.password.file=$HADOOP_HOME/conf/jmxremote.password \ +# -Dcom.sun.management.jmxremote.access.file=$HADOOP_HOME/conf/jmxremote.access" +export HADOOP_NAMENODE_OPTS="$JMX_OPTS=8006 $HADOOP_NAMENODE_OPTS" +export HADOOP_SECONDARYNAMENODE_OPTS="$HADOOP_SECONDARYNAMENODE_OPTS" +export HADOOP_DATANODE_OPTS="$JMX_OPTS=8006 $HADOOP_DATANODE_OPTS" +export HADOOP_BALANCER_OPTS="$HADOOP_BALANCER_OPTS" +export HADOOP_JOBTRACKER_OPTS="$JMX_OPTS=8007 $HADOOP_JOBTRACKER_OPTS" +export HADOOP_TASKTRACKER_OPTS="$JMX_OPTS=8007 $HADOOP_TASKTRACKER_OPTS" The JMX setting is used for Hadoop’s daemons while the HIVE_OPTS was added for Hive. /conf/hive-env.sh Enable JMX when running the Hive thrift server (we don’t want it when running the command-line client etc. since it’s pointless and we wouldn’t need to make sure that each of them has a unique port): if [ "$SERVICE" = "hiveserver" ]; then JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=8008" export HIVE_OPTS="$HIVE_OPTS $JMX_OPTS" fi Pitfalls When you start Hive server via hive –service hiveserver then it actually executes “hadoop jar …” so to be able to pass options from hive-env.sh to the JVM we had to add $HIVE_OPTS in hadoop-env.sh. (I haven’t found a cleaner way to do it.) Effects When we now start Hive or any of the Hadoop daemons, they will expose their metrics at their respective ports (NameNode – 8006, JobTracker – 8007, Hive – 8008). (If you are running DataNode and/or TaskTracker on the same machine then you’ll need to change their ports to be unique.) Secure Connection Over SSH Read the post VisualVM: Monitoring Remote JVM Over SSH (JMX Or Not) to find out how to connect securely to the JMX ports over ssh, f.ex. with VisualVM (spolier: ssh -D 9696 hostname; use proxy at localhost:9696).
September 25, 2012
by Jakub Holý
· 15,100 Views
article thumbnail
The Difference Between 'Hadoop DFS' and 'Hadoop FS'
While exploring HDFS, I came across these two syntaxes for querying HDFS: > hadoop dfs > hadoop fs Initally I couldn't differentiate between the two, and kept wondering why we have two different syntaxes for a common purpose. I found a number of people online with the same question -- their thoughts are below: Per Chris's explanation: it seems like there's no difference between the two syntaxes. If we look at the definitions of the two commands (hadoop fs and hadoop dfs) in $HADOOP_HOME/bin/hadoop ... elif [ "$COMMAND" = "datanode" ] ; then CLASS='org.apache.hadoop.hdfs.server.datanode.DataNode' HADOOP_OPTS="$HADOOP_OPTS $HADOOP_DATANODE_OPTS" elif [ "$COMMAND" = "fs" ] ; then CLASS=org.apache.hadoop.fs.FsShell HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" elif [ "$COMMAND" = "dfs" ] ; then CLASS=org.apache.hadoop.fs.FsShell HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" elif [ "$COMMAND" = "dfsadmin" ] ; then CLASS=org.apache.hadoop.hdfs.tools.DFSAdmin HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" ... That was his reasoning. Unconvinced, I kept looking for a more persuasive answer, and these excerpts made more sense to me: FS relates to a generic file system which can point to any file systems like local, HDFS etc. But dfs is very specific to HDFS. So when we use FS it can perform operation with from/to local or hadoop distributed file system to destination. But specifying DFS operation relates to HDFS. Below are two excerpts from the Hadoop documentation that describe these two as different shells. FS Shell The FileSystem (FS) shell is invoked by bin/hadoop fs. All the FS shell commands take path URIs as arguments. The URI format is scheme://autority/path. For HDFS the scheme is hdfs, and for the local filesystem the scheme is file. The scheme and authority are optional. If not specified, the default scheme specified in the configuration is used. An HDFS file or directory such as /parent/child can be specified as hdfs://namenodehost/parent/child or simply as /parent/child (given that your configuration is set to point to hdfs://namenodehost). Most of the commands in FS shell behave like corresponding Unix commands. DFShell The HDFS shell is invoked by bin/hadoop dfs. All the HDFS shell commands take path URIs as arguments. The URI format is scheme://autority/path. For HDFS the scheme is hdfs, and for the local filesystem the scheme is file. The scheme and authority are optional. If not specified, the default scheme specified in the configuration is used. An HDFS file or directory such as /parent/child can be specified as hdfs://namenode:namenodeport/parent/child or simply as /parent/child (given that your configuration is set to point to namenode:namenodeport). Most of the commands in HDFS shell behave like corresponding Unix commands. So, based on the above, we can conclude that it all depends on the scheme configuration. When using these two commands with absolute URI (i.e. scheme://a/b) the behavior shall be identical. Only it's the default configured scheme value for file and hdfs for fs and dfs respectively, which is the cause for difference in behavior.
September 14, 2012
by Abhishek Jain
· 45,307 Views
article thumbnail
Your First Hadoop MapReduce Job
Hadoop MapReduce is a YARN-based system for parallel processing of large data sets. In this article, learn to quickly start writing the simplest MapReduce job.
September 12, 2012
by Amresh Singh
· 19,614 Views
article thumbnail
Hadoop Hive Web Interface
I’ve been playing with Hive recently and liking what I’ve found. In theory at least it provides a very nice, simple way of getting into analysing large data sets. To make it even easier to show other people what you’re up to Hive has a nascent web interface with a little documentation on the wiki On the one hand it’s rather simple at this point, but that should be easily enought to prettify given a bit of time. The bigger problem was getting it working in the first place. What follows worked for me using the latest cloudera packages on debian testing. I’m assuming you already have Hive and Hadoop installed, the basic packages worked fine for me here. Next up you’ll need the JDK (not just the JRE) as their is some compilation that will go on the first time you run the web interface. apt-get install ant sun-java6-jdk Next up I had to modify the installed /etc/hive/conf/hive-site.xml file as follows: I changed this: hive.metastore.uris file:///var/lib/hivevar/metastore/metadb/ Comma separated list of URIs of metastore servers. The first server that can be connected to will be used. To this. Note the hivevar path doesn’t exist so I’m not sure if this was a typo in the source. hive.metastore.uris file:///var/lib/hive/var/metastore/metadb/ Comma separated list of URIs of metastore servers. The first server that can be connected to will be used. I also change the following section regarding the metastore name: javax.jdo.option.ConnectionURL jdbc:derby:;databaseName=/var/lib/hive/metastore/${user.name}_db;create=true JDBC connect string for a JDBC metastore To this, with a fixed name. When using the above confirguration the file was actually called ${user.name} rather than my username being subsituted in. Elsewhere this seems to work fine. javax.jdo.option.ConnectionURL jdbc:derby:;databaseName=/var/lib/hive/metastore/metastore_db;create=true JDBC connect string for a JDBC metastore I’m not convinced the above two changes are needed but have left them here just in case. The main tricky part is making sure a load of environment variables are correctly set. The following worked for me: export ANT_LIB=/usr/share/ant/lib export HIVE_HOME=/usr/lib/hive export HADOOP_HOME=/usr/lib/hadoop export PATH=$PATH:$HADOOP_HOME/bin export JAVA_HOME=/usr/lib/jvm/java-6-sun All being well that should allow you to run the hive command with the web interface like so: hive --service hwi That should bring up a webserver on port 9999 where you should see something similar to the screenshot above.
July 25, 2012
by Gareth Rushgrove
· 16,783 Views · 1 Like
article thumbnail
Spring Data - Apache Hadoop
Spring for Apache Hadoop is a Spring project to support writing applications that can benefit of the integration of Spring Framework and Hadoop. This post describes how to use Spring Data Apache Hadoop in an Amazon EC2 environment using the “Hello World” equivalent of Hadoop programming – a Wordcount application. 1./ Launch an Amazon Web Services EC2 instance. - Navigate to AWS EC2 Console (“https://console.aws.amazon.com/ec2/home”): - Select Launch Instance then Classic Wizzard and click on Continue. My test environment was a “Basic Amazon Linux AMI 2011.09″ 32-bit., Instant type: Micro (t1.micro , 613 MB), Security group quick-start-1 that enables ssh to be used for login. Select your existing key pair (or create a new one). Obviously you can select another AMI and instance types depending on your favourite flavour. (Should you vote for Windows 2008 based instance, you also need to have cygwin installed as an additional Hadoop prerequisite beside Java JDK and ssh, see “Install Apache Hadoop” section) 2./ Download Apache Hadoop - as of writing this article, 1.0.0 is the latest stable version of Apache Hadoop, that is what was used for testing purposes. I downloaded hadoop-1.0.0.tar.gz and copied it into /home/ec2-user directory using pscp command from my PC running Windows: c:\downloads>pscp -i mykey.ppk hadoop-1.0.0.tar.gz [email protected]:/home/ec2-user (the computer name above – ec2-ipaddress-region-compute.amazonaws.com – can be found on AWS EC2 console, Instance Description, public DNS field) 3./ Install Apache Hadoop: As prerequisites, you need to have Java JDK 1.6 and ssh installed, see Apache Single-Node Setup Guide. (ssh is automatically installed with Basic Amazon AMI). Then install hadoop itself: $ cd ~ # change directory to ec2-user home (/home/ec2-user) $ tar xvzf hadoop-1.0.0.tar.gz $ ln -s hadoop-1.0.0 hadoop $ cd hadoop/conf $ vi hadoop-env.sh # edit as below export JAVA_HOME=/opt/jdk1.6.0_29 $ vi core-site.xml # edit as below – this defines the namenode to be running on localhost and listeing to port 9000. fs.default.name hdfs://localhost:9000 $ vi hdsf-site.xml # edit as below this defines that file system replicate is 1 (in production environment it is supposed to be 3 by default) dfs.replication 1 $ vi mapred-site.xml # edit as below – this defines the jobtracker to be running on localhost and listeing to port 9001. mapred.job.tracker localhost:9001 $ cd ~/hadoop $ bin/hadoop namenode -format $ bin/start-all.sh At this stage all hadoop jobs are running in pseudo distributed mode, you can verify it by running: $ ps -ef | grep java You should see 5 java processes: namenode, secondarynamenode, datanode, jobtracker and tasktracker. 4./ Install Spring Data Hadoop Download Spring Data Hadoop package from SpringSource community download site. As of writing this article, the latest stable version is spring-data-hadoop-1.0.0.M1.zip. $ cd ~ $ tar xzvf spring-data-hadoop-1.0.0.M1.zip $ ln -s spring-data-hadoop-1.0.0.M1 spring-data-hadoop 5./ Build and Run Spring Data Hadoop Wordcount example $ cd spring-data-hadoop/spring-data-hadoop-1.0.0.M1/samples/wordcount Spring Data Hadoop is using gradle as build tool. Check build.grandle build file. The original version packaged in the tar.gz file does not compile, it complains about thrift, version 0.2.0 and jdo2-api, version2.3-ec. Add datanucleus.org maven repository to the build.gradle file to support jdo2-api (http://www.datanucleus.org/downloads/maven2/) . Unfortunatelly, there seems to be no maven repo for thrift 0.2.0 . You should download thrift 0.2.0.jar and thrift.0.2.0.pom file e.g. from this repo: “http://people.apache.org/~rawson/repo“ and then add it to local maven repo. $ mvn install:install-file -DgroupId=org.apache.thrift -DartifactId=thrift -Dversion=0.2.0 -Dfile=thrift-0.2.0.jar -Dpackaging=jar $ vi build.grandle # modify the build file to refer to datanucleus maven repo for jdo2-api and the local repo for thrift repositories { // Public Spring artefacts mavenCentral() maven { url “http://repo.springsource.org/libs-release” } maven { url “http://repo.springsource.org/libs-milestone” } maven { url “http://repo.springsource.org/libs-snapshot” } maven { url “http://www.datanucleus.org/downloads/maven2/” } maven { url “file:///home/ec2-user/.m2/repository” } } I also modified the META-INF/spring/context.xml file in order to run hadoop file system commands manually: $ cd /home/ec2-user/spring-data-hadoop/spring-data-hadoop-1.0.0.M1/samples/wordcount/src/main/resources $vi META-INF/spring/context.xml # remove clean-script and also the dependency on it for JobRunner. xmlns=”http://www.springframework.org/schema/beans” xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance” xmlns:context=”http://www.springframework.org/schema/context” xmlns:hdp=”http://www.springframework.org/schema/hadoop” xmlns:p=”http://www.springframework.org/schema/p” xsi:schemaLocation=”http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd”> fs.default.name=${hd.fs} Copy the sample file – nietzsche-chapter-1.txt – to Hadoop file system (/user/ec2-user-/input directory) $ cd src/main/resources/data $ hadoop fs -mkdir /user/ec2-user/input $ hadoop fs -put nietzsche-chapter-1.txt /user/ec2-user/input/data $ cd ../../../.. # go back to samples/wordcount directory $ ../gradlew Verify the result: $ hadoop fs -cat /user/ec2-user/output/part-r-00000 | more “AWAY 1 “BY 1 “Beyond 1 “By 2 “Cheers 1 “DE 1 “Everywhere 1 “FROM” 1 “Flatterers 1 “Freedom 1
July 19, 2012
by Istvan Szegedi
· 11,893 Views
article thumbnail
My Experience Moving Data from MySQL to Cassandra
I had a relational database, that I wanted to migrate to cassandra. Cassandra's sstableloader provides option to load the existing data from flat files to a cassandra ring. Hence this can be used as a way to migrate data in relational databases to cassandra, as most relational databases let us export the data into flat files. sqoop gives the option to do this effectively. Interestingly, DataStax Enterprise provides everything we want in the big data space as a package. This includes, cassandra, hadoop, hive, pig, sqoop, and mahout, which comes handy in this case. Under the resources directory, you may find the cassandra, dse, hadoop, hive, log4j-appender, mahout, pig, solr, sqoop, and tomcat specific configurations. For example, from resources/hadoop/bin, you may format the hadoop name node using ./hadoop namenode -format as usual. * Download and extract DataStax Enterprise binary archive (dse-2.1-bin.tar.gz). * Follow the documentation, which is also available as a PDF. * Migrating a relational database to cassandra is documented and is also blogged. * Before starting DataStax, make sure that the JAVA_HOME is set. This also can be set directly on conf/hadoop-env.sh. * Include the connector to the relational database into a location reachable by sqoop. I put mysql-connector-java-5.1.12-bin.jar under resources/sqoop. * Set the environment $ bin/dse-env.sh * Start DataStax Enterprise, as an Analytics node. $ sudo bin/dse cassandra -t where cassandra starts the Cassandra process plus CassandraFS and the -t option starts the Hadoop JobTracker and TaskTracker processes. if you start without the -t flag, the below exception will be thrown during the further operations that are discussed below. No jobtracker found Unable to run : jobtracker not found Hence do not miss the -t flag. * Start cassandra cli to view the cassandra keyrings and you will be able to view the data in cassandra, once you migrate using sqoop as given below. $ bin/cassandra-cli -host localhost -port 9160 Confirm that it is connected to the test cluster that is created on the port 9160, by the below from the CLI. [default@unknown] describe cluster; Cluster Information: Snitch: com.datastax.bdp.snitch.DseDelegateSnitch Partitioner: org.apache.cassandra.dht.RandomPartitioner Schema versions: f5a19a50-b616-11e1-0000-45b29245ddff: [127.0.1.1] If you have missed mentioning the host/port (starting the cli by just bin/cassandra-cli) or given it wrong, you will get the response as "Not connected to a cassandra instance." $ bin/dse sqoop import --connect jdbc:mysql://127.0.0.1:3306/shopping_cart_db --username root --password root --table Category --split-by categoryName --cassandra-keyspace shopping_cart_db --cassandra-column-family Category_cf --cassandra-row-key categoryName --cassandra-thrift-host localhost --cassandra-create-schema Above command will now migrate the table "Category" in the shopping_cart_db with the primary key categoryName, into a cassandra keyspace named shopping_cart, with the cassandra row key categoryName. You may use the --direct mysql specific option, which is faster. In my above command, I have everything runs on localhost. +--------------+-------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +--------------+-------------+------+-----+---------+-------+ | categoryName | varchar(50) | NO | PRI | NULL | | | description | text | YES | | NULL | | | image | blob | YES | | NULL | | +--------------+-------------+------+-----+---------+-------+ This also creates the respective java class (Category.java), inside the working directory. To import all the tables in the database, instead of a single table. $ bin/dse sqoop import-all-tables -m 1 --connect jdbc:mysql://127.0.0.1:3306/shopping_cart_db --username root --password root --cassandra-thrift-host localhost --cassandra-create-schema --direct Here "-m 1" tag ensures a sequential import. If not specified, the below exception will be thrown. ERROR tool.ImportAllTablesTool: Error during import: No primary key could be found for table Category. Please specify one with --split-by or perform a sequential import with '-m 1'. To check whether the keyspace is created, [default@unknown] show keyspaces; ................ Keyspace: shopping_cart_db: Replication Strategy: org.apache.cassandra.locator.SimpleStrategy Durable Writes: true Options: [replication_factor:1] Column Families: ColumnFamily: Category_cf Key Validation Class: org.apache.cassandra.db.marshal.UTF8Type Default column value validator: org.apache.cassandra.db.marshal.UTF8Type Columns sorted by: org.apache.cassandra.db.marshal.UTF8Type Row cache size / save period in seconds / keys to save : 0.0/0/all Row Cache Provider: org.apache.cassandra.cache.SerializingCacheProvider Key cache size / save period in seconds: 200000.0/14400 GC grace seconds: 864000 Compaction min/max thresholds: 4/32 Read repair chance: 1.0 Replicate on write: true Bloom Filter FP chance: default Built indexes: [] Compaction Strategy: org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy ............. [default@unknown] describe shopping_cart_db; Keyspace: shopping_cart_db: Replication Strategy: org.apache.cassandra.locator.SimpleStrategy Durable Writes: true Options: [replication_factor:1] Column Families: ColumnFamily: Category_cf Key Validation Class: org.apache.cassandra.db.marshal.UTF8Type Default column value validator: org.apache.cassandra.db.marshal.UTF8Type Columns sorted by: org.apache.cassandra.db.marshal.UTF8Type Row cache size / save period in seconds / keys to save : 0.0/0/all Row Cache Provider: org.apache.cassandra.cache.SerializingCacheProvider Key cache size / save period in seconds: 200000.0/14400 GC grace seconds: 864000 Compaction min/max thresholds: 4/32 Read repair chance: 1.0 Replicate on write: true Bloom Filter FP chance: default Built indexes: [] Compaction Strategy: org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy You may also use hive to view the databases created in cassandra, in an sql-like manner. * Start Hive $ bin/dse hive hive> show databases; OK default shopping_cart_db When the entire database is imported as above, separate java classes will be created for each of the tables. $ bin/dse sqoop import-all-tables -m 1 --connect jdbc:mysql://127.0.0.1:3306/shopping_cart_db --username root --password root --cassandra-thrift-host localhost --cassandra-create-schema --direct 12/06/15 15:42:11 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead. 12/06/15 15:42:11 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset. 12/06/15 15:42:11 INFO tool.CodeGenTool: Beginning code generation 12/06/15 15:42:11 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `Category` AS t LIMIT 1 12/06/15 15:42:11 INFO orm.CompilationManager: HADOOP_HOME is /home/pradeeban/programs/dse-2.1/resources/hadoop/bin/.. Note: /tmp/sqoop-pradeeban/compile/926ddf787c73be06c4e2ad1f8fc522f1/Category.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 12/06/15 15:42:13 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-pradeeban/compile/926ddf787c73be06c4e2ad1f8fc522f1/Category.jar 12/06/15 15:42:13 INFO manager.DirectMySQLManager: Beginning mysqldump fast path import 12/06/15 15:42:13 INFO mapreduce.ImportJobBase: Beginning import of Category 12/06/15 15:42:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 12/06/15 15:42:15 INFO mapred.JobClient: Running job: job_201206151241_0007 12/06/15 15:42:16 INFO mapred.JobClient: map 0% reduce 0% 12/06/15 15:42:25 INFO mapred.JobClient: map 100% reduce 0% 12/06/15 15:42:25 INFO mapred.JobClient: Job complete: job_201206151241_0007 12/06/15 15:42:25 INFO mapred.JobClient: Counters: 18 12/06/15 15:42:25 INFO mapred.JobClient: Job Counters 12/06/15 15:42:25 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=6480 12/06/15 15:42:25 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 12/06/15 15:42:25 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 12/06/15 15:42:25 INFO mapred.JobClient: Launched map tasks=1 12/06/15 15:42:25 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0 12/06/15 15:42:25 INFO mapred.JobClient: File Output Format Counters 12/06/15 15:42:25 INFO mapred.JobClient: Bytes Written=2848 12/06/15 15:42:25 INFO mapred.JobClient: FileSystemCounters 12/06/15 15:42:25 INFO mapred.JobClient: FILE_BYTES_WRITTEN=21419 12/06/15 15:42:25 INFO mapred.JobClient: CFS_BYTES_WRITTEN=2848 12/06/15 15:42:25 INFO mapred.JobClient: CFS_BYTES_READ=87 12/06/15 15:42:25 INFO mapred.JobClient: File Input Format Counters 12/06/15 15:42:25 INFO mapred.JobClient: Bytes Read=0 12/06/15 15:42:25 INFO mapred.JobClient: Map-Reduce Framework 12/06/15 15:42:25 INFO mapred.JobClient: Map input records=1 12/06/15 15:42:25 INFO mapred.JobClient: Physical memory (bytes) snapshot=119435264 12/06/15 15:42:25 INFO mapred.JobClient: Spilled Records=0 12/06/15 15:42:25 INFO mapred.JobClient: CPU time spent (ms)=630 12/06/15 15:42:25 INFO mapred.JobClient: Total committed heap usage (bytes)=121241600 12/06/15 15:42:25 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2085318656 12/06/15 15:42:25 INFO mapred.JobClient: Map output records=36 12/06/15 15:42:25 INFO mapred.JobClient: SPLIT_RAW_BYTES=87 12/06/15 15:42:25 INFO mapreduce.ImportJobBase: Transferred 0 bytes in 11.4492 seconds (0 bytes/sec) 12/06/15 15:42:25 INFO mapreduce.ImportJobBase: Retrieved 36 records. 12/06/15 15:42:25 INFO tool.CodeGenTool: Beginning code generation 12/06/15 15:42:25 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `Customer` AS t LIMIT 1 12/06/15 15:42:25 INFO orm.CompilationManager: HADOOP_HOME is /home/pradeeban/programs/dse-2.1/resources/hadoop/bin/.. Note: /tmp/sqoop-pradeeban/compile/926ddf787c73be06c4e2ad1f8fc522f1/Customer.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 12/06/15 15:42:25 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-pradeeban/compile/926ddf787c73be06c4e2ad1f8fc522f1/Customer.jar 12/06/15 15:42:26 INFO manager.DirectMySQLManager: Beginning mysqldump fast path import 12/06/15 15:42:26 INFO mapreduce.ImportJobBase: Beginning import of Customer 12/06/15 15:42:26 INFO mapred.JobClient: Running job: job_201206151241_0008 12/06/15 15:42:27 INFO mapred.JobClient: map 0% reduce 0% 12/06/15 15:42:35 INFO mapred.JobClient: map 100% reduce 0% 12/06/15 15:42:35 INFO mapred.JobClient: Job complete: job_201206151241_0008 12/06/15 15:42:35 INFO mapred.JobClient: Counters: 17 12/06/15 15:42:35 INFO mapred.JobClient: Job Counters 12/06/15 15:42:35 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=6009 12/06/15 15:42:35 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 12/06/15 15:42:35 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 12/06/15 15:42:35 INFO mapred.JobClient: Launched map tasks=1 12/06/15 15:42:35 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0 12/06/15 15:42:35 INFO mapred.JobClient: File Output Format Counters 12/06/15 15:42:35 INFO mapred.JobClient: Bytes Written=0 12/06/15 15:42:35 INFO mapred.JobClient: FileSystemCounters 12/06/15 15:42:35 INFO mapred.JobClient: FILE_BYTES_WRITTEN=21489 12/06/15 15:42:35 INFO mapred.JobClient: CFS_BYTES_READ=87 12/06/15 15:42:35 INFO mapred.JobClient: File Input Format Counters 12/06/15 15:42:35 INFO mapred.JobClient: Bytes Read=0 12/06/15 15:42:35 INFO mapred.JobClient: Map-Reduce Framework 12/06/15 15:42:35 INFO mapred.JobClient: Map input records=1 12/06/15 15:42:35 INFO mapred.JobClient: Physical memory (bytes) snapshot=164855808 12/06/15 15:42:35 INFO mapred.JobClient: Spilled Records=0 12/06/15 15:42:35 INFO mapred.JobClient: CPU time spent (ms)=510 12/06/15 15:42:35 INFO mapred.JobClient: Total committed heap usage (bytes)=121241600 12/06/15 15:42:35 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2082869248 12/06/15 15:42:35 INFO mapred.JobClient: Map output records=0 12/06/15 15:42:35 INFO mapred.JobClient: SPLIT_RAW_BYTES=87 12/06/15 15:42:35 INFO mapreduce.ImportJobBase: Transferred 0 bytes in 9.3143 seconds (0 bytes/sec) 12/06/15 15:42:35 INFO mapreduce.ImportJobBase: Retrieved 0 records. 12/06/15 15:42:35 INFO tool.CodeGenTool: Beginning code generation 12/06/15 15:42:35 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `OrderEntry` AS t LIMIT 1 12/06/15 15:42:35 INFO orm.CompilationManager: HADOOP_HOME is /home/pradeeban/programs/dse-2.1/resources/hadoop/bin/.. Note: /tmp/sqoop-pradeeban/compile/926ddf787c73be06c4e2ad1f8fc522f1/OrderEntry.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 12/06/15 15:42:35 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-pradeeban/compile/926ddf787c73be06c4e2ad1f8fc522f1/OrderEntry.jar 12/06/15 15:42:36 INFO manager.DirectMySQLManager: Beginning mysqldump fast path import 12/06/15 15:42:36 INFO mapreduce.ImportJobBase: Beginning import of OrderEntry 12/06/15 15:42:36 INFO mapred.JobClient: Running job: job_201206151241_0009 12/06/15 15:42:37 INFO mapred.JobClient: map 0% reduce 0% 12/06/15 15:42:45 INFO mapred.JobClient: map 100% reduce 0% 12/06/15 15:42:45 INFO mapred.JobClient: Job complete: job_201206151241_0009 12/06/15 15:42:45 INFO mapred.JobClient: Counters: 17 12/06/15 15:42:45 INFO mapred.JobClient: Job Counters 12/06/15 15:42:45 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=6381 12/06/15 15:42:45 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 12/06/15 15:42:45 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 12/06/15 15:42:45 INFO mapred.JobClient: Launched map tasks=1 12/06/15 15:42:45 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0 12/06/15 15:42:45 INFO mapred.JobClient: File Output Format Counters 12/06/15 15:42:45 INFO mapred.JobClient: Bytes Written=0 12/06/15 15:42:45 INFO mapred.JobClient: FileSystemCounters 12/06/15 15:42:45 INFO mapred.JobClient: FILE_BYTES_WRITTEN=21569 12/06/15 15:42:45 INFO mapred.JobClient: CFS_BYTES_READ=87 12/06/15 15:42:45 INFO mapred.JobClient: File Input Format Counters 12/06/15 15:42:45 INFO mapred.JobClient: Bytes Read=0 12/06/15 15:42:45 INFO mapred.JobClient: Map-Reduce Framework 12/06/15 15:42:45 INFO mapred.JobClient: Map input records=1 12/06/15 15:42:45 INFO mapred.JobClient: Physical memory (bytes) snapshot=137252864 12/06/15 15:42:45 INFO mapred.JobClient: Spilled Records=0 12/06/15 15:42:45 INFO mapred.JobClient: CPU time spent (ms)=520 12/06/15 15:42:45 INFO mapred.JobClient: Total committed heap usage (bytes)=121241600 12/06/15 15:42:45 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2014703616 12/06/15 15:42:45 INFO mapred.JobClient: Map output records=0 12/06/15 15:42:45 INFO mapred.JobClient: SPLIT_RAW_BYTES=87 12/06/15 15:42:45 INFO mapreduce.ImportJobBase: Transferred 0 bytes in 9.2859 seconds (0 bytes/sec) 12/06/15 15:42:45 INFO mapreduce.ImportJobBase: Retrieved 0 records. 12/06/15 15:42:45 INFO tool.CodeGenTool: Beginning code generation 12/06/15 15:42:45 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `OrderItem` AS t LIMIT 1 12/06/15 15:42:45 INFO orm.CompilationManager: HADOOP_HOME is /home/pradeeban/programs/dse-2.1/resources/hadoop/bin/.. Note: /tmp/sqoop-pradeeban/compile/926ddf787c73be06c4e2ad1f8fc522f1/OrderItem.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 12/06/15 15:42:45 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-pradeeban/compile/926ddf787c73be06c4e2ad1f8fc522f1/OrderItem.jar 12/06/15 15:42:46 WARN manager.CatalogQueryManager: The table OrderItem contains a multi-column primary key. Sqoop will default to the column orderNumber only for this job. 12/06/15 15:42:46 INFO manager.DirectMySQLManager: Beginning mysqldump fast path import 12/06/15 15:42:46 INFO mapreduce.ImportJobBase: Beginning import of OrderItem 12/06/15 15:42:46 INFO mapred.JobClient: Running job: job_201206151241_0010 12/06/15 15:42:47 INFO mapred.JobClient: map 0% reduce 0% 12/06/15 15:42:55 INFO mapred.JobClient: map 100% reduce 0% 12/06/15 15:42:55 INFO mapred.JobClient: Job complete: job_201206151241_0010 12/06/15 15:42:55 INFO mapred.JobClient: Counters: 17 12/06/15 15:42:55 INFO mapred.JobClient: Job Counters 12/06/15 15:42:55 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=5949 12/06/15 15:42:55 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 12/06/15 15:42:55 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 12/06/15 15:42:55 INFO mapred.JobClient: Launched map tasks=1 12/06/15 15:42:55 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0 12/06/15 15:42:55 INFO mapred.JobClient: File Output Format Counters 12/06/15 15:42:55 INFO mapred.JobClient: Bytes Written=0 12/06/15 15:42:55 INFO mapred.JobClient: FileSystemCounters 12/06/15 15:42:55 INFO mapred.JobClient: FILE_BYTES_WRITTEN=21524 12/06/15 15:42:55 INFO mapred.JobClient: CFS_BYTES_READ=87 12/06/15 15:42:55 INFO mapred.JobClient: File Input Format Counters 12/06/15 15:42:55 INFO mapred.JobClient: Bytes Read=0 12/06/15 15:42:55 INFO mapred.JobClient: Map-Reduce Framework 12/06/15 15:42:55 INFO mapred.JobClient: Map input records=1 12/06/15 15:42:55 INFO mapred.JobClient: Physical memory (bytes) snapshot=116674560 12/06/15 15:42:55 INFO mapred.JobClient: Spilled Records=0 12/06/15 15:42:55 INFO mapred.JobClient: CPU time spent (ms)=590 12/06/15 15:42:55 INFO mapred.JobClient: Total committed heap usage (bytes)=121241600 12/06/15 15:42:55 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2014703616 12/06/15 15:42:55 INFO mapred.JobClient: Map output records=0 12/06/15 15:42:55 INFO mapred.JobClient: SPLIT_RAW_BYTES=87 12/06/15 15:42:55 INFO mapreduce.ImportJobBase: Transferred 0 bytes in 9.2539 seconds (0 bytes/sec) 12/06/15 15:42:55 INFO mapreduce.ImportJobBase: Retrieved 0 records. 12/06/15 15:42:55 INFO tool.CodeGenTool: Beginning code generation 12/06/15 15:42:55 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `Payment` AS t LIMIT 1 12/06/15 15:42:55 INFO orm.CompilationManager: HADOOP_HOME is /home/pradeeban/programs/dse-2.1/resources/hadoop/bin/.. Note: /tmp/sqoop-pradeeban/compile/926ddf787c73be06c4e2ad1f8fc522f1/Payment.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 12/06/15 15:42:55 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-pradeeban/compile/926ddf787c73be06c4e2ad1f8fc522f1/Payment.jar 12/06/15 15:42:56 WARN manager.CatalogQueryManager: The table Payment contains a multi-column primary key. Sqoop will default to the column orderNumber only for this job. 12/06/15 15:42:56 INFO manager.DirectMySQLManager: Beginning mysqldump fast path import 12/06/15 15:42:56 INFO mapreduce.ImportJobBase: Beginning import of Payment 12/06/15 15:42:56 INFO mapred.JobClient: Running job: job_201206151241_0011 12/06/15 15:42:57 INFO mapred.JobClient: map 0% reduce 0% 12/06/15 15:43:05 INFO mapred.JobClient: map 100% reduce 0% 12/06/15 15:43:05 INFO mapred.JobClient: Job complete: job_201206151241_0011 12/06/15 15:43:05 INFO mapred.JobClient: Counters: 17 12/06/15 15:43:05 INFO mapred.JobClient: Job Counters 12/06/15 15:43:05 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=5914 12/06/15 15:43:05 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 12/06/15 15:43:05 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 12/06/15 15:43:05 INFO mapred.JobClient: Launched map tasks=1 12/06/15 15:43:05 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0 12/06/15 15:43:05 INFO mapred.JobClient: File Output Format Counters 12/06/15 15:43:05 INFO mapred.JobClient: Bytes Written=0 12/06/15 15:43:05 INFO mapred.JobClient: FileSystemCounters 12/06/15 15:43:05 INFO mapred.JobClient: FILE_BYTES_WRITTEN=21518 12/06/15 15:43:05 INFO mapred.JobClient: CFS_BYTES_READ=87 12/06/15 15:43:05 INFO mapred.JobClient: File Input Format Counters 12/06/15 15:43:05 INFO mapred.JobClient: Bytes Read=0 12/06/15 15:43:05 INFO mapred.JobClient: Map-Reduce Framework 12/06/15 15:43:05 INFO mapred.JobClient: Map input records=1 12/06/15 15:43:05 INFO mapred.JobClient: Physical memory (bytes) snapshot=137998336 12/06/15 15:43:05 INFO mapred.JobClient: Spilled Records=0 12/06/15 15:43:05 INFO mapred.JobClient: CPU time spent (ms)=520 12/06/15 15:43:05 INFO mapred.JobClient: Total committed heap usage (bytes)=121241600 12/06/15 15:43:05 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2082865152 12/06/15 15:43:05 INFO mapred.JobClient: Map output records=0 12/06/15 15:43:05 INFO mapred.JobClient: SPLIT_RAW_BYTES=87 12/06/15 15:43:05 INFO mapreduce.ImportJobBase: Transferred 0 bytes in 9.2642 seconds (0 bytes/sec) 12/06/15 15:43:05 INFO mapreduce.ImportJobBase: Retrieved 0 records. 12/06/15 15:43:05 INFO tool.CodeGenTool: Beginning code generation 12/06/15 15:43:05 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `Product` AS t LIMIT 1 12/06/15 15:43:06 INFO orm.CompilationManager: HADOOP_HOME is /home/pradeeban/programs/dse-2.1/resources/hadoop/bin/.. Note: /tmp/sqoop-pradeeban/compile/926ddf787c73be06c4e2ad1f8fc522f1/Product.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 12/06/15 15:43:06 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-pradeeban/compile/926ddf787c73be06c4e2ad1f8fc522f1/Product.jar 12/06/15 15:43:06 INFO manager.DirectMySQLManager: Beginning mysqldump fast path import 12/06/15 15:43:06 INFO mapreduce.ImportJobBase: Beginning import of Product 12/06/15 15:43:07 INFO mapred.JobClient: Running job: job_201206151241_0012 12/06/15 15:43:08 INFO mapred.JobClient: map 0% reduce 0% 12/06/15 15:43:16 INFO mapred.JobClient: map 100% reduce 0% 12/06/15 15:43:16 INFO mapred.JobClient: Job complete: job_201206151241_0012 12/06/15 15:43:16 INFO mapred.JobClient: Counters: 18 12/06/15 15:43:16 INFO mapred.JobClient: Job Counters 12/06/15 15:43:16 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=5961 12/06/15 15:43:16 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 12/06/15 15:43:16 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 12/06/15 15:43:16 INFO mapred.JobClient: Launched map tasks=1 12/06/15 15:43:16 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0 12/06/15 15:43:16 INFO mapred.JobClient: File Output Format Counters 12/06/15 15:43:16 INFO mapred.JobClient: Bytes Written=248262 12/06/15 15:43:16 INFO mapred.JobClient: FileSystemCounters 12/06/15 15:43:16 INFO mapred.JobClient: FILE_BYTES_WRITTEN=21527 12/06/15 15:43:16 INFO mapred.JobClient: CFS_BYTES_WRITTEN=248262 12/06/15 15:43:16 INFO mapred.JobClient: CFS_BYTES_READ=87 12/06/15 15:43:16 INFO mapred.JobClient: File Input Format Counters 12/06/15 15:43:16 INFO mapred.JobClient: Bytes Read=0 12/06/15 15:43:16 INFO mapred.JobClient: Map-Reduce Framework 12/06/15 15:43:16 INFO mapred.JobClient: Map input records=1 12/06/15 15:43:16 INFO mapred.JobClient: Physical memory (bytes) snapshot=144871424 12/06/15 15:43:16 INFO mapred.JobClient: Spilled Records=0 12/06/15 15:43:16 INFO mapred.JobClient: CPU time spent (ms)=1030 12/06/15 15:43:16 INFO mapred.JobClient: Total committed heap usage (bytes)=121241600 12/06/15 15:43:16 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2085318656 12/06/15 15:43:16 INFO mapred.JobClient: Map output records=300 12/06/15 15:43:16 INFO mapred.JobClient: SPLIT_RAW_BYTES=87 12/06/15 15:43:16 INFO mapreduce.ImportJobBase: Transferred 0 bytes in 9.2613 seconds (0 bytes/sec) 12/06/15 15:43:16 INFO mapreduce.ImportJobBase: Retrieved 300 records. I found DataStax an interesting project to explore more. I have blogged on the issues that I faced on this as a learner, and how easily can they be fixed - Issues that you may encounter during the migration to Cassandra using DataStax/Sqoop and the fixes.
July 16, 2012
by Pradeeban Kathiravelu
· 20,411 Views · 2 Likes
article thumbnail
Introduction to Apache Bigtop, for Packaging and Testing Hadoop
Ah!! The name is everywhere, carried with the wind. Apache Hadoop!! The BIG DATA crunching platform! We all know how alien it can be at start too! Phew!! :o Its my personal experience, nearly 11 months before, I was trying to install HBase, I faced few issues! The problem was version compatibility. Ex: "HBase some x.version" with "Hadoop some y.version". This is a real issue because you will never know which package of what version blends well with the other, unless, someone has tested it. This testing again depends on the environment where they have set up and could be another issue. There was a pressing demand for the management of distributions and then comes an open source project which attempts to create a fully integrated and tested Big Data management distribution, "Apache Bigtop". Goals of Apache Bigtop: -Packaging -Deployment -Integration Testing of all the sub-projects of Hadoop. This project aims at system as a whole, than the individual project. I love the way Doug Cutting quoted in the Keynote, back then, wherein he expressed the similarity between Hadoop and Linux kernel,and the corresponding similarity between the big stack of Hadoop ( Hive, Hbase, Pig, Avro, etc.) and the fully operational operating systems with its distributions (RedHat, Ubuntu, Fedora, Debian etc.). This is an awesome analogy! :) Life is made easy with Bigtop: Bigtop Hadoop distribution artifacts won't make you feel that you live in an alien world! After installing, you will get a chance to blend a Hadoop cluster in any mode, with the sub-projects of it. Its all for you to garnish next! :) Setup Of Bigtop and Installing Hadoop: It's time to welcome all your packages home. [I also mean /home/..] ;) I've tested on Ubuntu 11.04 and here goes a quick and easy installation process. Step 1: Installing the GNU Privacy Guard key, a key management system to access all public key directories. wget -O- http://www.apache.org/dist/incubator/bigtop/bigtop-0.3.0-incubating/repos/GPG-KEY-bigtop | sudo apt-key add - Step 2: Get the repo file from the link http://www.apache.org/dist/incubator/bigtop/bigtop-0.3.0-incubating/repos/ubuntu/bigtop.list sudo wget -O /etc/apt/sources.list.d/bigtop.listhttp://www.apache.org/dist/incubator/bigtop/bigtop-0.3.0-incubating/repos/ubuntu/bigtop.list sudo gedit /etc/apt/sources.list.d/bigtop.list uncomment the mirror link near by. The first link worked for me. deb http://apache.01link.hk/incubator/bigtop/stable/repos/ubuntu/ bigtop contrib Step 3: Updating the apt cache sudo apt-get update Step 4: Checking in the artifacts sudo apt-cache search hadoop Image: Search in the apt cache Step 5: Set your JAVA_HOME export JAVA_HOME=path_to_your_Java export $JAVA_HOME in ~/.bashrc Step 6: Installing the complete Hadoop stack sudo apt-get install hadoop\* Image: (above) Running Hadoop: Step 1: Formatting the namendoe sudo -u hdfs hadoop namenode -format Image : Formatting the namenode Step 2: Starting the Namenode, Datanode, Jobtracker, Tasktracker of Hadoop for i in hadoop-namenode hadoop-datanode hadoop-jobtracker hadoop-tasktracker ; do sudo service $i start ; done Now, the cluster is up and running. Image : Start all the services Step 3: Creating a new directory in hdfs sudo -u hdfs hadoop fs -mkdir /user/bigtop bigtop is the directory name in the user $USER sudo -u hdfs hadoop fs -chown $USER /user/bigtop Image : Create a directory in HDFS Step 4: List the directories in file system hadoop fs -lsr / Image : HDFS directories Step 5: Running a sample pi example hadoop jar /usr/lib/hadoop/hadoop-examples.jar pi 10 1000 Image : Running a sample program Job Completed! Enjoy with your cluster! :) We shall see what more blending could be done with Hadoop (with Hive, Hbase, etc.) in the next post! Until then, Happy Learning!! :):)
July 9, 2012
by Swathi Venkatachala
· 10,929 Views
  • Previous
  • ...
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • Next
  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook
×