DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

The Latest Big Data Topics

article thumbnail
How to Migrate from MySQL to MongoDB
In the last week I was working on a key project to migrate a BI platform from MySQL to MongoDB. We chose that database due to its support and scalability.
April 14, 2014
by Moshe Kaplan
· 115,867 Views · 6 Likes
article thumbnail
Running Hadoop MapReduce Application from Eclipse Kepler
it's very important to learn hadoop by practice. one of the learning curves is how to write the first map reduce app and debug in favorite ide, eclipse. do we need any eclipse plugins? no, we do not. we can do hadoop development without map reduce plugins this tutorial will show you how to set up eclipse and run your map reduce project and mapreduce job right from your ide. before you read further, you should have setup hadoop single node cluster and your machine. you can download the eclipse project from github . use case: we will explore the weather data to find maximum temperature from tom white’s book hadoop: definitive guide (3rd edition) chapter 2 and run it using toolrunner i am using linux mint 15 on virtualbox vm instance. in addition, you should have hadoop (mrv1 am using 1.2.1) single node cluster installed and running, if you have not done so, would strongly recommend you do it from here download eclipse ide, as of writing this, latest version of eclipse is kepler 1. create new java project 2. add dependencies jars right click on project properties and select java build path add all jars from $hadoop_home/lib and $hadoop_home (where hadoop core and tools jar lives) 3. create mapper package com.letsdobigdata; import java.io.ioexception; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.mapper; public class maxtemperaturemapper extends mapper { private static final int missing = 9999; @override public void map(longwritable key, text value, context context) throws ioexception, interruptedexception { string line = value.tostring(); string year = line.substring(15, 19); int airtemperature; if (line.charat(87) == '+') { // parseint doesn't like leading plus // signs airtemperature = integer.parseint(line.substring(88, 92)); } else { airtemperature = integer.parseint(line.substring(87, 92)); } string quality = line.substring(92, 93); if (airtemperature != missing && quality.matches("[01459]")) { context.write(new text(year), new intwritable(airtemperature)); } } } 4. create reducer package com.letsdobigdata; import java.io.ioexception; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.reducer; public class maxtemperaturereducer extends reducer { @override public void reduce(text key, iterable values, context context) throws ioexception, interruptedexception { int maxvalue = integer.min_value; for (intwritable value : values) { maxvalue = math.max(maxvalue, value.get()); } context.write(key, new intwritable(maxvalue)); } } 5. create driver for mapreduce job map reduce job is executed by useful hadoop utility class toolrunner package com.letsdobigdata; import org.apache.hadoop.conf.configured; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; /*this class is responsible for running map reduce job*/ public class maxtemperaturedriver extends configured implements tool{ public int run(string[] args) throws exception { if(args.length !=2) { system.err.println("usage: maxtemperaturedriver "); system.exit(-1); } job job = new job(); job.setjarbyclass(maxtemperaturedriver.class); job.setjobname("max temperature"); fileinputformat.addinputpath(job, new path(args[0])); fileoutputformat.setoutputpath(job,new path(args[1])); job.setmapperclass(maxtemperaturemapper.class); job.setreducerclass(maxtemperaturereducer.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); system.exit(job.waitforcompletion(true) ? 0:1); boolean success = job.waitforcompletion(true); return success ? 0 : 1; } public static void main(string[] args) throws exception { maxtemperaturedriver driver = new maxtemperaturedriver(); int exitcode = toolrunner.run(driver, args); system.exit(exitcode); } } 6. supply input and output we need to supply input file that will be used during map phase and the final output will be generated in output directory by reduct task. edit run configuration and supply command line arguments. sample.txt reside in the project root. your project explorer should contain following ] 7. map reduce job execution 8. final output if you managed to come this far, once the job is complete, it will create output directory with _success and part_nnnnn , double click to view it in eclipse editor and you will see we have supplied 5 rows of weather data (downloaded from ncdc weather) and we wanted to find out the maximum temperature in a given year from input file and the output will contain 2 rows with max temperature in (centigrade) for each supplied year 1949 111 (11.1 c) 1950 22 (2.2 c) make sure you delete the output directory next time running your application else you will get an error from hadoop saying directory already exists. happy hadooping!
February 21, 2014
by Hardik Pandya
· 144,634 Views · 2 Likes
article thumbnail
Big Data Search, Part 6: Sorting Randomness
As it turns out, doing work on big data sets is quite hard. To start with, you need to get the data, and it is… well, big. So that takes a while. Instead, I decided to test my theory on the following scenario. Given 4 GB of random numbers, let us find how many times we have the number 1. Because I wanted to ensure a consistent answer, I wrote: public static IEnumerable RandomNumbers() { const long count = 1024 * 1024 * 1024L * 1; var random = new MyRand(); for (long i = 0; i < count; i++) { if (i % 1024 == 0) { yield return 1; continue; } var result = random.NextUInt(); while (result == 1) { result = random.NextUInt(); } yield return result; } } /// /// Based on Marsaglia, George. (2003). Xorshift RNGs. /// http://www.jstatsoft.org/v08/i14/paper /// public class MyRand { const uint Y = 842502087, Z = 3579807591, W = 273326509; uint _x, _y, _z, _w; public MyRand() { _y = Y; _z = Z; _w = W; _x = 1337; } public uint NextUInt() { uint t = _x ^ (_x << 11); _x = _y; _y = _z; _z = _w; return _w = (_w ^ (_w >> 19)) ^ (t ^ (t >> 8)); } } I am using a custom Rand function because it is significantly faster than System.Random. This generate 4GB of random numbers, at also ensure that we get exactly 1,048,576 instances of 1. Generating this in an empty loop takes about 30 seconds on my machine. For fun, I run the external sort routine in 32 bits mode, with a buffer of 256MB. It is currently processing things, but I expect it to take a while. Because the buffer is 256 in size, we flush it every 128 MB (while we still have half the buffer free to do more work). The interesting thing is that even though we generate random number, sorting then compressing the values resulted in about 60% compression rate. The problem is that for this particular case, I am not sure if that is a good thing. Because the values are random, we need to select a pretty high degree of compression just to get a good compression rate. And because of that, a significant amount of time is spent just compressing the data. I am pretty sure that for real world scenario, it would be better, but that is something that we’ll probably need to test. Not compressing the data in the random test is a huge help. Next, external sort is pretty dependent on the performance of… sort, of course. And sort isn’t that fast. In this scenario, we are sorting arrays of about 26 million items. And that takes time. Implementing parallel sort cut this down to less than a minute per batch of 26 million. That let us complete the entire process, but then it halts with the merge. The reason for that is that we push all the values into a heap, and there are 1 billion of them. Now, the heap never exceed 40 items, but those are still 1 billion * O(log 40) or about 5.4 billion comparisons that we have to do, and we do this sequentially, which takes time. I tried thinking about ways to parallel, but I am not sure how that can be done. We have 40 sorted files, and we want to merge all of them. Obviously we can sort each 10 files set in parallel, then sort the resulting 4, but the cost we have now is the actual sorting cost, not I/O. I am not sure how to approach this. For what is it worth, you can find the code for this here.
February 5, 2014
by Oren Eini
· 9,048 Views
article thumbnail
Big Data Search, Part 5: Sorting Optimizations
I mentioned several times that the entire point of the exercise was to just see how this works, not to actually do anything production worthy. But it is interesting to see how we could do better here. In no particular order, I think that there are at least several things that we could do to significantly improve the time it takes to sort. Right now we defined 2 indexes on top of a 1GB file, and it took under 1 minute to complete. That gives us a runtime of about 10 days over a 15TB file. Well, one of the reason for this performance is that we execute this in a serial fashion, that is, one after another. But we have to completely isolated indexes, there is no reason why we can’t parallelize the work between them. For that matter, we are buffering in memory up to a certain point, then we sort, then we buffer some more, etc. That is pretty inefficient. We can push the actual sorting to a different thread, and continue parsing and adding to a buffer while we are adding to the buffer. We wrote to intermediary files, but we wrote to those using plain file I/O. But it is usually a lot more costly to write to disk than to compress and then write to disk. We are writing sorted data, so it is probably going to compress pretty well. Those are the things that pop to mind. Can you think of additional options?
January 27, 2014
by Oren Eini
· 7,863 Views
article thumbnail
Big Data Search, Part 4: The Index Format is Horrible
I have completed my own exercise, and while I wanted to try it with “few allocations” rule, it is interesting to see just how far out there the code is. This isn’t something that you can really use for anything except as a basis to see how badly you are doing. Let us start with the index format. It is just a CSV file with the value and the position in the original file. That means that any search we want to do on the file is actually a binary search, as discussed in the previous post. But doing a binary search like that is an absolute killer for performance. Let us consider our 15TB data set. In my tests, a 1GB file with 4.2 million rows produced roughly 80MB index. Assuming the same is true for the larger file, that gives us a 1.2 TB file. In my small index, we have to do 24 seeks to get to the right position in the file. And as you should know, disk seeks are expensive. They are in the order of 10ms or so. So the cost of actually searching the index is close to quarter of a second. Now, to be fair, there is going to be a lot of caching opportunities here, but probably not that many if we have a lot of queries to deal with ere. Of course, the fun thing about this is that even with a 1.2 TB file, we are still talking about less than 40 seeks (the beauty of O(logN) in action), but that is still pretty expensive. Even worse, this is what happens when we are running on a single query at a time. What do you think will happen if we are actually running this with multiple threads generating queries. Now we will have a lot of seeks (effective random) that would generate a big performance sink. This is especially true if we consider that any storage solution big enough to store the data is going to be composed of an aggregate of HDD disks. Sure, we get multiple spindles, so we get better performance overall, but still… Obviously, there are multiple solutions for this issue. B+Trees solve the problem by packing multiple keys into a single page, so instead of doing a O(log2N), you are usually doing O(log36N) or O(log100N). Consider those fan outs, we will have 6 – 8 seeks to do to get to our data. Much better than the 40 seeks required using plain binary search. It would actually be better than that in the common case, since the first few levels of the trees are likely to reside in memory (and probably in L1, if we are speaking about that). However, given that we are storing sorted strings here, one must give some attention to Sorted Strings Tables. The way those work, you have the sorted strings in the file, and the footer contains two important bits of information. The first is the bloom filter, which allows you to quickly rule out missing values, but the more important factor is that it also contains the positions of (by default) every 16th entry to the file. This means that in our 15 TB data file (with 64.5 billion entries), we will use about 15GB just to store pointers to the different locations in the index file (which will be about 1.2 TB). Note that the numbers actually are probably worse. Because SST (note that when talking about SST I am talking specifically about the leveldb implementation) utilize many forms of compression, it is actually that the file size will be smaller (although, since the “value” we use is just a byte position in the data file, we won’t benefit from compression there). Key compression is probably a lot more important here. However, note that this is a pretty poor way of doing things. Sure, the actual data format is better, in the sense that we don’t store as much, but in terms of the number of operations required? Not so much. We still need to do a binary search over the entire file. In particular, the leveldb implementation utilizes memory mapped files. What this ends up doing is rely on the OS to keep the midway points in the file in RAM, so we don’t have to do so much seeking. Without that, the cost of actually seeking every time would make SSTs impractical. In fact, you would pretty much have to introduce another layer on top of this, but at that point, you are basically doing trees, and a binary tree is a better friend here. This leads to an interesting question. SST is probably so popular inside Google because they deal with a lot of data, and the file format is very friendly to compression of various kinds. It is also a pretty simple format. That make it much nicer to work with. On the other hand, a B+Tree implementation is a lot more complex, and it would probably several orders of magnitude more complex if it had to try to do the same compression tricks that SSTs do. Another factor that is probably as important is that as I understand it, a lot of the time, SSTs are usually used for actual sequential access (map/reduce stuff) and not necessarily for the random reads that are done in leveldb. It is interesting to think about this in this fashion, at least, even if I don’t know what I’ll be doing with it.
January 24, 2014
by Oren Eini
· 12,094 Views
article thumbnail
How to Set Up a Multi-Node Hadoop Cluster on Amazon EC2, Part 1
Learn how to set up a four node Hadoop cluster using AWS EC2, PuTTy(gen), and WinSCP.
January 23, 2014
by Hardik Pandya
· 135,885 Views · 3 Likes
article thumbnail
Big Data Search, Part 3: Binary Search of Textual Data
The index I created for the exercise is just a text file, sorted by the indexed key. When doing a search by a human, that makes it very easy to work with. Much easier than trying to work with a binary file, it also helps debugging. However, it does make it running a binary search on the data a bit harder. Mostly because there isn’t a nice way to say “give me the #th line”. Instead, I wrote the following: public void SetPositionToLineAt(long position) { // now we need to go back until we either get to the start of the file // or find a \n character const int bufferSize = 128; _buffer.Capacity = Math.Max(bufferSize, _buffer.Capacity); var charCount = _encoding.GetMaxCharCount(bufferSize); if (charCount > _charBuf.Length) _charBuf = new char[Utils.NearestPowerOfTwo(charCount)]; while (true) { _input.Position = position - (position < bufferSize ? 0 : bufferSize); var read = ReadToBuffer(bufferSize); var buffer = _buffer.GetBuffer(); var chars = _encoding.GetChars(buffer, 0, read, _charBuf, 0); for (int i = chars - 1; i >= 0; i--) { if (_charBuf[i] == '\n') { _input.Position = position - (bufferSize - i) + 1; return; } } position -= bufferSize; if (position < 0) { _input.Position = 0; return; } } } This code starts at an arbitrary byte position, and go backward until it find the new line character ‘\n’. This give me the ability to go to a rough location and get the line oriented input. Once I have that, the rest is pretty easy. Here is the binary search: while (lo <= hi) { position = (lo + hi) / 2; _reader.SetPositionToLineAt(position); bool? result; do { result = _reader.ReadOneLine(); } while (result == null); // skip empty lines if (result == false) yield break; // couldn't find anything var entry = _reader.Current.Values[0]; match = Utils.CompareArraySegments(expectedIndexEntry, entry); if (match == 0) { break; } if (match > 0) lo = position + _reader.Current.Values.Sum(x => x.Count) + 1; else hi = position - 1; } if (match != 0) { // no match yield break; } The idea is that this positions us on the location of the index that has an entry with a value that is equal to what we are searched on. We then write the following to actually get the data from the actual data file: // we have a match, now we need to return all the matches _reader.SetPositionToLineAt(position); while(true) { bool? result; do { result = _reader.ReadOneLine(); } while (result == null); // skip empty lines if(result == false) yield break; // end of file var entry = _reader.Current.Values[0]; match = Utils.CompareArraySegments(expectedIndexEntry, entry); if (match != 0) yield break; // out of the valid range we need _buffer.SetLength(0); _data.Position = Utils.ToInt64(_reader.Current.Values[1]); while (true) { var b = _data.ReadByte(); if (b == -1) break; if (b == '\n') { break; } _buffer.WriteByte((byte)b); } yield return _encoding.GetString(_buffer.GetBuffer(), 0, (int)_buffer.Length); } As you can see, we are moving forward in the index file, reading one line at a time. Then we take the second value, the position of the relevant line in the data file, and read that. We continue to do so as long as the indexed value is the same. Pretty simple, all told. But it comes with its own set of problems. I’ll discuss that in my next post.
January 22, 2014
by Oren Eini
· 5,877 Views
article thumbnail
Big Data Search, Part 2: Setting Up
the interesting thing about this problem is that i was very careful in how i phrased things. i said what i wanted to happen, but didn’t specify what needs to be done. that was quite intentional. for that matter, the fact that i am posting about what is going to be our acceptance criteria is also intentional. the idea is to have a non trivial task, but something that should be very well understood and easy to research. it also means that the candidate needs to be able to write some non trivial code. and i can tell a lot about a dev from such a project. at the same time, this is a very self contained scenario. the idea is that this is something that you can do in a short amount of time. the reason that this is an interesting exercise is that this is actually at least two totally different but related problems. first, in a 15tb file, we obviously cannot rely on just scanning the entire file. that means that we have to have an index. and that means we have to build it. interestingly enough, an index being a sorted structure, that means that we have to solve the problem of sorting more data than can fit in main memory. the second problem is probably easier, since it is just an implementation of external sort, and there are plenty of algorithms around to handle that. note that i am not really interested in actual efficiencies for this particular scenario. i care about being able to see the code. see that it works, etc. my solution, for example, is a single threaded system that make no attempt at parallelism or i/o optimizations. it clocks at over 1 gb / minute and the memory consumption is at under 150mb. queries for a unique value return the result in 0.0004 seconds. queries that returned 153k results completed in about 2 seconds. when increasing the used memory to about 650mb, there isn’t really any difference in performance, which surprised me a bit. then again, the entire code is probably highly inefficient. but that is good enough for now. the process is kicked off with indexing: 1: var options = new directoryexternalstorageoptions("/path/to/index/files"); 2: var input = file.openread(@"/path/to/data/crimes_-_2001_to_present.csv"); 3: var sorter = new externalsorter(input, options, new int[] 4: { 5: 1,// case number 6: 4, // ichr 7: 8: }); 9: 10: sorter.sort(); i am actually using the chicago crime data for this. this is a 1gb file that i downloaded from the chicago city portal in csv format. this is what the data looks like: the externalsorter will read and parse the file, and start reading it into a buffer. when it gets to a certain size (about 64mb of source data, usually), it will sort the values in memory and output them into temporary files. those file looks like this: initially, i tried to do that with binary data, but it turns out that that was too complex to be easy, and writing this in a human readable format made it much easier to work with. the format is pretty simple, you have the value of the left, and on the right you have start position of the row for this value. we generate about 17 such temporary files for the 1gb file. one temporary file per each 64 mb of the original file. this lets us keep our actual memory consumption very low, but for larger data sets, we’ll probably want to actually do the sort every 1 gb or maybe more. our test machine has 16 gb of ram, so doing a sort and outputting a temporary file every 8 gb can be a good way to handle things. but that is beside the point. the end result is that we have multiple sorted files, but they aren’t sequential. in other words, in file #1 we have values 1,4,6,8 and in file #2 we have 1,2,6,7. we need to merge all of them together. luckily, this is easy enough to do. we basically have a heap that we feed entries from the files into. and that pretty much takes care of this. see merge sort if you want more details about this. the end result of merging all of those files is… another file, just like them, that contains all of the data sorted. then it is time to actually handle the other issue, actually searching the data. we can do that using simple binary search, with the caveat that because this is a text file, and there is no fixed size records or pages, it is actually a big hard to figure out where to start reading. in effect, what i am doing is to select an arbitrary byte position, then walk backward until i find a ‘\n’. once i found the new line character, i can read the full line, check the value, and decide where i need to look next. assuming that i actually found my value, i can now go to the byte position of the value in the original file and read the original line, giving it to the user. assuming an indexing rate of 1 gb / minute a 15 tb file would take about 10 days to index. but there are ways around that as well, but i’ll touch on them in my next post. what all of this did was bring home just how much we usually don’t have to worry about such things. but i consider this research well spent, we’ll be using this in the future.
January 21, 2014
by Oren Eini
· 3,444 Views
article thumbnail
Top Posts of 2013: Google's Big Data Papers
I’ll review Google’s most important Big Data publications and discuss where they are (as far as they’ve disclosed).
December 30, 2013
by Mikio Braun
· 117,023 Views
article thumbnail
Logging, Processing and Monitoring Data using Talend, ElasticSearch, Logstash and Kibana
Your mission-critical projects need complex event processing, realtime management and monitoring. Talend 5.4 (released in December 2013, https://www.talend.com) offers a great new feature: Talend Event Logging. It allows logging, processing and monitoring of all technical events and business data. In this article, I will focus on how to process, filter and monitor business data. You can find more details about monitoring technical events and logs (e.g. OSGi events of the ESB container) in Talend’s documentation (www.help.talend.com). This new feature is very powerful, but also extendable to fit custom requirements. You can solve and monitor much more complex scenarios than the one I describe here. Talend Event Logging with Logstash, ElasticSearch and Kibana First, let’s take a look at the components / projects which are integrated and extended into Talend’s products for implementing this new feature. logstash (http://logstash.net) is a tool for managing events and logs. You can use it to collect different (distributed) logs, parse them, and store them for later use. Speaking of searching, logstash comes with a simple, but fine web interface for searching and drilling into all of your logs. It uses ElasticSearch under the hood. So, you can easily query through all your logs for specific errors or business analytics (e.g. searching for all lines matching an unique order id). Additional to the pure collection of events, the Event Logging feature supports custom processing (e.g. custom filtering, customer data enrichment/reduction), aggregation, signing and also server side custom pre- and post-processing of events - e.g. to send them to an intrusion detection system or to any other kind of potential higher level log processing /management system. Kibana (http://www.elasticsearch.org/overview/kibana) is a browser based analytics and search interface to logstash and other timestamped data sets stored in ElasticSearch. Kibana strives to be easy to get started with, while also being flexible and powerful, just like logstash and ElasticSearch. Main difference to logstash is a much more powerful HTML5 based web interface. You can • use multiple concurrent search inputs • highlight to drill down bar charts • create line charts, stacked, unstacked, filled or unfilled, with or without points • create Pie and donut charts that compare top terms or the results of multiple queries • create custom dashboards with multiple charts • and much more… Therefore, logstash, Elasticsearch and Kibana are a perfect combination. You can use Kibana to analyze and monitor your data as you do with logstash, however, Kibana’s web interface is much more powerful and comfortable than logstash. There is a great book about logstash: “The logstash book” – for just 9.99 USD. I can really recommend this book for getting started: http://www.logstashbook.com/. For Elasticsearch, you can find several books on Amazon. Unfortunately, Kibana has no good and extensive documentation yet. I heard from its developers that this topic is addressed for Q1 2014. Integration of Event Logging and Monitoring into Talend’s Unified Platform Talend 5.4 has integrated logstash and Kibana into its Unified Platform. Talend Administration Center (TAC) is Talend’s central web application for management and monitoring. It got a new logging view: Here, you can use very flexible and powerful realtime search capabilities of Elasticsearch. Some dashboards are available by default. You can also create your own custom dashboards easily within this site thanks to Kibana. Many panel types are available, such as pie, histogram, table, hits or trends. You can analyze every technical event or business data down to the message level: By default, you see fields such as message, source, timestamp, type, and others. Of course, you can also add custom fields suitable for your business case. This way, you have a central monitoring capability which allows analyzing data on distributed clusters easily. Under the hood, this data comes from logstash. Many alternative inputs are available for logstash, such as log4j input or tcp input. For business data, I often use file input (http://logstash.net/docs/1.2.2/inputs/file) to analyze files such as CSV. Adding new inputs is very simple. You just have to add an input to your logstash configuration file of your logstash server. As I mentioned already, all this is integrated into the Talend’s Unified Platform: In this example, there are three log4j inputs and one file input. I use the file input to analyze text files in a specific directory. In this case, output is an embedded Elasticsearch instance. In production, you should use an external Elasticsearch cluster, of course. Processing such as filtering can also be configured in this file. Thus, you can process, analyze and monitor all your different inputs within one central monitoring application thanks to Kibana. Building Talend Integration Jobs, Routes and Web Services As mentioned before, you can analyze almost all data with logstash, Elasticsearch and Kibana. It does not matter if your input is technical events from a container (e.g. OSGi events) or any business data such as CSV files, log4j logs, or something else. Talend implicitly supports technical events which are created by the ESB container, by MDM, etc. However, you can also add your custom business data from your Talend jobs (integration perspective), SOAP / REST Web Services (integration perspective) or Talend routes (mediation perspective), easily: This is just one example (part of Talend’s DI demos which are included in every DI installation). The job generates some random data and stores it to a CSV file. You just have to add the configured file or directory of tFileOutputDelimited to your logstash configuration using file input (with wild cards for more complex scenarios). That’s it. You can now monitor and analyze the business data in realtime. This example showed a Talend DI Job (i.e. ETL job). However, you can also monitor your business data from SOAP / REST Web Services or Talend Routes the same way. Conclusion Your mission-critical projects need management and monitoring. Today, this is not just possible with complex and expensive tools of large vendors, but also with Talend’s Unified Platform products such as Talend ESB or Talend MDM. Under the hood, Talend integrates and extends widely used open source products: logstash, Elasticsearch and Kibana. Have fun with Talend 5.4’s new event logging and monitoring features… Best regards, Kai Wähner (@KaiWaehner) CONTENT FROM MY BLOG: http://www.kai-waehner.de/blog/2013/12/17/realtime-event-logging-complex-event-processing-cep-and-monitoring-with-talends-unified-platform-5-4-di-esb-dq-mdm-bpm-using-elasticsearch-logstash-and-kibana/
December 19, 2013
by Kai Wähner DZone Core CORE
· 31,468 Views
article thumbnail
Handling Big Data with HBase Part 4: The Java API
Editor's note: Be sure to check out part 2 as well. This is the fourth of an introductory series of blogs on Apache HBase. In the third part, we saw a high level view of HBase architecture . In this part, we'll use the HBase Java API to create tables, insert new data, and retrieve data by row key. We'll also see how to setup a basic table scan which restricts the columns retrieved and also uses a filter to page the results. Having just learned about HBase high-level architecture, now let's look at the Java client API since it is the way your applications interact with HBase. As mentioned earlier you can also interact with HBase via several flavors of RPC technologies like Apache Thrift plus a REST gateway, but we're going to concentrate on the native Java API. The client APIs provide both DDL (data definition language) and DML (data manipulation language) semantics very much like what you find in SQL for relational databases. Suppose we are going to store information about people in HBase, and we want to start by creating a new table. The following listing shows how to create a new table using the HBaseAdmin class. Configuration conf = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(conf); HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf("people")); tableDescriptor.addFamily(new HColumnDescriptor("personal")); tableDescriptor.addFamily(new HColumnDescriptor("contactinfo")); tableDescriptor.addFamily(new HColumnDescriptor("creditcard")); admin.createTable(tableDescriptor); The people table defined in preceding listing contains three column families: personal, contactinfo, and creditcard. To create a table you create an HTableDescriptor and add one or more column families by adding HColumnDescriptor objects. You then call createTable to create the table. Now we have a table, so let's add some data. The next listing shows how to use the Put class to insert data on John Doe, specifically his name and email address (omitting proper error handling for brevity). Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "people"); Put put = new Put(Bytes.toBytes("doe-john-m-12345")); put.add(Bytes.toBytes("personal"), Bytes.toBytes("givenName"), Bytes.toBytes("John")); put.add(Bytes.toBytes("personal"), Bytes.toBytes("mi"), Bytes.toBytes("M")); put.add(Bytes.toBytes("personal"), Bytes.toBytes("surame"), Bytes.toBytes("Doe")); put.add(Bytes.toBytes("contactinfo"), Bytes.toBytes("email"), Bytes.toBytes("[email protected]")); table.put(put); table.flushCommits(); table.close(); In the above listing we instantiate a Put providing the unique row key to the constructor. We then add values, which must include the column family, column qualifier, and the value all as byte arrays. As you probably noticed, the HBase API's utility Bytes class is used a lot; it provides methods to convert to and from byte[] for primitive types and strings. (Adding a static import for the toBytes() method would cut out a lot of boilerplate code.) We then put the data into the table, flush the commits to ensure locally buffered changes take effect, and finally close the table. Updating data is also done via the Put class in exactly the same manner as just shown in the prior listing. Unlike relational databases in which updates must update entire rows even if only one column changed, if you only need to update a single column then that's all you specify in the Put and HBase will only update that column. There is also a checkAndPut operation which is essentially a form of optimistic concurrency control - the operation will only put the new data if the current values are what the client says they should be. Retrieving the row we just created is accomplished using the Get class, as shown in the next listing. (From this point forward, listings will omit the boilerplate code to create a configuration, instantiate the HTable, and the flush and close calls.) Get get = new Get(Bytes.toBytes("doe-john-m-12345")); get.addFamily(Bytes.toBytes("personal")); get.setMaxVersions(3); Result result = table.get(get); The code in the previous listing instantiates a Get instance supplying the row key we want to find. Next we use addFamily to instruct HBase that we only need data from the personal column family, which also cuts down the amount of work HBase must do when reading information from disk. We also specify that we'd like up to three versions of each column in our result, perhaps so we can list historical values of each column. Finally, calling get returns a Result instance which can then be used to inspect all the column values returned. In many cases you need to find more than one row. HBase lets you do this by scanning rows, as shown in the second part which showed using a scan in the HBase shell session. The corresponding class is the Scan class. You can specify various options, such as the start and ending row key to scan, which columns and column families to include and the maximum versions to retrieve. You can also add filters, which allow you to implement custom filtering logic to further restrict which rows and columns are returned. A common use case for filters is pagination. For example, we might want to scan through all people whose last name is Smith one page (e.g. 25 people) at a time. The next listing shows how to perform a basic scan. Scan scan = new Scan(Bytes.toBytes("smith-")); scan.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("givenName")); scan.addColumn(Bytes.toBytes("contactinfo"), Bytes.toBytes("email")); scan.setFilter(new PageFilter(25)); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { // ... } In the above listing we create a new Scan that starts from the row key smith- and we then use addColumn to restrict the columns returned (thus reducing the amount of disk transfer HBase must perform) to personal:givenName and contactinfo:email. A PageFilter is set on the scan to limit the number of rows scanned to 25. (An alternative to using the page filter would be to specify a stop row key when constructing the Scan.) We then get a ResultScanner for the Scan just created, and loop through the results performing whatever actions are necessary. Since the only method in HBase to retrieve multiple rows of data is scanning by sorted row keys, how you design the row key values is very important. We'll come back to this topic later. You can also delete data in HBase using the Delete class, analogous to the Put class to delete all columns in a row (thus deleting the row itself), delete column families, delete columns, or some combination of those. Connection Handling In the above examples not much attention was paid to connection handling and RPCs (remote procedure calls). HBase provides the HConnection class which provides functionality similar to connection pool classes to share connections, for example you use the getTable() method to get a reference to an HTable instance. There is also an HConnectionManager class which is how you get instances of HConnection. Similar to avoiding network round trips in web applications, effectively managing the number of RPCs and amount of data returned when using HBase is important, and something to consider when writing HBase applications. Conclusion to Part 4 In this part we used the HBase Java API to create a people table, insert a new person, and find the newly inserted person information. We also used the Scan class to scan the people table for people with last name "Smith" and showed how to restrict the data retrieved and finally how to use a filter to limit the number of results. In the next part, we'll learn how to deal with the absence of SQL and relations when modeling schemas in HBase. References HBase web site, http://hbase.apache.org/ HBase wiki, http://wiki.apache.org/hadoop/Hbase HBase Reference Guide http://hbase.apache.org/book/book.html HBase: The Definitive Guide, http://bit.ly/hbase-definitive-guide Google Bigtable Paper, http://labs.google.com/papers/bigtable.html Hadoop web site, http://hadoop.apache.org/ Hadoop: The Definitive Guide, http://bit.ly/hadoop-definitive-guide Fallacies of Distributed Computing, http://en.wikipedia.org/wiki/Fallacies_of_Distributed_Computing HBase lightning talk slides, http://www.slideshare.net/scottleber/hbase-lightningtalk Sample code, https://github.com/sleberknight/basic-hbase-examples
December 18, 2013
by Scott Leberknight
· 56,783 Views · 3 Likes
article thumbnail
Database vs. Data Science
One thing that Big Data certainly made happen is that it brought the database/infrastructure community and the data analysis/statistics/machine learning communities closer together. As always, each community had it’s own set of models, methods, and ideas about how to structure and interpret the world. You can still tell these differences when looking at current Big Data projects, and I think it’s important to be aware of the distinctions in order to better understand the relationships between different projects. Because, let’s face it, every project claims to re-invent Big Data. Hadoop and MapReduce being something like the founding fathers of Big Data, other’s projects have since appeared. Most notably, there are stream processing projects like Twitter’s Storm who move from batch-oriented processing to event-based processing which is more suited for real-time, low-latency processing. Spark is yet something different, a bit like Hadoop, but puts greater emphasis on iterative algorithms, and in-memory processing to achieve that landmark “100x faster than Hadoop” every current project seems to need to sport. Twitter’s summingbird project tries to bridge the gap between MapReduce and stream processing by providing us with a high-level set of operators which can then either run on MapReduce or Storm. However, both Spark or summingbird leave me sort of flat because you can see that they come from a database background, which means that there will still be a considerable gap to serious machine learning. So, what exactly is the difference? In the end, it’s the difference between relational and linear algebra. In the database world, you model relationships between objects, which you encode in tables, and foreign keys to link up entries between different tables. Probably the most important insight of the database world was to develop a query language, a declarative description of what you want to extract from your database, leaving the optimization of the query and the exact details of how to perform them efficiently to the database guys. The machine learning community, on the other hand, has its roots in linear algebra and probability theory. Objects are usually encoded as a feature vector, that is, a list of numbers describing different properties of an object. Data is often collected in matrices where each row corresponds to an object, and each column to a feature, not much unlike a table in a database. However, the operations you perform in order to do data analysis are quite different from the data base world. Take something as basic as linear regression: your try to learn a linear function f(x)=di=1wixi in a d-dimensional space (that is, where your objects are described by a d-dimensional vector) given n examples Xi, and Yi, where Xi are the features describing your objects and Yi is the real number you attach to Xi. One way to “learn” w is to tune it such that the quadratic error on the training examples is minimal. The solution can be written in closed form as w=(XXT)−1XY where X is the matrix built from the Xi (putting the Xi in the columns of X), and Y is the vector of outputs Yi. In order to solve this, you need to solve the linear equation (XXT)w=XY which can be done by one of a large number of algorithms, starting with Gaussian elimination, which you’ve probably learned in your undergrad studies, or the conjugate gradient algorithm, or by first computing a Cholesky decomposition. All of these algorithms have in common that they are iterative. They go through a number of operations, for example O(d3) for the Gaussian elimination case. They also need to store intermediate results. Gaussian elimination and Cholesky decomposition have rather elementary operations acting on individual entries, while the conjugate gradient algorithm performs a matrix-vector multiplication in each iteration. Most importantly, these algorithms can only be expressed very badly in SQL! It’s certainly not impossible, but you’d need to store your data in much different ways than you would in idiomatic database usage. So, it’s not about whether or not your framework can support iterative algorithms without significant latency, it’s about understanding that joins, group bys, and count() won’t get you far, but you need scalar products, matrix-vector and matrix-matrix multiplications. You don’t need indices for most ML algorithms, maybe except for being able to quickly find the k-nearest neighbors, because most algorithms tend to either take in the whole data set in each iteration or otherwise stream the whole set by some model which is iteratively updated like in stochastic gradient descent. I’m not sure projects like Spark or Stratosphere have fully grasped the significance of this yet. Database infrastructure-inspired Big Data has it’s place when it comes to extracting and preprocessing data, but eventually, you move from database land to machine learning land, which invariably means linear algebra land (or probability theory land, which often also reduces to linear algebra like computations). What often happens today is that you either painstakingly have to break down your linear algebra into MapReduce jobs, or you actively look for algorithms which fit the database view better. I think we’re still at the beginning of what is possible. Or, to be a bit more aggressive, claims that existing (infrastructure, database, parallelism inspired) frameworks provide you with sophistic data analytics are widely exaggerated. They take care of a very important problem by giving you a reliable infrastructure to scale your data analysis code, but there’s still a lot of work that needs to be done on your side. High-level DSLs like Apache Hive or Pig are a first step in this direction but still too much rooted in the database world IMHO. In summary, one should be aware of the difference between a framework which mostly is concerned with scaling and a tool which actually provides some piece of data analysis. And even if it comes with basic database-like analytics mechanisms, there is still a long way to go to do some serious data science. That’s why we’re also thinking that streamdrill occupies an interesting spot, because it is a bit of infrastructure, allowing you to process a serious amount of event data, but it also provides valuable analysis based on algorithms you wouldn’t want to implement yourself, even if you had some Big Data framework like Hadoop at hand. That’s an interesting direction I also would like to see more of in the future. Note: Just saw that Spark has a logistic regression example on their landing page. Well, doing matrix operations explicitly via map() on collections doesn’t count in my view ;)
October 18, 2013
by Mikio Braun
· 11,382 Views · 1 Like
article thumbnail
ElasticSearch: Java API
ElasticSearch provides Java API, thus it executes all operations asynchronously by using client object.
September 30, 2013
by Hüseyin Akdoğan DZone Core CORE
· 137,552 Views · 4 Likes
article thumbnail
Introduction to ElasticSearch
Learn about ElasticSearch, an open source tool developed with Java. It is a Lucene-based, scalable, full-text search engine, and a data analysis tool.
September 17, 2013
by Hüseyin Akdoğan DZone Core CORE
· 12,113 Views · 5 Likes
article thumbnail
Introducing the Spring YARN framework for Developing Apache Hadoop YARN Applications
Originally posted on the SpringSource blog by Janne Valkealahti We're super excited to let the cat out of the bag and release support for writing YARN based applications as part of the Spring for Apache Hadoop 2.0 M1 release. In this blog post I’ll introduce you to YARN, what you can do with it, and how Spring simplifies the development of YARN based applications. If you have been following the Hadoop community over the past year or two, you’ve probably seen a lot of discussions around YARN and the next version of Hadoop's MapReduce called MapReduce v2. YARN (Yet Another Resource Negotiator) is a component of the MapReduce project created to overcome some performance issues in Hadoop's original design. The fundamental idea of MapReduce v2 is to split the functionalities of the JobTracker, Resource Management and Job Scheduling/Monitoring, into separate daemons. The idea is to have a global Resource Manager (RM) and a per-application Application Master (AM). A generic diagram for YARN component dependencies can be found from YARN architecture. MapReduce Version 2 is an application running on top of YARN. It is also possible to make similar custom YARN based application which have nothing to do with MapReduce, it is simply running YARN application. However, writing a custom YARN based application is difficult. The YARN APIs are low-level infrastructure APIs and not developer APIs. Take a look at thedocumentation for writing a YARN application to get an idea of what is involved. Starting with the 2.0 version, Spring for Apache Hadoop introduces the Spring YARN sub-project to provide support for building Spring based YARN applications. This support for YARN steps in by trying to make development easier. “Spring handles the infrastructure so you can focus on your application” applies to writing Hadoop applications as well as other types of Java applications. Spring’s YARN support also makes it easier to test your YARN application. With Spring’s YARN support, you're going to use all familiar concepts of Spring Framework itself, including configuration and generally speaking what you can do in your application. At a high level, Spring YARN provides three different components, YarnClient, YarnAppmaster andYarnContainer which together can be called a Spring YARN Application. We provide default implementations for all components while still giving the end user an option to customize as much as he or she wants. Lets take a quick look at a very simplistic Spring YARN application which runs some custom code in a Hadoop cluster. The YarnClient is used to communicate with YARN's Resource Manager. This provides management actions like submitting new application instances, listing applications and killing running applications. When submitting applications from the YarnClient, the main concerns relate to how the Application Master is configured and launched. Both the YarnAppmaster andYarnContainer share the same common launch context config logic so you'll see a lot of similarities in YarnClient and YarnAppmaster configuration. Similar to how the YarnClient will define the launch context for the YarnAppmaster, the YarnAppmaster defines the launch context for the YarnContainer. The Launch context defines the commands to start the container, localized files, command line parameters, environment variables and resource limits(memory, cpu). The YarnContainer is a worker that does the heavy lifting of what a YARN application will actually do. The YarnAppmaster is communicating with YARN Resource Manager and starts and stops YarnContainers accordingly. You can create a Spring application that launches an ApplicationMaster by using the YARN XML namespace to define a Spring Application Context. Context configuration for YarnClient defines the launch context for YarnAppmaster. This includes resources and libraries needed by YarnAppmaster and its environment settings. An example of this is shown below. Note: Future releases will provide a Java based API for configuration, similar to what is done in Spring Security 3.2. The purpose of YarnAppmaster is to control the instance of a running application. TheYarnAppmaster is responsible for controlling the lifecycle of all its YarnContainers, the whole running application after the application is submitted, as well as itself. The example above is defining a context configuration for the YarnAppmaster. Similar to what we saw in YarnClient configuration, we define local resources for the YarnContainer and its environment. The classpath setting picks up hadoop jars as well as your own application jars in default locations, change the setting if you want to use non-default directories. Also within theYarnAppmaster we define components handling the container allocation and bootstrapping. Allocator component is interacting with YARN resource manager handling the resource scheduling. Runner component is responsible for bootstrapping of allocated containers. Above example defines a simple YarnContainer context configuration. To implement the functionality of the container, you implement the interface YarnContainer. The YarnContainer interface is similar to Java’s Runnable interface, its has a run() method, as well as two additional methods related to getting environment and command line information. Below is a simple hello world application that will be run inside of a YARN container: public class MyCustomYarnContainer implements YarnContainer { private static final Log log = LogFactory.getLog(MyCustomYarnContainer.class); @Override public void run() { log.info("Hello from MyCustomYarnContainer"); } @Override public void setEnvironment(Map environment) {} @Override public void setParameters(Properties parameters) {} } We just showed the configuration of a Spring YARN Application and the core application logic so what remains is how to bootstrap the application to run inside the Hadoop cluster. The utility class, CommandLineClientRunner provides this functionality. You can you use CommandLineClientRunner either manually from a command line or use it from your own code. # java -cp org.springframework.yarn.client.CommandLineClientRunner application-context.xml yarnClient -submit A Spring YARN Application is packaged into a jar file which then can be transferred into HDFS with the rest of the dependencies. A YarnClient can transfer all needed libraries into HDFS during the application submit process but generally speaking it is more advisable to do this manually in order to avoid unnecessary network I/O. Your application wont change until new version is created so it can be copied into HDFS prior the first application submit. You can i.e. use Hadoop's hdfs dfs -copyFromLocal command. Below you can see an example of a typical project setup. src/main/java/org/example/MyCustomYarnContainer.java src/main/resources/application-context.xml src/main/resources/appmaster-context.xml src/main/resources/container-context.xml As a wild guess, we'll make a bet that you have now figured out that you are not actually configuring YARN, instead you are configuring Spring Application Contexts for all three components, YarnClient, YarnAppmaster and YarnContainer. We have just scratched the surface of what we can do with Spring YARN. While we’re preparing more blog posts, go ahead and check existing samples in GitHub. Basically, to reflect the concepts we described in this blog post, see the multi-context example in our samples repository. Future blog posts will cover topics like Unit Testing and more advanced YARN application development.
September 11, 2013
by Pieter Humphrey
· 13,616 Views
article thumbnail
OpenStack Savanna: Fast Hadoop Cluster Provisioning on OpenStack
introduction openstack is one of the most popular open source cloud computing projects to provide infrastructure as a service solution. its key components are compute (nova), networking (neutron, formerly known as quantum), storage (object and block storage, swift and cinder, respectively), openstack dashboard (horizon), identity service (keystone) and image service (glance). there are other official incubated projects like metering (celiometer) and orchestration and service definition (heat). savanna is a hadoop as a service for openstack introduced by mirantis . it is still in an early phase (version .02 was released in summer 2013) and according to its roadmap version 1.0 is targeted for official openstack incubation. in principle, heat also could be used for hadoop cluster provisioning but savanna is especially tuned for providing hadoop-specific api functionality while heat is meant to be used for generic purposes. savanna architecture savanna is integrated with the core openstack components such as keystone, nova, glance, swift and horizon. it has a rest api that supports the hadoop cluster provisioning steps. savanna api is implemented as a wsgi server that, by default, listens to port 8386. in addition, savanna can also be integrated with horizon, the openstack dashboard to create a hadoop cluster from the management console. savanna also comes with a vanilla plugin that deploys a hadoop cluster image. the standard out-of-the-box vanilla plugin supports hadoop 1.1.2 version. installing savanna the simplest option to try out savanna is to use devstack in a virtual machine. i was using an ubuntu 12.04 virtual instance in my tests. in that environment we need to execute the following commands to install devstack and savanna api: $ sudo apt-get install git-core $ git clone https://github.com/openstack-dev/devstack.git $ vi localrc # edit localrc admin_password=nova mysql_password=nova rabbit_password=nova service_password=$admin_password service_token=nova # enable swift enabled_services+=,swift swift_hash=66a3d6b56c1f479c8b4e70ab5c2000f5 swift_replicas=1 swift_data_dir=$dest/data # force checkout prerequsites # force_prereq=1 # keystone is now configured by default to use pki as the token format which produces huge tokens. # set uuid as keystone token format which is much shorter and easier to work with. keystone_token_format=uuid # change the floating_range to whatever ips vm is working in. # in nat mode it is subnet vmware fusion provides, in bridged mode it is your local network. floating_range=192.168.55.224/27 # enable auto assignment of floating ips. by default savanna expects this setting to be enabled extra_opts=(auto_assign_floating_ip=true) # enable logging screen_logdir=$dest/logs/screen $ ./stack.sh # this will take a while to execute $ sudo apt-get install python-setuptools python-virtualenv python-dev $ virtualenv savanna-venv $ savanna-venv/bin/pip install savanna $ mkdir savanna-venv/etc $ cp savanna-venv/share/savanna/savanna.conf.sample savanna-venv/etc/savanna.conf # to start savanna api: $ savanna-venv/bin/python savanna-venv/bin/savanna-api --config-file savanna-venv/etc/savanna.conf to install savanna ui integrated with horizon, we need to run the following commands: $ sudo pip install savanna-dashboard $ cd /opt/stack/horizon/openstack-dashboard $ vi settings.py horizon_config = { 'dashboards': ('nova', 'syspanel', 'settings', 'savanna'), installed_apps = ( 'savannadashboard', .... $ cd /opt/stack/horizon/openstack-dashboard/local $ vi local_settings.py savanna_url = 'http://localhost:8386/v1.0' $ sudo service apache2 restart provisioning a hadoop cluster as a first step, we need to configure keystone-related environment variables to get the authentication token: ubuntu@ip-10-59-33-68:~$ vi .bashrc $ export os_auth_url=http://127.0.0.1:5000/v2.0/ $ export os_tenant_name=admin $ export os_username=admin $ export os_password=nova ubuntu@ip-10-59-33-68:~$ source .bashrc ubuntu@ip-10-59-33-68:~$ ubuntu@ip-10-59-33-68:~$ env | grep os os_password=nova os_auth_url=http://127.0.0.1:5000/v2.0/ os_username=admin os_tenant_name=admin ubuntu@ip-10-59-33-68:~$ keystone token-get +-----------+----------------------------------+ | property | value | +-----------+----------------------------------+ | expires | 2013-08-09t20:31:12z | | id | bdb582c836e3474f979c5aa8f844c000 | | tenant_id | 2f46e214984f4990b9c39d9c6222f572 | | user_id | 077311b0a8304c8e86dc0dc168a67091 | +-----------+----------------------------------+ $ export auth_token="bdb582c836e3474f979c5aa8f844c000" $ export tenant_id="2f46e214984f4990b9c39d9c6222f572" then we need to create the glance image that we want to use for our hadoop cluster. in our example we have used mirantis's vanilla image but we can also build our own image: $ wget http://savanna-files.mirantis.com/savanna-0.2-vanilla-1.1.2-ubuntu-12.10.qcow2 $ glance image-create --name=savanna-0.2-vanilla-hadoop-ubuntu.qcow2 --disk-format=qcow2 --container-format=bare < ./savanna-0.2-vanilla-1.1.2-ubuntu-12.10.qcow2 ubuntu@ip-10-59-33-68:~/devstack$ glance image-list +--------------------------------------+-----------------------------------------+-------------+------------------+-----------+--------+ | id | name | disk format | container format | size | status | +--------------------------------------+-----------------------------------------+-------------+------------------+-----------+--------+ | d0d64f5c-9c15-4e7b-ad4c-13859eafa7b8 | cirros-0.3.1-x86_64-uec | ami | ami | 25165824 | active | | fee679ee-e0c0-447e-8ebd-028050b54af9 | cirros-0.3.1-x86_64-uec-kernel | aki | aki | 4955792 | active | | 1e52089b-930a-4dfc-b707-89b568d92e7e | cirros-0.3.1-x86_64-uec-ramdisk | ari | ari | 3714968 | active | | d28051e2-9ddd-45f0-9edc-8923db46fdf9 | savanna-0.2-vanilla-hadoop-ubuntu.qcow2 | qcow2 | bare | 551699456 | active | +--------------------------------------+-----------------------------------------+-------------+------------------+-----------+--------+ $ export image_id=d28051e2-9ddd-45f0-9edc-8923db46fdf9 then we have installed httpie , an open source http client that can be used to send rest requests to savanna api: $ sudo pip install httpie from now on we will use httpie to send savanna commands. we need to register the image with savanna: $ export savanna_url="http://localhost:8386/v1.0/$tenant_id" $ http post $savanna_url/images/$image_id x-auth-token:$auth_token username=ubuntu http/1.1 202 accepted content-length: 411 content-type: application/json date: thu, 08 aug 2013 21:28:07 gmt { "image": { "os-ext-img-size:size": 551699456, "created": "2013-08-08t21:05:55z", "description": "none", "id": "d28051e2-9ddd-45f0-9edc-8923db46fdf9", "metadata": { "_savanna_description": "none", "_savanna_username": "ubuntu" }, "mindisk": 0, "minram": 0, "name": "savanna-0.2-vanilla-hadoop-ubuntu.qcow2", "progress": 100, "status": "active", "tags": [], "updated": "2013-08-08t21:28:07z", "username": "ubuntu" } } $ http $savanna_url/images/$image_id/tag x-auth-token:$auth_token tags:='["vanilla", "1.1.2", "ubuntu"]' http/1.1 202 accepted content-length: 532 content-type: application/json date: thu, 08 aug 2013 21:29:25 gmt { "image": { "os-ext-img-size:size": 551699456, "created": "2013-08-08t21:05:55z", "description": "none", "id": "d28051e2-9ddd-45f0-9edc-8923db46fdf9", "metadata": { "_savanna_description": "none", "_savanna_tag_1.1.2": "true", "_savanna_tag_ubuntu": "true", "_savanna_tag_vanilla": "true", "_savanna_username": "ubuntu" }, "mindisk": 0, "minram": 0, "name": "savanna-0.2-vanilla-hadoop-ubuntu.qcow2", "progress": 100, "status": "active", "tags": [ "vanilla", "ubuntu", "1.1.2" ], "updated": "2013-08-08t21:29:25z", "username": "ubuntu" } } then we need to create a nodegroup templates (json files) that will be sent to savanna. there is one template for the master nodes ( namenode , jobtracker ) and another template for the worker nodes such as datanode and tasktracker . the hadoop version is 1.1.2. $ vi ng_master_template_create.json { "name": "test-master-tmpl", "flavor_id": "2", "plugin_name": "vanilla", "hadoop_version": "1.1.2", "node_processes": ["jobtracker", "namenode"] } $ vi ng_worker_template_create.json { "name": "test-worker-tmpl", "flavor_id": "2", "plugin_name": "vanilla", "hadoop_version": "1.1.2", "node_processes": ["tasktracker", "datanode"] } $ http $savanna_url/node-group-templates x-auth-token:$auth_token < ng_master_template_create.json http/1.1 202 accepted content-length: 387 content-type: application/json date: thu, 08 aug 2013 21:58:00 gmt { "node_group_template": { "created": "2013-08-08t21:58:00", "flavor_id": "2", "hadoop_version": "1.1.2", "id": "b3a79c88-b6fb-43d2-9a56-310218c66f7c", "name": "test-master-tmpl", "node_configs": {}, "node_processes": [ "jobtracker", "namenode" ], "plugin_name": "vanilla", "updated": "2013-08-08t21:58:00", "volume_mount_prefix": "/volumes/disk", "volumes_per_node": 0, "volumes_size": 10 } } $ http $savanna_url/node-group-templates x-auth-token:$auth_token < ng_worker_template_create.json http/1.1 202 accepted content-length: 388 content-type: application/json date: thu, 08 aug 2013 21:59:41 gmt { "node_group_template": { "created": "2013-08-08t21:59:41", "flavor_id": "2", "hadoop_version": "1.1.2", "id": "773b2cfb-1e05-46f4-923f-13edc7d6aac6", "name": "test-worker-tmpl", "node_configs": {}, "node_processes": [ "tasktracker", "datanode" ], "plugin_name": "vanilla", "updated": "2013-08-08t21:59:41", "volume_mount_prefix": "/volumes/disk", "volumes_per_node": 0, "volumes_size": 10 } } the next step is to define the cluster template: $ vi cluster_template_create.json { "name": "demo-cluster-template", "plugin_name": "vanilla", "hadoop_version": "1.1.2", "node_groups": [ { "name": "master", "node_group_template_id": "b3a79c88-b6fb-43d2-9a56-310218c66f7c", "count": 1 }, { "name": "workers", "node_group_template_id": "773b2cfb-1e05-46f4-923f-13edc7d6aac6", "count": 2 } ] } $ http $savanna_url/cluster-templates x-auth-token:$auth_token < cluster_template_create.json http/1.1 202 accepted content-length: 815 content-type: application/json date: fri, 09 aug 2013 07:04:24 gmt { "cluster_template": { "anti_affinity": [], "cluster_configs": {}, "created": "2013-08-09t07:04:24", "hadoop_version": "1.1.2", "id": "{ "name": "cluster-1", "plugin_name": "vanilla", "hadoop_version": "1.1.2", "cluster_template_id" : "64c4117b-acee-4da7-937b-cb964f0471a9", "user_keypair_id": "stack", "default_image_id": "3f9fc974-b484-4756-82a4-bff9e116919b" }", "name": "demo-cluster-template", "node_groups": [ { "count": 1, "flavor_id": "2", "name": "master", "node_configs": {}, "node_group_template_id": "b3a79c88-b6fb-43d2-9a56-310218c66f7c", "node_processes": [ "jobtracker", "namenode" ], "volume_mount_prefix": "/volumes/disk", "volumes_per_node": 0, "volumes_size": 10 }, { "count": 2, "flavor_id": "2", "name": "workers", "node_configs": {}, "node_group_template_id": "773b2cfb-1e05-46f4-923f-13edc7d6aac6", "node_processes": [ "tasktracker", "datanode" ], "volume_mount_prefix": "/volumes/disk", "volumes_per_node": 0, "volumes_size": 10 } ], "plugin_name": "vanilla", "updated": "2013-08-09t07:04:24" } } now we are ready to create the hadoop cluster: $ vi cluster_create.json { "name": "cluster-1", "plugin_name": "vanilla", "hadoop_version": "1.1.2", "cluster_template_id" : "64c4117b-acee-4da7-937b-cb964f0471a9", "user_keypair_id": "savanna", "default_image_id": "d28051e2-9ddd-45f0-9edc-8923db46fdf9" } $ http $savanna_url/clusters x-auth-token:$auth_token < cluster_create.json http/1.1 202 accepted content-length: 1153 content-type: application/json date: fri, 09 aug 2013 07:28:14 gmt { "cluster": { "anti_affinity": [], "cluster_configs": {}, "cluster_template_id": "64c4117b-acee-4da7-937b-cb964f0471a9", "created": "2013-08-09t07:28:14", "default_image_id": "d28051e2-9ddd-45f0-9edc-8923db46fdf9", "hadoop_version": "1.1.2", "id": "d919f1db-522f-45ab-aadd-c078ba3bb4e3", "info": {}, "name": "cluster-1", "node_groups": [ { "count": 1, "created": "2013-08-09t07:28:14", "flavor_id": "2", "instances": [], "name": "master", "node_configs": {}, "node_group_template_id": "b3a79c88-b6fb-43d2-9a56-310218c66f7c", "node_processes": [ "jobtracker", "namenode" ], "updated": "2013-08-09t07:28:14", "volume_mount_prefix": "/volumes/disk", "volumes_per_node": 0, "volumes_size": 10 }, { "count": 2, "created": "2013-08-09t07:28:14", "flavor_id": "2", "instances": [], "name": "workers", "node_configs": {}, "node_group_template_id": "773b2cfb-1e05-46f4-923f-13edc7d6aac6", "node_processes": [ "tasktracker", "datanode" ], "updated": "2013-08-09t07:28:14", "volume_mount_prefix": "/volumes/disk", "volumes_per_node": 0, "volumes_size": 10 } ], "plugin_name": "vanilla", "status": "validating", "updated": "2013-08-09t07:28:14", "user_keypair_id": "savanna" } } after a while we can run the nova command to check if the instances are created and running: $ nova list +--------------------------------------+-----------------------+--------+------------+-------------+----------------------------------+ | id | name | status | task state | power state | networks | +--------------------------------------+-----------------------+--------+------------+-------------+----------------------------------+ | 1a9f43bf-cddb-4556-877b-cc993730da88 | cluster-1-master-001 | active | none | running | private=10.0.0.2, 192.168.55.227 | | bb55f881-1f96-4669-a94a-58cbf4d88f39 | cluster-1-workers-001 | active | none | running | private=10.0.0.3, 192.168.55.226 | | 012a24e2-fa33-49f3-b051-9ee2690864df | cluster-1-workers-002 | active | none | running | private=10.0.0.4, 192.168.55.225 | +--------------------------------------+-----------------------+--------+------------+-------------+----------------------------------+ now we can log in to the hadoop master instance and run the required hadoop commands: $ ssh -i savanna.pem [email protected] $ sudo chmod 777 /usr/share/hadoop $ sudo su hadoop $ cd /usr/share/hadoop $ hadoop jar hadoop-example-1.1.2.jar pi 10 100 savanna ui via horizon in order to create nodegroup templates, cluster templates and the cluster itself we used a command line tool -- httpie -- to send rest api calls. the same functionality is also available via horizon, the standard openstack dashboard. first we need to register the image with savanna: then we need to create the nodegroup templates: after that we have to create the cluster template: and finally we have to create the cluster:
August 20, 2013
by Istvan Szegedi
· 9,417 Views
article thumbnail
What Is NoSQL?
Dan McCreary and Ann Kelly, authors of 'Making Sense of NoSQL,' discuss the business drivers and motivations that make NoSQL so popular to organizations today.
August 1, 2013
by Eric Gregory
· 21,236 Views · 4 Likes
article thumbnail
Integration of Amazon Redshift Data Warehouse with Talend Data Integration
In this blog post, I will show you how to "ETL" all kinds of data to Amazon’s cloud data warehouse Redshift wit Talend’s big data components. Let’s begin with a short introduction to Amazon Redshift (copied from website): "Amazon Redshift is [part of Amazon Web Services (AWS) and] a fast and powerful, fully managed, petabyte-scale data warehouse service in the cloud. With a few clicks in the AWS Management Console, customers can launch a Redshift cluster, starting with a few hundred gigabytes and scaling to a petabyte or more, for under $1,000 per terabyte per year. Traditional data warehouses require significant time and resource to administer, especially for large datasets. In addition, the financial cost associated with building, maintaining, and growing self-managed, on-premise data warehouses is very high. Amazon Redshift not only significantly lowers the cost of a data warehouse, but also makes it easy to analyze large amounts of data very quickly.“ Sounds interesting! And indeed, we already see companies using Talend’s Redshift connectors. From Talend perspective it is not much more than just another database. If you have ever used a Talend connector, you can integrate to Redshift within some minutes. In the next sections, I will describe all necessary steps and give some hints regarding configuration issues and performance improvements. Be aware: You need Talend Open Studio for Data Integration (Apache License, open source) or any Talend Enterprise Edition / Platform which contains the Cloud components to see and use Amazon Redshift connectors. The open source edition offers all connectors and functionality to integrate with Amazon Redshift. However, Enterprise versions offer some more features (e.g. versioning), comfort (e.g. wizards) and commercial support. Setup Amazon Redshift Setup of Amazon Redshift is very easy. Just follow Amazon‘s getting started guide: http://docs.aws.amazon.com/redshift/latest/gsg/welcome.html. Like every other AWS guide, it is very easy to understand and use. Be aware, that you just have to do step 1, 2 and 3 of the getting started guide for using it with Talend. Some hints: - Step 1 („before you begin“): Just sign up. Client tools and drivers are not necessary because they are already installed within Talend Studio. - Step 2 („launch a cluster“): Yes, please start your cluster! - Step 3(„authorize access“): If you are not sure what to do here, select Connection Type = CIDR/IP. Find out your IP address (http://whatismyipaddress.com) and enter it with „/32“ at the end. Example: „192.168.1.1/32“ Now you can connect to Amazon Redshift from your Talend Studio on your local computer. Step 4 (connect) and step 5 (create table, data, queries) are not necessary, this will be done from Talend Studio. Of course, you should not forget to delete your cluster (step 7) when you are done. Otherwise, you will pay for every hour, even if you do not access your DWH. Connect to Amazon Redshift from Talend Studio Create a new connection to Amazon Redshift database as you do with every other relational database. The easiest way is to use „DB Connection Wizard“ in metadata. Just enter your connection information and check if it works. You get all information about configuration from Amazon Web Console. The connection string looks something like this: „jdbc:paraccel://talend-demo-cluster.cp8t6c5.eu-west-1.redshift.amazonaws.com:5439/dev“ Next, right click on the created connection and select „retrieve schema“. „public“ is the default schema which you (have to) use. Now, you are ready to use this connection within Talend Jobs to write to Amazon Redshift and read from it. Create Talend Jobs (Write, Read, Delete) Amazon Redshift components work like any other Talend (relational) database components. Look at www.help.talend.com for more information if you have not used them before (or just try them out, they are very self-explanatory). You just have to drag&drop your connection from metadata . Afterwards, you can easily write data (tRedShiftOutput), read data (tRedshiftInput), or do any other queries such as delete or copy (tRedShiftRow). In the following job, I start with deleting all content in the Amazon Redshift table. Then, I read data from a MySQL table and insert it into an Amazon Redshift table. The table is created automatically (as I have configured it this way). After this subjob is finished, I read the data again, and store it to a CSV file (which is also created automatically). Of course, this is no business use case, but it shows how to use different Amazon Redshift components. Query Data from Amazon Redshift You can connect to Amazon Redshift directly from Talend Studio to explore and query data of the DWH. Thus, no other database tool is required. Just right click on your Amazon Redshift connection in metadata and select „edit queries“. Here you can define, execute and save SQL queries. Improve Performance Write performance of Amazon Redshift is relatively low compared to „classical“ relational databases (in your data center) as you have to upload all data into the cloud. Different alternatives exist to improve performance: - Bulk inserts: „Extended insert“ (in advanced settings) improves performance a lot, but still not to hyperspeed… Also, as it is bulk, you can just do inserts! It is not compatible to „rejects“ or „updates“ - AWS S3 and COPY command: S3 is Amazon’s „simple storage service“, a key-value store – also called NoSQL today – for storing very large objects. You can use Amazon Redshift’s COPY command (http://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) to transfer data from S3 to Amazon Redshift with good performance. Though, you still have to copy data to S3 before, same „cloud problem“ here. The COPY command can be used with tRedshiftRow, so no problem at all from Talend perspective. To transfer data to S3, you can either use the Talend S3 components from Talendforge, Talend’s open source community (http://www.talendforge.org/exchange), or use camel-s3, an Apache Camel component which is included in Talend ESB. The latter is an option, if you use Talend Data Services which combines Talend DI and Talend ESB in its unified platform. Summary You need not be a cloud or DWH expert, or an expert developer to integrate with Amazon’s cloud data warehouse Redshift. It is very easy with Talend’s integration solutions. Just drag&drop, configure, do some graphical mappings / transformations (if necessary), that’s it. Code is generated. Job runs. You can integrate Amazon Redshift almost as simple as any other relational database. Just be aware of some cloud specific security and performance issues. With Talend, you can easily „ETL“ all data from different sources to Redshift and store it there for under $1,000 per terabyte per year – even with the open source version! Best regards, Kai Wähner (Contact and feedback via @KaiWaehner, www.kai-waehner.de, LinkedIn / Xing) This is content from my blog: http://www.kai-waehner.de/blog/2013/06/26/integration-of-amazon-redshift-cloud-data-warehouse-aws-saas-dwh-with-talend-data-integration-di-big-data-bd-enterprise-service-bus-esb/
June 27, 2013
by Kai Wähner DZone Core CORE
· 20,447 Views · 1 Like
article thumbnail
Searchable Documents? Yes You Can. Another Reason to Choose AsciiDoc
Elasticsearch is a flexible and powerful open source, distributed real-time search and analytics engine for the cloud based on Apache Lucene which provides full text search capabilities. It is document oriented and schema free. Asciidoctor is a pure Ruby processor for converting AsciiDoc source files and strings into HTML 5, DocBook 4.5 and other formats. Apart of Asciidoctor Ruby part, there is an Asciidoctor-java-integration project which let us call Asciidoctor functions from Java without noticing that Ruby code is being executed. In this post we are going to see how we can use Elasticsearch over AsciiDocdocuments to make them searchable by their header information or by their content. Let's add required dependencies: junit junit 4.11 test com.googlecode.lambdaj lambdaj 2.3.3 org.elasticsearch elasticsearch 0.90.1 org.asciidoctor asciidoctor-java-integration 0.1.3 Lambdaj library is used to convert AsciiDoc files to a json documents. Now we can start an Elasticsearch instance which in our case it is going to be an embedded instance. node = nodeBuilder().local(true).node(); Next step is parse AsciiDoc document header, read its content and convert them into a json document. An example of json document stored in Elasticsearch can be: { "title":"Asciidoctor Maven plugin 0.1.2 released!", "authors":[ { "author":"Jason Porter", "email":"[email protected]" } ], "version":null, "content":"= Asciidoctor Maven plugin 0.1.2 released!.....", "tags":[ "release", "plugin" ] } And for converting an AsciiDoc File to a json document we are going to useXContentBuilder class which is provided by ElasticsearchJava API to create jsondocuments programmatically. package com.lordofthejars.asciidoctor; import static org.elasticsearch.common.xcontent.XContentFactory.*; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.List; import org.asciidoctor.Asciidoctor; import org.asciidoctor.Author; import org.asciidoctor.DocumentHeader; import org.asciidoctor.internal.IOUtils; import org.elasticsearch.common.xcontent.XContentBuilder; import ch.lambdaj.function.convert.Converter; public class AsciidoctorFileJsonConverter implements Converter { private Asciidoctor asciidoctor; public AsciidoctorFileJsonConverter() { this.asciidoctor = Asciidoctor.Factory.create(); } public XContentBuilder convert(File asciidoctor) { DocumentHeader documentHeader = this.asciidoctor.readDocumentHeader(asciidoctor); XContentBuilder jsonContent = null; try { jsonContent = jsonBuilder() .startObject() .field("title", documentHeader.getDocumentTitle()) .startArray("authors"); Author mainAuthor = documentHeader.getAuthor(); jsonContent.startObject() .field("author", mainAuthor.getFullName()) .field("email", mainAuthor.getEmail()) .endObject(); List authors = documentHeader.getAuthors(); for (Author author : authors) { jsonContent.startObject() .field("author", author.getFullName()) .field("email", author.getEmail()) .endObject(); } jsonContent.endArray() .field("version", documentHeader.getRevisionInfo().getNumber()) .field("content", readContent(asciidoctor)) .array("tags", parseTags((String)documentHeader.getAttributes().get("tags"))) .endObject(); } catch (IOException e) { throw new IllegalArgumentException(e); } return jsonContent; } private String[] parseTags(String tags) { tags = tags.substring(1, tags.length()-1); return tags.split(", "); } private String readContent(File content) throws FileNotFoundException { return IOUtils.readFull(new FileInputStream(content)); } } Basically we are building the json document by calling startObject methods to start a new object, field method to add new fields, and startArray to start an array. Then this builder will be used to render the equivalent object in json format. Notice that we are using readDocumentHeader method from Asciidoctor class which returns header attributes from AsciiDoc file without reading and rendering the whole document. And finally content field is set with all document content. And now we are ready to start indexing documents. Note that populateData method receives as parameter a Client object. This object is from Elasticsearch Java APIand represents a connection to Elasticsearch database. import static ch.lambdaj.Lambda.convert; //.... private void populateData(Client client) throws IOException { List asciidoctorFiles = new ArrayList() {{ add(new File("target/test-classes/java_release.adoc")); add(new File("target/test-classes/maven_release.adoc")); }; List jsonDocuments = convertAsciidoctorFilesToJson(asciidoctorFiles); for (int i=0; i < jsonDocuments.size(); i++) { client.prepareIndex("docs", "asciidoctor", Integer.toString(i)).setSource(jsonDocuments.get(i)).execute().actionGet(); } client.admin().indices().refresh(new RefreshRequest("docs")).actionGet(); } private List convertAsciidoctorFilesToJson(List asciidoctorFiles) { return convert(asciidoctorFiles, new AsciidoctorFileJsonConverter()); } It is important to note that the first part of the algorithm is converting all our AsciiDocfiles (in our case two) to XContentBuilder instances by using previous converter class and the method convert of Lambdaj project. If you want you can take a look to both documents used in this example in https://github.com/asciidoctor/asciidoctor.github.com/blob/develop/news/asciidoctor-java-integration-0-1-3-released.adoc and https://github.com/asciidoctor/asciidoctor.github.com/blob/develop/news/asciidoctor-maven-plugin-0-1-2-released.adoc. Next part is inserting documents inside one index. This is done by using prepareIndexmethod, which requires an index name (docs), an index type (asciidoctor), and the idof the document being inserted. Then we call setSource method which transforms theXContentBuilder object to json, and finally by calling execute().actionGet(), data is sent to database. The final step is only required because we are using an embedded instance ofElasticsearch (in production this part should not be required), which refresh the indexes by calling refresh method. After that point we can start querying Elasticsearch for retrieving information from our AsciiDoc documents. Let's start with very simple example, which returns all documents inserted: SearchResponse response = client.prepareSearch().execute().actionGet(); Next we are going to search for all documents that has been written by Alex Sotowhich in our case is one. import static org.elasticsearch.index.query.QueryBuilders.matchQuery; //.... QueryBuilder matchQuery = matchQuery("author", "Alex Soto"); QueryBuilder matchQuery = matchQuery("author", "Alexander Soto"); Note that I am searching for field author the string Alex Soto, which returns only one. The other document is written by Jason. But it is interesting to say that if you search for Alexander Soto, the same document will be returned; Elasticsearch is smart enough to know that Alex and Alexander are very similar names so it returns the document too. More queries, how about finding documents written by someone who is called Alex, but not Soto. import static org.elasticsearch.index.query.QueryBuilders.fieldQuery; //.... QueryBuilder matchQuery = fieldQuery("author", "+Alex -Soto"); And of course no results are returned in this case. See that in this case we are using afield query instead of a term query, and we use +, and - symbols to exclude and include words. Also you can find all documents which contains the word released on title. import static org.elasticsearch.index.query.QueryBuilders.matchQuery; //.... QueryBuilder matchQuery = matchQuery("title", "released"); And finally let's find all documents that talks about 0.1.2 release, in this case only one document talks about it, the other one talks about 0.1.3. QueryBuilder matchQuery = matchQuery("content", "0.1.2"); Now we only have to send the query to Elasticsearch database, which is done by using prepareSearch method. SearchResponse response = client.prepareSearch("docs") .setTypes("asciidoctor") .setQuery(matchQuery) .execute() .actionGet(); SearchHits hits = response.getHits(); for (SearchHit searchHit : hits) { System.out.println(searchHit.getSource().get("content")); } Note that in this case we are printing the AsciiDoc content through console, but you could use asciidoctor.render(String content, Options options) method to render the content into required format. So in this post we have seen how to index documents using Elasticsearch, how to get some important information from AsciiDoc files using Asciidoctor-java-integration project, and finally how to execute some queries to inserted documents. Of course there are more kind of queries in Elasticsearch, but the intend of this post wasn't to explore all possibilities of Elasticsearch. Also as corollary, note how important it is using AsciiDoc format for writing your documents. Without much effort you can build a search engine for your documentation. On the other side, imagine all code that would be required to implement the same using any proprietary binary format like Microsoft Word. So we have shown another reason to use AsciiDoc instead of other formats.
June 10, 2013
by Alex Soto
· 4,803 Views
article thumbnail
Hadoop REST API - WebHDFS
Hadoop provides a Java native API to support file system operations..
June 3, 2013
by Istvan Szegedi
· 57,441 Views · 5 Likes
  • Previous
  • ...
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • Next
  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook
×