DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

The Latest Big Data Topics

article thumbnail
Amazon EMR Tutorial: Running a Hadoop MapReduce Job Using Custom JAR
See original post at https://muhammadkhojaye.blogspot.com/2012/04/how-to-run-amazon-elastic-mapreduce-job.html Introduction Amazon EMR is a web service which can be used to easily and efficiently process enormous amounts of data. It uses a hosted Hadoop framework running on the web-scale infrastructure of Amazon EC2 and Amazon S3. Amazon EMR removes most of the cumbersome details of Hadoop while taking care of provisioning of Hadoop, running the job flow, terminating the job flow, moving the data between Amazon EC2 and Amazon S3, and optimizing Hadoop. In this tutorial, we will use a developed WordCount Java example using Hadoop and thereafter, we execute our program on Amazon Elastic MapReduce. Prerequisites You must have valid AWS account credentials. You should also have a general familiarity with using the Eclipse IDE before you begin. The reader can also use any other IDE of their choice. Step 1 – Develop MapReduce WordCount Java Program In this section, we are first going to develop a WordCount application. A WordCount program will determine how many times different words appear in a set of files. In Eclipse (or whatever the IDE you are using), Create simple Java Project with the name "WordCount". Create a java class name Map and override the map method as follow, public class Map extends Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } } Create a java class named Reduce and override the reduce method as shown below, public class Reduce extends Reducer { @Override protected void reduce(Text key, java.lang.Iterable values, org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } } Create a java class named WordCount and defined the main method as below, public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "wordcount"); job.setJarByClass(WordCount.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } Export the WordCount program in a jar using eclipse and save it to some location on disk. Make sure that you have provided the Main Class (WordCount.jar) during extraction ofu8u the jar file as shown below. Our jar is ready!!! Step 2 – Upload the WordCount JAR and Input Files to Amazon S3 Now we are going to upload the WordCount jar to Amazon S3. First, go to the following URL: https://console.aws.amazon.com/s3/home Next, click “Create Bucket”, give your bucket a name, and click the “Create” button. Select your new S3 bucket in the left-hand pane. Upload the WordCount JAR and sample input file for counting the words. Step 3 – Running an Elastic MapReduce job Now that the JAR is uploaded into S3, all we need to do is to create a new Job flow. let's execute the steps below. (I encourage readers to check out the following link for details regarding each step, How to Create a Job Flow Using a Custom JAR ) Sign in to the AWS Management Console and open the Amazon Elastic MapReduce console at https://console.aws.amazon.com/elasticmapreduce/ Click Create New Job Flow. In the DEFINE JOB FLOW page, enter the following details, a) Job Flow Name = WordCountJob b) Select Run your own applications) Select Custom JAR in the drop-down list) Click Continue In the SPECIFY PARAMETERS page, enter values in the boxes using the following table as a guide, and then click Continue.JAR Location = bucketName/jarFileLocationJAR Arguments =s3n://bucketName/inputFileLocations3n://bucketName/outputpath Please note that the output path must be unique each time we execute the job. The Hadoop always create a folder with the same name specified here. After executing the job, just wait and monitor your job that runs through the Hadoop flow. You can also look for errors by using the Debug button. The job should be complete within 10 to 15 minutes (can also depend on the size of the input). After completing the job, You can view results in the S3 Browser panel. You can also download the files from S3 and can analyze the outcome of the job. Amazon Elastic MapReduce Resources Amazon Elastic MapReduce Documentation,http://aws.amazon.com/documentation/elasticmapreduce/ Amazon Elastic MapReduce Getting Started Guide,http://docs.amazonwebservices.com/ElasticMapReduce/latest/GettingStartedGuide/ Amazon Elastic MapReduce Developer Guide,http://docs.amazonwebservices.com/ElasticMapReduce/latest/DeveloperGuide/ Apache Hadoop,http://hadoop.apache.org/ See more at https://muhammadkhojaye.blogspot.com/2012/04/how-to-run-amazon-elastic-mapreduce-job.html
April 23, 2012
by Muhammad Ali Khojaye
· 59,007 Views
article thumbnail
Hadoop Basics—Creating a MapReduce Program
The Map Reduce Framework works in two main phases to process the data, which are the "map" phase and the "reduce" phase.
March 18, 2012
by Carlo Scarioni
· 212,697 Views · 4 Likes
article thumbnail
Circos: An Amazing Tool for Visualizing Big Data
storing massive amounts of data in a nosql data store is just one side of the big data equation. being able to visualize your data in such a way that you can easily gain deeper insights , is where things really start to get interesting. lately, i've been exploring various options for visualizing (directed) graphs, including circos . circos is an amazing software package that visualizes your data through a circular layout . although it's originally designed for displaying genomic data , it allows to create good-looking figures from data in any field. just transform your data set into a tabular format and you are ready to go. the figure below illustrates the core concept behind circos. the table's columns and rows are represented by segments around the circle. individual cells are shown as ribbons , which connect the corresponding row and column segments. the ribbons themselves are proportional in width to the value in the cell. when visualizing a directed graph , nodes are displayed as segments on the circle and the size of the ribbons is proportional to the value of some property of the relationships. the proportional size of the segments and ribbons with respect to the full data set allows you to easily identify the key data points within your table. in my case, i want to better understand the flow of visitors to and within the datablend site and blog; where do visitors come from (direct, referral, search, ...) and how do they navigate between pages. the rest of this article details how to 1) retrieve the raw visit information through the google analytics api, 2) persist this information as a graph in neo4j and 3) query and preprocess this data for visualization through circos. as always, the complete source code can be found on the datablend public github repository . 1. retrieving your google analytics data let's start by retrieving the raw google analytics data . the google analytics data api provides access to all dimensions and metrics that can be queried through the web application. in my case, i'm interested in retrieving the previous page path property for each page view. if a visitor enters through a page outside of the datablend website, the previous page path is marked as (entrance) . otherwise, it contains the internal path . we will use google's java data api to connect and retrieve this information. we are particularly interested in the pagepath , pagetitle , previouspagepath and medium dimensions, while our metric of choice is the number of pageviews . after setting the date range, the feed of entries that satisfy this criteria can be retrieved. for ease of use, we transform this data to a domain entity and filter/clean the data accordingly. if a visit originates from outside the datablend website, we store the specific medium (direct, referral, search, ...) as previous path. // authenticate analyticsservice = new analyticsservice(configuration.service); analyticsservice.setusercredentials(configuration.client_username, configuration.client_pass); // create query dataquery query = new dataquery(new url(configuration.data_url)); query.setids(configuration.table_id); query.setdimensions("ga:medium,ga:previouspagepath,ga:pagepath,ga:pagetitle"); query.setmetrics("ga:pageviews"); query.setstartdate(datestring); query.setenddate(datestring); // execute datafeed feed = analyticsservice.getfeed(createqueryurl(date), datafeed.class); // iterate and clean for (dataentry entry : feed.getentries()) { string pagepath = entry.stringvalueof("ga:pagepath"); string pagetitle = entry.stringvalueof("ga:pagetitle"); string previouspagepath = entry.stringvalueof("ga:previouspagepath"); string medium = entry.stringvalueof("ga:medium"); long views = entry.longvalueof("ga:pageviews"); // filter the data if (filter(pagepath) && filter(previouspagepath) && (!clean(previouspagepath).equals(clean(pagepath)))) { // check criteria are satisfied navigation navigation = new navigation(clean(previouspagepath), clean(pagepath), pagetitle, date, views); if (navigation.getsource().equals("(entrance)")) { // in case of an entrace, save its medium instead navigation.setsource(medium); } navigations.add(navigation); } } 2. storing navigational data as a directed graph in neo4j the set of site navigations can easily be stored as a directed graph in the neo4j graph database . nodes are site paths (or mediums), while relationships are the navigations themselves. we start by retrieving the navigations for a particular date range and retrieve (or lazily create) the nodes representing the source and target paths (or mediums). next we de-normalize the pageviews metric (for instance, 6 individual relationships will be created for 6 page-views). although this de-normalization step is not really required, i did so to make sure that the degree of my nodes is correct if i would perform other types of calculations. for each individual navigation relationship, we also store the date of visit . // retrieve navigations for a particular date list navigations = retrieval.getnavigations(date); // save them in the graph database transaction tx = graphdb.begintx(); // iterate and create for (navigation nav : navigations) { node source = getpath(nav.getsource()); node target = getpath(nav.gettarget()); if (!target.hasproperty("title")) { target.setproperty("title", nav.gettargettitle()); } for (long i = 0; i < nav.getamount(); i++) { // duplicate relationships relationship transition = source.createrelationshipto(target, relationships.navigation); transition.setproperty("date", date.gettime()); // save time as long } } // commit tx.success(); tx.finish(); 3. creating the circos tabular data format the circos tabular data format is quite easy to construct. it's basically a tab-delimited file with row and column headers. a cell is interpreted as a value that flows from the row entity to the column entity . we will use the neo4j cypher query language to retrieve the data of interest, namely all navigations that occurred within a certain time period . doing so allows us to create historical visualizations of our navigations and observe how visit flow behaviors are changing over time. // access the graph database graphdb = new embeddedgraphdatabase("var/analytics"); engine = new executionengine(graphdb); // execute the data range cypher query map params = new hashmap(); params.put("fromdate", from.gettime()); params.put("todate", to.gettime()); // execute the query executionresult result = engine.execute("start sourcepath=node:index(\"path:*\") " + "match sourcepath-[r]->targetpath " + "where r.date >= {fromdate} and r.date <= {todate} " + "return sourcepath,targetpath", params); next, we create the tab delimited file itself. we iterate through all entries (i.e. navigations) that match our cypher query and store them in a temporary list. afterwards, we start building the two-dimensional array by normalizing (i.e. summing) the number of navigations between the source and target paths. at the end, we filter this occurrence matrix on the minimal number of required navigations. this ensures that we will only create segments for paths that are relevant in the total population. as a final step, we print the occurrences matrix as a tab-delimited file. for each path, we will use a shorthand as the circos renderer seems to have problem with long string identifiers. // retrieve the results iterator> it = result.javaiterator(); list navigations = new arraylist(); map titles = new hashmap(); set paths = new hashset(); // iterate the results while (it.hasnext()) { map record = it.next(); string source = (string)((node) record.get("sourcepath")).getproperty("path"); string target = (string) ((node) record.get("targetpath")).getproperty("path"); string targettitle = (string) ((node) record.get("targetpath")).getproperty("title"); // reuse the navigation object as temorary holder navigations.add(new navigation(source, target, targettitle, new date(), 1)); paths.add(source); paths.add(target); if (!titles.containskey(target)) { titles.put(target, targettitle); } } // retrieve the various paths list pathids = arrays.aslist(paths.toarray(new string[]{})); // create the matrix that holds the info int[][] occurences = new int[pathids.size()][pathids.size()]; // iterate through all the navigations and update accordingly for (navigation navigation : navigations) { int sourceindex = pathids.indexof(navigation.getsource()); int targetindex = pathids.indexof(navigation.gettarget()); occurences[sourceindex][targetindex] = occurences[sourceindex][targetindex] + 1; } // matrix build, filter on threshold for (int i = 0; i < occurences.length; i++) { for (int j = 0; j < occurences.length; j++) { if (occurences[i][j] < threshold) { occurences[i][j] = 0; } } // print printcircosdata(pathids, titles, occurences); the text below is a sample of the output generated by the printcircosdata method. it first prints the legend (matching shorthands with actual paths). next it prints the tab-delimited circos table. link0 - /?p=411/wp-admin - storing and querying rdf data in neo4j through sail - datablend link1 - /?p=1146 - visualizing rdf schema inferencing through neo4j, tinkerpop, sail and gephi - datablend link2 - /?p=164 - big data / concise articles - datablend link3 - referral - null link4 - /?p=1400 - the joy of algorithms and nosql revisited: the mongodb aggregation framework - datablend ... datal0l1l2l3l4... l000000 l100000 l200000 l3059400197 l400000 4. use the circos power although circos can be installed on your local computer, we will use its online version to create the visualization of our data. upload your tab-delimited file and just wait a few seconds before enjoying the beautiful rendering of your site's navigation information. with just a glimpse of an eye we can already see that the l3-segment (i.e. the referrals) is significantly larger (almost 6000 navigations) compared to the others segments. the outer 3 rings visualize the total amounts of navigations that are leaving and entering this particular path. in case of referrals, no navigations have this path as target (indicated by the empty middle ring). its total segment count (inner ring) is entirely build up out of navigations that have a referral as source. the l6-segment seems to be the path that attracts the most traffic (around 2500 navigations). this segment visualizes the navigation data related to my "the joy of algorithms and nosql: a mongodb example" -article. most of its traffic is received through referrals, while a decent amount is also generated through direct (l17-segment) and search (l27-segment) traffic. the l15-segment (my blog's main page) is the only path that receives an almost equal amount of incoming and outgoing traffic. with just a few tweaks to the circos input data, we can easily focus on particular types of navigation data. in the figure below, i made sure that referral and search navigations are visualized more prominently through the use of 2 separate colors. 5. conclusions in the era of big data, visualizations are becoming crucial as they enable us to mine our large data sets for certain patterns of interest. circos specializes in a very specific type of visualization, but does its job extremely well. i would be delighted to hear about other types of visualizations for directed graphs.
March 13, 2012
by Davy Suvee
· 36,340 Views · 2 Likes
article thumbnail
The Dark Side of Big Data: Pseudo-Science & Fooled By Randomness
Over the last couple of months I have read up on volumes of Technical Analysis (“TA”) information, I have back tested probably hundreds of automated trading strategies against massive amounts of data, both exchange intraday- and tick data, as well as other sources. Some of these strategies have been massively profitable in back testing, others not so much. Some of the TA patterns, I’ve discarded before they even left the book, because they did not stand up to any sort of scientific scrutiny because they lacked a clear predictive thesis, where riddled with forward-looking bias (“Head and Shoulders patterns”), and in some cases where just plain bulls**t (“Elliott Wave Principle” comes to mind). The outcomes of my testing has made me think about the implications of large scale data analysis in general: it is very easy to get fooled by randomness. In many cases in my testing results have been amazing, but I cannot come up with a plausible causal explanation as to why, and when I gently nudge the parameters just ever so slightly, outcomes can look entirely different. Taking a step back from the data, looking at it in a larger perspective, I’m inclined to conclude that if data across multiple parameter variations looks like a random walk and lacks a plausible causal explanation, then it is a random walk. If I cannot say “X is caused by A and B”, I’m inclined to believe that the actual reason is “X is the result because A and B fit the historical data D, but may not do so in the future”. And herein lies the crux of the matter: how many data scientists are inclined to take a step back, rather than just assume that there is a pattern there? How many are prepared to do so if their livelihood is largely based on them finding patterns, rather than discarding them because they do not hold up to deeper scrutiny? I’d say very few. My conclusion to this is that the age of Big Data will see a radical increase of pseudo-scientific “discoveries”, driven out of an interest in announcing new great “patterns”. This pseudo-science will pervade both academia, public sector and private sector, God knows I’ve seen a fair number of academic research papers already that simply do not hold if you investigate their thesis in a deeper manner. I suspect we will arrive at a point much like with any new technology whereby people will tire of the claims made by “Big Data Scientists”, because at least half of what they say will have been proven to be hokey and pseudo-science in the pursuit of being able to make even more outlandish claims in a game of one-upping the competition. Some of this will be driven by malice and self-interest, but I suspect in equal parts it will be driven by ignorance and perverted incentives putting blinders on people in the business.
March 9, 2012
by Wille Faler
· 13,959 Views
article thumbnail
Big Data Chapter Excerpt: Implementing Schemas with Apache Thrift
This is an excerpt from the upcoming Manning book about Big Data. Big Data Principles and Best Practices of Scalable Realtime Data Systems By Nathan Marz and Samuel E. Ritchie Thrift is a widely used project that originated at Facebook. It can be used for making language-neutral RPC servers, but developers use it for its schema-creation capabilities. In this article based on chapter 2, author Nathan Marz discusses workhorses of Thrift—the struct and union type definitions—and Thrift’s built-in mechanisms for evolving a schema over time. You may also be interested in… Thrift is a widely used project that originated at Facebook. It can be used for making language-neutral RPC servers, but developers use it for its schema-creation capabilities. The workhorses of Thrift are the struct and union type definitions, and Thrift has built-in mechanisms for evolving a schema over time. Orginally Authored by Nathan Marz and Samuel E. Ritchie Structs The following code shows how to define a struct using the Thrift Interface Definition Language (IDL). Defining a struct is like defining a class in an object-oriented language: you specify all the data the object contains. The difference is that a Thrift struct only contains data and doesn't specify any extra behavior for the object. Fields in a struct can be: Primitive types like strings, ints, longs, and doubles. In the Thrift IDL, these are referred to as string, i32, i64, and double, respectively. Collections of other types. Thrift supports list, map, and set. Another Thrift struct or union. struct Person { 1: string twitter_username; 2: string full_name; 3: list interests; } The following code listing shows how to serialize a struct with Java. As you can see, we're using ArrayList, a native Java data structure, as part of the Person object. List interests = new ArrayList() {{ add("hadoop"); add("nosql"); }; Person person = new Person("joesmith", "Joe Smith", interests); TSerializer serializer = new TSerializer(); byte[] serialized = serializer.serialize(person); Here's how to deserialize a Person object in Python. When the object is deserialized, it will be using native Python data structures for any collection types. person = Person() deserialize(person, serialized_bytes) Fields in structs can be defined as being either required or optional. If a field is defined as required, than a value for that field must be provided or else Thrift will give an error upon serialization or deserialization. If a field is optional, the value will be null if not provided. You should always declare fields as being either required or optional. The following code listing shows how to define a struct containing required and optional fields. struct Tweet { 1: required string text; 2: required i64 id; 3: required i64 timestamp; 4: required Person person; 5: optional i64 response_to_tweet_id; Unions You can also define unions in Thrift. A union is a struct that must have exactly one field set. Unions are useful for representing polymorphic data. The following listing shows how to define a "PersonID" using a Thrift union that can be one of many different kinds of identifiers. union PersonID { 1: string email; 2: i64 facebook_id; 3: i64 twitter_id; } Evolving a schema Thrift is designed so that schemas can be evolved over time. The key to evolving Thrift schemas over time is the numeric identifiers used for every field. Those ids are used to identify fields in their serialized form. When you want to change the schema but still be backward compatible with existing data, you must obey the following rules. Fields may be renamed. This is because the serialized form of an object uses the field ids to identify fields, not the names. Fields may be removed, but you must be sure never to reuse that field id. When deserializing, Thrift will skip over any fields that don't match an id it's expecting. So the data for that field will just be ignored in the existing data. If you were to reuse that field id, Thrift will try to deserialize that old data into your new field which will lead to either invalid or incorrect data. Only optional fields can be added to existing structs. You can't add required fields because existing data won't have that field and will not be deserializable. Note that this point does not apply to unions since unions have no notion of required and optional fields. Summary In a relational database, the schema language is part of the database system and is integrated with how the database stores and processes that data. In the Big Data world, you use your own serialization framework that's separate from the storage and processing pieces. You get the flexibility to fine-tune this component to work exactly as needed to fit your data model. There are a few different open source serialization frameworks available, namely Thrift, Protocol Buffers, and Avro. We discussed our favorite, Apache Thrift, because it’s mature and supports most languages, but you could use any of these tools for defining a schema. Here are some other Manning titles you might be interested in: MongoDB in Action Kyle Banker RabbitMQ in Action Alvaro Videla and Jason J.W. Williams Hadoop in Action Chuck Lam Last updated: January 11, 2012
January 12, 2012
by Chris Smith
· 11,530 Views
article thumbnail
Solr + Hadoop = Big Data Love
Bixo Labs shows how to use Solr as a NoSQL solution for big data Many people use the Hadoop open source project to process large data sets because it’s a great solution for scalable, reliable data processing workflows. Hadoop is by far the most popular system for handling big data, with companies using massive clusters to store and process petabytes of data on thousands of servers. Since it emerged from the Nutch open source web crawler project in 2006, Hadoop has grown in every way imaginable – users, developers, associated projects (aka the “Hadoop ecosystem”). Starting at roughly the same time, the Solr open source project has become the most widely used search solution on planet Earth. Solr wraps the API-level indexing and search functionality of Lucene with a RESTful API, GUI, and lots of useful administrative and data integration functionality. The interesting thing about combining these two open source projects is that you can use Hadoop to crunch the data, and then serve it up in Solr. And we’re not talking about just free-text search; Solr can be used as a key-value store (i.e. a NoSQL database) via its support for range queries. Even on a single server, Solr can easily handle many millions of records (“documents” in Lucene lingo). Even better, Solr now supports sharding and replication via the new, cutting-edge SolrCloud functionality. Background I started using Hadoop & Solr about five years ago, as key pieces of the Krugle code search startup I co-founded in 2005. Back then, Hadoop was still part of the Nutch web crawler we used to extract information about open source projects. And Solr was fresh out of the oven, having just been released as open source by CNET. At Bixo Labs we use Hadoop, Solr, Cascading, Mahout, and many other open source technologies to create custom data processing workflows. The web is a common source of our input data, which we crawl using the Bixo open source project. The Problem During a web crawl, the state of the crawl is contained in something commonly called a “crawl DB”. For broad crawls, this has to be something that works with billions of records, since you need one entry for each known URL. Each “record” has the URL as the key, and contains important state information such as the time and result of the last request. For Hadoop-based crawlers such as Nutch and Bixo, the crawl DB is commonly kept in a set of flat files, where each file is a Hadoop “SequenceFile”. These are just a packed array of serialized key/value objects. Sometimes we need to poke at this data, and here’s where the simple flat-file structure creates a problem. There’s no easy way run queries against the data, but we can’t store it in a traditional database since billions of records + RDBMS == pain and suffering. Here is where scalable NoSQL solutions shine. For example, the Nutch project is currently re-factoring this crawl DB layer to allow plugging in HBase. Other options include Cassandra, MongoDB, CouchDB, etc. But for simple analytics and exploration on smaller datasets, a Solr-based solution works and is easier to configure. Plus you get useful and surprising fun functionality like facets, geospatial queries, range queries, free-form text search, and lots of other goodies for free. Architecture So what exactly would such a Hadoop + Solr system look like? As mentioned previously, in this example our input data comes from a Bixo web crawler’s CrawlDB, with one entry for each known URL. But the input data could just as easily be log files, or records from a traditional RDBMS, or the output of another data processing workflow. The key point is that we’re going to take a bunch of input data, (optionally) munge it into a more useful format, and then generate a Lucene index that we access via Solr. Hadoop For the uninitiated, Hadoop implements both a distributed file system (aka “HDFS”) and an execution layer that supports the map-reduce programming model. Typically data is loaded and transformed during the map phase, and then combined/saved during the reduce phase. In our example, the map phase reads in Hadoop compressed SequenceFiles that contain the state of our web crawl, and our reduce phase write out Lucene indexes. The focus of this article isn’t on how to write Hadoop map-reduce jobs, but I did want to show you the code that implements the guts of the job. Note that it’s not typical Hadoop key/value manipulation code, which is painful to write, debug, and maintain. Instead we use Cascading, which is an open source workflow planning and data processing API that creates Hadoop jobs from shorter, more representative code. The snippet below reads SequenceFiles from HDFS, and pipes those records into a sink (output) that stores them using a LuceneScheme, which in turn saves records as Lucene documents in an index. Tap source = new Hfs(new SequenceFile(CRAWLDB_FIELDS), inputDir); Pipe urlPipe = new Pipe("crawldb urls"); urlPipe = new Each(urlPipe, new ExtractDomain()); Tap sink = new Hfs(new LuceneScheme(SOLR_FIELDS, STORE_SETTINGS, INDEX_SETTINGS, StandardAnalyzer.class, MAX_FIELD_LENGTH), outputDir, true); FlowConnector fc = new FlowConnector(); fc.connect(source, sink, urlPipe).complete(); We defined CRAWLDB_FIELDS and SOLR_FIELDS to be the set of input and output data elements, using names like “url” and “status”. We take advantage of the Lucene Scheme that we’ve created for Cascading, which lets us easily map from Cascading’s view of the world (records with fields) to Lucene’s index (documents with fields). We don’t have a Cascading Scheme that directly supports Solr (wouldn’t that be handy?), but we can make-do for now since we can do simple analysis for this example. We indexed all of the fields so that we can perform queries against them. Only the status message contains normal English text, so that’s the only one we have to analyze (i.e., break the text up into terms using spaces and other token delimiters). In addition, the ExtractDomain operation pulls the domain from the URL field and builds a new Solr field containing just the domain. This will allow us to do queries against the domain of the URL as well as the complete URL. We could also have chosen to apply a custom analyzer to the URL to break it into several pieces (i.e., protocol, domain, port, path, query parameters) that could have been queried individually. Running the Hadoop Job For simplicity and pay-as-you-go, it’s hard to beat Amazon’s EC2 and Elastic Mapreduce offerings for running Hadoop jobs. You can easily spin up a cluster of 50 servers, run your job, save the results, and shut it down – all without needing to buy hardware or pay for IT support. There are many ways to create and configure a Hadoop cluster; for us, we’re very familiar with the (modified) EC2 Hadoop scripts that you can find in the Bixo distribution. Step-by-step instructions are available at http://openbixo.org/documentation/running-bixo-in-ec2/ The code for this article is available via GitHub, at http://github.com/bixolabs/hadoop2solr. The README displayed on that page contains step-by-step instructions for building and running the job. After the job is done, we’ll copy the resulting index out of the Hadoop distributed file system (HDFS) and onto the Hadoop cluster’s master server, then kill off the one slave we used. The Hadoop master is now ready to be configured as our Solr server. Solr On the Solr side of things, we need to create a schema that matches the index we’re generating. The key section of our schema.xml file is where we define the fields. These fields have a one-to-one correspondence with the SOLR_FIELDS we defined in our Hadoop workflow. They also need to use the same Lucene settings as what we defined in the static IndexWorkflow.java STORE_SETTINGS and INDEX_SETTINGS. Once we have this defined, all that’s left is to set up a server that we can use. To keep it simple, we’ll use the single EC2 instance in Amazon’s cloud (m1.large) that we used as our master for the Hadoop job, and run the simple Solr search server that relies on embedded Jetty to provide the webapp container. Similar to the Hadoop job, step-by-step instructions are in the README for the hadoop2solr project on GitHub. But in a nutshell, we’ll copy and unzip a Solr 1.4.1 setup on the EC2 server, do the same for our custom Solr configuration, create a symlink to the index, and then start it running with: Giving it a Try Now comes the interesting part. Since we opened up the default Jetty port used by Solr (8983) on this EC2 instance, we can directly access Solr’s handy admin console by pointing our browser at http://:8983/solr/admin % cd solr % java -Dsolr.solr.home=../solr-conf -Dsolr.data.dir=../solr-data -jar start.jar From here we can run queries against Solr: We can also use curl to talk to the server via HTTP requests: curl http://:8983/solr/select/?q=-status%3AFETCHED+and+-status%3AUNFETCHED The response is XML by default. Below is an example of the response from the above request, where we found 2,546 matches in 94ms. Now here’s what I find amazing. For an index of 82 million documents, running on a fairly wimpy box (EC2 m1.large = 2 virtual cores), the typical response time for a simple query like “status:FETCHED” is only 400 milliseconds, to find 9M documents. Even a complex query such as (status not FETCHED and not UNFETCHED) only takes six seconds. Scaling Obviously we could use beefier boxes. If we switched to something like m1.xlarge (15GB of memory, 4 virtual cores) then it’s likely we could handle upwards of 200M “records” in our Solr index and still get reasonable response times. If we wanted to scale beyond a single box, there are a number of solutions. Even out of the box Solr supports sharding, where your HTTP request can specify multiple servers to use in parallel. More recently, the Solr trunk has support for SolrCloud. This uses the ZooKeeper open source project to simplify coordination of multiple Solr servers. Finally, the Katta open source project supports Lucene-level distributed search, with many of the features needed for production quality distributed search that have not yet been added to SolrCloud. Summary The combination of Hadoop and Solr makes it easy to crunch lots of data and then quickly serve up the results via a fast, flexible search & query API. Because Solr supports query-style requests, it’s suitable as a NoSQL replacement for traditional databases in many situations, especially when the size of the data exceeds what is reasonable with a typical RDBMS. Solr has some limitations that you should be aware of, specifically: · Updating the index works best as a batch operation. Individual records can be updated, but each commit (index update) generates a new Lucene segment, which will impact performance. · Current support for replication, fail-over, and other attributes that you’d want in a production-grade solution aren’t yet there in SolrCloud. If this matters to you, consider Katta instead. · Many SQL queries can’t be easily mapped to Solr queries. The code for this article is available via GitHub, at http://github.com/bixolabs/hadoop2solr. The README displayed on that page contains additional technical details.
April 4, 2011
by Ken Krugler
· 119,518 Views
article thumbnail
Solve Foreign-key Problems in DBUnit Test Data
If you create small per-test datasets, as DBUnit advises, you’ll get intermittent build failures due to foreign-key violations. This post explains (1) why this happens, (2) why small per-test datasets are still a good idea, and (3) one simple way to get around the problem. NB When I searched for solutions to this problem, I discovered that other kinds of foreign-key problem come up with DBUnit. Some people have circular dependencies in their relational database schemas, which stops DBUnit from loading the test data. If such is your case, I’m sorry to say that this post won’t help you with it, and your best option is probably to just take yourself outside and shoot yourself now. (Although some people seem to chosen instead to disable foreign key checking during test runs.) What causes the foreign-key violations The cause of the problem is simple, and illustrated by a trivial example. Suppose you have two entity classes, HitchHiker and SpaceShip. The HitchHiker table has a foreign key that references SpaceShip. The test data for HitchHikerDaoTest contains lines from both tables, whereas the test data for SpaceShipDaoTest contains only lines from SpaceShip. DBUnit’s default setup operation, CLEAN_INSERT, wipes data from every table occurring in the test dataset and then inserts the lines listed in that dataset. When SpaceShipDaoTest runs, DBUnit will start by deleting everything in the SpaceShip table. If any HitchHikers are currently riding in the SpaceShips that are about to be deleted, the database will object to their untimely eviction (I’m not sure whether the error message will read like Vogon poetry, though). If you start from an empty database, and execute SpaceShipDaoTest and then HitchHikerDaoTest, you’ll be fine; but if you do it in the other order, your build will fail. It’s that second-worst kind of bug, the unpredictable kind, since you don’t (usually) specify the order in which tests run. After all, they’re supposed to be independent! So you may well find that you have no problems for months on end, until one day you get an error running individual tests in a particular sequence, or Maven changes the order in which it runs your tests on the CI server, and BOOM! Why you should still use small independent datasets It’s tempting to circumvent the problem by using a single monolithic dataset for all your integration tests. I’ve tried this, and I advise against it. A big data file is hard to work with: you waste a lot of time scrolling around looking for the line you need, and it’s very hard to follow and understand foreign-key relations. Worse still: by modifying the data to make one test pass, you can easily accidentally break another one. The larger the dataset and the test suite become, the more fragile they get, and the more painstaking it becomes to modify them. How to avoid the foreign-key problem with small independent datasets One working but unsatisfactory solution would be to pad out every XML dataset with the list of all tables touched in the test suite. It’s unsatisfactory because the only way to add a table into a FlatXmlDataSet is to list a line of that table — a FlatXmlDataSet can’t contain empty tables — and there’s no justification for polluting the test data with lines from tables that are not part of the test. The solution I found was to use a DTD to clean tables before tests. Every XML file has different contents, but they all reference a single DTD which lists all the tables involved in the test suite. The DTD is easy to generate from the database schema, and useful for auto-complete and catching typos in column names, so you should probably already be using one. The code to exploit its contents is very simple: private IDataSet loadTestDataWithDtdTableList(String dtdFilename) throws IOException, DataSetException, SQLException { Reader dtdReader = new FileReader(new ClassPathResource(dtdFilename).getFile()); IDataSet dtdDataset = new FlatDtdDataSet(dtdReader); FlatXmlDataSetBuilder builder = new FlatXmlDataSetBuilder(); builder.setMetaDataSet(new DatabaseDataSet(dbUnitConnection, false)); IDataSet xmlDataset = builder.build(asFile(xmlFilename)); return new CompositeDataSet(dtdDataset, xmlDataset);} How it works: DBUnit provides a facility to load a dataset from a DTD. This dataset contains all the tables listed in the DTD, but of course empty of data. The DTD dataset is then combined with a FlatXmlDataSet representing your test data. The graphic below illustrates the composite dataset that would be produced for the SpaceShip example. If you have dictionary tables whose contents never change, you can and should leave them out of the DTD as well as out of the XML datasets, to improve test performance a little. One further detail: you should close the FileReader after test setup. I couldn’t find a hook into the end of the test setup operation (short of writing my own DatabaseOperation), so I saved the reference as a member variable and hooked the close() call into the tear-down phase of the test. NB For a more complete code example, see this Gist snippet of a base class for TestNG+Spring+DBUnit tests that adds the above-described DBUnit setup operation to Spring’s TestNG helper class. Happy database testing! From http://www.andrewspencer.net/2011/solve-foreign-key-problems-in-dbunit-test-data/
February 16, 2011
by Andrew Spencer
· 27,845 Views
article thumbnail
Compute Grids vs. Data Grids
in a nutshell, grid computing is a way to distribute your computations across multiple computers (nodes). however, even jms does that, but jms is not a grid computing product - it's a messaging protocol. to correctly classify grid computing products we have to split them into 2 categories: compute grids and data grids. compute grid compute grids allow you to take a computation, optionally split it into multiple parts, and execute them on different grid nodes in parallel. the obvious benefit here is that your computation will perform faster as it now can use resources from all grid nodes in parallel. one of the most common design patterns for parallel execution is mapreduce . however, compute grids are useful even if you don't need to split your computation - they help you improve overall scalability and fault-tolerance of your system by offloading your computations onto most available nodes. some of the "must have" compute grid features are: automatic deployment - allows for automatic deployment of classes and resources onto grid without any extra steps from user. this feature alone provides one of the largest productivity boosts in distributed systems. users usually are able to simply execute a task from one grid node and as task execution penetrates the grid, all classes and resources are also automatically deployed. topology resolution - allows to provision nodes based on any node characteristic or user-specific configuration. for example, you can decide to only include linux nodes for execution, or to only include a certain group of nodes within certain time window. you should also be able to choose all nodes with cpu loaded, say, under 50% that have more than 2gb of available heap memory. collision resolution - allows users to control which jobs get executed, which jobs get rejected, how many jobs can be executed in parallel, order of overall execution, etc. load balancing - allows to balance properly balance your system load within grid. usually range of load balancing policies varies within products. some of the most common ones are round robin, random, or adaptive. more advanced vendors also provide affinity load balancing where grid jobs always end up on the same node based on job's affinity key. this policy works well with data grids described below. fail-over - grid jobs should automatically fail-over onto other nodes in case of node crash or some other job failure. checkpoints - long running jobs should be able to periodically store their intermediate state. this is useful for fail-overs, when a failed job should be able to pick up its execution from the latest checkpoint, rather than start from scratch. grid events - a querying mechanism for all grid events is essential. any grid node should be able to query all events that happened on remote grid nodes during grid task execution. node metrics - a good compute grid solution should be able to provide dynamic grid metrics for all grid nodes. metrics should include vital node statistics, from cpu load to average job execution time. this is especially useful for load balancing, when the system or user need to pick the least loaded node for execution. pluggability - in order to blend into any environment a good compute grid should have well thought out pluggability points. for example, if running on top of jboss, a compute grid should totally reuse jboss communication and discovery protocols. data grid integration - it is important that compute grid are able to natively integrate with data grids as quite often businesses will need both, computational and data features working within same application. some compute grid vendors: - gridgain - professional open source - jppf - open source data grid data grids allow you to distribute your data across the grid. most of us are used to the term distributed cache rather than data grid (data grid does sound more savvy though). the main goal of data grid is to provide as much data as possible from memory on every grid node and to ensure data coherency. some of the important data grid features include: data replication - all data is fully replicated to all nodes in the grid. this strategy consumes the most resources, however it is the most effective solution for read-mostly scenarios, as data is available everywhere for immediate access. data invalidation - in this scenario, nodes load data on demand. whenever data changes on one of the nodes, then the same data on all other nodes is purged (invalidated). then this data will be loaded on-demand the next time it is accessed. distributed transactions - transactions are required to ensure data coherency. cache updates must work just like database updates - whenever an update failed, then the whole transaction must be rolled back. most data grid support various transaction policies, such as read committed, write committed, serializable, etc... data backups - useful for fail-over. some data grid products provide ability to assign backup nodes for the data. this way whenever a node crashes, the data is immediately available from another node. data affinity/partitioning - data affinity allows you to split/partition your whole data set into multiple subsets and assign every subset to a grid node. in the purest form, data is not replicated between nodes at all, every node is only responsible for it's own subset of data. however, various data grid products may provide different flavors of data affinity, such as replication only to back up nodes for example. data affinity is one of the more advanced features, and is not provided by every vendor. to my knowledge, according to product websites, out of commercial vendors oracle coherence and gemstone have it (there may be others). in professional open source space you can take a look at combination of gridgain with affinity load balancing and jbosscache . some data grid/cache vendors: - oracle coherence - commercial - gemstone - commercial - gigaspaces - commercial - jbosscache - professional open source - ehcache - open source
July 31, 2008
by Dmitriy Setrakyan
· 28,299 Views · 3 Likes
article thumbnail
Understanding HBase and BigTable
The hardest part about learning Hbase (the open source implementation of Google's BigTable), is just wrapping your mind around the concept of what it actually is. I find it rather unfortunate that these two great systems contain the words table and base in their names, which tend to cause confusion among RDBMS indoctrinated individuals (like myself). This article aims to describe these distributed data storage systems from a conceptual standpoint. After reading it, you should be better able to make an educated decision regarding when you might want to use Hbase vs when you'd be better off with a "traditional" database. It's all in the terminology Fortunately, Google's BigTable Paper clearly explains what BigTable actually is. Here is the first sentence of the "Data Model" section: A Bigtable is a sparse, distributed, persistent multidimensional sorted map. Note: At this juncture I like to give readers the opportunity to collect any brain matter which may have left their skulls upon reading that last line. The BigTable paper continues, explaining that: The map is indexed by a row key, column key, and a timestamp; each value in the map is an uninterpreted array of bytes. Along those lines, the HbaseArchitecture page of the Hadoop wiki posits that: HBase uses a data model very similar to that of Bigtable. Users store data rows in labelled tables. A data row has a sortable key and an arbitrary number of columns. The table is stored sparsely, so that rows in the same table can have crazily-varying columns, if the user likes. Although all of that may seem rather cryptic, it makes sense once you break it down a word at a time. I like to discuss them in this sequence: map, persistent, distributed, sorted, multidimensional, and sparse. Rather than trying to picture a complete system all at once, I find it easier to build up a mental framework piecemeal, to ease into it... map At its core, Hbase/BigTable is a map. Depending on your programming language background, you may be more familiar with the terms associative array (PHP), dictionary (Python), Hash (Ruby), or Object (JavaScript). From the wikipedia article, a map is "an abstract data type composed of a collection of keys and a collection of values, where each key is associated with one value." Using JavaScript Object Notation, here's an example of a simple map where all the values are just strings: { "zzzzz" : "woot", "xyz" : "hello", "aaaab" : "world", "1" : "x", "aaaaa" : "y" } persistent Persistence merely means that the data you put in this special map "persists" after the program that created or accessed it is finished. This is no different in concept than any other kind of persistent storage such as a file on a filesystem. Moving along... distributed Hbase and BigTable are built upon distributed filesystems so that the underlying file storage can be spread out among an array of independent machines. Hbase sits atop either Hadoop's Distributed File System (HDFS) or Amazon's Simple Storage Service (S3), while a BigTable makes use of the Google File System (GFS). Data is replicated across a number of participating nodes in an analogous manner to how data is striped across discs in a RAID system. For the purpose of this article, we don't really care which distributed filesystem implementation is being used. The important thing to understand is that it is distributed, which provides a layer of protection against, say, a node within the cluster failing. sorted Unlike most map implementations, in Hbase/BigTable the key/value pairs are kept in strict alphabetical order. That is to say that the row for the key "aaaaa" should be right next to the row with key "aaaab" and very far from the row with key "zzzzz". Continuing our JSON example, the sorted version looks like this: { "1" : "x", "aaaaa" : "y", "aaaab" : "world", "xyz" : "hello", "zzzzz" : "woot" } Because these systems tend to be so huge and distributed, this sorting feature is actually very important. The spacial propinquity of rows with like keys ensures that when you must scan the table, the items of greatest interest to you are near each other. This is important when choosing a row key convention. For example, consider a table whose keys are domain names. It makes the most sense to list them in reverse notation (so "com.jimbojw.www" rather than "www.jimbojw.com") so that rows about a subdomain will be near the parent domain row. Continuing the domain example, the row for the domain "mail.jimbojw.com" would be right next to the row for "www.jimbojw.com" rather than say "mail.xyz.com" which would happen if the keys were regular domain notation. It's important to note that the term "sorted" when applied to Hbase/BigTable does not mean that "values" are sorted. There is no automatic indexing of anything other than the keys, just as it would be in a plain-old map implementation. multidimensional Up to this point, we haven't mentioned any concept of "columns", treating the "table" instead as a regular-old hash/map in concept. This is entirely intentional. The word "column" is another loaded word like "table" and "base" which carries the emotional baggage of years of RDBMS experience. Instead, I find it easier to think about this like a multidimensional map - a map of maps if you will. Adding one dimension to our running JSON example gives us this: { "1" : { "A" : "x", "B" : "z" }, "aaaaa" : { "A" : "y", "B" : "w" }, "aaaab" : { "A" : "world", "B" : "ocean" }, "xyz" : { "A" : "hello", "B" : "there" }, "zzzzz" : { "A" : "woot", "B" : "1337" } } In the above example, you'll notice now that each key points to a map with exactly two keys: "A" and "B". From here forward, we'll refer to the top-level key/map pair as a "row". Also, in BigTable/Hbase nomenclature, the "A" and "B" mappings would be called "Column Families". A table's column families are specified when the table is created, and are difficult or impossible to modify later. It can also be expensive to add new column families, so it's a good idea to specify all the ones you'll need up front. Fortunately, a column family may have any number of columns, denoted by a column "qualifier" or "label". Here's a subset of our JSON example again, this time with the column qualifier dimension built in: { // ... "aaaaa" : { "A" : { "foo" : "y", "bar" : "d" }, "B" : { "" : "w" } }, "aaaab" : { "A" : { "foo" : "world", "bar" : "domination" }, "B" : { "" : "ocean" } }, // ... } Notice that in the two rows shown, the "A" column family has two columns: "foo" and "bar", and the "B" column family has just one column whose qualifier is the empty string (""). When asking Hbase/BigTable for data, you must provide the full column name in the form ":". So for example, both rows in the above example have three columns: "A:foo", "A:bar" and "B:". Note that although the column families are static, the columns themselves are not. Consider this expanded row: { // ... "zzzzz" : { "A" : { "catch_phrase" : "woot", } } } In this case, the "zzzzz" row has exactly one column, "A:catch_phrase". Because each row may have any number of different columns, there's no built-in way to query for a list of all columns in all rows. To get that information, you'd have to do a full table scan. You can however query for a list of all column families since these are immutable (more-or-less). The final dimension represented in Hbase/BigTable is time. All data is versioned either using an integer timestamp (seconds since the epoch), or another integer of your choice. The client may specify the timestamp when inserting data. Consider this updated example utilizing arbitrary integral timestamps: { // ... "aaaaa" : { "A" : { "foo" : { 15 : "y", 4 : "m" }, "bar" : { 15 : "d", } }, "B" : { "" : { 6 : "w" 3 : "o" 1 : "w" } } }, // ... } Each column family may have its own rules regarding how many versions of a given cell to keep (a cell is identified by its rowkey/column pair) In most cases, applications will simply ask for a given cell's data, without specifying a timestamp. In that common case, Hbase/BigTable will return the most recent version (the one with the highest timestamp) since it stores these in reverse chronological order. If an application asks for a given row at a given timestamp, Hbase will return cell data where the timestamp is less than or equal to the one provided. Using our imaginary Hbase table, querying for the row/column of "aaaaa"/"A:foo" will return "y" while querying for the row/column/timestamp of "aaaaa"/"A:foo"/10 will return "m". Querying for a row/column/timestamp of "aaaaa"/"A:foo"/2 will return a null result. sparse The last keyword is sparse. As already mentioned, a given row can have any number of columns in each column family, or none at all. The other type of sparseness is row-based gaps, which merely means that there may be gaps between keys. This, of course, makes perfect sense if you've been thinking about Hbase/BigTable in the map-based terms of this article rather than perceived similar concepts in RDBMS's. And that's about it Well, I hope that helps you understand conceptually what the Hbase data model feels like. As always, I look forward to your thoughts, comments and suggestions.
May 22, 2008
by Jim Wilson
· 84,435 Views · 5 Likes
  • Previous
  • ...
  • 147
  • 148
  • 149
  • 150
  • 151
  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook
×