DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

The Latest Data Engineering Topics

article thumbnail
Why Run Your Microservices on a PaaS
[This article by Chris Haddad comes to you from the DZone Guide to Cloud Development - 2015 Edition. For more information—including in-depth articles from industry experts, best solutions for PaaS, iPaaS, IaaS, and MBaaS, and more—click the link below to download your free copy of the guide.] Microservices can be understood from two angles. First, the differential: teams that take a microservice design approach divide business solutions into distinct, full-stack business services owned by autonomous teams. Second, the integral: microservice-based applications weave multiple atomic microservices into holistic user experiences. Unfortunately, traditional application delivery models and traditional middleware infrastructure do not address microservice-specific demands for on-demand provisioning, dynamic composition, and service level management. On the other hand, the Platform-as-a-Service (PaaS) model addresses these demands perfectly. Running microservices on a PaaS fabric decreases solution fragility, reduces operational burden, and enhances developer productivity. To understand why, we’ll first review how microservices separate concerns from both business and object-oriented design perspectives. Second, we’ll consider how microservice-based design can complicate deployment as applications scale dynamically. Third, we’ll focus on how a PaaS environment helps to solve many of the problems both addressed and introduced by microservices-based architectures — in other words, why PaaS and microservices are a match made in heaven. Microservices: Separating Concerns By Business Solution A microservice approach decomposes monolithic applications according to the single responsibility pattern. In a microservice solution, each microservice interface delivers discrete business capabilities (e.g. customer profile, product catalogue, inventory, order, billing, fulfillment) within a well-defined, bounded context. The atomic microservice interfaces reside on separate and distinct full-stack application platforms that contain separate database storage, integration flows, and web application hosting. By separating concerns onto separate full-stack platforms and not sharing database instances or web application hosts across services, every team is free to choose different runtime languages and frameworks for its own microservice. Also, every team is free to evolve its data schemas, application frameworks, and business logic without impacting other teams. Because microservices are a relatively new design approach, many development teams may have the misconception that creating a microservice-based solution requires simply deploying small web services in containers. But this doesn’t cut quite deep enough. The correct approach is to evolve your monolithic design by applying service-oriented principles (i.e. encapsulation, loose coupling, separation of concerns) in conjunction with domain-driven design techniques and dynamic runtime application composition. For example, in a typical ecommerce scenario, a development team applies the bounded context pattern and single responsibility pattern to refactor a monolithic application into units distinguished by business capability (see Figure 2). By creating a user experience from loosely coupled services instead of tightly coupled native-language business objects, teams have more independence to develop, evolve, and deploy each business capability separately. Obviously, the microservice design approach works best for (a) greenfield projects or (b) modernization efforts where teams focus on refactoring monolithic application assets. The Microservice Execution Trap Although a microservice approach decouples development dependencies and speeds up development iterations, microservices also create a challenging environment for high-performance scaling and reliable runtime execution. More complex, loosely coupled, and dynamic environments distribute business capabilities over the entire network. Even a task as simple as responding to a single web application page request may spread out across several microservice instances residing on a distributed network topology. Martin Fowler and Stefan Tilkov (both microservice proponents) warn teams that successfully implementing a microservice approach requires choosing platforms that decrease solution fragility and reduce operational burdens. What Platform-as-a-Service Offers Platform-as-a-Service environments reduce microservice operational burdens when infrastructure-as-code and declarative policies are used to eliminate all manual actions and increase runtime quality of service (i.e. reliability, availability, scalability, and performance). The appropriate PaaS environment will automatically deploy, provision, and link full-stack microservices. In a microservice architecture, teams want to rapidly release new versions and perform A/B testing across versions. When teams define instance dependencies, scaling properties, and security policies as PaaS metadata or code scripts, the runtime fabric can reduce manual effort and increase release confidence. With a DevOps- friendly PaaS, the team can experiment with new service versions and safely rollback to a prior stable release if a problem arises. Because microservices are full-stack silos *1* that can be composed of multiple server instances (e.g. web server, database, load balancer, integration server), a PaaS can reduce deployment complexity by automatically spinning up and linking all instances. Linking may require discovering instance locations, dynamically initializing network routes, and auto-configuring connection strings based on service version or tenant. A traditional application will compose business functions and user experience by statically linking class files and shared object libraries. In contrast, microservice- based applications use service composition to connect available microservices endpoints and realize a fully functional application. While many microservice proponents promote microservice-based interactions by “smart endpoints through dumb pipes, ‘ effective service composition requires smart infrastructure building blocks to bootstrap and maintain connections between services and consumers. The right PaaS solves these problems. Infrastructure building blocks will register service endpoint locations, associate metadata and policies, connect clients, circuit break around failures, correlate inter-service calls, and load balance traffic. A microservice-friendly PaaS will provide service registries, metadata services, discovery services, and service virtualization gateways. In the pipe, circuit breakers will automatically route traffic on failover or overload. Smart endpoint code will dynamically connect with microservices based on discovery service responses and negotiated quality of service parameters. Rather than being hard-coded to a specific service hostname and URI, endpoint code will query for microservice location based on security assurances, performance guarantees, traffic load, service version, client tenancy, or business domain. When services are unavailable or underperform, smart endpoints will follow the tolerant reader pattern and gracefully degrade experience or proactively recover. A few recovery options include reading from local caches or circuit tripping to backup service endpoints. In conjunction with smart endpoint actions, a smart PaaS will spin up new microservice endpoints and full-stack instances based on service level management metrics. By following microservice architecture best practices, teams create anti-fragile applications that not only withstand a shock, but also improve performance and quality of service when stressed or experiencing failures. To drive this non-intuitive behavior, the underlying platform environment must be ready to scale, repair, and reconnect services. PaaS service level management components will create more resilient and anti-fragile microservices by monitoring performance, elastically provisioning instances, and dynamically re-routing traffic. Scaling an anti-fragile microservice is more difficult than scaling a web application. The PaaS should distribute microservice instances across multiple availability zones and dynamically adjust traffic to reduce latency and response time. Because transient microservice instances will rapidly start, stop, and change location, the service management layer must be completely automated and integrated with routing services. A PaaS environment will deliver the service level management, dynamic service composition, circuit breakers, and on-demand provisioning functions required to overcome the complexity inherent within a distributed microservice-based application architecture. Running microservices on a PaaS fabric will decrease solution fragility, reduce operational burden, and enhance developer productivity. If you are pursuing a microservice design approach, make sure you choose a microservice- friendly PaaS. DOWNLOAD YOUR FREE COPY TODAY
May 5, 2015
by Chris Haddad
· 12,075 Views · 2 Likes
article thumbnail
A Look at Nanomsg and Scalability Protocols (Why ZeroMQ Shouldn’t Be Your First Choice)
Earlier this month, I explored ZeroMQ and how it proves to be a promising solution for building fast, high-throughput, and scalable distributed systems. Despite lending itself quite well to these types of problems, ZeroMQ is not without its flaws. Its creators have attempted to rectify many of these shortcomings through spiritual successors Crossroads I/O and nanomsg. The now-defunct Crossroads I/O is a proper fork of ZeroMQ with the true intention being to build a viable commercial ecosystem around it. Nanomsg, however, is a reimagining of ZeroMQ—a complete rewrite in C1. It builds upon ZeroMQ’s rock-solid performance characteristics while providing several vital improvements, both internal and external. It also attempts to address many of the strange behaviors that ZeroMQ can often exhibit. Today, I’ll take a look at what differentiates nanomsg from its predecessor and implement a use case for it in the form of service discovery. Nanomsg vs. ZeroMQ A common gripe people have with ZeroMQ is that it doesn’t provide an API for new transport protocols, which essentially limits you to TCP, PGM, IPC, and ITC. Nanomsg addresses this problem by providing a pluggable interface for transports and messaging protocols. This means support for new transports (e.g. WebSockets) and new messaging patterns beyond the standard set of PUB/SUB, REQ/REP, etc. Nanomsg is also fully POSIX-compliant, giving it a cleaner API and better compatibility. No longer are sockets represented as void pointers and tied to a context—simply initialize a new socket and begin using it in one step. With ZeroMQ, the context internally acts as a storage mechanism for global state and, to the user, as a pool of I/O threads. This concept has been completely removed from nanomsg. In addition to POSIX compliance, nanomsg is hoping to be interoperable at the API and protocol levels, which would allow it to be a drop-in replacement for, or otherwise interoperate with, ZeroMQ and other libraries which implement ZMTP/1.0 and ZMTP/2.0. It has yet to reach full parity, however. ZeroMQ has a fundamental flaw in its architecture. Its sockets are not thread-safe. In and of itself, this is not problematic and, in fact, is beneficial in some cases. By isolating each object in its own thread, the need for semaphores and mutexes is removed. Threads don’t touch each other and, instead, concurrency is achieved with message passing. This pattern works well for objects managed by worker threads but breaks down when objects are managed in user threads. If the thread is executing another task, the object is blocked. Nanomsg does away with the one-to-one relationship between objects and threads. Rather than relying on message passing, interactions are modeled as sets of state machines. Consequently, nanomsg sockets are thread-safe. Nanomsg has a number of other internal optimizations aimed at improving memory and CPU efficiency. ZeroMQ uses a simple trie structure to store and match PUB/SUB subscriptions, which performs nicely for sub-10,000 subscriptions but quickly becomes unreasonable for anything beyond that number. Nanomsg uses a space-optimized trie called a radix tree to store subscriptions. Unlike its predecessor, the library also offers a true zero-copy API which greatly improves performance by allowing memory to be copied from machine to machine while completely bypassing the CPU. ZeroMQ implements load balancing using a round-robin algorithm. While it provides equal distribution of work, it has its limitations. Suppose you have two datacenters, one in New York and one in London, and each site hosts instances of “foo” services. Ideally, a request made for foo from New York shouldn’t get routed to the London datacenter and vice versa. With ZeroMQ’s round-robin balancing, this is entirely possible unfortunately. One of the new user-facing features that nanomsg offers is priority routing for outbound traffic. We avoid this latency problem by assigning priority one to foo services hosted in New York for applications also hosted there. Priority two is then assigned to foo services hosted in London, giving us a failover in the event that foos in New York are unavailable. Additionally, nanomsg offers a command-line tool for interfacing with the system called nanocat. This tool lets you send and receive data via nanomsg sockets, which is useful for debugging and health checks. Scalability Protocols Perhaps most interesting is nanomsg’s philosophical departure from ZeroMQ. Instead of acting as a generic networking library, nanomsg intends to provide the “Lego bricks” for building scalable and performant distributed systems by implementing what it refers to as “scalability protocols.” These scalability protocols are communication patterns which are an abstraction on top of the network stack’s transport layer. The protocols are fully separated from each other such that each can embody a well-defined distributed algorithm. The intention, as stated by nanomsg’s author Martin Sustrik, is to have the protocol specifications standardized through the IETF. Nanomsg currently defines six different scalability protocols: PAIR, REQREP, PIPELINE, BUS, PUBSUB, and SURVEY. PAIR (Bidirectional Communication) PAIR implements simple one-to-one, bidirectional communication between two endpoints. Two nodes can send messages back and forth to each other. REQREP (Client Requests, Server Replies) The REQREP protocol defines a pattern for building stateless services to process user requests. A client sends a request, the server receives the request, does some processing, and returns a response. PIPELINE (One-Way Dataflow) PIPELINE provides unidirectional dataflow which is useful for creating load-balanced processing pipelines. A producer node submits work that is distributed among consumer nodes. BUS (Many-to-Many Communication) BUS allows messages sent from each peer to be delivered to every other peer in the group. PUBSUB (Topic Broadcasting) PUBSUB allows publishers to multicast messages to zero or more subscribers. Subscribers, which can connect to multiple publishers, can subscribe to specific topics, allowing them to receive only messages that are relevant to them. SURVEY (Ask Group a Question) The last scalability protocol, and the one in which I will further examine by implementing a use case with, is SURVEY. The SURVEY pattern is similar to PUBSUB in that a message from one node is broadcasted to the entire group, but where it differs is that each node in the group responds to the message. This opens up a wide variety of applications because it allows you to quickly and easily query the state of a large number of systems in one go. The survey respondents must respond within a time window configured by the surveyor. Implementing Service Discovery As I pointed out, the SURVEY protocol has a lot of interesting applications. For example: What data do you have for this record? What price will you offer for this item? Who can handle this request? To continue exploring it, I will implement a basic service-discovery pattern. Service discovery is a pretty simple question that’s well-suited for SURVEY: what services are out there? Our solution will work by periodically submitting the question. As services spin up, they will connect with our service discovery system so they can identify themselves. We can tweak parameters like how often we survey the group to ensure we have an accurate list of services and how long services have to respond. This is great because 1) the discovery system doesn’t need to be aware of what services there are—it just blindly submits the survey—and 2) when a service spins up, it will be discovered and if it dies, it will be “undiscovered.” Here is the ServiceDiscovery class: from collections import defaultdict import random from nanomsg import NanoMsgAPIError from nanomsg import Socket from nanomsg import SURVEYOR from nanomsg import SURVEYOR_DEADLINE class ServiceDiscovery(object): def __init__(self, port, deadline=5000): self.socket = Socket(SURVEYOR) self.port = port self.deadline = deadline self.services = defaultdict(set) def bind(self): self.socket.bind('tcp://*:%s' % self.port) self.socket.set_int_option(SURVEYOR, SURVEYOR_DEADLINE, self.deadline) def discover(self): if not self.socket.is_open(): return self.services self.services = defaultdict(set) self.socket.send('service query') while True: try: response = self.socket.recv() except NanoMsgAPIError: break service, address = response.split('|') self.services[service].add(address) return self.services def resolve(self, service): providers = self.services[service] if not providers: return None return random.choice(tuple(providers)) def close(self): self.socket.close() The discover method submits the survey and then collects the responses. Notice we construct a SURVEYOR socket and set the SURVEYOR_DEADLINE option on it. This deadline is the number of milliseconds from when a survey is submitted to when a response must be received—adjust it accordingly based on your network topology. Once the survey deadline has been reached, a NanoMsgAPIError is raised and we break the loop. The resolve method will take the name of a service and randomly select an available provider from our discovered services. We can then wrap ServiceDiscovery with a daemon that will periodically run discover. import os import time from service_discovery import ServiceDiscovery DEFAULT_PORT = 5555 DEFAULT_DEADLINE = 5000 DEFAULT_INTERVAL = 2000 def start_discovery(port, deadline, interval): discovery = ServiceDiscovery(port, deadline=deadline) discovery.bind() print 'Starting service discovery [port: %s, deadline: %s, interval: %s]' \ % (port, deadline, interval) while True: print discovery.discover() time.sleep(interval / 1000) if __name__ == '__main__': port = int(os.environ.get('PORT', DEFAULT_PORT)) deadline = int(os.environ.get('DEADLINE', DEFAULT_DEADLINE)) interval = int(os.environ.get('INTERVAL', DEFAULT_INTERVAL)) start_discovery(port, deadline, interval) The discovery parameters are configured through environment variables which I inject into a Docker container. Services must connect to the discovery system when they start up. When they receive a survey, they should respond by identifying what service they provide and where the service is located. One such service might look like the following: import os from threading import Thread from nanomsg import REP from nanomsg import RESPONDENT from nanomsg import Socket DEFAULT_DISCOVERY_HOST = 'localhost' DEFAULT_DISCOVERY_PORT = 5555 DEFAULT_SERVICE_NAME = 'foo' DEFAULT_SERVICE_PROTOCOL = 'tcp' DEFAULT_SERVICE_HOST = 'localhost' DEFAULT_SERVICE_PORT = 9000 def register_service(service_name, service_address, discovery_host, discovery_port): socket = Socket(RESPONDENT) socket.connect('tcp://%s:%s' % (discovery_host, discovery_port)) print 'Starting service registration [service: %s %s, discovery: %s:%s]' \ % (service_name, service_address, discovery_host, discovery_port) while True: message = socket.recv() if message == 'service query': socket.send('%s|%s' % (service_name, service_address)) def start_service(service_name, service_protocol, service_port): socket = Socket(REP) socket.bind('%s://*:%s' % (service_protocol, service_port)) print 'Starting service %s' % service_name while True: request = socket.recv() print 'Request: %s' % request socket.send('The answer is 42') if __name__ == '__main__': discovery_host = os.environ.get('DISCOVERY_HOST', DEFAULT_DISCOVERY_HOST) discovery_port = os.environ.get('DISCOVERY_PORT', DEFAULT_DISCOVERY_PORT) service_name = os.environ.get('SERVICE_NAME', DEFAULT_SERVICE_NAME) service_host = os.environ.get('SERVICE_HOST', DEFAULT_SERVICE_HOST) service_port = os.environ.get('SERVICE_PORT', DEFAULT_SERVICE_PORT) service_protocol = os.environ.get('SERVICE_PROTOCOL', DEFAULT_SERVICE_PROTOCOL) service_address = '%s://%s:%s' % (service_protocol, service_host, service_port) Thread(target=register_service, args=(service_name, service_address, discovery_host, discovery_port)).start() start_service(service_name, service_protocol, service_port) Once again, we configure parameters through environment variables set on a container. Note that we connect to the discovery system with a RESPONDENT socket which then responds to service queries with the service name and address. The service itself uses a REP socket that simply responds to any requests with “The answer is 42,” but it could take any number of forms such as HTTP, raw socket, etc. The full code for this example, including Dockerfiles, can be found on GitHub. Nanomsg or ZeroMQ? Based on all the improvements that nanomsg makes on top of ZeroMQ, you might be wondering why you would use the latter at all. Nanomsg is still relatively young. Although it has numerous language bindings, it hasn’t reached the maturity of ZeroMQ which has a thriving development community. ZeroMQ has extensive documentation and other resources to help developers make use of the library, while nanomsg has very little. Doing a quick Google search will give you an idea of the difference (about 500,000 results for ZeroMQ to nanomsg’s 13,500). That said, nanomsg’s improvements and, in particular, its scalability protocols make it very appealing. A lot of the strange behaviors that ZeroMQ exposes have been resolved completely or at least mitigated. It’s actively being developed and is quickly gaining more and more traction. Technically, nanomsg has been in beta since March, but it’s starting to look production-ready if it’s not there already.
May 4, 2015
by Tyler Treat
· 16,034 Views · 1 Like
article thumbnail
How To: Neo4j Data Import - Minimal Example
The easiest way to import data from relational or legacy systems, like plain CSV files without headers, into Neo4j with Cypher and a graph model.
April 30, 2015
by Michael Hunger
· 7,765 Views
article thumbnail
Introduction to Probabilistic Data Structures
When processing large data sets, we often want to do some simple checks, such as number of unique items, most frequent items, and whether some items exist in the data set. The common approach is to use some kind of deterministic data structure like HashSet or Hashtable for such purposes. But when the data set we are dealing with becomes very large, such data structures are simply not feasible because the data is too big to fit in the memory. It becomes even more difficult for streaming applications which typically require data to be processed in one pass and perform incremental updates. Probabilistic data structures are a group of data structures that are extremely useful for big data and streaming applications. Generally speaking, these data structures use hash functions to randomize and compactly represent a set of items. Collisions are ignored but errors can be well-controlled under certain threshold. Comparing with error-free approaches, these algorithms use much less memory and have constant query time. They usually support union and intersection operations and therefore can be easily parallelized. This article will introduce three commonly used probabilistic data structures: Bloom filter, HyperLogLog, and Count-Min sketch. Membership Query - Bloom filter A Bloom filter is a bit array of m bits initialized to 0. To add an element, feed it to k hash functions to get k array position and set the bits at these positions to 1. To query an element, feed it to k hash functions to obtain k array positions. If any of the bits at these positions is 0, then the element is definitely not in the set. If the bits are all 1, then the element might be in the set. A Bloom filter with 1% false positive rate only requires 9.6 bits per element regardless of the size of the elements. For example, if we have inserted x, y, z into the bloom filter, with k=3 hash functions like the picture above. Each of these three elements has three bits each set to 1 in the bit array. When we look up for w in the set, because one of the bits is not set to 1, the bloom filter will tell us that it is not in the set. Bloom filter has the following properties: False positive is possible when the queried positions are already set to 1. But false negative is impossible. Query time is O(k). Union and intersection of bloom filters with same size and hash functions can be implemented with bitwise OR and AND operations. Cannot remove an element from the set. Bloom filter requires the following inputs: m: size of the bit array n: estimated insertion p: false positive probability The optimum number of hash functions k can be determined using the formula: Given false positive probability p and the estimated number of insertions n, the length of the bit array can be calculated as: The hash functions used for bloom filter should generally be faster than cryptographic hash algorithms with good distribution and collision resistance. Commonly used hash functions for bloom filter include Murmur hash, fnv series of hashes and Jenkins hashes. Murmur hash is the fastest among them. MurmurHash3 is used by Google Guava library's bloom filter implementation. Cardinality - HyperLogLog HyperLogLog is a streaming algorithm used for estimating the number of distinct elements (the cardinality) of very large data sets. HyperLogLog counter can count one billion distinct items with an accuracy of 2% using only 1.5 KB of memory. It is based on the bit pattern observation that for a stream of randomly distributed numbers, if there is a number x with the maximum of leading 0 bits k, the cardinality of the stream is very likely equal to 2^k. For each element si in the stream, hash function h(si) transforms si into string of random bits (0 or 1 with probability of 1/2): The probability P of the bit patterns: 0xxxx... → P = 1/2 01xxx... → P = 1/4 001xx... → P = 1/8 The intuition is that when we are seeing prefix 0k 1..., it's likely there are n ≥ 2k+1 different strings. By keeping track of prefixes 0k 1... that have appeared in the data stream, we can estimate the cardinality to be 2p, where p is the length of the largest prefix. Because the variance is very high when using single counter, in order to get a better estimation, data is split into m sub-streams using the first few bits of the hash. The counters are maintained by m registers each has memory space of multiple of 4 bytes. If the standard deviation for each sub-stream is σ, then the standard deviation for the averaged value is only σ/√m. This is called stochastic averaging. For instance for m=4, The elements are split into m stream using the first 2 bits (00, 01, 10, 11) which are then discarded. Each of the register stores the rest of the hash bits that contains the largest 0k 1 prefix. The values in the m registers are then averaged to obtain the cardinality estimate. HyperLogLog algorithm uses harmonic mean to normalize result. The algorithm also makes adjustment for small and very large values. The resulting error is equal to 1.04/√m. Each of the m registers uses at most log2log2 n + O(1) bits when cardinalities ≤ n need to be estimated. Union of two HyperLogLog counters can be calculated by first taking the maximum value of the two counters for each of the m registers, and then calculate the estimated cardinality. Frequency - Count-Min Sketch Count-Min sketch is a probabilistic sub-linear space streaming algorithm. It is somewhat similar to bloom filter. The main difference is that bloom filter represents a set as a bitmap, while Count-Min sketch represents a multi-set which keeps a frequency distribution summary. The basic data structure is a two dimensional d x w array of counters with d pairwise independent hash functions h1 ... hd of range w. Given parameters (ε,δ), set w = [e/ε], and d = [ln1/δ]. ε is the accuracy we want to have and δ is the certainty with which we reach the accuracy. The two dimensional array consists of wd counts. To increment the counts, calculate the hash positions with the d hash functions and update the counts at those positions. The estimate of the counts for an item is the minimum value of the counts at the array positions determined by the d hash functions. The space used by Count-Min sketch is the array of w*d counters. By choosing appropriate values for d and w, very small error and high probability can be achieved. Example of Count-Min sketch sizes for different error and probability combination: ε 1 - δ w d wd 0.1 0.9 28 3 84 0.1 0.99 28 5 140 0.1 0.999 28 7 196 0.01 0.9 272 3 816 0.01 0.99 272 5 1360 0.01 0.999 272 7 1940 0.001 0.999 2719 7 19033 Count-Min sketch has the following properties: Union can be performed by cell-wise ADD operation O(k) query time Better accuracy for higher frequency items (heavy hitters) Can only cause over-counting but not under-counting Count-Min sketch can be used for querying single item count or "heavy hitters" which can be obtained by keeping a heap structure of all the counts. Summary Probabilistic data structures have many applications in modern web and data applications where the data arrives in a streaming fashion and needs to be processed on the fly using limited memory. Bloom filter, HyperLogLog, and Count-Min sketch are the most commonly used probabilistic data structures. There are a lot of research on various streaming algorithms, synopsis data structures and optimization techniques that are worth investigating and studying. If you haven't tried these data structures, you will be amazed how powerful they can be once you start using them. It may be a little bit intimidating to understand the concept initially, but the implementation is actually quite simple. Google Guava has Bloom filter implementation using murmur hash. Clearspring's Java library stream-lib and Twitter's Scala library Algebird have implementation for all three data structures and other useful data structures that you can play with. I have included the links below. Links http://bigsnarf.wordpress.com/2013/02/08/probabilistic-data-structures-for-data-analytics/ http://en.wikipedia.org/wiki/Bloom_filter http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf http://static.googleusercontent.com/media/research.google.com/en/us/pubs/archive/40671.pdf http://research.neustar.biz/2012/10/25/sketch-of-the-day-hyperloglog-cornerstone-of-a-big-data-infrastructure/ http://dimacs.rutgers.edu/~graham/pubs/papers/cm-full.pdf http://www.moneyscience.com/pg/blog/ThePracticalQuant/read/438348/realtime-analytics-hokusai-adds-a-temporal-component-to-countmin-sketch http://people.cs.umass.edu/~mcgregor/711S12/sketches1.pdf https://github.com/addthis/stream-lib https://github.com/twitter/algebird
April 30, 2015
by Yin Niu
· 35,678 Views · 6 Likes
article thumbnail
How to Create Multi-Column PDF Document inside .NET Applications
This technical tip shows how .NET developers create multi-column PDF document using Aspose.Pdf for .NET. In magazines and newspapers, we mostly see that news are displayed in multiple columns on the single pages instead of the books where text paragraphs are mostly printed on the whole pages from left to right position. Many document processing applications like Microsoft Word and Adobe Acrobat Writer allow users to create multiple columns on a single page and then add data to them. Aspose.Pdf for .NET also offers the feature to create multiple columns inside the pages of PDF documents. In order to create multi-column PDF file, we can make use of Aspose.Pdf.FloatingBox class as it provides ColumnInfo.ColumnCount property to specify the number of columns inside FloatingBox and we can also specify the spacing between columns and columns widths using ColumnInfo.ColumnSpacing and ColumnInfo.ColumnWidths properties accordingly. Please note that FloatingBox is an element inside Document Object Model and it can have obsolete positioning as compared to relative positioning (i.e. Text, Graph, Image etc). Column spacing means the space between the columns and the default spacing between the columns is 1.25cm. If the column width is not specified, then Aspose.Pdf for .NET calculates width for each column automatically according to the page size and column spacing. The code example is given below to demonstrate the creation of two columns with Graphs objects (Line) and they are added to paragraphs collection of FloatingBox, which is then added paragraphs collection of Page instance. //C# Code Sample Document doc = new Document(); // specify the left margin info for the PDF file doc.PageInfo.Margin.Left = 40; // specify the Right margin info for the PDF file doc.PageInfo.Margin.Right = 40; Page page = doc.Pages.Add(); Aspose.Pdf.Drawing.Graph graph1 = new Aspose.Pdf.Drawing.Graph(500, 2); // Add the line to paraphraphs collection of section object page.Paragraphs.Add(graph1); //specify the coordinates for the line float[] posArr = new float[] { 1, 2, 500, 2 }; Aspose.Pdf.Drawing.Line l1 = new Aspose.Pdf.Drawing.Line(posArr); graph1.Shapes.Add(l1); //Create string variables with text containing html tags string s = "" + " How to Steer Clear of money scams " + ""; //Create text paragraphs containing HTML text HtmlFragment heading_text = new HtmlFragment(s); page.Paragraphs.Add(heading_text); Aspose.Pdf.FloatingBox box = new Aspose.Pdf.FloatingBox(); //Add four columns in the section box.ColumnInfo.ColumnCount = 2; //Set the spacing between the columns box.ColumnInfo.ColumnSpacing = "5"; box.ColumnInfo.ColumnWidths = "105 105"; TextFragment text1 = new TextFragment("By A Googler (The Official Google Blog)"); text1.TextState.FontSize = 8; text1.TextState.LineSpacing = 2; box.Paragraphs.Add(text1); text1.TextState.FontSize = 10; text1.TextState.FontStyle = FontStyles.Italic; // Create a graphs object to draw a line Aspose.Pdf.Drawing.Graph graph2 = new Aspose.Pdf.Drawing.Graph(50, 10); // specify the coordinates for the line float[] posArr2 = new float[] { 1, 10, 100, 10 }; Aspose.Pdf.Drawing.Line l2 = new Aspose.Pdf.Drawing.Line(posArr2); graph2.Shapes.Add(l2); // Add the line to paragraphs collection of section object box.Paragraphs.Add(graph2); TextFragment text2 = new TextFragment(@"Sed augue tortor, sodales id, luctus et, pulvinar ut, eros. Suspendisse vel dolor. Sed quam. Curabitur ut massa vitae eros euismod aliquam. Pellentesque sit amet elit. Vestibulum interdum pellentesque augue. Cras mollis arcu sit amet purus. Donec augue. Nam mollis tortor a elit. Nulla viverra nisl vel mauris. Vivamus sapien. nascetur ridiculus mus. Nam justo lorem, aliquam luctus, sodales et, semper sed, enim Nam justo lorem, aliquam luctus, sodales et,nAenean posuere ante ut neque. Morbi sollicitudin congue felis. Praesent turpis diam, iaculis sed, pharetra non, mollis ac, mauris. Phasellus nisi ipsum, pretium vitae, tempor sed, molestie eu, dui. Duis lacus purus, tristique ut, iaculis cursus, tincidunt vitae, risus. Sed commodo. *** sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Nam justo lorem, aliquam luctus, sodales et, semper sed, enim Nam justo lorem, aliquam luctus, sodales et, semper sed, enim Nam justo lorem, aliquam luctus, sodales et, semper sed, enim nAenean posuere ante ut neque. Morbi sollicitudin congue felis. Praesent turpis diam, iaculis sed, pharetra non, mollis ac, mauris. Phasellus nisi ipsum, pretium vitae, tempor sed, molestie eu, dui. Duis lacus purus, tristique ut, iaculis cursus, tincidunt vitae, risus. Sed commodo. *** sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Sed urna. . Duis convallis ultrices nisi. Maecenas non ligula. Nunc nibh est, tincidunt in, placerat sit amet, vestibulum a, nulla. Praesent porttitor turpis eleifend ante. Morbi sodales.nAenean posuere ante ut neque. Morbi sollicitudin congue felis. Praesent turpis diam, iaculis sed, pharetra non, mollis ac, mauris. Phasellus nisi ipsum, pretium vitae, tempor sed, molestie eu, dui. Duis lacus purus, tristique ut, iaculis cursus, tincidunt vitae, risus. Sed commodo. *** sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Sed urna. . Duis convallis ultrices nisi. Maecenas non ligula. Nunc nibh est, tincidunt in, placerat sit amet, vestibulum a, nulla. Praesent porttitor turpis eleifend ante. Morbi sodales."); box.Paragraphs.Add(text2); page.Paragraphs.Add(box); string outFile = "c:/pdftest/Muli-Column.pdf"; //Save the Pdf doc.Save(outFile); ' Load the PDF file Dim doc As Document = New Document("source.pdf") ' Get the first link annotation from first page of document Dim linkAnnot As LinkAnnotation = CType(doc.Pages(1).Annotations(1), LinkAnnotation) ' Modification link: change link destination Dim goToAction As Aspose.Pdf.InteractiveFeatures.GoToAction = CType(linkAnnot.Action, Aspose.Pdf.InteractiveFeatures.GoToAction) ' Specify the destination for link object ' The first parameter is document object, second is destination page number. ' The 5ht argument is zoom factor when displaying the respective page. When using 2, the page will be displayed in 200% zoom goToAction.Destination = New Aspose.Pdf.InteractiveFeatures.XYZExplicitDestination(doc, 1, 1, 2, 2) ' Save the document with updated link doc.Save("PDFLINK_Modified_output.pdf") //VB.NET Code Sample Dim doc As Document = New Document() ' specify the left margin info for the PDF file doc.PageInfo.Margin.Left = 40 ' specify the Right margin info for the PDF file doc.PageInfo.Margin.Right = 40 Dim page As Page = doc.Pages.Add() Dim graph1 As Aspose.Pdf.Drawing.Graph = New Aspose.Pdf.Drawing.Graph(500, 2) ' Add the line to paragraphs collection of section object page.Paragraphs.Add(graph1) 'specify the coordinates for the line Dim posArr() As Single = {1, 2, 500, 2} Dim l1 As Aspose.Pdf.Drawing.Line = New Aspose.Pdf.Drawing.Line(posArr) graph1.Shapes.Add(l1) 'Create string variables with text containing html tags Dim s As String = " How to Steer Clear of money scams " 'Create text paragraphs containing HTML text Dim heading_text As HtmlFragment = New HtmlFragment(s) page.Paragraphs.Add(heading_text) Dim box As Aspose.Pdf.FloatingBox = New Aspose.Pdf.FloatingBox() 'Add four columns in the section box.ColumnInfo.ColumnCount = 2 'Set the spacing between the columns box.ColumnInfo.ColumnSpacing = "5" box.ColumnInfo.ColumnWidths = "105 105" Dim text1 As Aspose.Pdf.Text.TextFragment = New Aspose.Pdf.Text.TextFragment("By A Googler (The Official Google Blog)") text1.TextState.FontSize = 8 text1.TextState.LineSpacing = 2 box.Paragraphs.Add(text1) text1.TextState.FontSize = 10 text1.TextState.FontStyle = Aspose.Pdf.Text.FontStyles.Italic ' Create a graphs object to draw a line Dim graph2 As Aspose.Pdf.Drawing.Graph = New Aspose.Pdf.Drawing.Graph(50, 10) ' specify the coordinates for the line Dim posArr2() As Single = {1, 10, 100, 10} Dim l2 As Aspose.Pdf.Drawing.Line = New Aspose.Pdf.Drawing.Line(posArr2) graph2.Shapes.Add(l2) ' Add the line to paragraphs collection of section object box.Paragraphs.Add(graph2) Dim text2 As Aspose.Pdf.Text.TextFragment = New Aspose.Pdf.Text.TextFragment("Sed augue tortor, sodales id, luctus et, pulvinar ut, eros. Suspendisse vel dolor. Sed quam. Curabitur ut massa vitae eros euismod aliquam. Pellentesque sit amet elit. Vestibulum interdum pellentesque augue. Cras mollis arcu sit amet purus. Donec augue. Nam mollis tortor a elit. Nulla viverra nisl vel mauris. Vivamus sapien. nascetur ridiculus mus. Nam justo lorem, aliquam luctus, sodales et, semper sed, enim Nam justo lorem, aliquam luctus, sodales et,nAenean posuere ante ut neque. Morbi sollicitudin congue felis. Praesent turpis diam, iaculis sed, pharetra non, mollis ac, mauris. Phasellus nisi ipsum, pretium vitae, tempor sed, molestie eu, dui. Duis lacus purus, tristique ut, iaculis cursus, tincidunt vitae, risus. Sed commodo. *** sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Nam justo lorem, aliquam luctus, sodales et, semper sed, enim Nam justo lorem, aliquam luctus, sodales et, semper sed, enim Nam justo lorem, aliquam luctus, sodales et, semper sed, enim nAenean posuere ante ut neque. Morbi sollicitudin congue felis. Praesent turpis diam, iaculis sed, pharetra non, mollis ac, mauris. Phasellus nisi ipsum, pretium vitae, tempor sed, molestie eu, dui. Duis lacus purus, tristique ut, iaculis cursus, tincidunt vitae, risus. Sed commodo. *** sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Sed urna. . Duis convallis ultrices nisi. Maecenas non ligula. Nunc nibh est, tincidunt in, placerat sit amet, vestibulum a, nulla. Praesent porttitor turpis eleifend ante. Morbi sodales.nAenean posuere ante ut neque. Morbi sollicitudin congue felis. Praesent turpis diam, iaculis sed, pharetra non, mollis ac, mauris. Phasellus nisi ipsum, pretium vitae, tempor sed, molestie eu, dui. Duis lacus purus, tristique ut, iaculis cursus, tincidunt vitae, risus. Sed commodo. *** sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Sed urna. . Duis convallis ultrices nisi. Maecenas non ligula. Nunc nibh est, tincidunt in, placerat sit amet, vestibulum a, nulla. Praesent porttitor turpis eleifend ante. Morbi sodales.") box.Paragraphs.Add(text2) page.Paragraphs.Add(box) Dim outFile As String = "c:/pdftest/Muli-Column.pdf" 'Save the Pdf doc.Save(outFile)
April 29, 2015
by David Zondray
· 1,442 Views
article thumbnail
Diagnosing SST Errors with Percona XtraDB Cluster for MySQL
[This article was written by Stephane Combaudon] State Snapshot Transfer (SST) is used in Percona XtraDB Cluster (PXC) when a new node joins the cluster or to resync a failed node if Incremental State Transfer (IST) is no longer available. SST is triggered automatically but there is no magic: If it is not configured properly, it will not work and new nodes will never be able to join the cluster. Let’s have a look at a few classic issues. Port for SST is not open The donor and the joiner communicate on port 4444, and if the port is closed on one side, SST will always fail. You will see in the error log of the donor that SST is started: [...] 141223 16:08:48 [Note] WSREP: Node 2 (node1) requested state transfer from '*any*'. Selected 0 (node3)(SYNCED) as donor. 141223 16:08:48 [Note] WSREP: Shifting SYNCED -> DONOR/DESYNCED (TO: 6) 141223 16:08:48 [Note] WSREP: wsrep_notify_cmd is not defined, skipping notification. 141223 16:08:48 [Note] WSREP: Running: 'wsrep_sst_xtrabackup-v2 --role 'donor' --address '192.168.234.101:4444/xtrabackup_sst' --auth 'sstuser:s3cret' --socket '/var/lib/mysql/mysql.sock' --datadir '/var/lib/mysql/' --defaults-file '/etc/my.cnf' --gtid '04c085a1-89ca-11e4-b1b6-6b692803109b:6'' [...] But then nothing happens, and some time later you will see a bunch of errors: [...] 2014/12/23 16:09:52 socat[2965] E connect(3, AF=2 192.168.234.101:4444, 16): Connection timed out WSREP_SST: [ERROR] Error while getting data from donor node: exit codes: 0 1 (20141223 16:09:52.057) WSREP_SST: [ERROR] Cleanup after exit with status:32 (20141223 16:09:52.064) WSREP_SST: [INFO] Cleaning up temporary directories (20141223 16:09:52.068) 141223 16:09:52 [ERROR] WSREP: Failed to read from: wsrep_sst_xtrabackup-v2 --role 'donor' --address '192.168.234.101:4444/xtrabackup_sst' --auth 'sstuser:s3cret' --socket '/var/lib/mysql/mysql.sock' --datadir '/var/lib/mysql/' --defaults-file '/etc/my.cnf' --gtid '04c085a1-89ca-11e4-b1b6-6b692803109b:6' [...] On the joiner side, you will see a similar sequence: SST is started, then hangs and is finally aborted: [...] 141223 16:08:48 [Note] WSREP: Shifting PRIMARY -> JOINER (TO: 6) 141223 16:08:48 [Note] WSREP: Requesting state transfer: success, donor: 0 141223 16:08:49 [Note] WSREP: (f9560d0d, 'tcp://0.0.0.0:4567') turning message relay requesting off 141223 16:09:52 [Warning] WSREP: 0 (node3): State transfer to 2 (node1) failed: -32 (Broken pipe) 141223 16:09:52 [ERROR] WSREP: gcs/src/gcs_group.cpp:long int gcs_group_handle_join_msg(gcs_group_t*, const gcs_recv_msg_t*)():717: Will never receive state. Need to abort. The solution is of course to make sure that the ports are open on both sides. SST is not correctly configured Sometimes you will see an error like this on the donor: 141223 21:03:15 [Note] WSREP: Running: 'wsrep_sst_xtrabackup-v2 --role 'donor' --address '192.168.234.102:4444/xtrabackup_sst' --auth 'sstuser:s3cretzzz' --socket '/var/lib/mysql/mysql.sock' --datadir '/var/lib/mysql/' --defaults-file '/etc/my.cnf' --gtid 'e63f38f2-8ae6-11e4-a383-46557c71f368:0'' [...] WSREP_SST: [ERROR] innobackupex finished with error: 1. Check /var/lib/mysql//innobackup.backup.log (20141223 21:03:26.973) And if you look at innobackup.backup.log: 41223 21:03:26 innobackupex: Connecting to MySQL server with DSN 'dbi:mysql:;mysql_read_default_file=/etc/my.cnf;mysql_read_default_group=xtrabackup;mysql_socket=/var/lib/mysql/mysql.sock' as 'sstuser' (using password: YES). innobackupex: got a fatal error with the following stacktrace: at /usr//bin/innobackupex line 2995 main::mysql_connect('abort_on_error', 1) called at /usr//bin/innobackupex line 1530 innobackupex: Error: Failed to connect to MySQL server: DBI connect(';mysql_read_default_file=/etc/my.cnf;mysql_read_default_group=xtrabackup;mysql_socket=/var/lib/mysql/mysql.sock','sstuser',...) failed: Access denied for user 'sstuser'@'localhost' (using password: YES) at /usr//bin/innobackupex line 2979 What happened? The default SST method is xtrabackup-v2 and for it to work, you need to specify a username/password in the my.cnf file: [mysqld] wsrep_sst_auth=sstuser:s3cret And you also need to create the corresponding MySQL user: mysql> GRANT RELOAD, LOCK TABLES, REPLICATION CLIENT ON *.* TO 'sstuser'@'localhost' IDENTIFIED BY 's3cret'; So you should check that the user has been correctly created in MySQL and that wsrep_sst_auth is correctly set. Galera versions do not match Here is another set of errors you may see in the error log of the donor: 141223 21:14:27 [Warning] WSREP: unserialize error invalid flags 2: 71 (Protocol error) at gcomm/src/gcomm/datagram.hpp:unserialize():101 141223 21:14:30 [Warning] WSREP: unserialize error invalid flags 2: 71 (Protocol error) at gcomm/src/gcomm/datagram.hpp:unserialize():101 141223 21:14:33 [Warning] WSREP: unserialize error invalid flags 2: 71 (Protocol error) at gcomm/src/gcomm/datagram.hpp:unserialize():101 Here the issue is that you try to connect a node using Galera 2.x and a node running Galera 3.x. This can happen if you try to use a PXC 5.5 node and a PXC 5.6 node. The right solution is probably to understand why you ended up with such inconsistent versions and make sure all nodes are using the same Percona XtraDB Cluster version and Galera version. But if you know what you are doing, you can also instruct the node using Galera 3.x that it will communicate with Galera 2.x nodes by specifying in the my.cnf file: [mysqld] wsrep_provider_options="socket.checksum=1" Conclusion SST errors can have multiple reasons for occurring, and the best way to diagnose the issue is to have a look at the error log of the donor and the joiner. Galera is in general quite verbose so you can follow the progress of SST on both nodes and see where it fails. Then it is mostly about being able to interpret the error messages.
April 27, 2015
by Peter Zaitsev
· 11,853 Views
article thumbnail
Fixed Width Sortable Tables Row with jQueryUI
When you use jQuery UI sortable function on a table I've noticed that it will collapse the width of the row you're dragging which can lead to a strange user experience. In this tutorial we are going to see how you can use a helper function to change the width of dragging rows back to the original width. Have a look at the demo to see the difference. Demo jQuery Sortable is part of the jQuery UI library which can be found below. jQuery Sortable To define a table to have sortable rows all you have to do is apply the sortable method to the parent element of the row, which normal would be the table itself or ideally the table body. FilmDateRatingThe Shawshank Redemption19949.2 Then you can make the table body rows sortable by using the following jQuery code. $('table tbody').sortable(); One of the options you can use on the sortable method is helper property where you can define a function to run when dragging the display. Therefore we simply need to create a function that will reset the width of the table row by simply using the function below. $('table tbody').sortable({ helper: fixWidthHelper }).disableSelection(); function fixWidthHelper(e, ui) { ui.children().each(function() { $(this).width($(this).width()); }); return ui; } Demo
April 27, 2015
by Paul Underwood
· 19,454 Views
article thumbnail
Increasing Slow Query Performance with the Parallel Query Execution
[This article was written by Alexander Rubin] MySQL and Scaling-up (using more powerful hardware) was always a hot topic. Originally MySQL did not scale well with multiple CPUs; there were times when InnoDB performed poorer with more CPU cores than with less CPU cores. MySQL 5.6 can scale significantly better; however there is still 1 big limitation: 1 SQL query will eventually use only 1 CPU core (no parallelism). Here is what I mean by that: let’s say we have a complex query which will need to scan million of rows and may need to create a temporary table; in this case MySQL will not be able to scan the table in multiple threads (even with partitioning) so the single query will not be faster on the more powerful server. On the contrary, a server with more slower CPUs will show worse performance than the server with less (but faster) CPUs. To address this issue we can use a parallel query execution. Vadim wrote about the PHP asynchronous calls for MySQL. Another way to increase the parallelism will be to use “sharding” approach, for example with Shard Query. I’ve decided to test out the parallel (asynchronous) query execution with relatively large table: I’ve used the US Flights Ontime performance database, which was originally used by Vadim in the old post Analyzing air traffic performance. Let’s see how this can help us increase performance of the complex query reports. Parallel Query Example To illustrate the parallel query execution with MySQL I’ve created the following table: CREATE TABLE `ontime` ( `YearD` year(4) NOT NULL, `Quarter` tinyint(4) DEFAULT NULL, `MonthD` tinyint(4) DEFAULT NULL, `DayofMonth` tinyint(4) DEFAULT NULL, `DayOfWeek` tinyint(4) DEFAULT NULL, `FlightDate` date DEFAULT NULL, `UniqueCarrier` char(7) DEFAULT NULL, `AirlineID` int(11) DEFAULT NULL, `Carrier` char(2) DEFAULT NULL, `TailNum` varchar(50) DEFAULT NULL, `FlightNum` varchar(10) DEFAULT NULL, `OriginAirportID` int(11) DEFAULT NULL, `OriginAirportSeqID` int(11) DEFAULT NULL, `OriginCityMarketID` int(11) DEFAULT NULL, `Origin` char(5) DEFAULT NULL, `OriginCityName` varchar(100) DEFAULT NULL, `OriginState` char(2) DEFAULT NULL, `OriginStateFips` varchar(10) DEFAULT NULL, `OriginStateName` varchar(100) DEFAULT NULL, `OriginWac` int(11) DEFAULT NULL, `DestAirportID` int(11) DEFAULT NULL, `DestAirportSeqID` int(11) DEFAULT NULL, `DestCityMarketID` int(11) DEFAULT NULL, `Dest` char(5) DEFAULT NULL, -- ... (removed number of fields) `id` int(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (`id`), KEY `YearD` (`YearD`), KEY `Carrier` (`Carrier`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; And loaded 26 years of data into it. The table is 56G with ~152M rows. Software: Percona 5.6.15-63.0. Hardware: Supermicro; X8DTG-D; 48G of RAM; 24xIntel(R) Xeon(R) CPU L5639 @ 2.13GHz, 1xSSD drive (250G) So we have 24 relatively slow CPUs Simple query Now we can run some queries. The first query is very simple: find all flights per year (in the US): select yeard, count(*) from ontime group by yeard As we have the index on YearD, the query will use the index: mysql> explain select yeard, count(*) from ontime group by yeardG *************************** 1. row *************************** id: 1 select_type: SIMPLE table: ontime type: index possible_keys: YearD,comb1 key: YearD key_len: 1 ref: NULL rows: 148046200 Extra: Using index 1 row in set (0.00 sec) The query is simple, however, it will have to scan 150M rows. Here is the results of the query (cached): mysql> select yeard, count(*) from ontime group by yeard; +-------+----------+ | yeard | count(*) | +-------+----------+ | 1988 | 5202096 | | 1989 | 5041200 | | 1990 | 5270893 | | 1991 | 5076925 | | 1992 | 5092157 | | 1993 | 5070501 | | 1994 | 5180048 | | 1995 | 5327435 | | 1996 | 5351983 | | 1997 | 5411843 | | 1998 | 5384721 | | 1999 | 5527884 | | 2000 | 5683047 | | 2001 | 5967780 | | 2002 | 5271359 | | 2003 | 6488540 | | 2004 | 7129270 | | 2005 | 7140596 | | 2006 | 7141922 | | 2007 | 7455458 | | 2008 | 7009726 | | 2009 | 6450285 | | 2010 | 6450117 | | 2011 | 6085281 | | 2012 | 6096762 | | 2013 | 5349447 | +-------+----------+ 26 rows in set (54.10 sec) The query took 54 seconds and utilized only 1 CPU core. However, this query is perfect for running in parallel. We can run 26 parallel queries, each will count its own year. I’ve used the following shell script to run the queries in background: #!/bin/bash date for y in {1988..2013} do sql="select yeard, count(*) from ontime where yeard=$y" mysql -vvv ontime -e "$sql" &>par_sql1/$y.log & done wait date Here are the results: par_sql1/1988.log:1 row in set (3.70 sec) par_sql1/1989.log:1 row in set (4.08 sec) par_sql1/1990.log:1 row in set (4.59 sec) par_sql1/1991.log:1 row in set (4.26 sec) par_sql1/1992.log:1 row in set (4.54 sec) par_sql1/1993.log:1 row in set (2.78 sec) par_sql1/1994.log:1 row in set (3.41 sec) par_sql1/1995.log:1 row in set (4.87 sec) par_sql1/1996.log:1 row in set (4.41 sec) par_sql1/1997.log:1 row in set (3.69 sec) par_sql1/1998.log:1 row in set (3.56 sec) par_sql1/1999.log:1 row in set (4.47 sec) par_sql1/2000.log:1 row in set (4.71 sec) par_sql1/2001.log:1 row in set (4.81 sec) par_sql1/2002.log:1 row in set (4.19 sec) par_sql1/2003.log:1 row in set (4.04 sec) par_sql1/2004.log:1 row in set (5.12 sec) par_sql1/2005.log:1 row in set (5.10 sec) par_sql1/2006.log:1 row in set (4.93 sec) par_sql1/2007.log:1 row in set (5.29 sec) par_sql1/2008.log:1 row in set (5.59 sec) par_sql1/2009.log:1 row in set (4.44 sec) par_sql1/2010.log:1 row in set (4.91 sec) par_sql1/2011.log:1 row in set (5.08 sec) par_sql1/2012.log:1 row in set (4.85 sec) par_sql1/2013.log:1 row in set (4.56 sec) Complex Query Now we can try more complex query. Lets imagine we want to find out which airlines have maximum delays for the flights inside continental US during the business days from 1988 to 2009 (I was trying to come up with the complex query with multiple conditions in the where clause). select min(yeard), max(yeard), Carrier, count(*) as cnt, sum(ArrDelayMinutes>30) as flights_delayed, round(sum(ArrDelayMinutes>30)/count(*),2) as rate FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') and flightdate < '2010-01-01' GROUP by carrier HAVING cnt > 100000 and max(yeard) > 1990 ORDER by rate DESC As the query has “group by” and “order by” plus multiple ranges in the where clause it will have to create a temporary table: id: 1 select_type: SIMPLE table: ontime type: index possible_keys: comb1 key: comb1 key_len: 9 ref: NULL rows: 148046200 Extra: Using where; Using temporary; Using filesort (for this query I’ve created the combined index: KEY comb1 (Carrier,YearD,ArrDelayMinutes) to increase performance) The query runs in ~15 minutes: +------------+------------+---------+----------+-----------------+------+ | min(yeard) | max(yeard) | Carrier | cnt | flights_delayed | rate | +------------+------------+---------+----------+-----------------+------+ | 2003 | 2009 | EV | 1454777 | 237698 | 0.16 | | 2006 | 2009 | XE | 1016010 | 152431 | 0.15 | | 2006 | 2009 | YV | 740608 | 110389 | 0.15 | | 2003 | 2009 | B6 | 683874 | 103677 | 0.15 | | 2003 | 2009 | FL | 1082489 | 158748 | 0.15 | | 2003 | 2005 | DH | 501056 | 69833 | 0.14 | | 2001 | 2009 | MQ | 3238137 | 448037 | 0.14 | | 2003 | 2006 | RU | 1007248 | 126733 | 0.13 | | 2004 | 2009 | OH | 1195868 | 160071 | 0.13 | | 2003 | 2006 | TZ | 136735 | 16496 | 0.12 | | 1988 | 2009 | UA | 9593284 | 1197053 | 0.12 | | 1988 | 2009 | AA | 10600509 | 1185343 | 0.11 | | 1988 | 2001 | TW | 2659963 | 280741 | 0.11 | | 1988 | 2009 | CO | 6029149 | 673863 | 0.11 | | 2007 | 2009 | 9E | 577244 | 59440 | 0.10 | | 1988 | 2009 | DL | 11869471 | 1156267 | 0.10 | | 1988 | 2009 | NW | 7601727 | 725460 | 0.10 | | 1988 | 2009 | AS | 1506003 | 146920 | 0.10 | | 2003 | 2009 | OO | 2654259 | 257069 | 0.10 | | 1988 | 2009 | US | 10276941 | 991016 | 0.10 | | 1988 | 1991 | PA | 206841 | 19465 | 0.09 | | 1988 | 2005 | HP | 2607603 | 235675 | 0.09 | | 1988 | 2009 | WN | 12722174 | 1107840 | 0.09 | | 2005 | 2009 | F9 | 307569 | 28679 | 0.09 | +------------+------------+---------+----------+-----------------+------+ 24 rows in set (15 min 56.40 sec) Now we can split this query and run the 31 queries (=31 distinct airlines in this table) in parallel. I have used the following script: date for c in '9E' 'AA' 'AL' 'AQ' 'AS' 'B6' 'CO' 'DH' 'DL' 'EA' 'EV' 'F9' 'FL' 'HA' 'HP' 'ML' 'MQ' 'NW' 'OH' 'OO' 'PA' 'PI' 'PS' 'RU' 'TW' 'TZ' 'UA' 'US' 'WN' 'XE' 'YV' do sql=" select min(yeard), max(yeard), Carrier, count(*) as cnt, sum(ArrDelayMinutes>30) as flights_delayed, round(sum(ArrDelayMinutes>30)/count(*),2) as rate FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') and flightdate < '2010-01-01' and carrier = '$c'" mysql -uroot -vvv ontime -e "$sql" &>par_sql_complex/$c.log & done wait date In this case we will also avoid creating temporary table (as we have an index which starts with carrier). Results: total time is 5 min 47 seconds (3x faster) Start: 15:41:02 EST 2013 End: 15:46:49 EST 2013 Per query statistics: par_sql_complex/9E.log:1 row in set (44.47 sec) par_sql_complex/AA.log:1 row in set (5 min 41.13 sec) par_sql_complex/AL.log:1 row in set (15.81 sec) par_sql_complex/AQ.log:1 row in set (14.52 sec) par_sql_complex/AS.log:1 row in set (2 min 43.01 sec) par_sql_complex/B6.log:1 row in set (1 min 26.06 sec) par_sql_complex/CO.log:1 row in set (3 min 58.07 sec) par_sql_complex/DH.log:1 row in set (31.30 sec) par_sql_complex/DL.log:1 row in set (5 min 47.07 sec) par_sql_complex/EA.log:1 row in set (28.58 sec) par_sql_complex/EV.log:1 row in set (2 min 6.87 sec) par_sql_complex/F9.log:1 row in set (46.18 sec) par_sql_complex/FL.log:1 row in set (1 min 30.83 sec) par_sql_complex/HA.log:1 row in set (39.42 sec) par_sql_complex/HP.log:1 row in set (2 min 45.57 sec) par_sql_complex/ML.log:1 row in set (4.64 sec) par_sql_complex/MQ.log:1 row in set (2 min 22.55 sec) par_sql_complex/NW.log:1 row in set (4 min 26.67 sec) par_sql_complex/OH.log:1 row in set (1 min 9.67 sec) par_sql_complex/OO.log:1 row in set (2 min 14.97 sec) par_sql_complex/PA.log:1 row in set (17.62 sec) par_sql_complex/PI.log:1 row in set (14.52 sec) par_sql_complex/PS.log:1 row in set (3.46 sec) par_sql_complex/RU.log:1 row in set (40.14 sec) par_sql_complex/TW.log:1 row in set (2 min 32.32 sec) par_sql_complex/TZ.log:1 row in set (14.16 sec) par_sql_complex/UA.log:1 row in set (4 min 55.18 sec) par_sql_complex/US.log:1 row in set (4 min 38.08 sec) par_sql_complex/WN.log:1 row in set (4 min 56.12 sec) par_sql_complex/XE.log:1 row in set (24.21 sec) par_sql_complex/YV.log:1 row in set (20.82 sec) As we can see there are large airlines (like AA, UA, US, DL, etc) which took most of the time. In this case the load will not be distributed evenly as in the previous example; however, by running the query in parallel we have got 3x times better response time on this server. CPU utilization: Cpu3 : 22.0%us, 1.2%sy, 0.0%ni, 74.4%id, 2.4%wa, 0.0%hi, 0.0%si, 0.0%st Cpu4 : 16.0%us, 0.0%sy, 0.0%ni, 84.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st Cpu5 : 39.0%us, 1.2%sy, 0.0%ni, 56.1%id, 3.7%wa, 0.0%hi, 0.0%si, 0.0%st Cpu6 : 33.3%us, 0.0%sy, 0.0%ni, 51.9%id, 13.6%wa, 0.0%hi, 1.2%si, 0.0%st Cpu7 : 33.3%us, 1.2%sy, 0.0%ni, 48.8%id, 16.7%wa, 0.0%hi, 0.0%si, 0.0%st Cpu8 : 24.7%us, 0.0%sy, 0.0%ni, 60.5%id, 14.8%wa, 0.0%hi, 0.0%si, 0.0%st Cpu9 : 24.4%us, 0.0%sy, 0.0%ni, 56.1%id, 19.5%wa, 0.0%hi, 0.0%si, 0.0%st Cpu10 : 40.7%us, 0.0%sy, 0.0%ni, 56.8%id, 2.5%wa, 0.0%hi, 0.0%si, 0.0%st Cpu11 : 19.5%us, 1.2%sy, 0.0%ni, 65.9%id, 12.2%wa, 0.0%hi, 1.2%si, 0.0%st Cpu12 : 40.2%us, 1.2%sy, 0.0%ni, 56.1%id, 2.4%wa, 0.0%hi, 0.0%si, 0.0%st Cpu13 : 82.7%us, 0.0%sy, 0.0%ni, 17.3%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st Cpu14 : 55.4%us, 0.0%sy, 0.0%ni, 43.4%id, 1.2%wa, 0.0%hi, 0.0%si, 0.0%st Cpu15 : 86.6%us, 0.0%sy, 0.0%ni, 13.4%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st Cpu16 : 61.0%us, 1.2%sy, 0.0%ni, 37.8%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st Cpu17 : 29.3%us, 1.2%sy, 0.0%ni, 69.5%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st Cpu18 : 18.8%us, 0.0%sy, 0.0%ni, 52.5%id, 28.8%wa, 0.0%hi, 0.0%si, 0.0%st Cpu19 : 14.3%us, 1.2%sy, 0.0%ni, 57.1%id, 27.4%wa, 0.0%hi, 0.0%si, 0.0%st Cpu20 : 12.3%us, 0.0%sy, 0.0%ni, 59.3%id, 28.4%wa, 0.0%hi, 0.0%si, 0.0%st Cpu21 : 10.7%us, 0.0%sy, 0.0%ni, 76.2%id, 11.9%wa, 0.0%hi, 1.2%si, 0.0%st Cpu22 : 0.0%us, 0.0%sy, 0.0%ni,100.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st Cpu23 : 10.8%us, 2.4%sy, 0.0%ni, 71.1%id, 15.7%wa, 0.0%hi, 0.0%si, 0.0%st Note that in case of “order by” we will need to manually sort the results, however, sorting 10-100 rows will be fast. Conclusion Splitting a complex report into multiple queries and running it in parallel (asynchronously) can increase performance (3x to 10x in the above example) and will better utilize modern hardware. It is also possible to split the queries between multiple MySQL servers (i.e. MySQL slave servers) to further increase scalability (will require more coding).
April 25, 2015
by Peter Zaitsev
· 13,185 Views
article thumbnail
Agrona Event Counters
Efficient open source event counters from the Agrona library. Agrona The Agrona library is an open source Java library of utility code. Unlike libraries such as Google Guava or Apache Commons which are general purpose Java utility libraries, Agrona is targeted at providing high performance code. It initially consists of code from the open source Aeron messaging library. Event Counters One of the features of the Agrona library is the event counters framework. One of the design goals of Aeron was to be easy to monitor. We wanted to make sure that people could easily check up on what Aeron is doing with services such as Nagios, internally written monitoring software or just from the commandline. Writing integrations with many services is a herculean task in and of itself, but we definitely wanted to be able to expose an API. We also didn't want to incorporate large 3rd party external dependencies, any allocation heavy code or things we couldn't control the performance of. This meant that we were going to have to write our own event counters rather than using something like the Coda Hale metrics framework. Our requirements for monitoring were very simple though. Update or increment the counter's value. Read or write to/from the counter value from a different thread or process. No garbage creation after initial setup. Labels should be associated with each counter value for readability's sake. Design Threadsafe updates of a long value is a very simple operation and already supported in Java through the AtomicLong class. The problem with using an AtomicLong as your event counter is that an external program, running in a different process, can't read from the value from your Java heap. Consequently Agrona's event counters are allocated in an off-heap buffer. This can be placed on a memory-mapped file which means it can be shared between two different processes. In order to give our counters names we store a table of name and counter id entries on another buffer. This can be placed on the same memory mapped file for convenience. API The CountersManager is responsible for allocating event counters. It needs to be instantiated with the buffers upon which to store the event counters and their labels. Here is an example of how to instantiate a counter from the counters manager. AtomicCounter conductorProxyFails = countersManager.newCounter("Failed offers to DriverConductorProxy"); You can also iterate over the current counter names and their ids. Here is some code that uses that to print a table of the event counter values: countersManager.forEach((id, label)->{finalint offset =CountersManager.counterOffset(id);finallong value = valuesBuffer.getLongVolatile(offset);System.out.format("%3d: %,20d - %s\n", id, value, label);}); Each instance of AtomicCounter represents one event counter. Here are some examples of using the atomic counter in code. // Increment the counter in a thread-safe manner conductorProxyFails.increment();// Increment the counter if you're only writing from a single thread conductorProxyFails.orderedIncrement();// atomically add 5 to the counter value conductorProxyFails.add(5);// Reset the counter conductorProxyFails.set(0); Conclusions I've just gone through a few simple examples of how to use the event counters from Agrona, which hopefully you've found useful. This isn't the only code in Agrona though - there are utilities for agents, and executing timing events as well as collections such as queues, ringbuffers and hashmaps. We're also expanding the library which is already on maven central. Currently documentation is a little bit thin on the ground, but contributions are always welcome. Thanks to Martin Thompson and Chris West for feedback on this blog post.
April 23, 2015
by Richard Warburton
· 5,848 Views
article thumbnail
AutoCompleteTextBox in C# Windows Form Application
In this Article, We will learn how to create AutoCompleteTextBox using C# Windows Form Application. In my previous article, we learned How to Search Records in DataGridView Using C#. Let's Begin. Create a new Windows Form Application. Drop a Label and TextBox Control from the ToolBox. Now go to Code behind file(.cs code) and add the following Code: using System; using System.Windows.Forms; namespace AutoCompleteTextBoxDemo { public partial class Form1 : Form { public Form1() { InitializeComponent(); } //AutoCompleteData Method private void autoCompleteData() { //Set AutoCompleteSource property of txt_StateName as CustomSource txt_StateName.AutoCompleteSource = AutoCompleteSource.CustomSource; //Set AutoCompleteMode property of txt_StateName as SuggestAppend. SuggestAppend Applies both Suggest and Append txt_StateName.AutoCompleteMode = AutoCompleteMode.SuggestAppend; txt_StateName.AutoCompleteCustomSource.AddRange(new string[]{"Maharastra","Andhra Pradesh","Assam","Punjab","Arunachal Pradesh","Bihar","Goa","Gujarat","Haryana"}); } private void Form1_Load(object sender, EventArgs e) { autoCompleteData(); } } } In the preeceding code, We set the AutoCompleteSource, AutoCompleteMode and AutoCompleteCustomSource properties of Textbox named as txt_StateName so that it automatically completes the input string. Preview: AutoComplete TextBox using a Database: In this example, We will Suggest/Append the data in TextBox(txt_StateName) from the Database. For Demonstration, I have created a Database (named Sample). Add a Table, tbl_State. The following is the table schema for creating tbl_State. Add the following lines of code: using System; using System.Windows.Forms; using System.Data.SqlClient; namespace AutoCompleteTextBoxDemo { public partial class Form2 : Form { public Form2() { InitializeComponent(); } private void autoCompleteData() { SqlConnection con = new SqlConnection("Data Source=.;Initial Catalog=Sample;Integrated Security=true;"); SqlCommand com = new SqlCommand("Select State from tbl_State", con); con.Open(); SqlDataReader rdr = com.ExecuteReader(); //AutoCompleteStringCollection Contains a collection of strings to use for the auto-complete feature on certain Windows Forms controls. AutoCompleteStringCollection autoCompleteCollection = new AutoCompleteStringCollection(); while (rdr.Read()) { autoCompleteCollection.Add(rdr.GetString(0)); } //Set AutoCompleteSource property of txt_StateName as CustomSource txt_StateName.AutoCompleteSource = AutoCompleteSource.CustomSource; //Set AutoCompleteMode property of txt_StateName as SuggestAppend. SuggestAppend Applies both Suggest and Append txt_StateName.AutoCompleteMode = AutoCompleteMode.SuggestAppend; txt_StateName.AutoCompleteCustomSource = autoCompleteCollection; con.Close(); } //Form2_Load Event private void Form2_Load(object sender, EventArgs e) { autoCompleteData(); } } } Preview: Hope you like it. Thanks.
April 23, 2015
by Anoop Kumar Sharma
· 7,581 Views
article thumbnail
How to Work with Merged Cells in Word Documents Table inside Android Apps
This technical tip shows how developers can work with merged cells in a Word documents inside Android applications. Several cells in a table can be merged together into a single cell. This is useful when crows require a title or large blocks of text which span across the width of the table. This can only be achieved by merging cells in the table into a single cell. Aspose.Words supports merged cells when working with all input formats including when importing HTML content. In Aspose.Words, merged cells are represented by CellFormat.HorizontalMerge and CellFormat.VerticalMerge. The CellFormat.HorizontalMerge property describes if the cell is part of a horizontal merge of cells. Likewise the CellFormat.VerticalMerge property describes if the cell is a part of a vertical merge of cells. The values of these properties are what define the merge behavior of cells. The first cell in a sequence of merged cells will have CellMerge.First. Any subsequent merged cells has CellMerge.Previous. A cell which is not merged has CellMerge.None. Sometimes when you load an existing document cells in a table will appear merged. However these can be in fact one long cell. Microsoft Word at times is known to export merged cells in this way. This can cause confusion when attempting to work with individual cells. There appears to be no particular pattern as to when this happens. //Checking if a Cell is Merged // Prints the horizontal and vertical merge type of a cell. public void checkCellsMerged() throws Exception { Document doc = new Document(getMyDir() + "Table.MergedCells.doc"); // Retrieve the first table in the document. Table table = (Table)doc.getChild(NodeType.TABLE, 0, true); for (Row row : table.getRows()) { for (Cell cell : row.getCells()) { System.out.println(printCellMergeType(cell)); } } } public String printCellMergeType(Cell cell) { boolean isHorizontallyMerged = cell.getCellFormat().getHorizontalMerge() != CellMerge.NONE; boolean isVerticallyMerged = cell.getCellFormat().getVerticalMerge() != CellMerge.NONE; String cellLocation = MessageFormat.format("R{0}, C{1}", cell.getParentRow().getParentTable().indexOf(cell.getParentRow()) + 1, cell.getParentRow().indexOf(cell) + 1); if (isHorizontallyMerged && isVerticallyMerged) return MessageFormat.format("The cell at {0} is both horizontally and vertically merged", cellLocation); else if (isHorizontallyMerged) return MessageFormat.format("The cell at {0} is horizontally merged.", cellLocation); else if (isVerticallyMerged) return MessageFormat.format("The cell at {0} is vertically merged", cellLocation); else return MessageFormat.format("The cell at {0} is not merged", cellLocation); } //Merging Cells in a Table //Creates a table with two rows with cells in the first row horizontally merged. Document doc = new Document(); DocumentBuilder builder = new DocumentBuilder(doc); builder.insertCell(); builder.getCellFormat().setHorizontalMerge(CellMerge.FIRST); builder.write("Text in merged cells."); builder.insertCell(); // This cell is merged to the previous and should be empty. builder.getCellFormat().setHorizontalMerge(CellMerge.PREVIOUS); builder.endRow(); builder.insertCell(); builder.getCellFormat().setHorizontalMerge(CellMerge.NONE); builder.write("Text in one cell."); builder.insertCell(); builder.write("Text in another cell."); builder.endRow(); builder.endTable(); //Example: Merging Cells Vertically //Creates a table with two columns with cells merged vertically in the first column. Document doc = new Document(); DocumentBuilder builder = new DocumentBuilder(doc); builder.insertCell(); builder.getCellFormat().setVerticalMerge(CellMerge.FIRST); builder.write("Text in merged cells."); builder.insertCell(); builder.getCellFormat().setVerticalMerge(CellMerge.NONE); builder.write("Text in one cell"); builder.endRow(); builder.insertCell(); // This cell is vertically merged to the cell above and should be empty. builder.getCellFormat().setVerticalMerge(CellMerge.PREVIOUS); builder.insertCell(); builder.getCellFormat().setVerticalMerge(CellMerge.NONE); builder.write("Text in another cell"); builder.endRow(); builder.endTable(); //Merging all Cells in a Range //A method which merges all cells of a table in the specified range of cells /** * Merges the range of cells found between the two specified cells both horizontally and vertically. Can span over multiple rows. */ public static void mergeCells(Cell startCell, Cell endCell) { Table parentTable = startCell.getParentRow().getParentTable(); // Find the row and cell indices for the start and end cell. Point startCellPos = new Point(startCell.getParentRow().indexOf(startCell), parentTable.indexOf(startCell.getParentRow())); Point endCellPos = new Point(endCell.getParentRow().indexOf(endCell), parentTable.indexOf(endCell.getParentRow())); // Create the range of cells to be merged based off these indices. Inverse each index if the end cell if before the start cell. Rectangle mergeRange = new Rectangle(Math.min(startCellPos.x, endCellPos.x), Math.min(startCellPos.y, endCellPos.y), Math.abs(endCellPos.x - startCellPos.x) + 1, Math.abs(endCellPos.y - startCellPos.y) + 1); for (Row row : parentTable.getRows()) { for(Cell cell : row.getCells()) { Point currentPos = new Point(row.indexOf(cell), parentTable.indexOf(row)); // Check if the current cell is inside our merge range then merge it. if (mergeRange.contains(currentPos)) { if (currentPos.x == mergeRange.x) cell.getCellFormat().setHorizontalMerge(CellMerge.FIRST); else cell.getCellFormat().setHorizontalMerge(CellMerge.PREVIOUS); if (currentPos.y == mergeRange.y) cell.getCellFormat().setVerticalMerge(CellMerge.FIRST); else cell.getCellFormat().setVerticalMerge(CellMerge.PREVIOUS); } } } } //Merging Cells between Two Cells // Merges the range of cells between the two specified cells // We want to merge the range of cells found inbetween these two cells. Cell cellStartRange = table.getRows().get(2).getCells().get(2); Cell cellEndRange = table.getRows().get(3).getCells().get(3); // Merge all the cells between the two specified cells into one. mergeCells(cellStartRange, cellEndRange);
April 22, 2015
by David Zondray
· 6,384 Views
article thumbnail
Why Elasticsearch is Suitable for Application Log Analytics
Handling Application Logs Enterprise application development using Web technologies has been around for a long time. In recent years we have seen a sharp increase in the deployment of such applications. This is partly due to the proliferation of ecommerce sites, social media sites, mobile application supporting sites, as well as the desire of enterprises to have their applications available 24x7. In most cases, such applications cater to huge load and are deployed on cloud infrastructure. Monitoring deployed applications is increasingly becoming a crucial task, as deployed applications are bound to fail, irrespective of the robust techniques used during development. Whenever an application fails, the most common resolution method starts by examining the application log. If the application has implemented logging properly, the logs can reveal the cause of application failure. Examination of log files is usually done by viewing the file using tools like vi, less, more, tail or grep. Another method is to download the file to a Windows system and viewing it using an editor like Notepad++. Engineers usually scan the log information to look for clues that point to the reasons for failure. Once the cause of failure is identified, suitable action is taken for restoring the application and/or service. The Key to Application Log Analytics This process, of logging onto a remote system and viewing logs is tedious. Additionally, many of the tools do not provide support to make the task of issue identification any simpler. Even when using tools like grep (if we know the pattern), we still need to view the logs in order to go through other information that has been logged, such as the log information that precedes the failure point. While it has always been possible to develop applications to parse application logs, the recent renewed interest in application log analytics is due to the acceptance of NoSQL-like technologies and the availability of standard tools to parse application logs. Though relational databases (RDBMS) have for many years provided the facility to store structured data, they are not well-suited for handling log data, as in many cases, the structure of the logged information is not the same across the file. This does not fit well in the rigidly defined world of an RDBMS. In comparison, NoSQL allows document flexibility and documents with different schemas can be stored in the same database / index / store. The ability to convert log data into a well-defined structure, as well as the ability to search, are key to implement a modern log analytics solution. In this document, we cover how Elasticsearch. Elasticsearch can store documents, giving us the benefit of structured storage without the overheads of a database system. The Suitability of Elasticsearch In the following subsections, we share our views as to why Elasticsearch is a suitable data store for an application log analytics solution. Elasticsearch is part of a popular trio of tools, commonly known as ELK. Of these, L stands for Logstash, the log parser; E stands for Elasticsearch, the document store; and K stands for Kibana, the visualization tool. Storing Documents Logstash can be used to parse plain text data into structured text. Once data has some structure, it becomes easy to find information by enabling search on it. While parsing application logs is not a challenge, the challenge has been in storing the data and enabling search on it. Most prior solutions have used an RDBMS for storage, but the varying structure and textual nature of application logs makes it difficult to use an RDBMS table structure to store data. RDBMSs are not geared toward ‘search’. They are geared for maintaining a ‘single value of truth’ for the data, defining relations between the data, ensuring their consistency and so on. Search is also not a strong point for RDBMSs as they use exact matches for values, while Elasticsearch supports exact matches as well as partial matches. It also supports document scoring, which attaches a confidence factor to the documents located. Elasticsearch supports documents in JSON format and uses the NoSQL philosophy for document storage. This has the advantage of allowing a flexible schema for the data. Unlike an RDBMS, Elasticsearch is a search engine at heart and hence is built for the same. Though Elasticsearch uses NoSQL for storing documents, it does not provide robust methods to update stored data. Not supporting updates is a serious disadvantage in most cases. In the case of application logs, not supporting updates actually works in favour of Elasticsearch. In case of machine logs, updates are not really required. Application logs are generated from a debugging perspective – having data handy for debugging purposes in the event of application crash or incorrect execution. They usually record important events from application execution and provide additional information to allow application developers to identify the reasons for failure. Additionally, existing information in application logs is rarely, if ever, updated. New information is continually being written to the logs, with no need to refer to old information. This plays to Elasticsearch’s strength, which is able to ingest and index new information very quickly. Search One of the easiest ways of locating information from large volumes of logs is to perform a search. Elasticsearch is well suited not only to handle search, it also supports huge volume of data, using distributed computing (implemented using Shards). While Kibana is one of the commonly used tools to display and visualize information stored in Elasticsearch, it is more suited to display standard charts like bar chart, column chart and pie chart. If the features provided by Kibana are not enough, we can always use Elasticsearch’s REST API support and it’s Query DSL (Domain-Specific Language), to search for required information. The Query DSL and the result of the query are in JSON format. Though this format makes it easy for applications to parse and process, users would need a friendly user interface to interact with the data. Handling Voluminous Data Elasticsearch supports distributed search out of the box – using the concept of ‘shards’. A shard is a single Lucene instance and is managed by Elasticsearch. Two types of shards, namely ‘primary shard’ and ‘replica shard’ are supported. By default, a document is first indexed on the primary shard and then on the replica shards. The number of primary shards can be specified, to cater to the expected volume. By default, Elasticsearch creates five shards for an index. But, once the number of primary shards is decided, it cannot be changed. A replica shards are copies the primary shard. They are used to handle fail-over and the increase performance. While performance across voluminous data can be handled by sharding, it is important to note that shards, once created for an index, cannot be changed. Thus, the sharding strategy of the data has to be decided in advance, after an assessment of the data and an estimation of its growth. In the case of application logs, the sharding strategy can be based on the application name, the business unit ID, the application OD or the application’s geolocation, just to name a few. Analytics By storing data in a structure, analytics can be enabled on the data. Not only can application perform a simple search, it is also possible to restrict the search for specific terms or over a specified time period. Structured storage also makes it easier to develop reports with well-defined visualizations, which in turn makes it easy to understand the current state of applications. It is also possible to perform various analytics operations like time series analysis using the timestamp and identification of patterns from the data using machine learning techniques (assuming, we have the right kind of data in the logs). Though Elasticsearch does not provide built-in support for analytics, applications can benefit from its fast search capability and also from its ability to handle voluminous data sets. In Closing One of the main hurdles for application logs has been the ability to search for information from the huge volume of data. By parsing application log files using Logstash, we can convert a flat file into structured data. Structured data, once stored in Elasticsearch, is easier to search and locate. Visualizations and business logic for generating alerts and tickets is easier to develop on structured data. Elasticsearch, which stores and searches documents, along with its ability to scale over huge volume of data, is a good candidate for inclusion in an application log analytics solution.
April 22, 2015
by Bipin Patwardhan
· 11,705 Views · 2 Likes
article thumbnail
On Neo4j Indexes, Match and Merge
Neo4j uses schema indexes, constraints, merges, matches, and a variety of other indexing features to help with your NoSQL needs.
April 20, 2015
by Michael Hunger
· 11,819 Views
article thumbnail
Internet of Things MQTT Quality of Service Levels
At Red Hat's Virtual Event, Building Data-driven Solutions for the Internet of Things, Kenneth Peeples spoke about connecting to the IoT with the MQTT protocol.
April 20, 2015
by Kenneth Peeples
· 12,282 Views
article thumbnail
Profiling MySQL Queries from Performance Schema
[This article was written by Jarvin Real] When optimizing queries and investigating performance issues, MySQL comes with built in support for profiling queries aka SET profiling=1; . This is already awesome and simple to use, but why the PERFORMANCE_SCHEMA alternative? Because profiling will be removed soon (already deprecated on MySQL 5.6 ad 5.7); the built-in profiling capability can only be enabled per session. This means that you cannot capture profiling information for queries running from other connections. If you are using Percona Server, the profiling option forlog_slow_verbosity is a nice alternative, unfortunately, not everyone is using Percona Server. Now, for a quick demo: I execute a simple query and profile it below. Note that all of these commands are executed from a single session to my test instance. mysql> SHOW PROFILES; +----------+------------+----------------------------------------+ | Query_ID | Duration | Query | +----------+------------+----------------------------------------+ | 1 | 0.00011150 | SELECT * FROM sysbench.sbtest1 LIMIT 1 | +----------+------------+----------------------------------------+ 1 row in set, 1 warning (0.00 sec) mysql> SHOW PROFILE SOURCE FOR QUERY 1; +----------------------+----------+-----------------------+------------------+-------------+ | Status | Duration | Source_function | Source_file | Source_line | +----------------------+----------+-----------------------+------------------+-------------+ | starting | 0.000017 | NULL | NULL | NULL | | checking permissions | 0.000003 | check_access | sql_parse.cc | 5797 | | Opening tables | 0.000021 | open_tables | sql_base.cc | 5156 | | init | 0.000009 | mysql_prepare_select | sql_select.cc | 1050 | | System lock | 0.000005 | mysql_lock_tables | lock.cc | 306 | | optimizing | 0.000002 | optimize | sql_optimizer.cc | 138 | | statistics | 0.000006 | optimize | sql_optimizer.cc | 381 | | preparing | 0.000005 | optimize | sql_optimizer.cc | 504 | | executing | 0.000001 | exec | sql_executor.cc | 110 | | Sending data | 0.000025 | exec | sql_executor.cc | 190 | | end | 0.000002 | mysql_execute_select | sql_select.cc | 1105 | | query end | 0.000003 | mysql_execute_command | sql_parse.cc | 5465 | | closing tables | 0.000004 | mysql_execute_command | sql_parse.cc | 5544 | | freeing items | 0.000005 | mysql_parse | sql_parse.cc | 6969 | | cleaning up | 0.000006 | dispatch_command | sql_parse.cc | 1874 | +----------------------+----------+-----------------------+------------------+-------------+ 15 rows in set, 1 warning (0.00 sec) To demonstrate how we can achieve the same with Performance Schema, we first identify our current connection id. In the real world, you might want to get the connection/processlist id of the thread you want to watch i.e. from SHOW PROCESSLIST . mysql> SELECT THREAD_ID INTO @my_thread_id -> FROM threads WHERE PROCESSLIST_ID = CONNECTION_ID(); Query OK, 1 row affected (0.00 sec) Next, we identify the bounding EVENT_IDs for the statement stages. We will look for the statement we wanted to profile using the query below from the events_statements_history_long table. Your LIMIT clause may vary depending on how much queries the server might be getting. mysql> SELECT THREAD_ID, EVENT_ID, END_EVENT_ID, SQL_TEXT, NESTING_EVENT_ID -> FROM events_statements_history_long -> WHERE THREAD_ID = @my_thread_id -> AND EVENT_NAME = 'statement/sql/select' -> ORDER BY EVENT_ID DESC LIMIT 3 G *************************** 1. row *************************** THREAD_ID: 13848 EVENT_ID: 419 END_EVENT_ID: 434 SQL_TEXT: SELECT THREAD_ID INTO @my_thread_id FROM threads WHERE PROCESSLIST_ID = CONNECTION_ID() NESTING_EVENT_ID: NULL *************************** 2. row *************************** THREAD_ID: 13848 EVENT_ID: 374 END_EVENT_ID: 392 SQL_TEXT: SELECT * FROM sysbench.sbtest1 LIMIT 1 NESTING_EVENT_ID: NULL *************************** 3. row *************************** THREAD_ID: 13848 EVENT_ID: 353 END_EVENT_ID: 364 SQL_TEXT: select @@version_comment limit 1 NESTING_EVENT_ID: NULL 3 rows in set (0.02 sec) From the results above, we are mostly interested with the EVENT_ID and END_EVENT_ID values from the second row, this will give us the stage events of this particular query from the events_stages_history_long table. mysql> SELECT EVENT_NAME, SOURCE, (TIMER_END-TIMER_START)/1000000000 as 'DURATION (ms)' -> FROM events_stages_history_long -> WHERE THREAD_ID = @my_thread_id AND EVENT_ID BETWEEN 374 AND 392; +--------------------------------+----------------------+---------------+ | EVENT_NAME | SOURCE | DURATION (ms) | +--------------------------------+----------------------+---------------+ | stage/sql/init | mysqld.cc:998 | 0.0214 | | stage/sql/checking permissions | sql_parse.cc:5797 | 0.0023 | | stage/sql/Opening tables | sql_base.cc:5156 | 0.0205 | | stage/sql/init | sql_select.cc:1050 | 0.0089 | | stage/sql/System lock | lock.cc:306 | 0.0047 | | stage/sql/optimizing | sql_optimizer.cc:138 | 0.0016 | | stage/sql/statistics | sql_optimizer.cc:381 | 0.0058 | | stage/sql/preparing | sql_optimizer.cc:504 | 0.0044 | | stage/sql/executing | sql_executor.cc:110 | 0.0008 | | stage/sql/Sending data | sql_executor.cc:190 | 0.0251 | | stage/sql/end | sql_select.cc:1105 | 0.0017 | | stage/sql/query end | sql_parse.cc:5465 | 0.0031 | | stage/sql/closing tables | sql_parse.cc:5544 | 0.0037 | | stage/sql/freeing items | sql_parse.cc:6969 | 0.0056 | | stage/sql/cleaning up | sql_parse.cc:1874 | 0.0006 | +--------------------------------+----------------------+---------------+ 15 rows in set (0.01 sec) As you can see the results are pretty close, not exactly the same but close. SHOW PROFILE shows Duration in seconds, while the results above is in milliseconds. Some limitations to this method though: As we’ve seen it takes a few hoops to dish out the information we need. Because we have to identify the statement we have to profile manually, this procedure may not be easy to port into tools like the sys schema or pstop. Only possible if Performance Schema is enabled (by default its enabled since MySQL 5.6.6, yay!) Does not cover all metrics compared to the native profiling i.e. CONTEXT SWITCHES, BLOCK IO, SWAPS Depending on how busy the server you are running the tests, the sizes of the history tables may be too small, as such you either have to increase or loose the history to early i.e.performance_schema_events_stages_history_long_size variable. Using ps_history might help in this case though with a little modification to the queries. The resulting Duration per event may vary, I would think this may be due to the additional as described on performance_timers table. In any case we hope to get this cleared up as result whenthis bug is fixed.
April 18, 2015
by Peter Zaitsev
· 8,084 Views
article thumbnail
Using Apache Kafka for Integration and Data Processing Pipelines with Spring
written by josh long on the spring blog applications generated more and more data than ever before and a huge part of the challenge - before it can even be analyzed - is accommodating the load in the first place. apache’s kafka meets this challenge. it was originally designed by linkedin and subsequently open-sourced in 2011. the project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. the design is heavily influenced by transaction logs. it is a messaging system, similar to traditional messaging systems like rabbitmq, activemq, mqseries, but it’s ideal for log aggregation, persistent messaging, fast (_hundreds_ of megabytes per second!) reads and writes, and can accommodate numerous clients. naturally, this makes it perfect for cloud-scale architectures! kafka powers many large production systems . linkedin uses it for activity data and operational metrics to power the linkedin news feed, and linkedin today, as well as offline analytics going into hadoop. twitter uses it as part of their stream-processing infrastructure. kafka powers online-to-online and online-to-offline messaging at foursquare. it is used to integrate foursquare monitoring and production systems with hadoop-based offline infrastructures. square uses kafka as a bus to move all system events through square’s various data centers. this includes metrics, logs, custom events, and so on. on the consumer side, it outputs into splunk, graphite, or esper-like real-time alerting. netflix uses it for 300-600bn messages per day. it’s also used by airbnb, mozilla, goldman sachs, tumblr, yahoo, paypal, coursera, urban airship, hotels.com, and a seemingly endless list of other big-web stars. clearly, it’s earning its keep in some powerful systems! installing apache kafka there are many different ways to get apache kafka installed. if you’re on osx, and you’re using homebrew, it can be as simple as brew install kafka . you can also download the latest distribution from apache . i downloaded kafka_2.10-0.8.2.1.tgz , unzipped it, and then within you’ll find there’s a distribution of apache zookeeper as well as kafka, so nothing else is required. i installed apache kafka in my $home directory, under another directory, bin , then i created an environment variable, kafka_home , that points to $home/bin/kafka . start apache zookeeper first, specifying where the configuration properties file it requires is: $kafka_home/bin/zookeeper-server-start.sh $kafka_home/config/zookeeper.properties the apache kafka distribution comes with default configuration files for both zookeeper and kafka, which makes getting started easy. you will in more advanced use cases need to customize these files. then start apache kafka. it too requires a configuration file, like this: $kafka_home/bin/kafka-server-start.sh $kafka_home/config/server.properties the server.properties file contains, among other things, default values for where to connect to apache zookeeper ( zookeeper.connect ), how much data should be sent across sockets, how many partitions there are by default, and the broker id ( broker.id - which must be unique across a cluster). there are other scripts in the same directory that can be used to send and receive dummy data, very handy in establishing that everything’s up and running! now that apache kafka is up and running, let’s look at working with apache kafka from our application. some high level concepts.. a kafka broker cluster consists of one or more servers where each may have one or more broker processes running. apache kafka is designed to be highly available; there are no master nodes. all nodes are interchangeable. data is replicated from one node to another to ensure that it is still available in the event of a failure. in kafka, a topic is a category, similar to a jms destination or both an amqp exchange and queue. topics are partitioned, and the choice of which of a topic’s partition a message should be sent to is made by the message producer. each message in the partition is assigned a unique sequenced id, its offset . more partitions allow greater parallelism for consumption, but this will also result in more files across the brokers. producers send messages to apache kafka broker topics and specify the partition to use for every message they produce. message production may be synchronous or asynchronous. producers also specify what sort of replication guarantees they want. consumers listen for messages on topics and process the feed of published messages. as you’d expect if you’ve used other messaging systems, this is usually (and usefully!) asynchronous. like spring xd and numerous other distributed system, apache kafka uses apache zookeeper to coordinate cluster information. apache zookeeper provides a shared hierarchical namespace (called znodes ) that nodes can share to understand cluster topology and availability (yet another reason that spring cloud has forthcoming support for it..). zookeeper is very present in your interactions with apache kafka. apache kafka has, for example, two different apis for acting as a consumer. the higher level api is simpler to get started with and it handles all the nuances of handling partitioning and so on. it will need a reference to a zookeeper instance to keep the coordination state. let’s turn now turn to using apache kafka with spring. using apache kafka with spring integration the recently released apache kafka 1.1 spring integration adapter is very powerful, and provides inbound adapters for working with both the lower level apache kafka api as well as the higher level api. the adapter, currently, is xml-configuration first, though work is already underway on a spring integration java configuration dsl for the adapter and milestones are available. we’ll look at both here, now. to make all these examples work, i added the libs-milestone-local maven repository and used the following dependencies: org.apache.kafka:kafka_2.10:0.8.1.1 org.springframework.boot:spring-boot-starter-integration:1.2.3.release org.springframework.boot:spring-boot-starter:1.2.3.release org.springframework.integration:spring-integration-kafka:1.1.1.release org.springframework.integration:spring-integration-java-dsl:1.1.0.m1 using the spring integration apache kafka with the spring integration xml dsl first, let’s look at how to use the spring integration outbound adapter to send message instances from a spring integration flow to an external apache kafka instance. the example is fairly straightforward: a spring integration channel named inputtokafka acts as a conduit that forwards message messages to the outbound adapter, kafkaoutboundchanneladapter . the adapter itself can take its configuration from the defaults specified in the kafka:producer-context element or it from the adapter-local configuration overrides. there may be one or many configurations in a given kafka:producer-context element. here’s the java code from a spring boot application to trigger message sends using the outbound adapter by sending messages into the incoming inputtokafka messagechannel . package xml; import org.apache.commons.logging.log; import org.apache.commons.logging.logfactory; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.boot.commandlinerunner; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.dependson; import org.springframework.context.annotation.importresource; import org.springframework.integration.config.enableintegration; import org.springframework.messaging.messagechannel; import org.springframework.messaging.support.genericmessage; @springbootapplication @enableintegration @importresource("/xml/outbound-kafka-integration.xml") public class demoapplication { private log log = logfactory.getlog(getclass()); @bean @dependson("kafkaoutboundchanneladapter") commandlinerunner kickoff(@qualifier("inputtokafka") messagechannel in) { return args -> { for (int i = 0; i < 1000; i++) { in.send(new genericmessage<>("#" + i)); log.info("sending message #" + i); } }; } public static void main(string args[]) { springapplication.run(demoapplication.class, args); } } using the new apache kafka spring integration java configuration dsl shortly after the spring integration 1.1 release, spring integration rockstar artem bilan got to work on adding a spring integration java configuration dsl analog and the result is a thing of beauty! it’s not yet ga (you need to add the libs-milestone repository for now), but i encourage you to try it out and kick the tires. it’s working well for me and the spring integration team are always keen on getting early feedback whenever possible! here’s an example that demonstrates both sending messages and consuming them from two different integrationflow s. the producer is similar to the example xml above. new in this example is the polling consumer. it is batch-centric, and will pull down all the messages it sees at a fixed interval. in our code, the message received will be a map that contains as its keys the topic and as its value another map with the partition id and the batch (in this case, of 10 records), of records read. there is a messagelistenercontainer -based alternative that processes messages as they come. package jc; import org.apache.commons.logging.log; import org.apache.commons.logging.logfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.beans.factory.annotation.value; import org.springframework.boot.commandlinerunner; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.context.annotation.dependson; import org.springframework.integration.integrationmessageheaderaccessor; import org.springframework.integration.config.enableintegration; import org.springframework.integration.dsl.integrationflow; import org.springframework.integration.dsl.integrationflows; import org.springframework.integration.dsl.sourcepollingchanneladapterspec; import org.springframework.integration.dsl.kafka.kafka; import org.springframework.integration.dsl.kafka.kafkahighlevelconsumermessagesourcespec; import org.springframework.integration.dsl.kafka.kafkaproducermessagehandlerspec; import org.springframework.integration.dsl.support.consumer; import org.springframework.integration.kafka.support.zookeeperconnect; import org.springframework.messaging.messagechannel; import org.springframework.messaging.support.genericmessage; import org.springframework.stereotype.component; import java.util.list; import java.util.map; /** * demonstrates using the spring integration apache kafka java configuration dsl. * thanks to spring integration ninja artem bilan * for getting the java configuration dsl working so quickly! * * @author josh long */ @enableintegration @springbootapplication public class demoapplication { public static final string test_topic_id = "event-stream"; @component public static class kafkaconfig { @value("${kafka.topic:" + test_topic_id + "}") private string topic; @value("${kafka.address:localhost:9092}") private string brokeraddress; @value("${zookeeper.address:localhost:2181}") private string zookeeperaddress; kafkaconfig() { } public kafkaconfig(string t, string b, string zk) { this.topic = t; this.brokeraddress = b; this.zookeeperaddress = zk; } public string gettopic() { return topic; } public string getbrokeraddress() { return brokeraddress; } public string getzookeeperaddress() { return zookeeperaddress; } } @configuration public static class producerconfiguration { @autowired private kafkaconfig kafkaconfig; private static final string outbound_id = "outbound"; private log log = logfactory.getlog(getclass()); @bean @dependson(outbound_id) commandlinerunner kickoff( @qualifier(outbound_id + ".input") messagechannel in) { return args -> { for (int i = 0; i < 1000; i++) { in.send(new genericmessage<>("#" + i)); log.info("sending message #" + i); } }; } @bean(name = outbound_id) integrationflow producer() { log.info("starting producer flow.."); return flowdefinition -> { consumer spec = (kafkaproducermessagehandlerspec.producermetadataspec metadata)-> metadata.async(true) .batchnummessages(10) .valueclasstype(string.class) .valueencoder(string::getbytes); kafkaproducermessagehandlerspec messagehandlerspec = kafka.outboundchanneladapter( props -> props.put("queue.buffering.max.ms", "15000")) .messagekey(m -> m.getheaders().get(integrationmessageheaderaccessor.sequence_number)) .addproducer(this.kafkaconfig.gettopic(), this.kafkaconfig.getbrokeraddress(), spec); flowdefinition .handle(messagehandlerspec); }; } } @configuration public static class consumerconfiguration { @autowired private kafkaconfig kafkaconfig; private log log = logfactory.getlog(getclass()); @bean integrationflow consumer() { log.info("starting consumer.."); kafkahighlevelconsumermessagesourcespec messagesourcespec = kafka.inboundchanneladapter( new zookeeperconnect(this.kafkaconfig.getzookeeperaddress())) .consumerproperties(props -> props.put("auto.offset.reset", "smallest") .put("auto.commit.interval.ms", "100")) .addconsumer("mygroup", metadata -> metadata.consumertimeout(100) .topicstreammap(m -> m.put(this.kafkaconfig.gettopic(), 1)) .maxmessages(10) .valuedecoder(string::new)); consumer endpointconfigurer = e -> e.poller(p -> p.fixeddelay(100)); return integrationflows .from(messagesourcespec, endpointconfigurer) .>>handle((payload, headers) -> { payload.entryset().foreach(e -> log.info(e.getkey() + '=' + e.getvalue())); return null; }) .get(); } } public static void main(string[] args) { springapplication.run(demoapplication.class, args); } } the example makes heavy use of java 8 lambdas. the producer spends a bit of time establishing how many messages will be sent in a single send operation, how keys and values are encoded (kafka only knows about byte[] arrays, after all) and whether messages should be sent synchronously or asynchronously. in the next line, we configure the outbound adapter itself and then define an integrationflow such that all messages get sent out via the kafka outbound adapter. the consumer spends a bit of time establishing which zookeeper instance to connect to, how many messages to receive (10) in a batch, etc. once the message batches are recieved, they’re handed to the handle method where i’ve passed in a lambda that’ll enumerate the payload’s body and print it out. nothing fancy. using apache kafka with spring xd apache kafka is a message bus and it can be very powerful when used as an integration bus. however, it really comes into its own because it’s fast enough and scalable enough that it can be used to route big-data through processing pipelines. and if you’re doing data processing, you really want spring xd ! spring xd makes it dead simple to use apache kafka (as the support is built on the apache kafka spring integration adapter!) in complex stream-processing pipelines. apache kafka is exposed as a spring xd source - where data comes from - and a sink - where data goes to. spring xd exposes a super convenient dsl for creating bash -like pipes-and-filter flows. spring xd is a centralized runtime that manages, scales, and monitors data processing jobs. it builds on top of spring integration, spring batch, spring data and spring for hadoop to be a one-stop data-processing shop. spring xd jobs read data from sources , run them through processing components that may count, filter, enrich or transform the data, and then write them to sinks. spring integration and spring xd ninja marius bogoevici , who did a lot of the recent work in the spring integration and spring xd implementation of apache kafka, put together a really nice example demonstrating how to get a full working spring xd and kafka flow working . the readme walks you through getting apache kafka, spring xd and the requisite topics all setup. the essence, however, is when you use the spring xd shell and the shell dsl to compose a stream. spring xd components are named components that are pre-configured but have lots of parameters that you can override with --.. arguments via the xd shell and dsl. (that dsl, by the way, is written by the amazing andy clement of spring expression language fame!) here’s an example that configures a stream to read data from an apache kafka source and then write the message a component called log , which is a sink. log , in this case, could be syslogd, splunk, hdfs, etc. xd> stream create kafka-source-test --definition "kafka --zkconnect=localhost:2181 --topic=event-stream | log"--deploy and that’s it! naturally, this is just a tase of spring xd, but hopefully you’ll agree the possibilities are tantalizing. deploying a kafka server with lattice and docker it’s easy to get an example kafka installation all setup using lattice , a distributed runtime that supports, among other container formats, the very popular docker image format. there’s a docker image provided by spotify that sets up a collocated zookeeper and kafka image . you can easily deploy this to a lattice cluster, as follows: ltc create --run-as-root m-kafka spotify/kafka from there, you can easily scale the apache kafka instances and even more easily still consume apache kafka from your cloud-based services. next steps you can find the code for this blog on my github account . we’ve only scratched the surface! if you want to learn more (and why wouldn’t you?), then be sure to check out marius bogoevici and dr. mark pollack’s upcoming webinar on reactive data-pipelines using spring xd and apache kafka where they’ll demonstrate how easy it can be to use rxjava, spring xd and apache kafka!
April 18, 2015
by Pieter Humphrey
· 29,111 Views
article thumbnail
Meet Molly, the virtual nurse
Telemedicine is a topic that I’ve touched on numerous times on this blog over the past year or so, with a number of platforms emerging to offer patients the opportunity to consult with a medical professional from anywhere in the world. Most of these platforms are clinical based, such as the Babylon service, and therefore provide you with access to doctors when you have something wrong with you. There are however, also a number that are taking a more preventative approach and seek to keep you healthier in the first place. For instance, last autumn I wrote about the Vida platform that is providing coaching and support to ensure you stay out of the emergency room altogether. Of course, all of these services require a human healthcare professional at the other end of your video call to answer your queries for you. Sense.ly are taking another approach by offering an AI based nurse, called Molly, who aims to provide help and support in those periods between appointments with real life professionals. The service is aimed specifically at patients with common medical conditions such as diabetes or heart failure. The patient signs up to the site either direct or via their GP, and the platform then draws up a personalized care plan for them based upon both their medical records and the individual needs of the patient. The patient then follows this prescribed plan (hopefully), with regular check-ins with the virtual nurse via their smartphone or computer to help their progress. Doctors can also access information via the site to see how their patient is getting on, whilst the system will alert them automatically if worrying symptoms begin to emerge. I’ve written previously about the important role the appearance of avatars has on how we engage with them, so Molly has been designed with a friendly face and a softly spoken voice. She interacts with patients using voice recognition technology and can ask relatively simple questions of the patient whilst guiding them through exercise plans and collecting medical data from them. This data can then be analyzed by a doctor (although AI analysis seems inevitable), whilst the patient can also use Molly as a sort of health PA and book appointments with their doctor through her. With voice recognition technology growing at an incredible pace and the AI grunt of services such as Watson increasingly capable of making complex decisions, this seems the beginning of an inevitable trend towards more automated healthcare. Check out the video below that explains more about Molly and the service she offers. Original post
April 15, 2015
by Adi Gaskell
· 2,098 Views
article thumbnail
Monitoring rsyslog’s Performance with impstats and Elasticsearch
If you’re using rsyslog for processing lots of logs (and, as we’ve shown before, rsyslog is good at processing lots of logs), you’re probably interested in monitoring it. To do that, you can use impstats, which comes from input module forprocess stats. impstats produces information like: – input stats, like how many events went through each input – queue stats, like the maximum size of a queue – action (output or message modification) stats, like how many events were forwarded by each action – general stats, like CPU time or memory usage In this post, we’ll show you how to send those stats to Elasticsearch (or Logsene — essentially hosted ELK, our log analytics service) that exposes the Elasticsearch API), where you can explore them with a nice UI, like Kibana. For example get the number of logs going through each input/output per hour: More precisely, we’ll look at: – the useful options around impstats – how to use those stats and what they’re about – how to ship stats to Elasticsearch/Logsene by using rsyslog’s Elasticsearch output – how to do this shipping in a fast and reliable way. This will apply to most rsyslog use-cases, not only impstats Configuring impstats Before starting, make sure you have a recent version of rsyslog. You can find the latest version (8.9.0 at the time of this writing), as well as packages for various distributions here. Many distributions still ship ancient versions like 5.x, which probably have impstats, but some of the features (like Elasticsearch output) may not be available. Once you’re there, load the impstats module at the beginning of your config: module( load="impstats" interval="10" # how often to generate stats resetCounters="on" # to get deltas (e.g. # of messages submitted in the last 10 seconds) log.file="/tmp/stats" # file to write those stats to log.syslog="off" # don't send stats through the normal processing pipeline. More on that in a bit At this point, if you restart rsyslog, you should see stats about all the inputs, queues, actions, as well as overall resource usage. For example, the stats below come from an rsyslog that takes messages over TCP and sends them over to Elasticsearch in Logstash-like format: Thu Apr 9 16:45:36 2015: omelasticsearch: origin=omelasticsearch submitted=11000 failed.http=0 failed.httprequests=0 failed.es=0 Thu Apr 9 16:45:36 2015: send-to-es: origin=core.action processed=10405 failed=0 suspended=0 suspended.duration=0 resumed=0 Thu Apr 9 16:45:36 2015: imtcp(13514): origin=imtcp submitted=6618 Thu Apr 9 16:45:36 2015: resource-usage: origin=impstats utime=2109000 stime=2415000 maxrss=53236 minflt=12559 majflt=1 inblock=8 oublock=0 nvcsw=164893 nivcsw=384355 Thu Apr 9 16:45:36 2015: main Q: origin=core.queue size=65095 enqueued=7149 full=0 discarded.full=0 discarded.nf=0 maxqsize=70000 Wait. What are these stats? Here’s my take on each line: 1. omelasticsearch (output module to Elasticsearch) sent 11K logs to ES in the last 10 seconds. There were no connectivity errors, nor any errors thrown by Elasticsearch (like you would get if you tried to index a string in an integer field) 2. the “send-to-es” action (which uses omelasticsearch) took a bit less than 11K logs from the main queue to send them to omelasticsearch. I assume the rest of them were sent before this 10 second window. Not terribly useful, but if there was a connectivity issue with Elasticsearch, you’d see how long this action was suspended 3. the TCP input received 6.6K logs in the last 10 seconds 4. rsyslog used ~2 seconds of user CPU time (utime=2109000 microseconds) and ~2.5s of system time. It used 53MB of RAM at most. You can see what all these abreviations mean by looking at getrusage’s man page 5. the default (main) queue currently buffers 65K messages from the inputs (though it went as high as 70K in the last 10 seconds), 7K of which were taken in the last 10 seconds Shipping Stats to Elasticsearch/Logsene Now that we have these stats, let’s centralize them to Elasticsearch. If you’re using rsyslog to push to Elasticsearch, it’s better to use another cluster or Logsene for stats. Otherwise, when Elasticsearch is in trouble, you won’t be able to see stats which might explain why you’re having trouble in the first place. Either way, we need four things: – produce those stats in JSON, so we can parse them easily – define a template for how JSON documents that we send to Elasticsearch will look like – parse the JSON stats – send those documents to Logsene/Elasticsearch using the defined template Here’s the relevant config snippet for sending to Logsene: module( load="impstats" interval="10" resetCounters="on" format="cee" # produce JSON stats ) module(load="mmjsonparse") module(load="omelasticsearch") #template for building the JSON documents that will land in Logsene/Elasticsearch template(name="stats" type="list") { constant(value="{") property(name="timereported" dateFormat="rfc3339" format="jsonf" outname="@timestamp") # the timestamp constant(value=",") property(name="hostname" format="jsonf" outname="host") # the host generating stats constant(value=",\"source\":\"impstats\",") # we'll hardcode "impstats" as a source property(name="$!all-json" position.from="2") # finally, we'll add all metrics } action( name="parse_impstats" # parse the type="mmjsonparse" # JSON stats ) action( name="impstats_to_es" # name actions so you can see them in impstats messages (instead of the default action 1, 2, etc) type="omelasticsearch" server="logsene-receiver.sematext.com" # host and port for Logsene/Elasticsearch serverport="80" # set serverport="443" and add usehttps="on" for using HTTPS instead of plain HTTP template="stats" # use the template defined earlier searchIndex="LOGSENE_APP_TOKEN_GOES_HERE" searchType="impstats" # we'll use a separate mapping type for stats bulkmode="on" # use Elasticsearch's bulk API action.resumeretrycount="-1" # retry indefinitely on failure ) That’s all you basically need to be sending stats to Logsene/Elasticsearch: load the impstats, mmjsonparse and omelasticsearch modules, define the template, parse stats and send them over. Note that while impstats comes bundled with most rsyslog packages, you need to install rsyslog-mmjsonparse and rsyslog-elasticsearch packages to install the other two plugins. Using a separate ruleset and configuring its queue Before wrapping up, let’s address two potential issues. First is that, by default, impstats will send stats events to the main queue (all input modules do that by default). This will mix stats with other logs, which has a couple of disadvantages: – you need to add a conditional to make sure only impstats events go to the impstats-specific destination – if rsyslog is queueing lots of messages in the main queue, stats can land in Elasticsearch with a delay To avoid these problems, you can bind impstats to a separate ruleset. Let’s call it “monitoring”. rsyslog will then process them separately from the main queue, which is associated to the default ruleset. You can have as many rulesets as you want, and they’re typically used to separate different types of logs. For example to process local logs and remote logs independently. Like the default ruleset which has the main queue, any ruleset can have its own queue (also, each action, no matter the ruleset it’s in, can have its own queue – more info on that here, here and here). Why am I talking about queues? Because if stats are important, you want to make sure you are able to queue them, instead of losing them if Elasticsearch becomes unavailable for a while. By default, the default ruleset comes with an in-memory queue of 10K or so messages. Additional rulesets have no queue by default, but you can add one by specifying queue options (you can find the complete list here). While in-memory queues are fast, they are typically small and you’d lose their contents if you have to shut down or restart rsyslog. In the following config snippet will add a disk-assisted queue to the “monitoring” ruleset. A disk assisted queue will normally be as fast as an in-memory queue, and will spill logs to disk in a performance-friendly way if it’s out of space. You can also make rsyslog save all logs to disk when you shut it down or restart it. module( load="impstats" interval="10" resetCounters="on" format="cee" ruleset="monitoring" # send stats to the monitoring ruleset ) # add here modules and template from the previous snippet ruleset( name="monitoring" # the monitoring ruleset defined earlier queue.type="FixedArray" # we'll have a fixed memory queue for this ruleset queue.highwatermark="50000" # at least until it contains 50K stats messages queue.spoolDirectory="/var/run/rsyslog/queues" # at which point, start writing in-memory messages to disk queue.filename="stats_ruleset" queue.lowwatermark="20000" # until the memory queue goes back to 20K, at which point the memory queue is used again queue.maxdiskspace="100m" # the queue will be full when it occupies 100MB of space on disk queue.size="5000000" # this is the total queue size (shouldn't be reachable) queue.dequeuebatchsize="1000" # how many messages from the queue to process at once (also determines how many messages will be in an ES Bulk) queue.saveonshutdown="on" # save queue contents to disk at shutdown ){ # add here actions from the previous snippet } Summary This was quite a long post, so let me summarize the features of rsyslog we touched on: – impstats is an input module that can generate stats about rsyslog’s inputs, queues and actions, as well as general process statistics – you normally want to write them to a file in a human-readable format for development/debugging or local performance tests – for production, it’s best to write them in JSON, parse them in a separate ruleset and send them to Logsene/Elasticsearch, where you can search and graph them – you can use disk-assisted queues to increase the capacity of an in-memory queue without losing performance under normal conditions. It can also save logs to disk on shutdown to make sure important stats are not lost If you find working with logs and/or Elasticsearch exciting, that’s what we do in lots of our products, consulting andsupport engagements. So if you want to join us, we’re hiring worldwide.
April 14, 2015
by Radu Gheorghe
· 8,155 Views
article thumbnail
Using Multiple Grok Statements to Parse a Java Stack Trace
Parse your Java stack trace log information with the Logstash tool.
April 14, 2015
by Bipin Patwardhan
· 77,980 Views · 6 Likes
article thumbnail
Mockito & DBUnit: Implementing a Mocking Structure Focused and Independent to Automated Tests on Java
On this post, we will make a hands-on about Mockito and DBUnit, two libraries from Java's open source ecosystem which can help us in improving our JUnit tests on focus and independence. But why mocking is so important on our unit tests? Focusing the tests Let's imagine a Java back-end application with a tier-like architecture. On this application, we could have 2 tiers: The service tier, which have the business rules and make as a interface for the front-end; The entity tier, which have the logic responsible for making calls to a database, utilizing techonologies like JDBC or JPA; Of course, on a architecture of this kind, we will have the following dependence of our tiers: Service >>> Entity On this kind of architecture, the most common way of building our automated tests is by creating JUnit Test Classes which test each tier independently, thus we can make running tests that reflect only the correctness of the tier we want to test. However, if we simply create the classes without any mocking, we will got problems like the following: On the JUnit tests of our service tier, for example, if we have a problem on the entity tier, we will have also our tests failed, because the error from the entity tier will reverberate across the tiers; If we have a project where different teams are working on the same system, and one team is responsible for the construction of the service tier, while the other is responsible for the construction of the entity tier, we will have a dependency of one team with the other before the tests could be made; To resolve such issues, we could mock the entity tier on the service tier's unit test classes, so we can have independence and focus of our tests on the service tier, which it belongs. independence One point that it is specially important when we make our JUnit test classes in the independence department is the entity tier. Since in our example this tier is focused in the connection and running of SQL commands on a database, it makes a break on our independence goal, since we will need a database so we can run our tests. Not only that, if a test breaks any structure that it is used by the subsequent tests, all of them will also fail. It is on this point that enters our other library, DBUnit. With DBUnit, we can use embedded databases, such as HSQLDB, to make our database exclusive to the running of our tests. So, without further delay, let's begin our hands-on! Hands-on For this lab, we will create a basic CRUD for a Client entity. The structure will follow the simple example we talked about previously, with the DAO (entity) and Service tiers. We will use DBUnit and JUnit to test the DAO tier, and Mockito with JUnit to test the Service tier. First, let's create a Maven project, without any archetype and include the following dependencies on pom.xml: . . . junit junit 4.12 org.dbunit dbunit 2.5.0 org.mockito mockito-all 1.10.19 org.hibernate hibernate-entitymanager 4.3.8.Final org.hsqldb hsqldb 2.3.2 org.springframework spring-core 4.1.4.RELEASE org.springframework spring-context 4.1.5.RELEASE org.springframework spring-test 4.1.5.RELEASE org.springframework spring-tx 4.1.5.RELEASE org.springframework spring-orm 4.1.5.RELEASE . . . On the previous snapshot, we included not only the Mockito, DBUnit and JUnit libraries, but we also included Hibernate to implement the persistence layer and Spring 4 to use the IoC container and the transaction management. We also included the Spring Test library, which includes some features that we will use later on this lab. Finally, to simplify the setup and remove the need of installing a database to run the code, we will use HSQLDB as our database. Our lab will have the following structure: One class will represent the application itself, as a standalone class, where we will consume the tiers, like a real application would do; We will have another 2 classes, each one with JUnit tests, that will test each tier independently; First, we define a persistence unit, where we define the name of the unit and the properties to make Hibernate create the table for us and populate her with some initial rows. The code of the persistence.xml can be seen bellow: com.alexandreesl.handson.model.Client And the initial data to populate the table can be seen bellow: insert into Client(id,name,sex, phone) values (1,'Alexandre Eleuterio Santos Lourenco','M','22323456'); insert into Client(id,name,sex, phone) values (2,'Lucebiane Santos Lourenco','F','22323876'); insert into Client(id,name,sex, phone) values (3,'Maria Odete dos Santos Lourenco','F','22309456'); insert into Client(id,name,sex, phone) values (4,'Eleuterio da Silva Lourenco','M','22323956'); insert into Client(id,name,sex, phone) values (5,'Ana Carolina Fernandes do Sim','F','22123456'); In order to not making the post burdensome, we will not discuss the project structure during the lab, but just show the final structure at the end. The code can be found on a Github repository, at the end of the post. With the persistence unit defined, we can start coding! First, we create the entity class: package com.alexandreesl.handson.model; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; @Table(name = "Client") @Entity public class Client { @Id private long id; @Column(name = "name", nullable = false, length = 50) private String name; @Column(name = "sex", nullable = false) private String sex; @Column(name = "phone", nullable = false) private long phone; public long getId() { return id; } public void setId(long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public long getPhone() { return phone; } public void setPhone(long phone) { this.phone = phone; } } In order to create the persistence-related beans to enable Hibernate and the transaction manager, alongside all the rest of the beans necessary for the application, we use a Java-based Spring configuration class. The code of the class can be seen bellow: package com.alexandreesl.handson.core; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.datasource.DriverManagerDataSource; import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.orm.jpa.vendor.Database; import org.springframework.orm.jpa.vendor.HibernateJpaDialect; import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter; import org.springframework.transaction.annotation.EnableTransactionManagement; @Configuration @EnableTransactionManagement @ComponentScan({ "com.alexandreesl.handson.dao", "com.alexandreesl.handson.service" }) public class AppConfiguration { @Bean public DriverManagerDataSource dataSource() { DriverManagerDataSource dataSource = new DriverManagerDataSource(); dataSource.setDriverClassName("org.hsqldb.jdbcDriver"); dataSource.setUrl("jdbc:hsqldb:mem://standalone"); dataSource.setUsername("sa"); dataSource.setPassword(""); return dataSource; } @Bean public JpaTransactionManager transactionManager() { JpaTransactionManager transactionManager = new JpaTransactionManager(); transactionManager.setEntityManagerFactory(entityManagerFactory() .getNativeEntityManagerFactory()); transactionManager.setDataSource(dataSource()); transactionManager.setJpaDialect(jpaDialect()); return transactionManager; } @Bean public HibernateJpaDialect jpaDialect() { return new HibernateJpaDialect(); } @Bean public HibernateJpaVendorAdapter jpaVendorAdapter() { HibernateJpaVendorAdapter jpaVendor = new HibernateJpaVendorAdapter(); jpaVendor.setDatabase(Database.HSQL); jpaVendor.setDatabasePlatform("org.hibernate.dialect.HSQLDialect"); return jpaVendor; } @Bean public LocalContainerEntityManagerFactoryBean entityManagerFactory() { LocalContainerEntityManagerFactoryBean entityManagerFactory = new LocalContainerEntityManagerFactoryBean(); entityManagerFactory .setPersistenceXmlLocation("classpath:META-INF/persistence.xml"); entityManagerFactory.setPersistenceUnitName("persistence"); entityManagerFactory.setDataSource(dataSource()); entityManagerFactory.setJpaVendorAdapter(jpaVendorAdapter()); entityManagerFactory.setJpaDialect(jpaDialect()); return entityManagerFactory; } } And finally, we create the classes that represent the tiers itself. This is the DAO class: package com.alexandreesl.handson.dao; import javax.persistence.EntityManager; import javax.persistence.PersistenceContext; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import com.alexandreesl.handson.model.Client; @Component public class ClientDAO { @PersistenceContext private EntityManager entityManager; @Transactional(readOnly = true) public Client find(long id) { return entityManager.find(Client.class, id); } @Transactional public void create(Client client) { entityManager.persist(client); } @Transactional public void update(Client client) { entityManager.merge(client); } @Transactional public void delete(Client client) { entityManager.remove(client); } } And this is the service class: package com.alexandreesl.handson.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.alexandreesl.handson.dao.ClientDAO; import com.alexandreesl.handson.model.Client; @Component public class ClientService { @Autowired private ClientDAO clientDAO; public ClientDAO getClientDAO() { return clientDAO; } public void setClientDAO(ClientDAO clientDAO) { this.clientDAO = clientDAO; } public Client find(long id) { return clientDAO.find(id); } public void create(Client client) { clientDAO.create(client); } public void update(Client client) { clientDAO.update(client); } public void delete(Client client) { clientDAO.delete(client); } } The reader may notice that we created a getter/setter to the DAO class on the Service class. This is not necessary for the Spring injection, but we made this way to get easier to change the real DAO by a Mockito's mock on the tests class. Finally, we code the class we talked about previously, the one that consume the tiers: package com.alexandreesl.handson.core; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import com.alexandreesl.handson.model.Client; import com.alexandreesl.handson.service.ClientService; public class App { public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext( AppConfiguration.class); ClientService service = (ClientService) context .getBean(ClientService.class); System.out.println(service.find(1).getName()); System.out.println(service.find(3).getName()); System.out.println(service.find(5).getName()); Client client = new Client(); client.setId(6); client.setName("Celina do Sim"); client.setPhone(44657688); client.setSex("F"); service.create(client); System.out.println(service.find(6).getName()); System.exit(0); } } If we run the class, we can see that the console print all the clients we searched for and that Hibernate is initialized properly, proving our implementation is a success: Mar 28, 2015 1:09:22 PM org.springframework.context.annotation.AnnotationConfigApplicationContext prepareRefresh INFO: Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@6433a2: startup date [Sat Mar 28 13:09:22 BRT 2015]; root of context hierarchy Mar 28, 2015 1:09:22 PM org.springframework.jdbc.datasource.DriverManagerDataSource setDriverClassName INFO: Loaded JDBC driver: org.hsqldb.jdbcDriver Mar 28, 2015 1:09:22 PM org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean createNativeEntityManagerFactory INFO: Building JPA container EntityManagerFactory for persistence unit 'persistence' Mar 28, 2015 1:09:22 PM org.hibernate.jpa.internal.util.LogHelper logPersistenceUnitInformation INFO: HHH000204: Processing PersistenceUnitInfo [ name: persistence ...] Mar 28, 2015 1:09:22 PM org.hibernate.Version logVersion INFO: HHH000412: Hibernate Core {4.3.8.Final} Mar 28, 2015 1:09:22 PM org.hibernate.cfg.Environment INFO: HHH000206: hibernate.properties not found Mar 28, 2015 1:09:22 PM org.hibernate.cfg.Environment buildBytecodeProvider INFO: HHH000021: Bytecode provider name : javassist Mar 28, 2015 1:09:22 PM org.hibernate.annotations.common.reflection.java.JavaReflectionManager INFO: HCANN000001: Hibernate Commons Annotations {4.0.5.Final} Mar 28, 2015 1:09:23 PM org.hibernate.dialect.Dialect INFO: HHH000400: Using dialect: org.hibernate.dialect.HSQLDialect Mar 28, 2015 1:09:23 PM org.hibernate.hql.internal.ast.ASTQueryTranslatorFactory INFO: HHH000397: Using ASTQueryTranslatorFactory Mar 28, 2015 1:09:23 PM org.hibernate.tool.hbm2ddl.SchemaExport execute INFO: HHH000227: Running hbm2ddl schema export Mar 28, 2015 1:09:23 PM org.hibernate.tool.hbm2ddl.SchemaExport execute INFO: HHH000230: Schema export complete Alexandre Eleuterio Santos Lourenco Maria Odete dos Santos Lourenco Ana Carolina Fernandes do Sim Celina do Sim Now, let's move on for the tests themselves. For the DBUnit tests, we create a Base class, which will provide the base DB operations which all of our JUnit tests will benefit. On the @PostConstruct method, which is fired after all the injections of the Spring context are made - reason why we couldn't use the @BeforeClass annotation, because we need Spring to instantiate and inject the EntityManager first - we use DBUnit to make a connection to our database, with the class DatabaseConnection and populate the table using the DataSet class we created, passing a XML structure that represents the data used on the tests. This operation of populating the table is made by the DatabaseOperation class, which we use with the CLEAN_INSERT operation, that truncate the table first and them insert the data on the dataset. Finally, we use one of JUnit's event listeners, the @After event, which is called after every test case. On our scenario, we use this event to call the clear() method on the EntityManager, which forces Hibernate to query against the Database for the first time at every test case, thus eliminating possible problems we could have between our test cases because of data that it is different on the second level cache than it is on the DB. The code for the base class is the following: package com.alexandreesl.handson.dao.test; import java.io.InputStream; import java.sql.SQLException; import javax.annotation.PostConstruct; import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import javax.persistence.PersistenceUnit; import org.dbunit.DatabaseUnitException; import org.dbunit.database.DatabaseConfig; import org.dbunit.database.DatabaseConnection; import org.dbunit.database.IDatabaseConnection; import org.dbunit.dataset.IDataSet; import org.dbunit.dataset.xml.FlatXmlDataSetBuilder; import org.dbunit.ext.hsqldb.HsqldbDataTypeFactory; import org.dbunit.operation.DatabaseOperation; import org.hibernate.HibernateException; import org.hibernate.internal.SessionImpl; import org.junit.After; public class BaseDBUnitSetup { private static IDatabaseConnection connection; private static IDataSet dataset; @PersistenceUnit public EntityManagerFactory entityManagerFactory; private EntityManager entityManager; @PostConstruct public void init() throws HibernateException, DatabaseUnitException, SQLException { entityManager = entityManagerFactory.createEntityManager(); connection = new DatabaseConnection( ((SessionImpl) (entityManager.getDelegate())).connection()); connection.getConfig().setProperty( DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new HsqldbDataTypeFactory()); FlatXmlDataSetBuilder flatXmlDataSetBuilder = new FlatXmlDataSetBuilder(); InputStream dataSet = Thread.currentThread().getContextClassLoader() .getResourceAsStream("test-data.xml"); dataset = flatXmlDataSetBuilder.build(dataSet); DatabaseOperation.CLEAN_INSERT.execute(connection, dataset); } @After public void afterTests() { entityManager.clear(); } } The xml structure used on the test cases is the following: And the code of our test class of the DAO tier is the following: package com.alexandreesl.handson.dao.test; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.test.context.transaction.TransactionConfiguration; import org.springframework.transaction.annotation.Transactional; import com.alexandreesl.handson.core.test.AppTestConfiguration; import com.alexandreesl.handson.dao.ClientDAO; import com.alexandreesl.handson.model.Client; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = AppTestConfiguration.class) @TransactionConfiguration(defaultRollback = true) public class ClientDAOTest extends BaseDBUnitSetup { @Autowired private ClientDAO clientDAO; @Test public void testFind() { Client client = clientDAO.find(1); assertNotNull(client); client = clientDAO.find(2); assertNotNull(client); client = clientDAO.find(3); assertNull(client); client = clientDAO.find(4); assertNull(client); client = clientDAO.find(5); assertNull(client); } @Test @Transactional public void testInsert() { Client client = new Client(); client.setId(3); client.setName("Celina do Sim"); client.setPhone(44657688); client.setSex("F"); clientDAO.create(client); } @Test @Transactional public void testUpdate() { Client client = clientDAO.find(1); client.setPhone(12345678); clientDAO.update(client); } @Test @Transactional public void testRemove() { Client client = clientDAO.find(1); clientDAO.delete(client); } } The code is very self explanatory so we will just focus on explaining the annotations at the top-level class. The @RunWith(SpringJUnit4ClassRunner.class) annotationchanges the JUnit base class that runs our test cases, using rather one made by Spring that enable support of the IoC container and the Spring's annotations. The @TransactionConfiguration(defaultRollback = true) annotation is from Spring's test library and change the behavior of the @Transactional annotation, making the transactions to roll back after execution, instead of a commit. That ensures that our test cases wont change the structure of the DB, so a test case wont break the execution of his followers. The reader may notice that we changed the configuration class to another one, exclusive for the test cases. It is essentially the same beans we created on the original configuration class, just changing the database bean to point to a different DB then the previously one, showing that we can change the database of our tests without breaking the code. On a real world scenario, the configuration class of the application would be pointing to a relational database like Oracle, DB2, etc and the test cases would use a embedded database such as HSQLDB, which we are using on this case: package com.alexandreesl.handson.core.test; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.datasource.DriverManagerDataSource; import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.orm.jpa.vendor.Database; import org.springframework.orm.jpa.vendor.HibernateJpaDialect; import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter; import org.springframework.transaction.annotation.EnableTransactionManagement; @Configuration @EnableTransactionManagement @ComponentScan({ "com.alexandreesl.handson.dao", "com.alexandreesl.handson.service" }) public class AppTestConfiguration { @Bean public DriverManagerDataSource dataSource() { DriverManagerDataSource dataSource = new DriverManagerDataSource(); dataSource.setDriverClassName("org.hsqldb.jdbcDriver"); dataSource.setUrl("jdbc:hsqldb:mem://standalone-test"); dataSource.setUsername("sa"); dataSource.setPassword(""); return dataSource; } @Bean public JpaTransactionManager transactionManager() { JpaTransactionManager transactionManager = new JpaTransactionManager(); transactionManager.setEntityManagerFactory(entityManagerFactory() .getNativeEntityManagerFactory()); transactionManager.setDataSource(dataSource()); transactionManager.setJpaDialect(jpaDialect()); return transactionManager; } @Bean public HibernateJpaDialect jpaDialect() { return new HibernateJpaDialect(); } @Bean public HibernateJpaVendorAdapter jpaVendorAdapter() { HibernateJpaVendorAdapter jpaVendor = new HibernateJpaVendorAdapter(); jpaVendor.setDatabase(Database.HSQL); jpaVendor.setDatabasePlatform("org.hibernate.dialect.HSQLDialect"); return jpaVendor; } @Bean public LocalContainerEntityManagerFactoryBean entityManagerFactory() { LocalContainerEntityManagerFactoryBean entityManagerFactory = new LocalContainerEntityManagerFactoryBean(); entityManagerFactory .setPersistenceXmlLocation("classpath:META-INF/persistence.xml"); entityManagerFactory.setPersistenceUnitName("persistence"); entityManagerFactory.setDataSource(dataSource()); entityManagerFactory.setJpaVendorAdapter(jpaVendorAdapter()); entityManagerFactory.setJpaDialect(jpaDialect()); return entityManagerFactory; } } If we run the test class, we can see that it runs the test cases successfully, showing that our code is a success. If we see the console, we can see that transactions were created and rolled back, respecting our configuration: . . . ar 28, 2015 2:29:55 PM org.springframework.test.context.transaction.TransactionContext startTransaction INFO: Began transaction (1) for test context [DefaultTestContext@644abb8f testClass = ClientDAOTest, testInstance = com.alexandreesl.handson.dao.test.ClientDAOTest@1a411233, testMethod = testInsert@ClientDAOTest, testException = [null], mergedContextConfiguration = [MergedContextConfiguration@70325d20 testClass = ClientDAOTest, locations = '{}', classes = '{class com.alexandreesl.handson.core.test.AppTestConfiguration}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]; transaction manager [org.springframework.orm.jpa.JpaTransactionManager@7c2327fa]; rollback [true] Mar 28, 2015 2:29:55 PM org.springframework.test.context.transaction.TransactionContext endTransaction INFO: Rolled back transaction for test context [DefaultTestContext@644abb8f testClass = ClientDAOTest, testInstance = com.alexandreesl.handson.dao.test.ClientDAOTest@1a411233, testMethod = testInsert@ClientDAOTest, testException = [null], mergedContextConfiguration = [MergedContextConfiguration@70325d20 testClass = ClientDAOTest, locations = '{}', classes = '{class com.alexandreesl.handson.core.test.AppTestConfiguration}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]. Mar 28, 2015 2:29:55 PM org.springframework.test.context.transaction.TransactionContext startTransaction INFO: Began transaction (1) for test context [DefaultTestContext@644abb8f testClass = ClientDAOTest, testInstance = com.alexandreesl.handson.dao.test.ClientDAOTest@2adddc06, testMethod = testRemove@ClientDAOTest, testException = [null], mergedContextConfiguration = [MergedContextConfiguration@70325d20 testClass = ClientDAOTest, locations = '{}', classes = '{class com.alexandreesl.handson.core.test.AppTestConfiguration}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]; transaction manager [org.springframework.orm.jpa.JpaTransactionManager@7c2327fa]; rollback [true] Mar 28, 2015 2:29:55 PM org.springframework.test.context.transaction.TransactionContext endTransaction INFO: Rolled back transaction for test context [DefaultTestContext@644abb8f testClass = ClientDAOTest, testInstance = com.alexandreesl.handson.dao.test.ClientDAOTest@2adddc06, testMethod = testRemove@ClientDAOTest, testException = [null], mergedContextConfiguration = [MergedContextConfiguration@70325d20 testClass = ClientDAOTest, locations = '{}', classes = '{class com.alexandreesl.handson.core.test.AppTestConfiguration}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]. Mar 28, 2015 2:29:55 PM org.springframework.test.context.transaction.TransactionContext startTransaction INFO: Began transaction (1) for test context [DefaultTestContext@644abb8f testClass = ClientDAOTest, testInstance = com.alexandreesl.handson.dao.test.ClientDAOTest@4905c46b, testMethod = testUpdate@ClientDAOTest, testException = [null], mergedContextConfiguration = [MergedContextConfiguration@70325d20 testClass = ClientDAOTest, locations = '{}', classes = '{class com.alexandreesl.handson.core.test.AppTestConfiguration}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]; transaction manager [org.springframework.orm.jpa.JpaTransactionManager@7c2327fa]; rollback [true] Mar 28, 2015 2:29:55 PM org.springframework.test.context.transaction.TransactionContext endTransaction INFO: Rolled back transaction for test context [DefaultTestContext@644abb8f testClass = ClientDAOTest, testInstance = com.alexandreesl.handson.dao.test.ClientDAOTest@4905c46b, testMethod = testUpdate@ClientDAOTest, testException = [null], mergedContextConfiguration = [MergedContextConfiguration@70325d20 testClass = ClientDAOTest, locations = '{}', classes = '{class com.alexandreesl.handson.core.test.AppTestConfiguration}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]. Now let's move on to the Service tests, with the help of Mockito. The class to test the Service tier is very simple, as we can see bellow: package com.alexandreesl.handson.service.test; import static org.junit.Assert.assertEquals; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import com.alexandreesl.handson.dao.ClientDAO; import com.alexandreesl.handson.model.Client; import com.alexandreesl.handson.service.ClientService; public class ClientServiceTest { private static ClientDAO clientDAO; private static ClientService clientService; @BeforeClass public static void beforeClass() { clientService = new ClientService(); clientDAO = Mockito.mock(ClientDAO.class); clientService.setClientDAO(clientDAO); Client client = new Client(); client.setId(0); client.setName("Mocked client!"); client.setPhone(11111111); client.setSex("M"); Mockito.when(clientDAO.find(Mockito.anyLong())).thenReturn(client); Mockito.doThrow(new RuntimeException("error on client!")) .when(clientDAO).delete((Client) Mockito.any()); Mockito.doNothing().when(clientDAO).create((Client) Mockito.any()); Mockito.doAnswer(new Answer
April 14, 2015
by Alexandre Lourenco
· 21,791 Views · 2 Likes
  • Previous
  • ...
  • 828
  • 829
  • 830
  • 831
  • 832
  • 833
  • 834
  • 835
  • 836
  • 837
  • ...
  • Next
  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook
×