DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

The Latest Big Data Topics

article thumbnail
Ecosystem of Hadoop Animal Zoo
hadoop is best known for map reduce and it's distributed file system (hdfs). recently other productivity tools developed on top of these will form a complete ecosystem of hadoop. most of the projects are hosted under apache software foundation . hadoop ecosystem projects are listed below. hadoop common a set of components and interfaces for distributed file system and i/o (serialization, java rpc, persistent data structures) http://hadoop.apache.org/ hadoop ecosystem hdfs a distributed file system that runs on large clusters of commodity hardware. hadoop distributed file system, hdfs renamed form ndfs. scalable data store that stores semi-structured, un-structured and structured data. http://hadoop.apache.org/docs/r2.3.0/hadoop-project-dist/hadoop-hdfs/hdfsuserguide.html http://wiki.apache.org/hadoop/hdfs map reduce map reduce is the distributed, parallel computing programming model for hadoop. inspired from google map reduce research paper . hadoop includes implementation of map reduce programming model. in map reduce there are two phases, not surprisingly map and reduce. to be precise in between map and reduce phase, there is another phase called sort and shuffle. job tracker in name node machine manages other cluster nodes. map reduce programming can be written in java. if you like sql or other non- java languages, you are still in luck. you can use utility called hadoop streaming. http://wiki.apache.org/hadoop/hadoopmapreduce hadoop streaming a utility to enable map reduce code in many languages like c, perl, python, c++, bash etc., examples include a python mapper and awk reducer. http://hadoop.apache.org/docs/r1.2.1/streaming.html avro a serialization system for efficient, cross-language rpc and persistent data storage. avro is a framework for performing remote procedure calls and data serialization. in the context of hadoop, it can be used to pass data from one program or language to another, e.g. from c to pig. it is particularly suited for use with scripting languages such as pig, because data is always stored with its schema in avro. http://avro.apache.org/ apache thrift apache thrift allows you to define data types and service interfaces in a simple definition file. taking that file as input, the compiler generates code to be used to easily build rpc clients and servers that communicate seamlessly across programming languages. instead of writing a load of boilerplate code to serialize and transport your objects and invoke remote methods, you can get right down to business. http://thrift.apache.org/ hive and hue if you like sql, you would be delighted to hear that you can write sql and hive convert it to a map reduce job. but, you don't get a full ansi-sql environment. hue gives you a browser based graphical interface to do your hive work. hue features a file browser for hdfs, a job browser for map reduce/yarn, an hbase browser, query editors for hive, pig, cloudera impala and sqoop2.it also ships with an oozie application for creating and monitoring workflows, a zookeeper browser and an sdk. pig a high-level programming data flow language and execution environment to do map reduce coding the pig language is called pig latin. you may find naming conventions some what un-conventional, but you get incredible price-performance and high availability. https://pig.apache.org/ jaql jaql is a functional, declarative programming language designed especially for working with large volumes of structured, semi-structured and unstructured data. as its name implies, a primary use of jaql is to handle data stored as json documents, but jaql can work on various types of data. for example, it can support xml, comma-separated values (csv) data and flat files. a "sql within jaql" capability lets programmers work with structured sql data while employing a json data model that's less restrictive than its structured query language counterparts. 1. jaql in google code 2. what is jaql? by ibm sqoop sqoop provides a bi-directional data transfer between hadoop -hdfs and your favorite relational database. for example you might be storing your app data in relational store such as oracle, now you want to scale your application with hadoop so you can migrate oracle database data to hadoop hdfs using sqoop. http://sqoop.apache.org/ oozie manages hadoop workflow. this doesn't replace your scheduler or BPM tooling, but it will provide if-then-else branching and control with hadoop jobs. https://oozie.apache.org/ zookeeper a distributed, highly available coordination service. zookeeper provides primitives such as distributed locks that can be used for building the highly scalable applications. it is used to manage synchronization for cluster. http://zookeeper.apache.org/ hbase based on google's bigtable , hbase "is an open-source, distributed, version, column-oriented store" that sits on top of hdfs. a super scalable key-value store. it works very much like a persistent hash-map (for python developers think like a dictionary). it is not a conventional relational database. it is a distributed, column oriented database. hbase uses hdfs for it's underlying. supports both batch-style computations using map reduce and point queries for random reads. https://hbase.apache.org/ cassandra a column oriented nosql data store which offers scalability, high availability with out compromising on performance. it perfect platform for commodity hardware and cloud infrastructure.cassandra's data model offers the convenience of column indexes with the performance of log-structured updates, strong support for de-normalization and materialized views , and powerful built-in caching. http://cassandra.apache.org/ flume a real time loader for streaming your data into hadoop. it stores data in hdfs and hbase.flume "channels" data between "sources" and "sinks" and its data harvesting can either be scheduled or event-driven. possible sources for flume include avro, files, and system logs, and possible sinks include hdfs and hbase. http://flume.apache.org/ mahout machine learning for hadoop, used for predictive analytics and other advanced analysis. there are currently four main groups of algorithms in mahout: recommendations, a.k.a. collective filtering classification, a.k.a categorization clustering frequent item set mining, a.k.a parallel frequent pattern mining mahout is not simply a collection of pre-existing algorithms; many machine learning algorithms are intrinsically non-scalable; that is, given the types of operations they perform, they cannot be executed as a set of parallel processes. algorithms in the mahout library belong to the subset that can be executed in a distributed fashion. http://en.wikipedia.org/wiki/list_of_machine_learning_algorithms https://www.coursera.org/course/machlearning https://mahout.apache.org/ fuse makes the hdfs system to look like a regular file system so that you can use ls, rm, cd etc., directly on hdfs data. whirr apache whirr is a set of libraries for running cloud services. whirr provides a cloud-neutral way to run services. you don't have to worry about the idiosyncrasies of each provider.a common service api. the details of provisioning are particular to the service. smart defaults for services. you can get a properly configured system running quickly, while still being able to override settings as needed. you can also use whirr as a command line tool for deploying clusters. https://whirr.apache.org/ giraph an open source graph processing api like pregel from google https://giraph.apache.org/ chukwa chukwa, an incubator project on apache, is a data collection and analysis system built on top of hdfs and map reduce. tailored for collecting logs and other data from distributed monitoring systems, chukwa provides a workflow that allows for incremental data collection, processing and storage in hadoop. it is included in the apache hadoop distribution as an independent module. https://chukwa.apache.org/ drill apache drill, an incubator project on apache, is an open-source software framework that supports data-intensive distributed applications for interactive analysis of large-scale datasets. drill is the open source version of google's dremel system which is available as an iaas service called google big query. one explicitly stated design goal is that drill is able to scale to 10,000 servers or more and to be able to process petabytes of data and trillions of records in seconds. http://incubator.apache.org/drill/ impala (cloudera) released by cloudera, impala is an open-source project which, like apache drill, was inspired by google's paper on dremel; the purpose of both is to facilitate real-time querying of data in hdfs or hbase. impala uses an sql-like language that, though similar to hiveql, is currently more limited than hiveql. because impala relies on the hive meta store, hive must be installed on a cluster in order for impala to work. the secret behind impala's speed is that it "circumvents map reduce to directly access the data through a specialized distributed query engine that is very similar to those found in commercial parallel rdbmss." (source: cloudera) http://www.cloudera.com/content/cloudera/en/products-and-services/cdh/impala.html http://training.cloudera.com/elearning/impala/
June 3, 2015
by Umashankar Ankuri
· 23,894 Views · 3 Likes
article thumbnail
Make Your IoT Gateway WiFi-Aware Using Camel and Kura
The common scenario for the mobile IoT Gateways is to cache collected data locally on the device storage and synchronizing the data with the data center.
May 9, 2015
by Henryk Konsek
· 8,096 Views
article thumbnail
Introduction to Probabilistic Data Structures
When processing large data sets, we often want to do some simple checks, such as number of unique items, most frequent items, and whether some items exist in the data set. The common approach is to use some kind of deterministic data structure like HashSet or Hashtable for such purposes. But when the data set we are dealing with becomes very large, such data structures are simply not feasible because the data is too big to fit in the memory. It becomes even more difficult for streaming applications which typically require data to be processed in one pass and perform incremental updates. Probabilistic data structures are a group of data structures that are extremely useful for big data and streaming applications. Generally speaking, these data structures use hash functions to randomize and compactly represent a set of items. Collisions are ignored but errors can be well-controlled under certain threshold. Comparing with error-free approaches, these algorithms use much less memory and have constant query time. They usually support union and intersection operations and therefore can be easily parallelized. This article will introduce three commonly used probabilistic data structures: Bloom filter, HyperLogLog, and Count-Min sketch. Membership Query - Bloom filter A Bloom filter is a bit array of m bits initialized to 0. To add an element, feed it to k hash functions to get k array position and set the bits at these positions to 1. To query an element, feed it to k hash functions to obtain k array positions. If any of the bits at these positions is 0, then the element is definitely not in the set. If the bits are all 1, then the element might be in the set. A Bloom filter with 1% false positive rate only requires 9.6 bits per element regardless of the size of the elements. For example, if we have inserted x, y, z into the bloom filter, with k=3 hash functions like the picture above. Each of these three elements has three bits each set to 1 in the bit array. When we look up for w in the set, because one of the bits is not set to 1, the bloom filter will tell us that it is not in the set. Bloom filter has the following properties: False positive is possible when the queried positions are already set to 1. But false negative is impossible. Query time is O(k). Union and intersection of bloom filters with same size and hash functions can be implemented with bitwise OR and AND operations. Cannot remove an element from the set. Bloom filter requires the following inputs: m: size of the bit array n: estimated insertion p: false positive probability The optimum number of hash functions k can be determined using the formula: Given false positive probability p and the estimated number of insertions n, the length of the bit array can be calculated as: The hash functions used for bloom filter should generally be faster than cryptographic hash algorithms with good distribution and collision resistance. Commonly used hash functions for bloom filter include Murmur hash, fnv series of hashes and Jenkins hashes. Murmur hash is the fastest among them. MurmurHash3 is used by Google Guava library's bloom filter implementation. Cardinality - HyperLogLog HyperLogLog is a streaming algorithm used for estimating the number of distinct elements (the cardinality) of very large data sets. HyperLogLog counter can count one billion distinct items with an accuracy of 2% using only 1.5 KB of memory. It is based on the bit pattern observation that for a stream of randomly distributed numbers, if there is a number x with the maximum of leading 0 bits k, the cardinality of the stream is very likely equal to 2^k. For each element si in the stream, hash function h(si) transforms si into string of random bits (0 or 1 with probability of 1/2): The probability P of the bit patterns: 0xxxx... → P = 1/2 01xxx... → P = 1/4 001xx... → P = 1/8 The intuition is that when we are seeing prefix 0k 1..., it's likely there are n ≥ 2k+1 different strings. By keeping track of prefixes 0k 1... that have appeared in the data stream, we can estimate the cardinality to be 2p, where p is the length of the largest prefix. Because the variance is very high when using single counter, in order to get a better estimation, data is split into m sub-streams using the first few bits of the hash. The counters are maintained by m registers each has memory space of multiple of 4 bytes. If the standard deviation for each sub-stream is σ, then the standard deviation for the averaged value is only σ/√m. This is called stochastic averaging. For instance for m=4, The elements are split into m stream using the first 2 bits (00, 01, 10, 11) which are then discarded. Each of the register stores the rest of the hash bits that contains the largest 0k 1 prefix. The values in the m registers are then averaged to obtain the cardinality estimate. HyperLogLog algorithm uses harmonic mean to normalize result. The algorithm also makes adjustment for small and very large values. The resulting error is equal to 1.04/√m. Each of the m registers uses at most log2log2 n + O(1) bits when cardinalities ≤ n need to be estimated. Union of two HyperLogLog counters can be calculated by first taking the maximum value of the two counters for each of the m registers, and then calculate the estimated cardinality. Frequency - Count-Min Sketch Count-Min sketch is a probabilistic sub-linear space streaming algorithm. It is somewhat similar to bloom filter. The main difference is that bloom filter represents a set as a bitmap, while Count-Min sketch represents a multi-set which keeps a frequency distribution summary. The basic data structure is a two dimensional d x w array of counters with d pairwise independent hash functions h1 ... hd of range w. Given parameters (ε,δ), set w = [e/ε], and d = [ln1/δ]. ε is the accuracy we want to have and δ is the certainty with which we reach the accuracy. The two dimensional array consists of wd counts. To increment the counts, calculate the hash positions with the d hash functions and update the counts at those positions. The estimate of the counts for an item is the minimum value of the counts at the array positions determined by the d hash functions. The space used by Count-Min sketch is the array of w*d counters. By choosing appropriate values for d and w, very small error and high probability can be achieved. Example of Count-Min sketch sizes for different error and probability combination: ε 1 - δ w d wd 0.1 0.9 28 3 84 0.1 0.99 28 5 140 0.1 0.999 28 7 196 0.01 0.9 272 3 816 0.01 0.99 272 5 1360 0.01 0.999 272 7 1940 0.001 0.999 2719 7 19033 Count-Min sketch has the following properties: Union can be performed by cell-wise ADD operation O(k) query time Better accuracy for higher frequency items (heavy hitters) Can only cause over-counting but not under-counting Count-Min sketch can be used for querying single item count or "heavy hitters" which can be obtained by keeping a heap structure of all the counts. Summary Probabilistic data structures have many applications in modern web and data applications where the data arrives in a streaming fashion and needs to be processed on the fly using limited memory. Bloom filter, HyperLogLog, and Count-Min sketch are the most commonly used probabilistic data structures. There are a lot of research on various streaming algorithms, synopsis data structures and optimization techniques that are worth investigating and studying. If you haven't tried these data structures, you will be amazed how powerful they can be once you start using them. It may be a little bit intimidating to understand the concept initially, but the implementation is actually quite simple. Google Guava has Bloom filter implementation using murmur hash. Clearspring's Java library stream-lib and Twitter's Scala library Algebird have implementation for all three data structures and other useful data structures that you can play with. I have included the links below. Links http://bigsnarf.wordpress.com/2013/02/08/probabilistic-data-structures-for-data-analytics/ http://en.wikipedia.org/wiki/Bloom_filter http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf http://static.googleusercontent.com/media/research.google.com/en/us/pubs/archive/40671.pdf http://research.neustar.biz/2012/10/25/sketch-of-the-day-hyperloglog-cornerstone-of-a-big-data-infrastructure/ http://dimacs.rutgers.edu/~graham/pubs/papers/cm-full.pdf http://www.moneyscience.com/pg/blog/ThePracticalQuant/read/438348/realtime-analytics-hokusai-adds-a-temporal-component-to-countmin-sketch http://people.cs.umass.edu/~mcgregor/711S12/sketches1.pdf https://github.com/addthis/stream-lib https://github.com/twitter/algebird
April 30, 2015
by Yin Niu
· 35,683 Views · 6 Likes
article thumbnail
Why Elasticsearch is Suitable for Application Log Analytics
Handling Application Logs Enterprise application development using Web technologies has been around for a long time. In recent years we have seen a sharp increase in the deployment of such applications. This is partly due to the proliferation of ecommerce sites, social media sites, mobile application supporting sites, as well as the desire of enterprises to have their applications available 24x7. In most cases, such applications cater to huge load and are deployed on cloud infrastructure. Monitoring deployed applications is increasingly becoming a crucial task, as deployed applications are bound to fail, irrespective of the robust techniques used during development. Whenever an application fails, the most common resolution method starts by examining the application log. If the application has implemented logging properly, the logs can reveal the cause of application failure. Examination of log files is usually done by viewing the file using tools like vi, less, more, tail or grep. Another method is to download the file to a Windows system and viewing it using an editor like Notepad++. Engineers usually scan the log information to look for clues that point to the reasons for failure. Once the cause of failure is identified, suitable action is taken for restoring the application and/or service. The Key to Application Log Analytics This process, of logging onto a remote system and viewing logs is tedious. Additionally, many of the tools do not provide support to make the task of issue identification any simpler. Even when using tools like grep (if we know the pattern), we still need to view the logs in order to go through other information that has been logged, such as the log information that precedes the failure point. While it has always been possible to develop applications to parse application logs, the recent renewed interest in application log analytics is due to the acceptance of NoSQL-like technologies and the availability of standard tools to parse application logs. Though relational databases (RDBMS) have for many years provided the facility to store structured data, they are not well-suited for handling log data, as in many cases, the structure of the logged information is not the same across the file. This does not fit well in the rigidly defined world of an RDBMS. In comparison, NoSQL allows document flexibility and documents with different schemas can be stored in the same database / index / store. The ability to convert log data into a well-defined structure, as well as the ability to search, are key to implement a modern log analytics solution. In this document, we cover how Elasticsearch. Elasticsearch can store documents, giving us the benefit of structured storage without the overheads of a database system. The Suitability of Elasticsearch In the following subsections, we share our views as to why Elasticsearch is a suitable data store for an application log analytics solution. Elasticsearch is part of a popular trio of tools, commonly known as ELK. Of these, L stands for Logstash, the log parser; E stands for Elasticsearch, the document store; and K stands for Kibana, the visualization tool. Storing Documents Logstash can be used to parse plain text data into structured text. Once data has some structure, it becomes easy to find information by enabling search on it. While parsing application logs is not a challenge, the challenge has been in storing the data and enabling search on it. Most prior solutions have used an RDBMS for storage, but the varying structure and textual nature of application logs makes it difficult to use an RDBMS table structure to store data. RDBMSs are not geared toward ‘search’. They are geared for maintaining a ‘single value of truth’ for the data, defining relations between the data, ensuring their consistency and so on. Search is also not a strong point for RDBMSs as they use exact matches for values, while Elasticsearch supports exact matches as well as partial matches. It also supports document scoring, which attaches a confidence factor to the documents located. Elasticsearch supports documents in JSON format and uses the NoSQL philosophy for document storage. This has the advantage of allowing a flexible schema for the data. Unlike an RDBMS, Elasticsearch is a search engine at heart and hence is built for the same. Though Elasticsearch uses NoSQL for storing documents, it does not provide robust methods to update stored data. Not supporting updates is a serious disadvantage in most cases. In the case of application logs, not supporting updates actually works in favour of Elasticsearch. In case of machine logs, updates are not really required. Application logs are generated from a debugging perspective – having data handy for debugging purposes in the event of application crash or incorrect execution. They usually record important events from application execution and provide additional information to allow application developers to identify the reasons for failure. Additionally, existing information in application logs is rarely, if ever, updated. New information is continually being written to the logs, with no need to refer to old information. This plays to Elasticsearch’s strength, which is able to ingest and index new information very quickly. Search One of the easiest ways of locating information from large volumes of logs is to perform a search. Elasticsearch is well suited not only to handle search, it also supports huge volume of data, using distributed computing (implemented using Shards). While Kibana is one of the commonly used tools to display and visualize information stored in Elasticsearch, it is more suited to display standard charts like bar chart, column chart and pie chart. If the features provided by Kibana are not enough, we can always use Elasticsearch’s REST API support and it’s Query DSL (Domain-Specific Language), to search for required information. The Query DSL and the result of the query are in JSON format. Though this format makes it easy for applications to parse and process, users would need a friendly user interface to interact with the data. Handling Voluminous Data Elasticsearch supports distributed search out of the box – using the concept of ‘shards’. A shard is a single Lucene instance and is managed by Elasticsearch. Two types of shards, namely ‘primary shard’ and ‘replica shard’ are supported. By default, a document is first indexed on the primary shard and then on the replica shards. The number of primary shards can be specified, to cater to the expected volume. By default, Elasticsearch creates five shards for an index. But, once the number of primary shards is decided, it cannot be changed. A replica shards are copies the primary shard. They are used to handle fail-over and the increase performance. While performance across voluminous data can be handled by sharding, it is important to note that shards, once created for an index, cannot be changed. Thus, the sharding strategy of the data has to be decided in advance, after an assessment of the data and an estimation of its growth. In the case of application logs, the sharding strategy can be based on the application name, the business unit ID, the application OD or the application’s geolocation, just to name a few. Analytics By storing data in a structure, analytics can be enabled on the data. Not only can application perform a simple search, it is also possible to restrict the search for specific terms or over a specified time period. Structured storage also makes it easier to develop reports with well-defined visualizations, which in turn makes it easy to understand the current state of applications. It is also possible to perform various analytics operations like time series analysis using the timestamp and identification of patterns from the data using machine learning techniques (assuming, we have the right kind of data in the logs). Though Elasticsearch does not provide built-in support for analytics, applications can benefit from its fast search capability and also from its ability to handle voluminous data sets. In Closing One of the main hurdles for application logs has been the ability to search for information from the huge volume of data. By parsing application log files using Logstash, we can convert a flat file into structured data. Structured data, once stored in Elasticsearch, is easier to search and locate. Visualizations and business logic for generating alerts and tickets is easier to develop on structured data. Elasticsearch, which stores and searches documents, along with its ability to scale over huge volume of data, is a good candidate for inclusion in an application log analytics solution.
April 22, 2015
by Bipin Patwardhan
· 11,712 Views · 2 Likes
article thumbnail
Internet of Things MQTT Quality of Service Levels
At Red Hat's Virtual Event, Building Data-driven Solutions for the Internet of Things, Kenneth Peeples spoke about connecting to the IoT with the MQTT protocol.
April 20, 2015
by Kenneth Peeples
· 12,289 Views
article thumbnail
Using Apache Kafka for Integration and Data Processing Pipelines with Spring
written by josh long on the spring blog applications generated more and more data than ever before and a huge part of the challenge - before it can even be analyzed - is accommodating the load in the first place. apache’s kafka meets this challenge. it was originally designed by linkedin and subsequently open-sourced in 2011. the project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. the design is heavily influenced by transaction logs. it is a messaging system, similar to traditional messaging systems like rabbitmq, activemq, mqseries, but it’s ideal for log aggregation, persistent messaging, fast (_hundreds_ of megabytes per second!) reads and writes, and can accommodate numerous clients. naturally, this makes it perfect for cloud-scale architectures! kafka powers many large production systems . linkedin uses it for activity data and operational metrics to power the linkedin news feed, and linkedin today, as well as offline analytics going into hadoop. twitter uses it as part of their stream-processing infrastructure. kafka powers online-to-online and online-to-offline messaging at foursquare. it is used to integrate foursquare monitoring and production systems with hadoop-based offline infrastructures. square uses kafka as a bus to move all system events through square’s various data centers. this includes metrics, logs, custom events, and so on. on the consumer side, it outputs into splunk, graphite, or esper-like real-time alerting. netflix uses it for 300-600bn messages per day. it’s also used by airbnb, mozilla, goldman sachs, tumblr, yahoo, paypal, coursera, urban airship, hotels.com, and a seemingly endless list of other big-web stars. clearly, it’s earning its keep in some powerful systems! installing apache kafka there are many different ways to get apache kafka installed. if you’re on osx, and you’re using homebrew, it can be as simple as brew install kafka . you can also download the latest distribution from apache . i downloaded kafka_2.10-0.8.2.1.tgz , unzipped it, and then within you’ll find there’s a distribution of apache zookeeper as well as kafka, so nothing else is required. i installed apache kafka in my $home directory, under another directory, bin , then i created an environment variable, kafka_home , that points to $home/bin/kafka . start apache zookeeper first, specifying where the configuration properties file it requires is: $kafka_home/bin/zookeeper-server-start.sh $kafka_home/config/zookeeper.properties the apache kafka distribution comes with default configuration files for both zookeeper and kafka, which makes getting started easy. you will in more advanced use cases need to customize these files. then start apache kafka. it too requires a configuration file, like this: $kafka_home/bin/kafka-server-start.sh $kafka_home/config/server.properties the server.properties file contains, among other things, default values for where to connect to apache zookeeper ( zookeeper.connect ), how much data should be sent across sockets, how many partitions there are by default, and the broker id ( broker.id - which must be unique across a cluster). there are other scripts in the same directory that can be used to send and receive dummy data, very handy in establishing that everything’s up and running! now that apache kafka is up and running, let’s look at working with apache kafka from our application. some high level concepts.. a kafka broker cluster consists of one or more servers where each may have one or more broker processes running. apache kafka is designed to be highly available; there are no master nodes. all nodes are interchangeable. data is replicated from one node to another to ensure that it is still available in the event of a failure. in kafka, a topic is a category, similar to a jms destination or both an amqp exchange and queue. topics are partitioned, and the choice of which of a topic’s partition a message should be sent to is made by the message producer. each message in the partition is assigned a unique sequenced id, its offset . more partitions allow greater parallelism for consumption, but this will also result in more files across the brokers. producers send messages to apache kafka broker topics and specify the partition to use for every message they produce. message production may be synchronous or asynchronous. producers also specify what sort of replication guarantees they want. consumers listen for messages on topics and process the feed of published messages. as you’d expect if you’ve used other messaging systems, this is usually (and usefully!) asynchronous. like spring xd and numerous other distributed system, apache kafka uses apache zookeeper to coordinate cluster information. apache zookeeper provides a shared hierarchical namespace (called znodes ) that nodes can share to understand cluster topology and availability (yet another reason that spring cloud has forthcoming support for it..). zookeeper is very present in your interactions with apache kafka. apache kafka has, for example, two different apis for acting as a consumer. the higher level api is simpler to get started with and it handles all the nuances of handling partitioning and so on. it will need a reference to a zookeeper instance to keep the coordination state. let’s turn now turn to using apache kafka with spring. using apache kafka with spring integration the recently released apache kafka 1.1 spring integration adapter is very powerful, and provides inbound adapters for working with both the lower level apache kafka api as well as the higher level api. the adapter, currently, is xml-configuration first, though work is already underway on a spring integration java configuration dsl for the adapter and milestones are available. we’ll look at both here, now. to make all these examples work, i added the libs-milestone-local maven repository and used the following dependencies: org.apache.kafka:kafka_2.10:0.8.1.1 org.springframework.boot:spring-boot-starter-integration:1.2.3.release org.springframework.boot:spring-boot-starter:1.2.3.release org.springframework.integration:spring-integration-kafka:1.1.1.release org.springframework.integration:spring-integration-java-dsl:1.1.0.m1 using the spring integration apache kafka with the spring integration xml dsl first, let’s look at how to use the spring integration outbound adapter to send message instances from a spring integration flow to an external apache kafka instance. the example is fairly straightforward: a spring integration channel named inputtokafka acts as a conduit that forwards message messages to the outbound adapter, kafkaoutboundchanneladapter . the adapter itself can take its configuration from the defaults specified in the kafka:producer-context element or it from the adapter-local configuration overrides. there may be one or many configurations in a given kafka:producer-context element. here’s the java code from a spring boot application to trigger message sends using the outbound adapter by sending messages into the incoming inputtokafka messagechannel . package xml; import org.apache.commons.logging.log; import org.apache.commons.logging.logfactory; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.boot.commandlinerunner; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.dependson; import org.springframework.context.annotation.importresource; import org.springframework.integration.config.enableintegration; import org.springframework.messaging.messagechannel; import org.springframework.messaging.support.genericmessage; @springbootapplication @enableintegration @importresource("/xml/outbound-kafka-integration.xml") public class demoapplication { private log log = logfactory.getlog(getclass()); @bean @dependson("kafkaoutboundchanneladapter") commandlinerunner kickoff(@qualifier("inputtokafka") messagechannel in) { return args -> { for (int i = 0; i < 1000; i++) { in.send(new genericmessage<>("#" + i)); log.info("sending message #" + i); } }; } public static void main(string args[]) { springapplication.run(demoapplication.class, args); } } using the new apache kafka spring integration java configuration dsl shortly after the spring integration 1.1 release, spring integration rockstar artem bilan got to work on adding a spring integration java configuration dsl analog and the result is a thing of beauty! it’s not yet ga (you need to add the libs-milestone repository for now), but i encourage you to try it out and kick the tires. it’s working well for me and the spring integration team are always keen on getting early feedback whenever possible! here’s an example that demonstrates both sending messages and consuming them from two different integrationflow s. the producer is similar to the example xml above. new in this example is the polling consumer. it is batch-centric, and will pull down all the messages it sees at a fixed interval. in our code, the message received will be a map that contains as its keys the topic and as its value another map with the partition id and the batch (in this case, of 10 records), of records read. there is a messagelistenercontainer -based alternative that processes messages as they come. package jc; import org.apache.commons.logging.log; import org.apache.commons.logging.logfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.beans.factory.annotation.value; import org.springframework.boot.commandlinerunner; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.context.annotation.dependson; import org.springframework.integration.integrationmessageheaderaccessor; import org.springframework.integration.config.enableintegration; import org.springframework.integration.dsl.integrationflow; import org.springframework.integration.dsl.integrationflows; import org.springframework.integration.dsl.sourcepollingchanneladapterspec; import org.springframework.integration.dsl.kafka.kafka; import org.springframework.integration.dsl.kafka.kafkahighlevelconsumermessagesourcespec; import org.springframework.integration.dsl.kafka.kafkaproducermessagehandlerspec; import org.springframework.integration.dsl.support.consumer; import org.springframework.integration.kafka.support.zookeeperconnect; import org.springframework.messaging.messagechannel; import org.springframework.messaging.support.genericmessage; import org.springframework.stereotype.component; import java.util.list; import java.util.map; /** * demonstrates using the spring integration apache kafka java configuration dsl. * thanks to spring integration ninja artem bilan * for getting the java configuration dsl working so quickly! * * @author josh long */ @enableintegration @springbootapplication public class demoapplication { public static final string test_topic_id = "event-stream"; @component public static class kafkaconfig { @value("${kafka.topic:" + test_topic_id + "}") private string topic; @value("${kafka.address:localhost:9092}") private string brokeraddress; @value("${zookeeper.address:localhost:2181}") private string zookeeperaddress; kafkaconfig() { } public kafkaconfig(string t, string b, string zk) { this.topic = t; this.brokeraddress = b; this.zookeeperaddress = zk; } public string gettopic() { return topic; } public string getbrokeraddress() { return brokeraddress; } public string getzookeeperaddress() { return zookeeperaddress; } } @configuration public static class producerconfiguration { @autowired private kafkaconfig kafkaconfig; private static final string outbound_id = "outbound"; private log log = logfactory.getlog(getclass()); @bean @dependson(outbound_id) commandlinerunner kickoff( @qualifier(outbound_id + ".input") messagechannel in) { return args -> { for (int i = 0; i < 1000; i++) { in.send(new genericmessage<>("#" + i)); log.info("sending message #" + i); } }; } @bean(name = outbound_id) integrationflow producer() { log.info("starting producer flow.."); return flowdefinition -> { consumer spec = (kafkaproducermessagehandlerspec.producermetadataspec metadata)-> metadata.async(true) .batchnummessages(10) .valueclasstype(string.class) .valueencoder(string::getbytes); kafkaproducermessagehandlerspec messagehandlerspec = kafka.outboundchanneladapter( props -> props.put("queue.buffering.max.ms", "15000")) .messagekey(m -> m.getheaders().get(integrationmessageheaderaccessor.sequence_number)) .addproducer(this.kafkaconfig.gettopic(), this.kafkaconfig.getbrokeraddress(), spec); flowdefinition .handle(messagehandlerspec); }; } } @configuration public static class consumerconfiguration { @autowired private kafkaconfig kafkaconfig; private log log = logfactory.getlog(getclass()); @bean integrationflow consumer() { log.info("starting consumer.."); kafkahighlevelconsumermessagesourcespec messagesourcespec = kafka.inboundchanneladapter( new zookeeperconnect(this.kafkaconfig.getzookeeperaddress())) .consumerproperties(props -> props.put("auto.offset.reset", "smallest") .put("auto.commit.interval.ms", "100")) .addconsumer("mygroup", metadata -> metadata.consumertimeout(100) .topicstreammap(m -> m.put(this.kafkaconfig.gettopic(), 1)) .maxmessages(10) .valuedecoder(string::new)); consumer endpointconfigurer = e -> e.poller(p -> p.fixeddelay(100)); return integrationflows .from(messagesourcespec, endpointconfigurer) .>>handle((payload, headers) -> { payload.entryset().foreach(e -> log.info(e.getkey() + '=' + e.getvalue())); return null; }) .get(); } } public static void main(string[] args) { springapplication.run(demoapplication.class, args); } } the example makes heavy use of java 8 lambdas. the producer spends a bit of time establishing how many messages will be sent in a single send operation, how keys and values are encoded (kafka only knows about byte[] arrays, after all) and whether messages should be sent synchronously or asynchronously. in the next line, we configure the outbound adapter itself and then define an integrationflow such that all messages get sent out via the kafka outbound adapter. the consumer spends a bit of time establishing which zookeeper instance to connect to, how many messages to receive (10) in a batch, etc. once the message batches are recieved, they’re handed to the handle method where i’ve passed in a lambda that’ll enumerate the payload’s body and print it out. nothing fancy. using apache kafka with spring xd apache kafka is a message bus and it can be very powerful when used as an integration bus. however, it really comes into its own because it’s fast enough and scalable enough that it can be used to route big-data through processing pipelines. and if you’re doing data processing, you really want spring xd ! spring xd makes it dead simple to use apache kafka (as the support is built on the apache kafka spring integration adapter!) in complex stream-processing pipelines. apache kafka is exposed as a spring xd source - where data comes from - and a sink - where data goes to. spring xd exposes a super convenient dsl for creating bash -like pipes-and-filter flows. spring xd is a centralized runtime that manages, scales, and monitors data processing jobs. it builds on top of spring integration, spring batch, spring data and spring for hadoop to be a one-stop data-processing shop. spring xd jobs read data from sources , run them through processing components that may count, filter, enrich or transform the data, and then write them to sinks. spring integration and spring xd ninja marius bogoevici , who did a lot of the recent work in the spring integration and spring xd implementation of apache kafka, put together a really nice example demonstrating how to get a full working spring xd and kafka flow working . the readme walks you through getting apache kafka, spring xd and the requisite topics all setup. the essence, however, is when you use the spring xd shell and the shell dsl to compose a stream. spring xd components are named components that are pre-configured but have lots of parameters that you can override with --.. arguments via the xd shell and dsl. (that dsl, by the way, is written by the amazing andy clement of spring expression language fame!) here’s an example that configures a stream to read data from an apache kafka source and then write the message a component called log , which is a sink. log , in this case, could be syslogd, splunk, hdfs, etc. xd> stream create kafka-source-test --definition "kafka --zkconnect=localhost:2181 --topic=event-stream | log"--deploy and that’s it! naturally, this is just a tase of spring xd, but hopefully you’ll agree the possibilities are tantalizing. deploying a kafka server with lattice and docker it’s easy to get an example kafka installation all setup using lattice , a distributed runtime that supports, among other container formats, the very popular docker image format. there’s a docker image provided by spotify that sets up a collocated zookeeper and kafka image . you can easily deploy this to a lattice cluster, as follows: ltc create --run-as-root m-kafka spotify/kafka from there, you can easily scale the apache kafka instances and even more easily still consume apache kafka from your cloud-based services. next steps you can find the code for this blog on my github account . we’ve only scratched the surface! if you want to learn more (and why wouldn’t you?), then be sure to check out marius bogoevici and dr. mark pollack’s upcoming webinar on reactive data-pipelines using spring xd and apache kafka where they’ll demonstrate how easy it can be to use rxjava, spring xd and apache kafka!
April 18, 2015
by Pieter Humphrey
· 29,115 Views
article thumbnail
Monitoring rsyslog’s Performance with impstats and Elasticsearch
If you’re using rsyslog for processing lots of logs (and, as we’ve shown before, rsyslog is good at processing lots of logs), you’re probably interested in monitoring it. To do that, you can use impstats, which comes from input module forprocess stats. impstats produces information like: – input stats, like how many events went through each input – queue stats, like the maximum size of a queue – action (output or message modification) stats, like how many events were forwarded by each action – general stats, like CPU time or memory usage In this post, we’ll show you how to send those stats to Elasticsearch (or Logsene — essentially hosted ELK, our log analytics service) that exposes the Elasticsearch API), where you can explore them with a nice UI, like Kibana. For example get the number of logs going through each input/output per hour: More precisely, we’ll look at: – the useful options around impstats – how to use those stats and what they’re about – how to ship stats to Elasticsearch/Logsene by using rsyslog’s Elasticsearch output – how to do this shipping in a fast and reliable way. This will apply to most rsyslog use-cases, not only impstats Configuring impstats Before starting, make sure you have a recent version of rsyslog. You can find the latest version (8.9.0 at the time of this writing), as well as packages for various distributions here. Many distributions still ship ancient versions like 5.x, which probably have impstats, but some of the features (like Elasticsearch output) may not be available. Once you’re there, load the impstats module at the beginning of your config: module( load="impstats" interval="10" # how often to generate stats resetCounters="on" # to get deltas (e.g. # of messages submitted in the last 10 seconds) log.file="/tmp/stats" # file to write those stats to log.syslog="off" # don't send stats through the normal processing pipeline. More on that in a bit At this point, if you restart rsyslog, you should see stats about all the inputs, queues, actions, as well as overall resource usage. For example, the stats below come from an rsyslog that takes messages over TCP and sends them over to Elasticsearch in Logstash-like format: Thu Apr 9 16:45:36 2015: omelasticsearch: origin=omelasticsearch submitted=11000 failed.http=0 failed.httprequests=0 failed.es=0 Thu Apr 9 16:45:36 2015: send-to-es: origin=core.action processed=10405 failed=0 suspended=0 suspended.duration=0 resumed=0 Thu Apr 9 16:45:36 2015: imtcp(13514): origin=imtcp submitted=6618 Thu Apr 9 16:45:36 2015: resource-usage: origin=impstats utime=2109000 stime=2415000 maxrss=53236 minflt=12559 majflt=1 inblock=8 oublock=0 nvcsw=164893 nivcsw=384355 Thu Apr 9 16:45:36 2015: main Q: origin=core.queue size=65095 enqueued=7149 full=0 discarded.full=0 discarded.nf=0 maxqsize=70000 Wait. What are these stats? Here’s my take on each line: 1. omelasticsearch (output module to Elasticsearch) sent 11K logs to ES in the last 10 seconds. There were no connectivity errors, nor any errors thrown by Elasticsearch (like you would get if you tried to index a string in an integer field) 2. the “send-to-es” action (which uses omelasticsearch) took a bit less than 11K logs from the main queue to send them to omelasticsearch. I assume the rest of them were sent before this 10 second window. Not terribly useful, but if there was a connectivity issue with Elasticsearch, you’d see how long this action was suspended 3. the TCP input received 6.6K logs in the last 10 seconds 4. rsyslog used ~2 seconds of user CPU time (utime=2109000 microseconds) and ~2.5s of system time. It used 53MB of RAM at most. You can see what all these abreviations mean by looking at getrusage’s man page 5. the default (main) queue currently buffers 65K messages from the inputs (though it went as high as 70K in the last 10 seconds), 7K of which were taken in the last 10 seconds Shipping Stats to Elasticsearch/Logsene Now that we have these stats, let’s centralize them to Elasticsearch. If you’re using rsyslog to push to Elasticsearch, it’s better to use another cluster or Logsene for stats. Otherwise, when Elasticsearch is in trouble, you won’t be able to see stats which might explain why you’re having trouble in the first place. Either way, we need four things: – produce those stats in JSON, so we can parse them easily – define a template for how JSON documents that we send to Elasticsearch will look like – parse the JSON stats – send those documents to Logsene/Elasticsearch using the defined template Here’s the relevant config snippet for sending to Logsene: module( load="impstats" interval="10" resetCounters="on" format="cee" # produce JSON stats ) module(load="mmjsonparse") module(load="omelasticsearch") #template for building the JSON documents that will land in Logsene/Elasticsearch template(name="stats" type="list") { constant(value="{") property(name="timereported" dateFormat="rfc3339" format="jsonf" outname="@timestamp") # the timestamp constant(value=",") property(name="hostname" format="jsonf" outname="host") # the host generating stats constant(value=",\"source\":\"impstats\",") # we'll hardcode "impstats" as a source property(name="$!all-json" position.from="2") # finally, we'll add all metrics } action( name="parse_impstats" # parse the type="mmjsonparse" # JSON stats ) action( name="impstats_to_es" # name actions so you can see them in impstats messages (instead of the default action 1, 2, etc) type="omelasticsearch" server="logsene-receiver.sematext.com" # host and port for Logsene/Elasticsearch serverport="80" # set serverport="443" and add usehttps="on" for using HTTPS instead of plain HTTP template="stats" # use the template defined earlier searchIndex="LOGSENE_APP_TOKEN_GOES_HERE" searchType="impstats" # we'll use a separate mapping type for stats bulkmode="on" # use Elasticsearch's bulk API action.resumeretrycount="-1" # retry indefinitely on failure ) That’s all you basically need to be sending stats to Logsene/Elasticsearch: load the impstats, mmjsonparse and omelasticsearch modules, define the template, parse stats and send them over. Note that while impstats comes bundled with most rsyslog packages, you need to install rsyslog-mmjsonparse and rsyslog-elasticsearch packages to install the other two plugins. Using a separate ruleset and configuring its queue Before wrapping up, let’s address two potential issues. First is that, by default, impstats will send stats events to the main queue (all input modules do that by default). This will mix stats with other logs, which has a couple of disadvantages: – you need to add a conditional to make sure only impstats events go to the impstats-specific destination – if rsyslog is queueing lots of messages in the main queue, stats can land in Elasticsearch with a delay To avoid these problems, you can bind impstats to a separate ruleset. Let’s call it “monitoring”. rsyslog will then process them separately from the main queue, which is associated to the default ruleset. You can have as many rulesets as you want, and they’re typically used to separate different types of logs. For example to process local logs and remote logs independently. Like the default ruleset which has the main queue, any ruleset can have its own queue (also, each action, no matter the ruleset it’s in, can have its own queue – more info on that here, here and here). Why am I talking about queues? Because if stats are important, you want to make sure you are able to queue them, instead of losing them if Elasticsearch becomes unavailable for a while. By default, the default ruleset comes with an in-memory queue of 10K or so messages. Additional rulesets have no queue by default, but you can add one by specifying queue options (you can find the complete list here). While in-memory queues are fast, they are typically small and you’d lose their contents if you have to shut down or restart rsyslog. In the following config snippet will add a disk-assisted queue to the “monitoring” ruleset. A disk assisted queue will normally be as fast as an in-memory queue, and will spill logs to disk in a performance-friendly way if it’s out of space. You can also make rsyslog save all logs to disk when you shut it down or restart it. module( load="impstats" interval="10" resetCounters="on" format="cee" ruleset="monitoring" # send stats to the monitoring ruleset ) # add here modules and template from the previous snippet ruleset( name="monitoring" # the monitoring ruleset defined earlier queue.type="FixedArray" # we'll have a fixed memory queue for this ruleset queue.highwatermark="50000" # at least until it contains 50K stats messages queue.spoolDirectory="/var/run/rsyslog/queues" # at which point, start writing in-memory messages to disk queue.filename="stats_ruleset" queue.lowwatermark="20000" # until the memory queue goes back to 20K, at which point the memory queue is used again queue.maxdiskspace="100m" # the queue will be full when it occupies 100MB of space on disk queue.size="5000000" # this is the total queue size (shouldn't be reachable) queue.dequeuebatchsize="1000" # how many messages from the queue to process at once (also determines how many messages will be in an ES Bulk) queue.saveonshutdown="on" # save queue contents to disk at shutdown ){ # add here actions from the previous snippet } Summary This was quite a long post, so let me summarize the features of rsyslog we touched on: – impstats is an input module that can generate stats about rsyslog’s inputs, queues and actions, as well as general process statistics – you normally want to write them to a file in a human-readable format for development/debugging or local performance tests – for production, it’s best to write them in JSON, parse them in a separate ruleset and send them to Logsene/Elasticsearch, where you can search and graph them – you can use disk-assisted queues to increase the capacity of an in-memory queue without losing performance under normal conditions. It can also save logs to disk on shutdown to make sure important stats are not lost If you find working with logs and/or Elasticsearch exciting, that’s what we do in lots of our products, consulting andsupport engagements. So if you want to join us, we’re hiring worldwide.
April 14, 2015
by Radu Gheorghe
· 8,155 Views
article thumbnail
Using Multiple Grok Statements to Parse a Java Stack Trace
Parse your Java stack trace log information with the Logstash tool.
April 14, 2015
by Bipin Patwardhan
· 77,982 Views · 6 Likes
article thumbnail
The State of the Storage Engine
This article by Baron Schwartz comes to you from the DZone Guide to Database and Persistence Management.
March 16, 2015
by B Jones
· 16,577 Views · 1 Like
article thumbnail
Big Data Processing in Spark
In the traditional 3-tier architecture, data processing is performed by the application server where the data itself is stored in the database server. Application server and database server are typically two different machine. Therefore, the processing cycle proceeds as follows Application server send a query to the database server to retrieve the necessary data Application server perform processing on the received data Application server will save the changed data to the database server In the traditional data processing paradigm, we move data to the code. It can be depicted as follows ... Then big data phenomenon arrives. Because the data volume is huge, it cannot be hold by a single database server. Big data is typically partitioned and stored across many physical DB server machines. On the other hand, application servers need to be added to increase the processing power of big data. However, as we increase the number of App servers and DB servers for storing and processing the big data, more data need to be transfer back and forth across the network during the processing cycle, up to a point where the network becomes a major bottleneck. Moving Code to Data To overcome the network bottleneck, we need a new computing paradigm. Instead of moving data to the code, we move the code to the data and perform the processing at where the data is stored. Notice the change of the program structure The program execution starts at a driver, which orchestrate the execution happening remotely across many worker servers within a cluster. Data is no longer transferred to the driver program, the driver program holds a data reference in its variable rather than the data itself. The data reference is basically an id to locate the corresponding data residing in the database server Code is shipped from the program to the database server, where the execution is happening, and data is modified at the database server without leaving the server machine. Finally the program request a save of the modified data. Since the modified data resides in the database server, no data transfer happens over the network. By moving the code to the data, the volume of data transfer over network is significantly reduced. This is an important paradigm shift for big data processing. In the following session, I will use Apache Spark to illustrate how this big data processing paradigm is implemented. RDD Resilient Distributed Dataset (RDD) is how Spark implements the data reference concept. RDD is a logical reference of a dataset which is partitioned across many server machines in the cluster. To make a clear distinction between data reference and data itself, a Spark program is organized as a sequence of execution steps, which can either be a "transformation" or an "action". Programming Model A typical program is organized as follows From an environment variable "context", create some initial data reference RDD objects Transform initial RDD objects to create more RDD objects. Transformation is expressed in terms of functional programming where a code block is shipped from the driver program to multiple remote worker server, which hold a partition of the RDD. Variable appears inside the code block can either be an item of the RDD or a local variable inside the driver program which get serialized over to the worker machine. After the code (and the copy of the serialized variables) is received by the remote worker server, it will be executed there by feeding the items of RDD residing in that partition. Notice that the result of a transformation is a brand new RDD (the original RDD is not mutated) Finally, the RDD object (the data reference) will need to be materialized. This is achieved through an "action", which will dump the RDD into a storage, or return its value data to the driver program. Here is a word count example # Get initial RDD from the context file = spark.textFile("hdfs://...") # Three consecutive transformation of the RDD counts = file.flatMap(lambda line: line.split(" ")) .map(lambda word: (word, 1)) .reduceByKey(lambda a, b: a + b) # Materialize the RDD using an action counts.saveAsTextFile("hdfs://...") When the driver program starts its execution, it builds up a graph where nodes are RDD and edges are transformation steps. However, no execution is happening at the cluster until an action is encountered. At that point, the driver program will ship the execution graph as well as the code block to the cluster, where every worker server will get a copy. The execution graph is a DAG. Each DAG is a atomic unit of execution. Each source node (no incoming edge) is an external data source or driver memory Each intermediate node is a RDD Each sink node (no outgoing edge) is an external data source or driver memory Green edge connecting to RDD represents a transformation. Red edge connecting to a sink node represents an action Data Shuffling Although we ship the code to worker server where the data processing happens, data movement cannot be completely eliminated. For example, if the processing requires data residing in different partitions to be grouped first, then we need to shuffle data among worker server. Spark carefully distinguish "transformation" operation in two types. "Narrow transformation" refers to the processing where the processing logic depends only on data that is already residing in the partition and data shuffling is unnecessary. Examples of narrow transformation includes filter(), sample(), map(), flatMap() .... etc. "Wide transformation" refers to the processing where the processing logic depends on data residing in multiple partitions and therefore data shuffling is needed to bring them together in one place. Example of wide transformation includes groupByKey(), reduceByKey() ... etc. Joining two RDD can also affect the amount of data being shuffled. Spark provides two ways to join data. In a shuffle join implementation, data of two RDD with the same key will be redistributed to the same partition. In other words, each of the items in each RDD will be shuffled across worker servers. Beside shuffle join, Spark provides another alternative call broadcast join. In this case, one of the RDD will be broadcasted and copied over to every partition. Imagine the situation when one of the RDD is significantly smaller relative to the other, then broadcast join will reduce the network traffic because only the small RDD need to be copied to all worker servers while the large RDD doesn't need to be shuffled at all. In some cases, transformation can be re-ordered to reduce the amount of data shuffling. Below is an example of a JOIN between two huge RDDs followed by a filtering. Plan1 is a naive implementation which follows the given order. It first join the two huge RDD and then apply the filter on the join result. This ends up causing a big data shuffling because the two RDD is huge, even though the result after filtering is small. Plan2 offers a smarter way by using the "push-down-predicate" technique where we first apply the filtering in both RDDs before joining them. Since the filtering will reduce the number of items of each RDD significantly, the join processing will be much cheaper. Execution Planning As explain above, data shuffling incur the most significant cost in the overall data processing flow. Spark provides a mechanism that generate an execute plan from the DAG that minimize the amount of data shuffling. Analyze the DAG to determine the order of transformation. Notice that we starts from the action (terminal node) and trace back to all dependent RDDs. To minimize data shuffling, we group the narrow transformation together in a "stage" where all transformation tasks can be performed within the partition and no data shuffling is needed. The transformations becomes tasks that are chained together within a stage Wide transformation sits at the boundary of two stages, which requires data to be shuffled to a different worker server. When a stage finishes its execution, it persist the data into different files (one per partition) of the local disks. Worker nodes of the subsequent stage will come to pickup these files and this is where data shuffling happens Below is an example how the execution planning turns the DAG into an execution plan involving stages and tasks. Reliability and Fault Resiliency Since the DAG defines a deterministic transformation steps between different partitions of data within each RDD RDD, fault recovery is very straightforward. Whenever a worker server crashes during the execution of a stage, another worker server can simply re-execute the stage from the beginning by pulling the input data from its parent stage that has the output data stored in local files. In case the result of the parent stage is not accessible (e.g. the worker server lost the file), the parent stage need to be re-executed as well. Imagine this is a lineage of transformation steps, and any failure of a step will trigger a restart of execution from its last step. Since the DAG itself is an atomic unit of execution, all the RDD values will be forgotten after the DAG finishes its execution. Therefore, after the driver program finishes an action (which execute a DAG to its completion), all the RDD value will be forgotten and if the program access the RDD again in subsequent statement, the RDD needs to be recomputed again from its dependents. To reduce this repetitive processing, Spark provide a caching mechanism to remember RDDs in worker server memory (or local disk). Once the execution planner finds the RDD is already cache in memory, it will use the RDD right away without tracing back to its parent RDDs. This way, we prune the DAG once we reach an RDD that is in the cache. Overall speaking, Apache Spark provides a powerful framework for big data processing. By the caching mechanism that holds previous computation result in memory, Spark out-performs Hadoop significantly because it doesn't need to persist all the data into disk for each round of parallel processing. Although it is still very new, I think Spark will take off as the main stream approach to process big data.
March 13, 2015
by Ricky Ho
· 23,502 Views · 2 Likes
article thumbnail
Using MongoDB with Hadoop & Spark: Part 2 - Hive Example
Originally Written by Matt Kalan Welcome to part two of our three-part series on MongoDB and Hadoop. In part one, we introduced Hadoop and how to set it up. In this post, we'll look at a Hive example. Introduction & Setup of Hadoop and MongoDB Hive Example Spark Example & Key Takeaways For more detail on the use case, see the first paragraph of part 1. Summary Use case: aggregating 1 minute intervals of stock prices into 5 minute intervals Input:: 1 minute stock prices intervals in a MongoDB database Simple Analysis: performed in: - Hive - Spark Output: 5 minute stock prices intervals in Hadoop Hive Example I ran the following example from the Hive command line (simply typing the command “hive” with no parameters), not Cloudera’s Hue editor, as that would have needed additional installation steps. I immediately noticed the criticism people have with Hive, that everything is compiled into MapReduce which takes considerable time. I ran most things with just 20 records to make the queries run quickly. This creates the definition of the table in Hive that matches the structure of the data in MongoDB. MongoDB has a dynamic schema for variable data shapes but Hive and SQL need a schema definition. CREATE EXTERNAL TABLE minute_bars ( id STRUCT, Symbol STRING, Timestamp STRING, Day INT, Open DOUBLE, High DOUBLE, Low DOUBLE, Close DOUBLE, Volume INT ) STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler' WITH SERDEPROPERTIES('mongo.columns.mapping'='{"id":"_id", "Symbol":"Symbol", "Timestamp":"Timestamp", "Day":"Day", "Open":"Open", "High":"High", "Low":"Low", "Close":"Close", "Volume":"Volume"}') TBLPROPERTIES('mongo.uri'='mongodb://localhost:27017/marketdata.minbars'); Recent changes in the Apache Hive repo make the mappings necessary even if you are keeping the field names the same. This should be changed in the MongoDB Hadoop Connector soon if not already by the time you read this. Then I ran the following command to create a Hive table for the 5 minute bars: CREATE TABLE five_minute_bars ( id STRUCT, Symbol STRING, Timestamp STRING, Open DOUBLE, High DOUBLE, Low DOUBLE, Close DOUBLE ); This insert statement uses the SQL windowing functions to group 5 1-minute periods and determine the OHLC for the 5 minutes. There are definitely other ways to do this but here is one I figured out. Grouping in SQL is a little different from grouping in the MongoDB aggregation framework (in which you can pull the first and last of a group easily), so it took me a little while to remember how to do it with a subquery. The subquery takes each group of 5 1-minute records/documents, sorts them by time, and takes the open, high, low, and close price up to that record in each 5-minute period. Then the outside WHERE clause selects the last 1-minute bar in that period (because that row in the subquery has the correct OHLC information for its 5-minute period). I definitely welcome easier queries to understand but you can run the subquery by itself to see what it’s doing too. INSERT INTO TABLE five_minute_bars SELECT m.id, m.Symbol, m.OpenTime as Timestamp, m.Open, m.High, m.Low, m.Close FROM (SELECT id, Symbol, FIRST_VALUE(Timestamp) OVER ( PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) ORDER BY Timestamp) as OpenTime, LAST_VALUE(Timestamp) OVER ( PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) ORDER BY Timestamp) as CloseTime, FIRST_VALUE(Open) OVER ( PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) ORDER BY Timestamp) as Open, MAX(High) OVER ( PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) ORDER BY Timestamp) as High, MIN(Low) OVER ( PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) ORDER BY Timestamp) as Low, LAST_VALUE(Close) OVER ( PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) ORDER BY Timestamp) as Close FROM minute_bars) as m WHERE unix_timestamp(m.CloseTime, 'yyyy-MM-dd HH:mm') - unix_timestamp(m.OpenTime, 'yyyy-MM-dd HH:mm') = 60*4; I can definitely see the benefit of being able to use SQL to access data in MongoDB and optionally in other databases and file formats, all with the same commands, while the mapping differences are handled in the table declarations. The downside is that the latency is quite high, but that could be made up some with the ability to scale horizontally across many nodes. I think this is the appeal of Hive for most people - they can scale to very large data volumes using traditional SQL, and latency is not a primary concern. Post #3 in this blog series shows similar examples using Spark. Introduction & Setup of Hadoop and MongoDB Hive Example Spark Example & Key Takeaways To learn more, watch our video on MongoDB and Hadoop. We will take a deep dive into the MongoDB Connector for Hadoop and how it can be applied to enable new business insights. WATCH MONGODB & HADOOP << Read Part 1
March 2, 2015
by Francesca Krihely
· 10,885 Views
article thumbnail
Streaming Big Data: Storm, Spark and Samza
There are a number of distributed computation systems that can process Big Data in real time or near-real time. This article will start with a short description of three Apache frameworks, and attempt to provide a quick, high-level overview of some of their similarities and differences. Apache Storm In Storm, you design a graph of real-time computation called a topology, and feed it to the cluster where the master node will distribute the code among worker nodes to execute it. In a topology, data is passed around between spouts that emit data streams as immutable sets of key-value pairs called tuples, and bolts that transform those streams (count, filter etc.). Bolts themselves can optionally emit data to other bolts down the processing pipeline. Apache Spark Spark Streaming (an extension of the core Spark API) doesn’t process streams one at a time like Storm. Instead, it slices them in small batches of time intervals before processing them. The Spark abstraction for a continuous stream of data is called a DStream (for Discretized Stream). A DStream is a micro-batch of RDDs (Resilient Distributed Datasets). RDDs are distributed collections that can be operated in parallel by arbitrary functions and by transformations over a sliding window of data (windowed computations). Apache Samza Samza ’s approach to streaming is to process messages as they are received, one at a time. Samza’s stream primitive is not a tuple or a Dstream, but a message. Streams are divided into partitions and each partition is an ordered sequence of read-only messages with each message having a unique ID (offset). The system also supports batching, i.e. consuming several messages from the same stream partition in sequence. Samza`s Execution & Streaming modules are both pluggable, although Samza typically relies on Hadoop’s YARN (Yet Another Resource Negotiator) and Apache Kafka. Common Ground All three real-time computation systems are open-source, low-latency, distributed, scalable and fault-tolerant. They all allow you to run your stream processing code through parallel tasks distributed across a cluster of computing machines with fail-over capabilities. They also provide simple APIs to abstract the complexity of the underlying implementations. The three frameworks use different vocabularies for similar concepts: Comparison Matrix A few of the differences are summarized in the table below: There are three general categories of delivery patterns: At-most-once: messages may be lost. This is usually the least desirable outcome. At-least-once: messages may be redelivered (no loss, but duplicates). This is good enough for many use cases. Exactly-once: each message is delivered once and only once (no loss, no duplicates). This is a desirable feature although difficult to guarantee in all cases. Another aspect is state management. There are different strategies to store state. Spark Streaming writes data into the distributed file system (e.g. HDFS). Samza uses an embedded key-value store. With Storm, you’ll have to either roll your own state management at your application layer, or use a higher-level abstraction called Trident. Use Cases All three frameworks are particularly well-suited to efficiently process continuous, massive amounts of real-time data. So which one to use? There are no hard rules, at most a few general guidelines. If you want a high-speed event processing system that allows for incremental computations, Storm would be fine for that. If you further need to run distributed computations on demand, while the client is waiting synchronously for the results, you’ll have Distributed RPC (DRPC) out-of-the-box. Last but not least, because Storm uses Apache Thrift, you can write topologies in any programming language. If you need state persistence and/or exactly-once delivery though, you should look at the higher-level Trident API, which also offers micro-batching. A few companies using Storm: Twitter, Yahoo!, Spotify, The Weather Channel... Speaking of micro-batching, if you must have stateful computations, exactly-once delivery and don’t mind a higher latency, you could consider Spark Streaming…specially if you also plan for graph operations, machine learning or SQL access. The Apache Spark stack lets you combine several libraries with streaming (Spark SQL, MLlib, GraphX) and provides a convenient unifying programming model. In particular, streaming algorithms (e.g. streaming k-means) allow Spark to facilitate decisions in real-time. A few companies using Spark: Amazon, Yahoo!, NASA JPL, eBay Inc., Baidu… If you have a large amount of state to work with (e.g. many gigabytes per partition), Samza co-locates storage and processing on the same machines, allowing to work efficiently with state that won’t fit in memory. The framework also offers flexibility with its pluggable API: its default execution, messaging and storage engines can each be replaced with your choice of alternatives. Moreover, if you have a number of data processing stages from different teams with different codebases, Samza ‘s fine-grained jobs would be particularly well-suited, since they can be added/removed with minimal ripple effects. A few companies using Samza: LinkedIn, Intuit, Metamarkets, Quantiply, Fortscale… Conclusion We only scratched the surface of The Three Apaches. We didn’t cover a number of other features and more subtle differences between these frameworks. Also, it’s important to keep in mind the limits of the above comparisons, as these systems are constantly evolving.
February 28, 2015
by Tony Siciliani
· 32,732 Views · 5 Likes
article thumbnail
How-To: Setup Development Environment for Hadoop MapReduce
This post is intended for folks who are looking out for a quick start on developing a basic Hadoop MapReduce application. We will see how to set up a basic MR application for WordCount using Java, Maven and Eclipse and run a basic MR program in local mode , which is easy for debugging at an early stage. Assuming JDK 1.6+ is already installed and Eclipse has a setup for Maven plugin and download from default maven repository is not restriced. Problem Statement : To count the occurrence of each word appearing in an input file using MapReduce. Step 1 : Adding Dependency Create a maven project in eclipse and use following code in your pom.xml. 4.0.0 com.saurzcode.hadoop MapReduce 0.0.1-SNAPSHOT jar org.apache.hadoop hadoop-client 2.2.0 Upon saving it should download all required dependencies for running a basic Hadoop MapReduce program. Step 2 : Mapper Program Map step involves tokenizing the file, traversing the words, and emitting a count of one for each word that is found. Our mapper class should extend Mapper class and override it’s map method. When this method is called the value parameter of the method will contain a chunk of the lines of file to be processed and the output parameter is used to emit word instances. In real world clustered setup, this code will run on multiple nodes which will be consumed by set of reducers to process further. public class WordCountMapper extends Mapper { private final IntWritable ONE = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while(tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, ONE); } } } Step 3 : Reducer Program Our reducer extends the Reducer class and implement logic to sum up each occurrence of word token received from mappers.Output from Reducers will go to the output folder as a text file ( default or as configured in Driver program for Output format) named as part-r-00000 along with a _SUCCESS file. public class WordCountReducer extends Reducer { public void reduce(Text text, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(text, new IntWritable(sum)); } } Step 4 : Driver Program Our driver program will configure the job by supplying the map and reduce program we just wrote along with various input , output parameters. public class WordCount { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Path inputPath = new Path(args[0]); Path outputDir = new Path(args[1]); // Create configuration Configuration conf = new Configuration(true); // Create job Job job = new Job(conf, "WordCount"); job.setJarByClass(WordCountMapper.class); // Setup MapReduce job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setNumReduceTasks(1); // Specify key / value job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // Input FileInputFormat.addInputPath(job, inputPath); job.setInputFormatClass(TextInputFormat.class); // Output FileOutputFormat.setOutputPath(job, outputDir); job.setOutputFormatClass(TextOutputFormat.class); // Delete output if exists FileSystem hdfs = FileSystem.get(conf); if (hdfs.exists(outputDir)) hdfs.delete(outputDir, true); // Execute job int code = job.waitForCompletion(true) ? 0 : 1; System.exit(code); } } That’s It !! We are all set to execute our first MapReduce Program in eclipse in local mode. Let’s assume there is an input text file called input.txt in folder input which contains following text : foo bar is foo count count foo for saurzcode Expected output : foo 3 bar 1 is 1 count 2 for 1 saurzcode 1 Let’s run this program in eclipse as Java Application :- We need to give path to input and output folder/file to the program as argument.Also, note output folder shouldn’t exist before running this program else program will fail. java com.saurzcode.mapreduce.WordCount input/inputfile.txt output If this program runs successfully emitting set of lines while it is executing mappers and reducers, we should see a output folder and with following files : output/ _SUCCESS part-r-00000
January 30, 2015
by Saurabh Chhajed
· 13,080 Views · 1 Like
article thumbnail
Lambda Architecture for Big Data
An increasing number of systems are being built to handle the Volume, Velocity and Variety of Big Data, and hopefully help gain new insights and make better business decisions. Here, we will look at ways to deal with Big Data’s Volume and Velocity simultaneously, within a single architecture solution. Volume + Velocity Apache Hadoop provides both reliable storage (HDFS) and a processing system (MapReduce) for large data sets across clusters of computers. MapReduce is a batch query processor that is targeted at long-running background processes. Hadoop can handle Volume. But to handle Velocity, we need real-time processing tools that can compensate for the high-latency of batch systems, and serve the most recent data continuously, as new data arrives and older data is progressively integrated into the batch framework. Therefore we need both batch and real-time to run in parallel, and add a real-time computational system (e.g. Apache Storm) to our batch framework. This architectural combination of batch and real-time computation is referred to as a Lambda Architecture (λ). Generic Lambda λ has three layers: The Batch Layer manages the master data and precomputes the batch views The Speed Layer serves recent data only and increments the real-time views The Serving Layer is responsible for indexing and exposing the views so that they can be queried. The three layers are outlined in the below diagram along with a sample choice of technology stacks: Incoming data is dispatched to both Batch and Speed layers for processing. At the other end, queries are answered by merging both batch and real-time views. Note that real-time views are transient by nature and their data is discarded (making room for newer data) once propagated through the Batch and Serving layers. Most of the complexity is pushed onto the much smaller Speed layer where the results are only temporary, a process known as “complexity isolation“. We are indeed isolating the complexity of concurrent data updates in a layer that is regularly purged and kept small in size. λ is technology agnostic. The data pipeline is broken down into layers with clear demarcation of responsibilities, and at each layer, we can choose from a number of technologies. The Speed layer for instance could use either Apache Storm, or Apache Spark Streaming, or Spring “XD” ( eXtreme Data) etc. How do we recover from mistakes in λ ? Basically, we recompute the views. If that takes too long, we just revert to the previous, non-corrupted versions of our data. We can do that because of data immutability in the master dataset: data is never updated, only appended to (time-based ordering). The system is therefore Human Fault-Tolerant: if we write bad data, we can just remove that data altogether and recompute. Unified Lambda The downside of λ is its inherent complexity. Keeping in sync two already complex distributed systems is quite an implementation and maintenance challenge. People have started to look for simpler alternatives that would bring just about the same benefits and handle the full problem set. There are basically three approaches: 1) Adopt a pure streaming approach, and use a flexible framework such as Apache Samza to provide some type of batch processing. Although its distributed streaming layer is pluggable, Samza typically relies on Apache Kafka. Samza’s streams are replayable, ordered partitions. Samza can be configured for batching, i.e. consume several messages from the same stream partition in sequence. 2) Take the opposite approach, and choose a flexible Batch framework that would also allow micro-batches, small enough to be close to real-time, with Apache Spark/Spark Streaming or Storm’s Trident. Spark streaming is essentially a sequence of small batch processes that can reach latency as low as one second.Trident is a high-level abstraction on top of Storm that can process streams as small batches as well as do batch aggregation. 3) Use a technology stack already combining batch and real-time, such as Spring “XD”, Summingbird or Lambdoop. Summingbird (“Streaming MapReduce”) is a hybrid system where both batch/real-time workflows can be run at the same time and the results merged automatically.The Speed layer runs on Storm and the Batch layer on Hadoop, Lambdoop (Lambda-Hadoop, with HBase, Storm and Redis) also combines batch/real-time by offering a single API for both processing paradigms: The integrated approach (unified λ) seeks to handle Big Data’s Volume and Velocity by featuring a hybrid computation model, where both batch and real-time data processing are combined transparently. And with a unified framework, there would be only one system to learn, and one system to maintain.
January 17, 2015
by Tony Siciliani
· 40,473 Views · 6 Likes
article thumbnail
Mutliple Table Insert Using a Single POST - Common Coding Examples
This is part of a series of blogs from Espresso Logic’s Application Engineering team on common coding tasks and how they can be accomplished more efficiently using Espresso Logic. The purpose of these blogs is two fold: Provide code examples of how Espresso system can be used to solve certain use cases Provide code samples that you can use within your programs while developing apps This blog deals with a design pattern all web and mobile transaction applications have to deal with – how to create a multiple table self registration using a single POST. This is both a performance issue, reducing the number of calls to and from the server, as well as a demonstration of how Espresso Logic advanced design handles complex transactions without coding. In the prior blog we dealt with rules used to validate credit cards. Self Registration Design Pattern User self registration is a standard design pattern for some web and mobile applications. This becomes a bit more challenging when the database model is using multiple tables. Using Espresso Logic , you can create a multiple table insert using a single POST. The back-end server can be run in the cloud or on-premise to connect to your database and quickly expose RESTful endpoints for each of your SQL tables, views, and stored procedures. Creating a compound nested document (called a Resource) will allow us to design an API that will join related tables into a single REST endpoint. Developers can design a Mobile front-end using a drag-and-drop tools that bind each of the text fields with the JSON nested document fields. Using a single POST, this data is sent to the Espresso Logic REST server to handle the details. Self Registration Account Setup Data Model The model below shows tables that hold each of the relevant parts of the self-registration entry. Each of these tables has an auto increment primary key and a foreign key relationship to the Person table (e.g. Password, PersonPhone, Address, EmailAddress) to support the one-to-many relationship cardinality. So how can we create a single POST to insert into multiple tables and propagate the primary key? Connecting to any SQL database, Espresso Logic will instantly create REST API definitions for every table, view, and stored procedure. Multiple table data model Create new Resource Next, we us the Espresso Logic Design Studio to select one or more tables from a point-and-click interface to create this new compound document endpoint (see Resource documentation), which is a combination of each of the child tables. The ‘Join’ for each child is be automatically completed using the existing relationships defined in the schema. The unused Attributes for the POST can be excluded including the primary key (auto increment) and the foreign parent key PersonID (these are handled by the server) in each child table. Multiple Table Resource Setup POST JSON Sample Data The REST Lab in Espresso Logic Design Studio can be used to test this new resource. We enter all the values of the JSON self registration (shown below) and use the POST command. { "Title": "Mr", "FirstName": "Test", "MiddleName": "M", "LastName": "Record5", "Phone": [ { "PhoneNumber": "(407) 555-1216", "PhoneNumberTypeID": 1 } ], "Address": [ { "AddressTypeID": 2, "AddressLine1": "555 Main St", "AddressLine2": "Apt 6", "City": "Maitland", "StateProvinceID": 15, "PostalCode": "32751" }, { "AddressTypeID": 1, "AddressLine1": "555 Main St", "AddressLine2": "6", "City": "Maitland", "StateProvinceID": 15, "PostalCode": "32751" } ], "Email": [ { "EmailAddress": "[email protected]" } ], "Password": [ { "PasswordHash": " password6", "Username": "user" } ] } How it works The Espresso Logic REST Server will take this JSON and perform the necessary validations, derivations, and event processing (using Reactive Logic Programming) and then insert the Person first (returning the primary key) and then propagate this down to each of the related children in a single transaction. All the logic, validations, and events must succeed or the entire transaction is rolled back. This is not an easy trick to perform for any REST API – so do not try this at home without a deep understanding of SQL, transactions, the data model and relationships between parent and child. Espresso Logic does this out-of-the-box with no code required. Live Browser Espresso Logic offers Live Browser which is an Instant HTML5/Angular view of your data using the active schema. This will lets us view the one-to-many relationships of the self registration process. Try it yourself and see the power of Espresso Logic. View the samples on GitHub. Espresso Logic ‘Mutliple Table insert using a Single POST’ examples are part of the extended demo. Summary The mobile and web developer want a simple and single REST endpoint to populate and update user information. The server should be able to handle the complexity and hide the underlying data model. Using the Espresso Logic back-end server eliminates all the code required to connect to the database, create a nested document endpoint, and propagate the primary key to all the related child tables. Live Browser gives you an instant view to see and test your results – again, no code. This is the fastest way to build and deliver mobile solutions on the market today.
December 30, 2014
by Val Huber DZone Core CORE
· 19,020 Views · 1 Like
article thumbnail
Learn R: How to Extract Rows and Columns From Data Frame
This article represents command set in R programming language, which could be used to extract rows and columns from a given data frame.
December 8, 2014
by Ajitesh Kumar
· 1,105,178 Views · 5 Likes
article thumbnail
How Does Elasticsearch Real-time Search?
Compared to other features, real-time search capability is undoubtedly one of the most important features in Elasticsearch. Today we’ll look closely how is provided real-time search by Elasticsearch. Real time First of all, if we need to explain the concept of real-time, in general, we can say that the delay between input and out time in the information is small at real-time systems. This means, data is taken without data accumulation, processed in real time. Today, the best solution Elasticsearch known for real-time search, when a record is added to it for storage makes it searchable in 1 second. How? As is known, the disks are able to create a risk of bottleneck for I/O operations at the data persistence step. Also some mechanisms used for prevent any loss of data increases cost of time. At this point Elasticsearch uses the file-system cache that sitting between itself and the disk for overcome the risk of bottleneck and ensure the a new document can be searched in real time. A new segment is written to the file-system cache first and only later it flushed to disk by Elasticsearch. This lightweight process of writing and opening a new segment is called a refresh in Elasticsearch. By default, all shards is refreshed automatically once every second. In this way, Elasticsearch support real-time search. Test time Above digression about the time of refresh of the shards you can bring to mind the following questions: What happens, when a new document is requested in less than 1 second time? Can be documents requested, without having to depend of the refresh period shards of managed by Elasticsearch? Short answers. Elasticsearch does not return the document. Yes. Now let’s get clarity on this issue is a simple example. hakdogan$ curl -XPUT localhost:9200/kodcucom/document/1 -d'{ > "title": "Document A" > }' We sent a document to Elasticsearch. The index name is kodcucom, type document, id value 1. The title field is only field in the document and the value of "Document A". Let’s take this document from Elasticsearch. hakdogan$ curl -XGET localhost:9200/kodcucom/document/1?pretty { "_index" : "kodcucom", "_type" : "document", "_id" : "1", "_version" : 1, "found" : true, "_source":{ "title": "Document A" } } As expected, the document was returned to us. Well, if we keep short the time between document recording and get request than default shard refresh time what will happen? Let’s see. hakdogan$ curl -XPUT localhost:9200/kodcucom/document/2 -d'{"title": "Document B"}'; curl -XGET localhost:9200/kodcucom/_search?pretty {"_index":"kodcucom","_type":"document","_id":"2","_version":1,"created":true}{ "took" : 38, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 }, "hits" : { "total" : 1, "max_score" : 1.0, "hits" : [ { "_index" : "kodcucom", "_type" : "document", "_id" : "1", "_score" : 1.0, "_source":{ "title": "Document A" } } ] } } As can be seen, only the previous document was returned to us by Elasticsearch when we do concurrently create and get request. Well, how can I get the document concurrently? Let’s see. hakdogan$ curl -XPUT localhost:9200/kodcucom/document/3 -d'{"title": "Document C"}'; curl -XGET localhost:9200/kodcucom/_refresh; curl -XGET localhost:9200/kodcucom/_search?pretty {"_index":"kodcucom","_type":"document","_id":"3","_version":1,"created":true}{"_shards":{"total":10,"successful":5,"failed":0}{ "took" : 3, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 }, "hits" : { "total" : 3, "max_score" : 1.0, "hits" : [ { "_index" : "kodcucom", "_type" : "document", "_id" : "1", "_score" : 1.0, "_source":{ "title": "Document A" } }, { "_index" : "kodcucom", "_type" : "document", "_id" : "2", "_score" : 1.0, "_source":{"title": "Document B"} }, { "_index" : "kodcucom", "_type" : "document", "_id" : "3", "_score" : 1.0, "_source":{"title": "Document C"} } ] } } In this command, we perform to refresh operation on kodcucom index before the search request. In this way, the document was returned to us. Auto refresh time can be changed. By setting the index.refresh_interval parameter in the configuration file. Applies to all indices in the cluster. A per-index basis by updated index setting. In addition to these, you can turn off automatic refresh. An important point to keep in mind about the refresh time of the shards, the refresh operation is costly in terms of system resources. If you wished to make changes to the auto-refresh time, this situation should be taken into account. Extension of the automatic refresh time, enables faster indexing but new documents and changes made to the existing documents will not appear in searches during specified period of time.
November 25, 2014
by Hüseyin Akdoğan DZone Core CORE
· 17,436 Views
article thumbnail
NewTypes Aren't As Cool As You Think
My last post talked about what’s wrong with type classes (in general, but also specifically in Haskell). This post generated some great feedback on Reddit, including some valid criticism that I didn’t explain why I hated on newtypes so much. I took some of that feedback and incorporated it into a revised version of the post, but I have even more to say about “newtypes," so I decided to write another blog post. What’s in a Newtype Newtypes are a feature of Haskell that let you define a new type in terms of an existing type. In the following example, I create a newtype for Email, which “holds” a String. newtype Email = Email String They are similar to type synonyms (type Email = String), except that type synonyms don’t create new types, they just allow you to refer to existing types by other names. Every newtype can be easily translated into a data declaration. In fact, only the keyword changes: data Email = Email String There’s a slight semantic difference between the two, but for purposes of this blog post, any criticism I have against newtypes apply equally to similar constructs modeled with type or data. The Promise of Newtypes Newtypes are used to provide and select between alternate implementations of type classes for some base types. I think that’s a hack (albeit a necessary one), but I’ve already talked about this so I won’t belabor it here. The other promise of newtypes is that we can use them to make our code more type safe. Instead of passing around String as an email, for example, we can create a super lightweight “wrapper” around String called Email, and make it an error to use a String wherever an Email is expected. This practice isn’t restricted to Haskell. Even in Java, it’s considered good coding practice to wrap primitives with classes whose names denote the meaning of the wrapper (Email, SSN, Address, etc.). There’s a part of this promise that’s certainly true. If I have to define a function accepting four parameters, and three of them are strings, but one of those strings denotes an email, then I have two choices: Model the email parameter with a String. In this case, I may accidentally use the email where I intended to use the other two string parameters, or I may use one of the other two string parameters where I intended to use the email. Considering just these choices, there are five ways my program may go wrong if I use the wrong name in the wrong position. Model the email parameter with a newtype. In this case, I cannot use the email where I intended to use the other two string parameters, because the compiler may stop me. Similarly, I cannot use the other two string parameters where I intended to use the email, for the same reason. Looking at just these choices, there are 0 ways my program may go wrong. Thus, newtypes, like all good FP practices, reduce the number of ways my program can go wrong. Unfortunately, in my opinion, they don’t go nearly far enough. False Security For most intent and purposes, newtypes are isomorphic to the single value they hold. In my preceding example, given a String, I can get an email (Email "foo"). Given an Email, I can also get a String, e.g. by pattern matching on the Email constructor. Stated differently, and also approximately because I’m ignoring bottom: the String and Email types are isomorphic; they contain the same inhabitants, for any useful definition of “same”. The only substantive difference between the preceding String and Email is the name of the data constructor (call Email an AbergrackleFoozyWatzit, and what has changed?). Hence, my previous criticism of newtypes as “programming by name”. By themselves, newtypes don’t really reduce the number of ways my program can go wrong. They just make it a bit harder to go wrong. But any newtype is isomorphic to the value it holds, and it’s trivial to convert between the two. In fact, if my code doesn’t need to convert between the two (either directly or indirectly), then it’s better off generic. That is, if I never need to convert an Email to a String, or a String to an Email, then I should really write the code generically to work with any value (even if that means making data structures or functions more polymorphic). Parametricity provides a massive reduction in the number of ways my program can go wrong. Newtypes, on the other hand, just make it a bit harder to go wrong, by adding one layer of indirection. In this example, as with many newtypes, I’ve created a bad isomorphism. The domain model of an email is not isomorphic to the data model of a string. But by using a newtype, I have implicitly declared that they are isomorphic. Calling a string an email may make me feel better, because of the different name, but fundamentally, with a newtype, it’s still a string, and I’m only ever one more step away from going wrong. In my experience, too many newtypes create an isomorphism between things that, properly modeled, are not isomorphic. Fortunately, there’s a well-worn workaround that lets us get more mileage out of newtypes. Smart Constructors If I define Email in a module, I can make its data constructor private, and export a helper function to construct an Email. Such helper functions are called smart constructors. They can be used to break the natural isomorphisms created by newtyping. An example is shown below: newtype Email = MkEmail String mkEmail :: String -> Maybe Email mkEmail s = ... In this example, I create a smart constructor which does not promise that it can turn every string into an email. It promises only that it might be able to turn a string into an email, by returning a Maybe Email. With the smart constructor approach, I’ve modeled the fact that while every email has a string representation, not every string has an email representation. Going back to my earlier example of passing same-typed parameters to a function, if I use a smart constructor, then while I can still use an email anywhere a string is expected (by converting), I can’t use a string anywhere an email is expected. (Well, ignoring the fromJust abomination!) Smart Constructors, Dumb Data Smart constructors take us one step closer toward modeling data in a type safe fashion. Unfortunately, I still don’t think it’s far enough. With smart constructors, our data model is fundamentally underconstrained, so we patch that up by restricting who can create the data. That’s putting a band-aid on the real problem. Why not just solve the root issue — viz., that our data model is underconstrained? Dumb Constructors, Smart Data The best solution to a great many newtype problems, I believe, is creating a data model where there is a true isomorphism between the entity modeled by our data and the values passed to the data constructor. That is, creating a data model such that there exists no regions in our data’s state space which correspond to invalid states. Email is a simple example, because there are well-defined models for what constitutes a data model, which can be translated into data declarations in straightforward, if tedious fashion. (To some extent, it’s a failure of most languages I know that such specifications cannot be easily translated into code without tedious boilerplate!) When our data declaration precisely fits our data model specification, there’s no need for smart constructors, and no need for newtypes. There’s far fewer ways that code can go wrong, and because our domain model is captured precisely by our data model, we can transform that data model in ways that make semantic sense (e.g. transforming just the name part of an email, since we’re now in the realm of structured data). Summary As I’ve explained in this blog post, I don’t really hate newtypes. I think they’re very useful, and I do use them, because they make it more difficult for my programs to go wrong. Ultimately, however, I think a lot of problems solved with newtypes (modeling coordinates, positions, emails, etc.) are better solved by more precise data modeling. That is, by making our programs stop lying about isomorphisms. Precision may be tedious due to limitations of the languages we work in, but honestly, what’s more tedious than debugging broken code?
November 20, 2014
by John De Goes
· 10,752 Views
article thumbnail
The No Fluff Introduction to Big Data
big data traditionally has referred to a collection of data too massive to be handled efficiently by traditional database tools and methods. this original definition has expanded over the years to identify tools (big data tools) that tackle extremely large datasets (nosql databases, mapreduce, hadoop, newsql, etc.), and to describe the industry challenge posed by having data harvesting abilities that far outstrip the ability to process, interpret, and act on that data. technologists knew that those huge batches of user data and other data types were full of insights that could be extracted by analyzing the data in large aggregates. they just didn’t have any cheap, simple technology for organizing and querying these large batches of raw, unstructured data. the term quickly became a buzzword for every sort of data processing product’s marketing team. big data became a catchall term for anything that handled non-trivial sizes of data. sean owen, a data scientist at cloudera, has suggested that big data is a stage where individual data points are irrelevant and only aggregate analysis matters [1]. but this is true for a 400 person survey as well, and most people wouldn’t consider that very big. the key part missing from that definition is the transformation of unstructured data batches into structured datasets. it doesn’t matter if the database is relational or non-relational. big data is not defined by a number of terabytes, it’s rooted in the push to discoverhidden insights in data that companies used to disregard or throw away. due to the obstacles presented by large scale data management, the goal for developers and data scientists is two-fold: first, systems must be created to handle large scale data, and two, business intelligence and insights should be acquired from analysis of the data. acquiring the tools and methods to meet these goals is a major focus in the data science industry, but it’s a landscape where needs and goals are still shifting. what are the characteristics of big data? tech companies are constantly amassing data from a variety of digital sources that is almost without end—everything from email addresses to digital images, mp3s, social media communication, server traffic logs, purchase history, and demographics. and it’s not just the data itself, but data about the data (metadata). it is a barrage of information on every level. what is it that makes this mountain of data big data? one of the most helpful models for understanding the nature of big data is “the three vs:” volume, velocity, and variety. data volume volumeis the sheer size of the data being collected. there was a point in not-so-distant history where managing gigabytes of data was considered a serious task—now we have web giants like google and facebook handling petabytes of information about users’ digital activities. the size of the data is often seen as the first challenge of characterizing big data storage, but even beyond that is the capability of programs to provide architecture that can not only store but query these massive datasets. one of the most popular models for big data architecture comes from google’s mapreduce concept, which was the basis for apache hadoop, a popular data management solution. data velocity velocityis a problem that flows naturally from the volume characteristics of big data. data velocity is the speed at which data is flowing into a business’ infrastructure and the ability of software solutions to receive and ingest that data quickly. certain types of high-velocity data, such as streaming data, needs to be moved into storage and processed on the fly. this is often referred to as complex event processing (cep). the ability to intercept and analyze data that has a lifespan of milliseconds is a widely sought after. this kind of quick-fire data processing has long been the cornerstone of digital financial transactions, but it is also being used to track live consumer behavior or to bring instant updates to social media feeds. data variety variety refers to the source and type of data that is being collected. this data could be anything from raw image data to sensor readings, audio recordings, social media communication, and metadata. the challenge of data variety is being able to take raw, unstructured data and organize it so that an application can use it. this kind of structure can be achieved through architectural models that traditionally favor relational databases—but there is often a need to tidy up this data before it will even be useful to store in a raw form. sometimes a better option is to use a schema-less, non-relational database. how do you manage big data? the three vs is a great model for getting an initial understanding of what makes big data a challenge for businesses. however, big data is not just about the data itself, but the way that it is handled. a popular way of thinking about these challenges is to look at how a business stores, processes, and accesses their data. · store: can you store the vast amounts of data being collected? · process: can you organize, clean, and analyze the data collected? · access: can you search and query this data in an organized manner? the store, process, and access model is useful for two reasons: it reminds businesses that big data is largely about managing data, and it demonstrates the problem of scale within big data management. “big” is relative. the data batches that challenge some companies could be moved through a single google datacenter in under a minute. the only question a company needs to ask itself is how it will store and access increasingly massive amounts of data for its particular use case. there are several high level approaches that companies have turned to in the last few years. the traditional approach the traditional method for handling most data is to use relational databases. data warehouses are then used to integrate and analyze data from many sources. these databases are structured according to the concept of “early structure binding”—essentially, the database has predetermined “questions” that can be asked based on a schema. relational databases are highly functional, and the goal with this type of data processing is for the database to be fully transactional. although relational databases are the most common persistence type by a large margin (see key findings pg. 4-5), a growing number of use cases are not well-suited for relational schema. relational architectures tend to have difficulty when dealing with the velocity and variety of big data, since their structure is very rigid. when you perform functions such as join on many large data sets, the volume can be a problem as well. instead, businesses are looking to non-relational databases, or a mixture of both types, to meet data demand. the newer approach - mapreduce, hadoop, and nosql databases in the early 2000s, web giant google released two helpful web technologies: google file system (gfs) and mapreduce. both were new and unique approaches to the growing problem of big data, but mapreduce was chief among them, especially when it comes to its role as a major influencer of later solution models. mapreduce is a programming paradigm that allows for low cost data analysis and clustered scale-out processing. mapreduce became the primary architectural influence for the next big thing in big data: the creation of the big data management infrastructure known as hadoop. hadoop’s open source ecosystem and ease of use for handling large-scale data processing operations have secured a large part of the big data marketplace. besides hadoop, there was a host of non-relational (nosql) databases that emerged around 2009 to meet a different set of demands for processing big data. whereas hadoop is used for its massive scalability and parallel processing, nosql databases are especially useful for handling data stored within large multi-structured datasets. this kind of discrete data handling is not traditionally seen as a strong point of relational databases, but it’s also not the same kind of data operations that hadoop is running. the solution for many businesses ends up being a combination of these approaches to data management. finding hidden data insights once you get beyond storage and management, you still have the enormous task of creating actionable business intelligence (bi) from the datasets you’ve collected. this problem of processing and analyzing data is maybe one of the trickiest in the data management lifecycle. the best options for data analytics will favor an approach that is predictive and adaptable to changing data streams. the thing is, there’s so many types of analytic models and different ways of providing infrastructure for this process. your analytics solution should scale, but to what degree? scalability can be an enormous pain in your analytical neck, due to the problem of decreasing performance returns when scaling out an algorithm. ultimately, analytics tools rely on a great deal of reasoning and analysis to extract data patterns and data insights, but this capacity means nothing for a business if they can’t then create actionable intelligence. part of this problem is that many businesses have the infrastructure to accommodate big data, but they aren’t asking questions about what problems they’re going to solve with the data. implementing a big data-ready infrastructure before knowing what questions you want to ask is like putting the cart before the horse. but even if we do know the questions we want to ask, data analysis can always reveal many correlations with no clear causes. as organizations get better at processing and analyzing big data, the next major hurdle will be pinpointing the causes behind the trends by asking the right questions and embracing the complexity of our answers. [1] http://www.quora.com/what-is-big-data 2014 guide to big data this guide explores the meaning of big data, how businesses use it, and uncovers new tools and techniques for the future of big data. this guide includes: detailed profiles on 43 big data vendor solutions in-depth articles written by industry experts results from our survey of 850 it professionals "finding the database for your use case" download now
September 25, 2014
by Benjamin Ball
· 10,660 Views · 1 Like
article thumbnail
The Near Future of IoT
[This article was written by Sean Lorenz.] Pundits within the technology sphere have been calling 2014 the year of the Internet of Things (IoT). The market revenue potentials are forecasted into the trillions and it’s a Fortune 500 land grab with major companies moving quickly to stake their claims [1]. If this sounds a bit like pages from an American Wild West history book, a frontier analogy isn’t too far off. This is an exciting turning point in technology that—thanks to advances in plummeting sensor costs, wireless communication, and chip size reduction—will soon make today’s futuristic IoT concepts seem humorous in retrospect. While it’s difficult to see where the market is going, given the exponential rate of change in IoT technology, I have noticed several key trends emerging. As a fellow IoT prospector on the frontier, this is my account of the most evident trends as well as some educated predictions for the future. Trends 1. Business Value Over Technology Focus Like any promising new technology still in its infancy stage, the true innovation stems from tech-savvy researchers and tinkerers that build fascinating devices that sometimes have no consumer base–I’m looking at you, robotics market. We have all heard about the smart toothbrushes and smart egg trays coming to market and thought: “Interesting! I wouldn’t buy one, but… sure!” Perhaps the biggest trend is a shift from thinking, “let’s build it because we can” to “what business problem are we solving here?” IoT developers are getting wise to this mentality and building user-focused MVPs (Minimum Viable Products) that will begin hitting the market in late 2014 and early 2015. 2. Keeping It Real At my company, Xively, we often get asked what are the real use cases for the IoT. Many times our customers walk in the door with a vague idea of how connecting their product or service to the Internet would be potentially interesting, but need a little help with seeing how an IoT-enabled product can transform their business—internally and externally. The reason for this is that most of the exciting, transformative elements happen under the hood. Right now, the true “wow” moments in the industry are far from sexy: energy savings in enterprise complexes, CRM & ERP integration, service and support, supply chain efficiencies, product part failure and alert, and so on… you get the idea. Smart homes that respond to our every whim are really great ideas, but these products aren’t integral to our lives yet. Large manufacturing companies and enterprises are using the Internet of Things to manage internal operations and efficiency while also engaging their customers more fully with new IoT data sources aggregated in existing services like Salesforce1 or SAP. 3. Publish-Subscribe The IoT protocol wars are heating up, but allegiances aside, publish-subscribe messaging is what the bulk of implemented models use for connecting devices to the cloud. Pub-sub protocols such as MQTT, CoAP, and AMQP are attractive for connected product development thanks to their ease of scalability and many-to-one/one-to-many possibilities. Given the massive variance of the IoT market, there is bound to be more than one protocol that wins in the end; yet before we get to that point, there are plenty of bugs and vulnerabilities to patch across all of the thriving, open IoT protocols out there. 4. Security Panic! Hacked refrigerators, big box stores, and security cameras… oh my! There has been no shortage of concern for privacy, security, and compliance in the Internet of Things space. Like any news story, some of this attention is warranted and some overblown. Just like your pre-IoT old-fashioned Internet, creating specific application keys and advanced permissioning systems for hardware connecting to the cloud is essential. The amount of nodes at the edge connecting to services across the Internet will be far larger than anything we see now, but IoT platforms are already addressing these complex device lifecycle management issues that are crucial for protecting personal and enterprise information in a connected world. Near-Term Predictions Now lets hop in the DeLorean and look into the future. Rather than focus on five, ten, or twenty years into the future, let’s focus only on the next few years. Why? As I mentioned in the beginning, the IoT landscape changes on a day-to-day basis, so even a prediction looking forward six-months from now can be unreliable. This list contains no self-driving cars or sentient AIs. Instead, it makes some pretty sure bets for what to expect over the horizon. 1. A Household Name Usually the second question after “what’s your name?” at a dinner party is the inevitable “so what do you do?” Mentioning the Internet of Things to non-techies still draws blank stares and looks of confusion. Those looks are justified given the not-so-great marketing name of IoT and the myriad definitions trying to explain what it actually is. Whether it’s called the Internet of Things, Internet of Everything, or just the good ol’ Internet, the concept of connecting any and everything to the Internet will begin to make sense for everyday consumers. 2. Consumers Slow to Adopt Many IoT products are still just toys in many people’s minds. Startups are building products that address problems which most consumers don’t see as a problem yet. This isn’t to say the consumer IoT market will evaporate. It just means we need to get smarter about what customers actually want from smart devices. Today’s wearable products remind me of the Newton—Apple’s infamous PDA. The problem wasn’t the idea, but rather the timing. The Apple Newton seemed clunky, not very powerful, and low on the usability scale. Years later, the iPhone and iPad came along with a set of features and a form factor that customers were looking for. The same feels true of wearables right now—they may need a few more years to incubate before the general public gives two thumbs up. Other consumer IoT markets such as the smart home or driverless cars seem to be in the same situation as the wearables market, but this is changing quickly with major players like Apple and Google moving into these arenas. For example, in the home automation space, frameworks like Apple HomeKit will be essential for unifying disparate protocols and clouds into one application that can handle various products’ data, automating much of the technology and pushing it into the background. I am sure there is a brilliant developer learning Swift and building the first killer smart home app as we speak. 3. Analytics and Automation This prediction probably comes as no surprise, but it is worth stating. Most companies willing to foray into the IoT unknown are, for now, happy with connecting their devices to an external application or cloud service. Having a place to send the data is usually the first step in constructing an IoT system. So what do you do with all this data once you have it? Reporting tools for IoT are just starting to become available, but this is just the tip of the iceberg. The real magic lies in the ability to use exploratory and predictive algorithms to make actionable intelligence a reality. These insights are beneficial to both businesses understanding their customers and to the customers themselves. One could imagine closing the feedback loop between sensor, cloud, and actuator by adding some beautiful supervised machine learning code into the cloud platform at some point in the chain. There are currently a handful of analytics startups focusing on IoT specifically, but this market is about to explode from both platform and application perspectives. 4. IoT Startups Galore For any developers out there interested in the IoT with a real customer pain that needs solving, now is the time to get coding and building that pitch deck. With hardware back en vogue, venture capital funding of IoT-centric companies ison the rise [2]. Having been to a number of IoT events, the amount of enthusiasm by VC and angel investors is palpable. There’s a definite need for developers with great, connected product and service ideas; so, if you haven’t already, I strongly suggest putting on your favorite prospecting gear and exploring the untamed wild west of the Internet of Things. [1] https://internetofeverything.cisco.com/sites/default/files/docs/en/ioe_public_sector_vas_white%20paper_121913final.pdf [2] http://www.cbinsights.com/blog/internet-of-things-investing-snapshot 2014 Guide to Internet of Things The 2014 Guide to Internet of Things covers 39 different IoT SDKs, developer programs, and hardware options, plus: Key findings from our survey of over 2,000 developers "How to IoT Your Life: The Complete Shopping List" "The Scale of IoT" Infographic Glossary of common IoT terms Four in-depth articles from industry experts DOWNLOAD NOW
August 28, 2014
by Benjamin Ball
· 10,519 Views
  • Previous
  • ...
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • Next
  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook
×