DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

The Latest Containers Topics

article thumbnail
In-Memory Data Grids
Introduction The IT buzzword of 2012 is without a doubt Big Data. It’s new and here to stay, and for a good reason. Big data is data that exceeds the processing capacity of conventional database systems. Great examples are CERN with the Large Hadron Collider, whose experiments generate 25 petabytes of data annually, or Walmart, which handles more than one million customer transaction every hour. Problems These vast amounts of data leave us with two problems. Problem 1: To gain value from this data, one must choose an alternative way to process it. The value of big data to an organization falls into two categories: analytical use, and enabling new products. Big data analytics can reveal insights hidden previously by data too costly to process, such as peer influence among customers, revealed by analyzing shoppers’ transactions, social and geographical data. Being able to process every item of data in reasonable time removes the troublesome need for sampling and promotes an investigative approach to data, in contrast to the somewhat static nature of running predetermined reports. Problem 2: The data is too big, moves too fast, or doesn’t fit the strictures of your database architectures. Remember the CERN case where the LHC produces over 25 Petabytes of data annually? No “classic” database architecture or setup is capable of holding these amounts of data. Solutions Fortunately, both problems can be solved by implementing the correct infrastructure and rethinking data storage. There are two critical factors in Big Data environments: size and speed. We already discussed the vast amounts of data and desire to be able to access and process the data fast. The latter is the main differentiator from more traditional data warehouses. Just imagine what you can do when you can access all your data real-time. Enter big data. A common Big Data implementation is an in-memory data grid that lives in a distributed cluster, ensuring both speed, by storing data in-memory, and capacity by using scalability features provided by a cluster. As a bonus, availability is ensured by using a distributed cluster. As for the data storage, there are typically two kinds: in-memory databases and in-memory data grids. But first some background. It is not a new attempt to use main memory as a storage area instead of a disk. In our daily lives there are numerous examples of main memory databases (MMDB), as they perform much faster than disk-based databases. An every day example is a mobile phone. When you SMS or call someone most mobile service providers use MMDB to get the information on your contact as soon as possible. The same applies to your phone. When someone calls you, the caller details are looked up in the contacts application, usually providing a name and sometimes a picture. In memory data grids In Memory Data Grid (IMDG) is the same as MMDB in that it stores data in main memory, but it has a totally different architecture. The features of IMDG can be summarized as follows: Data is distributed and stored on multiple servers. Each server operates in the active mode. A data model is usually object-oriented (serialized) and non-relational. According to the necessity, you often need to add or reduce servers. No traditional database features such as tables. In other words, IMDG is designed to store data in main memory, ensure scalability and store an object itself. These days, there are many IMDG products, both commercial and open source. Some of the most commonly used products are: Hazelcast (http://www.hazelcast.com) JBoss Infinispan (http://www.jboss.org/infinispan) GridGain DataGrid (http://www.gridgain.com/features/in-memory-data-grid/) VMware Gemfire (http://www.vmware.com/nl/products/application-platform/vfabric-gemfire/overview.html) Oracle Coherence (http://www.oracle.com/technetwork/middleware/coherence/overview/index.html) Gigaspaces XAP (http://www.gigaspaces.com/datagrid) Terracotta Enterprise Suite (http://terracotta.org/products/enterprise-suite) Why Memory? The main reasons for using main memory for data storage are once again the two main themes of Big Data: speed and capacity. The processing performance of main memory is 800 times faster than an HDD and up to 40 times faster than an. Moreover, the latest x86 server supports main memory of hundreds of GB per server. It is said that the limit of a traditional processing database’s (OLTP) data capacity is approximately 1 TB and that the OLTP processing data capacity would not increment well. If servers using main memory of 1 TB or larger become more commonly used, you will be able to conduct operations with the entire data placed in main memory, at least in the field of OLTP. IMDG Architecture To use main memory as a storage area, two weak points should be overcome: Limited capacity: involves data that exceeds the maximum capacity of the main memory of the server Reliability: involves data loss in case of a (system) failure. IMDG overcomes the limit of capacity by ensuring horizontal scalability using a distributed architecture, and resolves the issue of reliability through a replication system as part of the grid (or a distributed cluster). Now let’s discuss how an IMDG actually works. First of all, it is important to understand that an IMDG is not the same as an in-memory database, also referred to as MMDB (main memory databases). Typical examples of MMDBs are Oracle TimesTen or Sap Hana. MMDBs are full database products that simply reside in memory. As a result of being a full-blown database, they also carry the weight and overhead of database management features. IMDG is different. No tables, indexes, triggers, stored procedures, process managers etc. Just plain storage. The data model used in IMDG is key-value pairs. A key-value pair is a list with only two parts: a key and a value. The key can be used for storing and retrieving the values in the list. A key can be compared to the index or primary key of a table in a database. Note that IMDG are closely tied to development environments such as Java as the key-value pairs are represented by the structures provided by such a programming environment. Most IMDGs are written in Java, and can only be used within other Java applications. Therefore, the values of key-value pairs can be anything supported by Java, ranging from simple data types such as a string or number, to complex objects. This overcomes the two important hurdles: as you can store complex Java objects as value, there’s no need to translate these objects into a relational datamodel (which is the case in more traditional applications using a database for storage). Furthermore, the seeming limitation of being able to store only one value per key, is actually no limitation at all. Large memory sizes Most of the products introduced above use Java as an implementation language. Java reserves and uses a part of the RAM (internal memory) for dynamic memory allocation. This reserved memory space is called the Java heap. All runtime objects created by a Java application are stored in heap. Using large amounts of data causes two problems. Size limitation: By default, the heap size is 128 MB, but for current business applications, this limit is reached easily. Once the heap is “full”, no new objects can be created and the Java application will show some nasty errors. Performance: It is possible to increase the size of the heap, but this introduces some new problems. When a heap reaches a size of more than 4 gigabytes, Java will have serious issues with memory managements, causing your application to slow down or even freeze. Java has a feature called Garbage Collector, which periodically scans the heap and checks each object if it is still valid and being used. If not, the garbage collector removes the object and defragments the newly available space. The problem is, the larger the heap size, the more work to do for the garbage collector, resulting in performance degradation. Imagine a large bank has a Java application that manages customers, accounts and transactions. We have seen that an IMDG allows the application to store and access all data very quickly by caching it in memory, instead of storing the data in relatively slow databases. Let’s assume the combined data has a size of 40 gigabytes. Storing it in heap is simply not possible, considering the performance penalties of Java’s memory management capabilities. The graph below illustrates the garbage collection pause time when placing cached data in heap: Terracotta’s BigMemory product has a method to overcome these limitations. The method is to use an off-heap memory (direct buffer). Data will not be stored in Java’s heap, but directly in the available internal memory (RAM). Since, this is not subject to Java’s garbage collector, there are no performance penalties. The differences on performance are significant, as can be seen in the graph below: Using off-heap storage has some major benefits: You can use all the available memory on your machine, not just the memory that is allocated to the heap (usually less that 512 Mb). This allows you to store more data in a in-memory data grid, greatly speeding up your application. The heap can be relieved by storing data in native memory, speeding up Java applications as less heap space has to be garbage collected. Clustering, fail over and high availability So far, we have seen IMDG features that are applicable to a single server. However, the real power of IMDG lies in it’s networking and clustering capabilities, providing features as data replication, data synchronization between clients, fail over and high availability. To achieve this, a cluster of servers (or server array) acts a backbone of the infrastructure. Applications (that still can have their own IMDG or off-heap cache) that are connected to the cluster can share, replicate and backup their data with either the cluster or other applications. The graph below depicts a typical setup using Terracotta's BigMemory: The caches on the application servers are usually referred to as “level 1” cache, while the data cache on the server array is referred to as “level 2” cache. There are many different scenarios possible for storing, clustering, synchronizing and replicating data. Covering all these topics goes far beyond the scope of this article. For more information, consult the technical documentation of the product of your choice. Conclusion Big Data brings us some new challenges. First of all, storing and accessing vast amounts of data makes us rethink traditional methods and technologies. Next, there’s the question what to do with all the available data. The potential value for marketing, financial and other businesses is huge. In order to facilitate Big Data, in-memory data grids are considered the best option. IMDGs with off-heap storage are even more powerful, allowing data centric enterprise application to overcome certain limits of the Java platform, such as memory and performance constraints. As the amount of data that (large) companies produce and store, grows exponentially, databases will hit a limit. Accessing your data without a performance penalty simply will not be possible. The answer to this is using an IMDG.
March 13, 2013
by Roy Prins
· 32,632 Views · 5 Likes
article thumbnail
Spring, JMS, Listener Adapters, and Containers
In order to receive JMS messages, Spring provides the concept of message listener containers. These are beans that can be tied to receive messages that arrive at certain destinations. This post will examine the different ways in which containers can be configured. A simple example is below where the DefaultMessageListenerContianer has been configured to watch one queue (the property jms.queue.name) and has a reference to a myMessageListener bean which implements the MessageListener interface (ie onMessage): This is all very well but means that the myMessageListener bean will have to handle the JMS Message object and process accordingly depending upon the type of javax.jms.Message and its payload. For example: if (message instanceof MapMessage) { // cast, get object, do something } An alternative is to use a MessageListenerAdapter. This class abstracts away the above processing and leaves your code to deal with just the message's payload. For example: The delegate is a reference to a myMessageReceiverDelegate bean which has one or more methods called processMessage. It does not need to implement the MessageListener interface. This method can be overload to handle different payload types. Spring behind the scenes will determine which gets called. For example: public void processMessage(final HashMap message) { // do something } public void processMessage(final String message) { // do something } For the given approach though, only one queue can be tied to the container. Another approach is to tie many listeners (therefore many queues) to the one container, The below Spring XML, using the jms namespace, shows how two listeners for different queues can be tied to one container: The myMessageReceiverDelegate bean is treated as an adapter delegate, therefore does not need to implement the MessageListener interface. Each listener can have a different delegate but for the above example, all messages arriving at the two queues are processed by the one receiver bean ie myMessageReceiverDelegate. If there is a need to check the message type and extract the payload, then the listener can use a class which implements the MessageListener interface (eg the myMessageListener bean used in the first example). The onMessage method will then be called when messages arrive at the specified destination:
February 28, 2013
by Geraint Jones
· 72,548 Views · 2 Likes
article thumbnail
Hazelcast Distributed Execution with Spring
The ExecutorService feature had come with Java 5 and is under the java.util.concurrent package. It extends the Executor interface and provides a thread pool functionality to execute asynchronous short tasks. Java Executor Service Types is suggested to look over basic ExecutorService implementation. Also ThreadPoolExecutor is a very useful implementation of ExecutorService ınterface. It extends AbstractExecutorService providing default implementations of ExecutorService execution methods. It provides improved performance when executing large numbers of asynchronous tasks and maintains basic statistics, such as the number of completed tasks. How to develop and monitor Thread Pool Services by using Spring is also suggested to investigate how to develop and monitor Thread Pool Services. So far, we have just talked Undistributed Executor Service implementation. Let us also investigate Distributed Executor Service. Hazelcast Distributed Executor Service feature is a distributed implementation of java.util.concurrent.ExecutorService. It allows to execute business logic in cluster. There are four alternative ways to realize it : 1) The logic can be executed on a specific cluster member which is chosen. 2) The logic can be executed on the member owning the key which is chosen. 3) The logic can be executed on the member Hazelcast will pick. 4) The logic can be executed on all or subset of the cluster members. This article shows how to develop Distributed Executor Service via Hazelcast and Spring. Used Technologies : JDK 1.7.0_09 Spring 3.1.3 Hazelcast 2.4 Maven 3.0.4 STEP 1 : CREATE MAVEN PROJECT A maven project is created as below. (It can be created by using Maven or IDE Plug-in). STEP 2 : LIBRARIES Firstly, Spring dependencies are added to Maven’ s pom.xml 3.1.3.RELEASE UTF-8 org.springframework spring-core ${spring.version} org.springframework spring-context ${spring.version} com.hazelcast hazelcast-all 2.4 log4j log4j 1.2.16 maven-compiler-plugin(Maven Plugin) is used to compile the project with JDK 1.7 org.apache.maven.plugins maven-compiler-plugin 3.0 1.7 1.7 maven-shade-plugin(Maven Plugin) can be used to create runnable-jar org.apache.maven.plugins maven-shade-plugin 2.0 package shade com.onlinetechvision.exe.Application META-INF/spring.handlers META-INF/spring.schemas STEP 3 : CREATE Customer BEAN A new Customer bean is created. This bean will be distributed between two node in OTV cluster. In the following sample, all defined properties(id, name and surname)’ types are String and standart java.io.Serializable interface has been implemented for serializing. If custom or third-party object types are used, com.hazelcast.nio.DataSerializable interface can be implemented for better serialization performance. package com.onlinetechvision.customer; import java.io.Serializable; /** * Customer Bean. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class Customer implements Serializable { private static final long serialVersionUID = 1856862670651243395L; private String id; private String name; private String surname; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getSurname() { return surname; } public void setSurname(String surname) { this.surname = surname; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((id == null) ? 0 : id.hashCode()); result = prime * result + ((name == null) ? 0 : name.hashCode()); result = prime * result + ((surname == null) ? 0 : surname.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Customer other = (Customer) obj; if (id == null) { if (other.id != null) return false; } else if (!id.equals(other.id)) return false; if (name == null) { if (other.name != null) return false; } else if (!name.equals(other.name)) return false; if (surname == null) { if (other.surname != null) return false; } else if (!surname.equals(other.surname)) return false; return true; } @Override public String toString() { return "Customer [id=" + id + ", name=" + name + ", surname=" + surname + "]"; } } STEP 4 : CREATE ICacheService INTERFACE A new ICacheService Interface is created for service layer to expose cache functionality. package com.onlinetechvision.cache.srv; import com.hazelcast.core.IMap; import com.onlinetechvision.customer.Customer; /** * A new ICacheService Interface is created for service layer to expose cache functionality. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public interface ICacheService { /** * Adds Customer entries to cache * * @param String key * @param Customer customer * */ void addToCache(String key, Customer customer); /** * Deletes Customer entries from cache * * @param String key * */ void deleteFromCache(String key); /** * Gets Customer cache * * @return IMap Coherence named cache */ IMap getCache(); } STEP 5 : CREATE CacheService IMPLEMENTATION CacheService is implementation of ICacheService Interface. package com.onlinetechvision.cache.srv; import com.hazelcast.core.IMap; import com.onlinetechvision.customer.Customer; import com.onlinetechvision.test.listener.CustomerEntryListener; /** * CacheService Class is implementation of ICacheService Interface. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class CacheService implements ICacheService { private IMap customerMap; /** * Constructor of CacheService * * @param IMap customerMap * */ @SuppressWarnings("unchecked") public CacheService(IMap customerMap) { setCustomerMap(customerMap); getCustomerMap().addEntryListener(new CustomerEntryListener(), true); } /** * Adds Customer entries to cache * * @param String key * @param Customer customer * */ @Override public void addToCache(String key, Customer customer) { getCustomerMap().put(key, customer); } /** * Deletes Customer entries from cache * * @param String key * */ @Override public void deleteFromCache(String key) { getCustomerMap().remove(key); } /** * Gets Customer cache * * @return IMap Coherence named cache */ @Override public IMap getCache() { return getCustomerMap(); } public IMap getCustomerMap() { return customerMap; } public void setCustomerMap(IMap customerMap) { this.customerMap = customerMap; } } STEP 6 : CREATE IDistributedExecutorService INTERFACE A new IDistributedExecutorService Interface is created for service layer to expose distributed execution functionality. package com.onlinetechvision.executor.srv; import java.util.Collection; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import com.hazelcast.core.Member; /** * A new IDistributedExecutorService Interface is created for service layer to expose distributed execution functionality. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public interface IDistributedExecutorService { /** * Executes the callable object on stated member * * @param Callable callable * @param Member member * @throws InterruptedException * @throws ExecutionException * */ String executeOnStatedMember(Callable callable, Member member) throws InterruptedException, ExecutionException; /** * Executes the callable object on member owning the key * * @param Callable callable * @param Object key * @throws InterruptedException * @throws ExecutionException * */ String executeOnTheMemberOwningTheKey(Callable callable, Object key) throws InterruptedException, ExecutionException; /** * Executes the callable object on any member * * @param Callable callable * @throws InterruptedException * @throws ExecutionException * */ String executeOnAnyMember(Callable callable) throws InterruptedException, ExecutionException; /** * Executes the callable object on all members * * @param Callable callable * @param Set all members * @throws InterruptedException * @throws ExecutionException * */ Collection executeOnMembers(Callable callable, Set members) throws InterruptedException, ExecutionException; } STEP 7 : CREATE DistributedExecutorService IMPLEMENTATION DistributedExecutorService is implementation of IDistributedExecutorService Interface. package com.onlinetechvision.executor.srv; import java.util.Collection; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import org.apache.log4j.Logger; import com.hazelcast.core.DistributedTask; import com.hazelcast.core.Member; import com.hazelcast.core.MultiTask; /** * DistributedExecutorService Class is implementation of IDistributedExecutorService Interface. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class DistributedExecutorService implements IDistributedExecutorService { private static final Logger logger = Logger.getLogger(DistributedExecutorService.class); private ExecutorService hazelcastDistributedExecutorService; /** * Executes the callable object on stated member * * @param Callable callable * @param Member member * @throws InterruptedException * @throws ExecutionException * */ @SuppressWarnings("unchecked") public String executeOnStatedMember(Callable callable, Member member) throws InterruptedException, ExecutionException { logger.debug("Method executeOnStatedMember is called..."); ExecutorService executorService = getHazelcastDistributedExecutorService(); FutureTask task = (FutureTask) executorService.submit( new DistributedTask(callable, member)); String result = task.get(); logger.debug("Result of method executeOnStatedMember is : " + result); return result; } /** * Executes the callable object on member owning the key * * @param Callable callable * @param Object key * @throws InterruptedException * @throws ExecutionException * */ @SuppressWarnings("unchecked") public String executeOnTheMemberOwningTheKey(Callable callable, Object key) throws InterruptedException, ExecutionException { logger.debug("Method executeOnTheMemberOwningTheKey is called..."); ExecutorService executorService = getHazelcastDistributedExecutorService(); FutureTask task = (FutureTask) executorService.submit(new DistributedTask(callable, key)); String result = task.get(); logger.debug("Result of method executeOnTheMemberOwningTheKey is : " + result); return result; } /** * Executes the callable object on any member * * @param Callable callable * @throws InterruptedException * @throws ExecutionException * */ public String executeOnAnyMember(Callable callable) throws InterruptedException, ExecutionException { logger.debug("Method executeOnAnyMember is called..."); ExecutorService executorService = getHazelcastDistributedExecutorService(); Future task = executorService.submit(callable); String result = task.get(); logger.debug("Result of method executeOnAnyMember is : " + result); return result; } /** * Executes the callable object on all members * * @param Callable callable * @param Set all members * @throws InterruptedException * @throws ExecutionException * */ public Collection executeOnMembers(Callable callable, Set members) throws ExecutionException, InterruptedException { logger.debug("Method executeOnMembers is called..."); MultiTask task = new MultiTask(callable, members); ExecutorService executorService = getHazelcastDistributedExecutorService(); executorService.execute(task); Collection results = task.get(); logger.debug("Result of method executeOnMembers is : " + results.toString()); return results; } public ExecutorService getHazelcastDistributedExecutorService() { return hazelcastDistributedExecutorService; } public void setHazelcastDistributedExecutorService(ExecutorService hazelcastDistributedExecutorService) { this.hazelcastDistributedExecutorService = hazelcastDistributedExecutorService; } } STEP 8 : CREATE TestCallable CLASS TestCallable Class shows business logic to be executed. TestCallable task for first member of the cluster : package com.onlinetechvision.task; import java.io.Serializable; import java.util.concurrent.Callable; /** * TestCallable Class shows business logic to be executed. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class TestCallable implements Callable, Serializable{ private static final long serialVersionUID = -1839169907337151877L; /** * Computes a result, or throws an exception if unable to do so. * * @return String computed result * @throws Exception if unable to compute a result */ public String call() throws Exception { return "First Member' s TestCallable Task is called..."; } } TestCallable task for second member of the cluster : package com.onlinetechvision.task; import java.io.Serializable; import java.util.concurrent.Callable; /** * TestCallable Class shows business logic to be executed. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class TestCallable implements Callable, Serializable{ private static final long serialVersionUID = -1839169907337151877L; /** * Computes a result, or throws an exception if unable to do so. * * @return String computed result * @throws Exception if unable to compute a result */ public String call() throws Exception { return "Second Member' s TestCallable Task is called..."; } } STEP 9 : CREATE AnotherAvailableMemberNotFoundException CLASS AnotherAvailableMemberNotFoundException is thrown when another available member is not found. To avoid this exception, first node should be started before the second node. package com.onlinetechvision.exception; /** * AnotherAvailableMemberNotFoundException is thrown when another available member is not found. * To avoid this exception, first node should be started before the second node. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class AnotherAvailableMemberNotFoundException extends Exception { private static final long serialVersionUID = -3954360266393077645L; /** * Constructor of AnotherAvailableMemberNotFoundException * * @param String Exception message * */ public AnotherAvailableMemberNotFoundException(String message) { super(message); } } STEP 10 : CREATE CustomerEntryListener CLASS CustomerEntryListener Class listens entry changes on named cache object. package com.onlinetechvision.test.listener; import com.hazelcast.core.EntryEvent; import com.hazelcast.core.EntryListener; /** * CustomerEntryListener Class listens entry changes on named cache object. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ @SuppressWarnings("rawtypes") public class CustomerEntryListener implements EntryListener { /** * Invoked when an entry is added. * * @param EntryEvent * */ public void entryAdded(EntryEvent ee) { System.out.println("EntryAdded... Member : " + ee.getMember() + ", Key : "+ee.getKey()+", OldValue : "+ee.getOldValue()+", NewValue : "+ee.getValue()); } /** * Invoked when an entry is removed. * * @param EntryEvent * */ public void entryRemoved(EntryEvent ee) { System.out.println("EntryRemoved... Member : " + ee.getMember() + ", Key : "+ee.getKey()+", OldValue : "+ee.getOldValue()+", NewValue : "+ee.getValue()); } /** * Invoked when an entry is evicted. * * @param EntryEvent * */ public void entryEvicted(EntryEvent ee) { } /** * Invoked when an entry is updated. * * @param EntryEvent * */ public void entryUpdated(EntryEvent ee) { } } STEP 11 : CREATE Starter CLASS Starter Class loads Customers to cache and executes distributed tasks. Starter Class of first member of the cluster : package com.onlinetechvision.exe; import com.onlinetechvision.cache.srv.ICacheService; import com.onlinetechvision.customer.Customer; /** * Starter Class loads Customers to cache and executes distributed tasks. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class Starter { private ICacheService cacheService; /** * Loads cache and executes the tasks * */ public void start() { loadCacheForFirstMember(); } /** * Loads Customers to cache * */ public void loadCacheForFirstMember() { Customer firstCustomer = new Customer(); firstCustomer.setId("1"); firstCustomer.setName("Jodie"); firstCustomer.setSurname("Foster"); Customer secondCustomer = new Customer(); secondCustomer.setId("2"); secondCustomer.setName("Kate"); secondCustomer.setSurname("Winslet"); getCacheService().addToCache(firstCustomer.getId(), firstCustomer); getCacheService().addToCache(secondCustomer.getId(), secondCustomer); } public ICacheService getCacheService() { return cacheService; } public void setCacheService(ICacheService cacheService) { this.cacheService = cacheService; } } Starter Class of second member of the cluster : package com.onlinetechvision.exe; import java.util.Set; import java.util.concurrent.ExecutionException; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.Member; import com.onlinetechvision.cache.srv.ICacheService; import com.onlinetechvision.customer.Customer; import com.onlinetechvision.exception.AnotherAvailableMemberNotFoundException; import com.onlinetechvision.executor.srv.IDistributedExecutorService; import com.onlinetechvision.task.TestCallable; /** * Starter Class loads Customers to cache and executes distributed tasks. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class Starter { private String hazelcastInstanceName; private Hazelcast hazelcast; private IDistributedExecutorService distributedExecutorService; private ICacheService cacheService; /** * Loads cache and executes the tasks * */ public void start() { loadCache(); executeTasks(); } /** * Loads Customers to cache * */ public void loadCache() { Customer firstCustomer = new Customer(); firstCustomer.setId("3"); firstCustomer.setName("Bruce"); firstCustomer.setSurname("Willis"); Customer secondCustomer = new Customer(); secondCustomer.setId("4"); secondCustomer.setName("Colin"); secondCustomer.setSurname("Farrell"); getCacheService().addToCache(firstCustomer.getId(), firstCustomer); getCacheService().addToCache(secondCustomer.getId(), secondCustomer); } /** * Executes Tasks * */ public void executeTasks() { try { getDistributedExecutorService().executeOnStatedMember(new TestCallable(), getAnotherMember()); getDistributedExecutorService().executeOnTheMemberOwningTheKey(new TestCallable(), "3"); getDistributedExecutorService().executeOnAnyMember(new TestCallable()); getDistributedExecutorService().executeOnMembers(new TestCallable(), getAllMembers()); } catch (InterruptedException | ExecutionException | AnotherAvailableMemberNotFoundException e) { e.printStackTrace(); } } /** * Gets cluster members * * @return Set Set of Cluster Members * */ private Set getAllMembers() { Set members = getHazelcastLocalInstance().getCluster().getMembers(); return members; } /** * Gets an another member of cluster * * @return Member Another Member of Cluster * @throws AnotherAvailableMemberNotFoundException An Another Available Member can not found exception */ private Member getAnotherMember() throws AnotherAvailableMemberNotFoundException { Set members = getAllMembers(); for(Member member : members) { if(!member.localMember()) { return member; } } throw new AnotherAvailableMemberNotFoundException("No Other Available Member on the cluster. Please be aware that all members are active on the cluster"); } /** * Gets Hazelcast local instance * * @return HazelcastInstance Hazelcast local instance */ @SuppressWarnings("static-access") private HazelcastInstance getHazelcastLocalInstance() { HazelcastInstance instance = getHazelcast().getHazelcastInstanceByName(getHazelcastInstanceName()); return instance; } public String getHazelcastInstanceName() { return hazelcastInstanceName; } public void setHazelcastInstanceName(String hazelcastInstanceName) { this.hazelcastInstanceName = hazelcastInstanceName; } public Hazelcast getHazelcast() { return hazelcast; } public void setHazelcast(Hazelcast hazelcast) { this.hazelcast = hazelcast; } public IDistributedExecutorService getDistributedExecutorService() { return distributedExecutorService; } public void setDistributedExecutorService(IDistributedExecutorService distributedExecutorService) { this.distributedExecutorService = distributedExecutorService; } public ICacheService getCacheService() { return cacheService; } public void setCacheService(ICacheService cacheService) { this.cacheService = cacheService; } } STEP 12 : CREATE hazelcast-config.properties FILE hazelcast-config.properties file shows the properties of cluster members. First member properties : hz.instance.name = OTVInstance1 hz.group.name = dev hz.group.password = dev hz.management.center.enabled = true hz.management.center.url = http://localhost:8080/mancenter hz.network.port = 5701 hz.network.port.auto.increment = false hz.tcp.ip.enabled = true hz.members = 192.168.1.32 hz.executor.service.core.pool.size = 2 hz.executor.service.max.pool.size = 30 hz.executor.service.keep.alive.seconds = 30 hz.map.backup.count=2 hz.map.max.size=0 hz.map.eviction.percentage=30 hz.map.read.backup.data=true hz.map.cache.value=true hz.map.eviction.policy=NONE hz.map.merge.policy=hz.ADD_NEW_ENTRY Second member properties : hz.instance.name = OTVInstance2 hz.group.name = dev hz.group.password = dev hz.management.center.enabled = true hz.management.center.url = http://localhost:8080/mancenter hz.network.port = 5702 hz.network.port.auto.increment = false hz.tcp.ip.enabled = true hz.members = 192.168.1.32 hz.executor.service.core.pool.size = 2 hz.executor.service.max.pool.size = 30 hz.executor.service.keep.alive.seconds = 30 hz.map.backup.count=2 hz.map.max.size=0 hz.map.eviction.percentage=30 hz.map.read.backup.data=true hz.map.cache.value=true hz.map.eviction.policy=NONE hz.map.merge.policy=hz.ADD_NEW_ENTRY STEP 13 : CREATE applicationContext-hazelcast.xml Spring Hazelcast Configuration file, applicationContext-hazelcast.xml, is created and Hazelcast Distributed Executor Service and Hazelcast Instance are configured. ${hz.instance.name} ${hz.members} STEP 14 : CREATE applicationContext.xml Spring Configuration file, applicationContext.xml, is created. classpath:/hazelcast-config.properties STEP 15 : CREATE Application CLASS Application Class is created to run the application. ackage com.onlinetechvision.exe; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * Application class starts the application * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class Application { /** * Starts the application * * @param String[] args * */ public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml"); Starter starter = (Starter) context.getBean("starter"); starter.start(); } } STEP 16 : BUILD PROJECT After OTV_Spring_Hazelcast_DistributedExecution Project is built, OTV_Spring_Hazelcast_DistributedExecution-0.0.1-SNAPSHOT.jar will be created. Important Note : The Members of the cluster have got different configuration for Coherence so the project should be built separately for each member. STEP 17 : INTEGRATION with HAZELCAST MANAGEMENT CENTER Hazelcast Management Center enables to monitor and manage nodes in the cluster. Entity and backup counts which are owned by customerMap, can be seen via Map Memory Data Table. We have distributed 4 entries via customerMap as shown below : Sample keys and values can be seen via Map Browser : Added First Entry : Added Third Entry : hazelcastDistributedExecutorService details can be seen via Executors tab. We have executed 3 task on first member and 2 tasks on second member as shown below : STEP 18 : RUN PROJECT BY STARTING THE CLUSTER’ s MEMBER After created OTV_Spring_Hazelcast_DistributedExecution-0.0.1-SNAPSHOT.jar file is run at the cluster’ s members, the following console output logs will be shown : First member console output : Kas 25, 2012 4:07:20 PM com.hazelcast.impl.AddressPicker INFO: Interfaces is disabled, trying to pick one address from TCP-IP config addresses: [x.y.z.t] Kas 25, 2012 4:07:20 PM com.hazelcast.impl.AddressPicker INFO: Prefer IPv4 stack is true. Kas 25, 2012 4:07:20 PM com.hazelcast.impl.AddressPicker INFO: Picked Address[x.y.z.t]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true Kas 25, 2012 4:07:21 PM com.hazelcast.system INFO: [x.y.z.t]:5701 [dev] Hazelcast Community Edition 2.4 (20121017) starting at Address[x.y.z.t]:5701 Kas 25, 2012 4:07:21 PM com.hazelcast.system INFO: [x.y.z.t]:5701 [dev] Copyright (C) 2008-2012 Hazelcast.com Kas 25, 2012 4:07:21 PM com.hazelcast.impl.LifecycleServiceImpl INFO: [x.y.z.t]:5701 [dev] Address[x.y.z.t]:5701 is STARTING Kas 25, 2012 4:07:24 PM com.hazelcast.impl.TcpIpJoiner INFO: [x.y.z.t]:5701 [dev] --A new cluster is created and First Member joins the cluster. Members [1] { Member [x.y.z.t]:5701 this } Kas 25, 2012 4:07:24 PM com.hazelcast.impl.MulticastJoiner INFO: [x.y.z.t]:5701 [dev] Members [1] { Member [x.y.z.t]:5701 this } ... -- First member adds two new entries to the cache... EntryAdded... Member : Member [x.y.z.t]:5701 this, Key : 1, OldValue : null, NewValue : Customer [id=1, name=Jodie, surname=Foster] EntryAdded... Member : Member [x.y.z.t]:5701 this, Key : 2, OldValue : null, NewValue : Customer [id=2, name=Kate, surname=Winslet] ... --Second Member joins the cluster. Members [2] { Member [x.y.z.t]:5701 this Member [x.y.z.t]:5702 } ... -- Second member adds two new entries to the cache... EntryAdded... Member : Member [x.y.z.t]:5702, Key : 4, OldValue : null, NewValue : Customer [id=4, name=Colin, surname=Farrell] EntryAdded... Member : Member [x.y.z.t]:5702, Key : 3, OldValue : null, NewValue : Customer [id=3, name=Bruce, surname=Willis] Second member console output : Kas 25, 2012 4:07:48 PM com.hazelcast.impl.AddressPicker INFO: Interfaces is disabled, trying to pick one address from TCP-IP config addresses: [x.y.z.t] Kas 25, 2012 4:07:48 PM com.hazelcast.impl.AddressPicker INFO: Prefer IPv4 stack is true. Kas 25, 2012 4:07:48 PM com.hazelcast.impl.AddressPicker INFO: Picked Address[x.y.z.t]:5702, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5702], bind any local is true Kas 25, 2012 4:07:49 PM com.hazelcast.system INFO: [x.y.z.t]:5702 [dev] Hazelcast Community Edition 2.4 (20121017) starting at Address[x.y.z.t]:5702 Kas 25, 2012 4:07:49 PM com.hazelcast.system INFO: [x.y.z.t]:5702 [dev] Copyright (C) 2008-2012 Hazelcast.com Kas 25, 2012 4:07:49 PM com.hazelcast.impl.LifecycleServiceImpl INFO: [x.y.z.t]:5702 [dev] Address[x.y.z.t]:5702 is STARTING Kas 25, 2012 4:07:49 PM com.hazelcast.impl.Node INFO: [x.y.z.t]:5702 [dev] ** setting master address to Address[x.y.z.t]:5701 Kas 25, 2012 4:07:49 PM com.hazelcast.impl.MulticastJoiner INFO: [x.y.z.t]:5702 [dev] Connecting to master node: Address[x.y.z.t]:5701 Kas 25, 2012 4:07:49 PM com.hazelcast.nio.ConnectionManager INFO: [x.y.z.t]:5702 [dev] 55715 accepted socket connection from /x.y.z.t:5701 Kas 25, 2012 4:07:55 PM com.hazelcast.cluster.ClusterManager INFO: [x.y.z.t]:5702 [dev] --Second Member joins the cluster. Members [2] { Member [x.y.z.t]:5701 Member [x.y.z.t]:5702 this } Kas 25, 2012 4:07:56 PM com.hazelcast.impl.LifecycleServiceImpl INFO: [x.y.z.t]:5702 [dev] Address[x.y.z.t]:5702 is STARTED -- Second member adds two new entries to the cache... EntryAdded... Member : Member [x.y.z.t]:5702 this, Key : 3, OldValue : null, NewValue : Customer [id=3, name=Bruce, surname=Willis] EntryAdded... Member : Member [x.y.z.t]:5702 this, Key : 4, OldValue : null, NewValue : Customer [id=4, name=Colin, surname=Farrell] 25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:42) - Method executeOnStatedMember is called... 25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:46) - Result of method executeOnStatedMember is : First Member' s TestCallable Task is called... 25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:61) - Method executeOnTheMemberOwningTheKey is called... 25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:65) - Result of method executeOnTheMemberOwningTheKey is : First Member' s TestCallable Task is called... 25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:78) - Method executeOnAnyMember is called... 25.11.2012 16:07:57 DEBUG (DistributedExecutorService.java:82) - Result of method executeOnAnyMember is : Second Member' s TestCallable Task is called... 25.11.2012 16:07:57 DEBUG (DistributedExecutorService.java:96) - Method executeOnMembers is called... 25.11.2012 16:07:57 DEBUG (DistributedExecutorService.java:101) - Result of method executeOnMembers is : [First Member' s TestCallable Task is called..., Second Member' s TestCallable Task is called...] STEP 19 : DOWNLOAD https://github.com/erenavsarogullari/OTV_Spring_Hazelcast_DistributedExecution REFERENCES : Java ExecutorService Interface Hazelcast Distributed Executor Service
December 11, 2012
by Eren Avsarogullari
· 29,924 Views · 1 Like
article thumbnail
EasyNetQ Cluster Support
EasyNetQ, my super simple .NET API for RabbitMQ, now (from version 0.7.2.34) supports RabbitMQ clusters without any need to deploy a load balancer. Simply list the nodes of the cluster in the connection string ... var bus = RabbitHutch.CreateBus("host=ubuntu:5672,ubuntu:5673"); In this example I have set up a cluster on a single machine, 'ubuntu', with node 1 on port 5672 and node 2 on port 5673. When the CreateBus statement executes, EasyNetQ will attempt to connect to the first host listed (ubuntu:5672). If it fails to connect it will attempt to connect to the second host listed (ubuntu:5673). If neither node is available it will sit in a re-try loop attempting to connect to both servers every five seconds. It logs all this activity to the registered IEasyNetQLogger. You might see something like this if the first node was unavailable: DEBUG: Trying to connect ERROR: Failed to connect to Broker: 'ubuntu', Port: 5672 VHost: '/'. ExceptionMessage: 'None of the specified endpoints were reachable' DEBUG: OnConnected event fired INFO: Connected to RabbitMQ. Broker: 'ubuntu', Port: 5674, VHost: '/' If the node that EasyNetQ is connected to fails, EasyNetQ will attempt to connect to the next listed node. Once connected, it will re-declare all the exchanges and queues and re-start all the consumers. Here's an example log record showing one node failing then EasyNetQ connecting to the other node and recreating the subscribers: INFO: Disconnected from RabbitMQ Broker DEBUG: Trying to connect DEBUG: OnConnected event fired DEBUG: Re-creating subscribers INFO: Connected to RabbitMQ. Broker: 'ubuntu', Port: 5674, VHost: '/' You get automatic fail-over out of the box. That’s pretty cool. If you have multiple services using EasyNetQ to connect to a RabbitMQ cluster, they will all initially connect to the first listed node in their respective connection strings. For this reason the EasyNetQ cluster support is not really suitable for load balancing high throughput systems. I would recommend that you use a dedicated hardware or software load balancer instead, if that’s what you want.
October 14, 2012
by Mike Hadlow
· 6,851 Views
article thumbnail
New ActiveMQ failover and Clustering Goodies
For the last two weeks I’ve been working on some interesting use cases for the good ol’ failover transport. I finally have some time at my hands, so here’s a brief recap of what’s coming in 5.6 release in this area. First there’s a new feature, called Priority Backup. It’s described in details here, but in a nutshell it provides you with the mechanism of prioritizing your failover urls and keep your clients connected to them as soon as they are available. The most obvious use case for this is to keep your clients connected to the broker in local data center whenever you can. By doing this, you can both have better performances and stability of your clients, but also save on your bandwidth bills. Another improvement is coming for automatic broker cluster feature. Although this feature is not new, I spent some time hardening it and thought to share some more insight in how (and when) to use it in your projects. In search of high availability, people often default to master-slave architecture. This makes sense in most use cases, but if your flow is purely non-persistent you can probably come up with more optimal architecture. Instead of having one broker at the time handling all your load, and other one just waiting for it to fail, you’ll get more efficient system with some kind of active-active configuration where (possibly multiple) brokers share the load all the time. Ideally clients would be evenly distributed and would rebalance if anything changes. Brokers don’t need to share any messages as clients are distributed and messages are non-persistent so they will be lost if broker fails. So can you achieve this kind of architecture with ActiveMQ? Sure you do. That’s where automatic rebalance and clustering shines. First of all, brokers should be networked but only so they can exchange information on their availability. They shouldn’t exchange the messages (but of course can if your use case needs it). In 5.6 you do that with pure static networks, using configuration like So now imagine three brokers A,B and C forming a full mesh. In addition every broker uses rebalance options on their transport connectors All that is left for the client to do is connect to one of the brokers it knows like failover:(brokerA) and the broker will fill it with all information on other brokers in the cluster and whether it should reconnect to one of them or not. So having a large number of clients connecting like this, very soon they’ll rebalance over available brokers. You can stop one of the brokers in the cluster for updates and clients will rebalance over remaining ones. You can even add a new broker to the cluster and everything will get rebalanced without any need for you to touch your clients. So, basically in this way you have both load balancing and high availability for your non-persistent messages. Additionally, your clients are automatically updated with all information they need, and no manual intervention is needed. Although the basic support for clustering was there since 5.4, I did some more hardening and better rebalancing, so it’s coming in the Apache ActiveMQ 5.6 (and the next Fuse 5.5.1) release. Also, there are some more great stuff regarding broker clustering coming soon, so stay tuned and happy messaging.
September 10, 2012
by Dejan Bosanac
· 15,408 Views
article thumbnail
How to Write Better POJO Services
In Java, you can easily implement some business logic in Plain Old Java Object (POJO) classes, and then able to run them in a fancy server or framework without much hassle. There many server/frameworks, such as JBossAS, Spring or Camel etc, that would allow you to deploy POJO without even hardcoding to their API. Obviously you would get advance features if you willing to couple to their API specifics, but even if you do, you can keep these to minimal by encapsulating your own POJO and their API in a wrapper. By writing and designing your own application as simple POJO as possible, you will have the most flexible ways in choose a framework or server to deploy and run your application. One effective way to write your business logic in these environments is to use Service component. In this article I will share few things I learned in writing Services. What is a Service? The word Service is overly used today, and it could mean many things to different people. When I say Service, my definition is a software component that has minimal of life-cycles such as init, start, stop, and destroy. You may not need all these stages of life-cycles in every service you write, but you can simply ignore ones that don't apply. When writing large application that intended for long running such as a server component, definining these life-cycles and ensure they are excuted in proper order is crucial! I will be walking you through a Java demo project that I have prepared. It's very basic and it should run as stand-alone. The only dependency it has is the SLF4J logger. If you don't know how to use logger, then simply replace them with System.out.println. However I would strongly encourage you to learn how to use logger effectively during application development though. Also if you want to try out the Spring related demos, then obviously you would need their jars as well. Writing basic POJO service You can quickly define a contract of a Service with life-cycles as below in an interface. package servicedemo; public interface Service { void init(); void start(); void stop(); void destroy(); boolean isInited(); boolean isStarted(); } Developers are free to do what they want in their Service implementation, but you might want to give them an adapter class so that they don't have to re-write same basic logic on each Service. I would provide an abstract service like this: package servicedemo; import java.util.concurrent.atomic.*; import org.slf4j.*; public abstract class AbstractService implements Service { protected Logger logger = LoggerFactory.getLogger(getClass()); protected AtomicBoolean started = new AtomicBoolean(false); protected AtomicBoolean inited = new AtomicBoolean(false); public void init() { if (!inited.get()) { initService(); inited.set(true); logger.debug("{} initialized.", this); } } public void start() { // Init service if it has not done so. if (!inited.get()) { init(); } // Start service now. if (!started.get()) { startService(); started.set(true); logger.debug("{} started.", this); } } public void stop() { if (started.get()) { stopService(); started.set(false); logger.debug("{} stopped.", this); } } public void destroy() { // Stop service if it is still running. if (started.get()) { stop(); } // Destroy service now. if (inited.get()) { destroyService(); inited.set(false); logger.debug("{} destroyed.", this); } } public boolean isStarted() { return started.get(); } public boolean isInited() { return inited.get(); } @Override public String toString() { return getClass().getSimpleName() + "[id=" + System.identityHashCode(this) + "]"; } protected void initService() { } protected void startService() { } protected void stopService() { } protected void destroyService() { } } This abstract class provide the basic of most services needs. It has a logger and states to keep track of the life-cycles. It then delegate new sets of life-cycle methods so subclass can choose to override. Notice that the start() method is checking auto calling init() if it hasn't already done so. Same is done in destroy() method to the stop() method. This is important if we're to use it in a container that only have two stages life-cycles invocation. In this case, we can simply invoke start() and destroy() to match to our service's life-cycles. Some frameworks might go even further and create separate interfaces for each stage of the life-cycles, such as InitableService or StartableService etc. But I think that would be too much in a typical app. In most of the cases, you want something simple, so I like it just one interface. User may choose to ignore methods they don't want, or simply use an adaptor class. Before we end this section, I would throw in a silly Hello world service that can be used in our demo later. package servicedemo; public class HelloService extends AbstractService { public void initService() { logger.info(this + " inited."); } public void startService() { logger.info(this + " started."); } public void stopService() { logger.info(this + " stopped."); } public void destroyService() { logger.info(this + " destroyed."); } } Managing multiple POJO Services with a container Now we have the basic of Service definition defined, your development team may start writing business logic code! Before long, you will have a library of your own services to re-use. To be able group and control these services into an effetive way, we want also provide a container to manage them. The idea is that we typically want to control and manage multiple services with a container as a group in a higher level. Here is a simple implementation for you to get started: package servicedemo; import java.util.*; public class ServiceContainer extends AbstractService { private List services = new ArrayList(); public void setServices(List services) { this.services = services; } public void addService(Service service) { this.services.add(service); } public void initService() { logger.debug("Initializing " + this + " with " + services.size() + " services."); for (Service service : services) { logger.debug("Initializing " + service); service.init(); } logger.info(this + " inited."); } public void startService() { logger.debug("Starting " + this + " with " + services.size() + " services."); for (Service service : services) { logger.debug("Starting " + service); service.start(); } logger.info(this + " started."); } public void stopService() { int size = services.size(); logger.debug("Stopping " + this + " with " + size + " services in reverse order."); for (int i = size - 1; i >= 0; i--) { Service service = services.get(i); logger.debug("Stopping " + service); service.stop(); } logger.info(this + " stopped."); } public void destroyService() { int size = services.size(); logger.debug("Destroying " + this + " with " + size + " services in reverse order."); for (int i = size - 1; i >= 0; i--) { Service service = services.get(i); logger.debug("Destroying " + service); service.destroy(); } logger.info(this + " destroyed."); } } From above code, you will notice few important things: We extends the AbstractService, so a container is a service itself. We would invoke all service's life-cycles before moving to next. No services will start unless all others are inited. We should stop and destroy services in reverse order for most general use cases. The above container implementation is simple and run in synchronized fashion. This mean, you start container, then all services will start in order you added them. Stop should be same but in reverse order. I also hope you would able to see that there is plenty of room for you to improve this container as well. For example, you may add thread pool to control the execution of the services in asynchronized fashion. Running POJO Services Running services with a simple runner program. In the simplest form, we can run our POJO services on our own without any fancy server or frameworks. Java programs start its life from a static main method, so we surely can invoke init and start of our services in there. But we also need to address the stop and destroy life-cycles when user shuts down the program (usually by hitting CTRL+C.) For this, the Java has the java.lang.Runtime#addShutdownHook() facility. You can create a simple stand-alone server to bootstrap Service like this: package servicedemo; import org.slf4j.*; public class ServiceRunner { private static Logger logger = LoggerFactory.getLogger(ServiceRunner.class); public static void main(String[] args) { ServiceRunner main = new ServiceRunner(); main.run(args); } public void run(String[] args) { if (args.length < 1) throw new RuntimeException("Missing service class name as argument."); String serviceClassName = args[0]; try { logger.debug("Creating " + serviceClassName); Class serviceClass = Class.forName(serviceClassName); if (!Service.class.isAssignableFrom(serviceClass)) { throw new RuntimeException("Service class " + serviceClassName + " did not implements " + Service.class.getName()); } Object serviceObject = serviceClass.newInstance(); Service service = (Service)serviceObject; registerShutdownHook(service); logger.debug("Starting service " + service); service.init(); service.start(); logger.info(service + " started."); synchronized(this) { this.wait(); } } catch (Exception e) { throw new RuntimeException("Failed to create and run " + serviceClassName, e); } } private void registerShutdownHook(final Service service) { Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { logger.debug("Stopping service " + service); service.stop(); service.destroy(); logger.info(service + " stopped."); } }); } } With abover runner, you should able to run it with this command: $ java demo.ServiceRunner servicedemo.HelloService Look carefully, and you'll see that you have many options to run multiple services with above runner. Let me highlight couple: Improve above runner directly and make all args for each new service class name, instead of just first element. Or write a MultiLoaderService that will load multiple services you want. You may control argument passing using System Properties. Can you think of other ways to improve this runner? Running services with Spring The Spring framework is an IoC container, and it's well known to be easy to work POJO, and Spring lets you wire your application together. This would be a perfect fit to use in our POJO services. However, with all the features Spring brings, it missed a easy to use, out of box main program to bootstrap spring config xml context files. But with what we built so far, this is actually an easy thing to do. Let's write one of our POJO Service to bootstrap a spring context file. package servicedemo; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.support.FileSystemXmlApplicationContext; public class SpringService extends AbstractService { private ConfigurableApplicationContext springContext; public void startService() { String springConfig = System.getProperty("springContext", "spring.xml); springContext = new FileSystemXmlApplicationContext(springConfig); logger.info(this + " started."); } public void stopService() { springContext.close(); logger.info(this + " stopped."); } } With that simple SpringService you can run and load any spring xml file. For example try this: $ java -DspringContext=config/service-demo-spring.xml demo.ServiceRunner servicedemo.SpringService Inside the config/service-demo-spring.xml file, you can easily create our container that hosts one or more service in Spring beans. Notice that I only need to setup init-method and destroy-method once on the serviceContainer bean. You can then add one or more other service such as the helloService as much as you want. They will all be started, managed, and then shutdown when you close the Spring context. Note that Spring context container did not explicitly have the same life-cycles as our services. The Spring context will automatically instanciate all your dependency beans, and then invoke all beans who's init-method is set. All that is done inside the constructor of FileSystemXmlApplicationContext. No explicit init method is called from user. However at the end, during stop of the service, Spring provide the springContext#close() to clean things up. Again, they do not differentiate stop from destroy. Because of this, we must merge our init and start into Spring's init state, and then merge stop and destroy into Spring's close state. Recall our AbstractService#destory will auto invoke stop if it hasn't already done so. So this is trick that we need to understand in order to use Spring effectively. Running services with JEE app server In a corporate env, we usually do not have the freedom to run what we want as a stand-alone program. Instead they usually have some infrustructure and stricter standard technology stack in place already, such as using a JEE application server. In these situation, the most portable way to run POJO services is in a war web application. In a Servlet web application, you can write a class that implements javax.servlet.ServletContextListener and this will provide you the life-cycles hook via contextInitialized and contextDestroyed. In there, you can instanciate your ServiceContainer object and call start and destroy methods accordingly. Here is an example that you can explore: package servicedemo; import java.util.*; import javax.servlet.*; public class ServiceContainerListener implements ServletContextListener { private static Logger logger = LoggerFactory.getLogger(ServiceContainerListener.class); private ServiceContainer serviceContainer; public void contextInitialized(ServletContextEvent sce) { serviceContainer = new ServiceContainer(); List services = createServices(); serviceContainer.setServices(services); serviceContainer.start(); logger.info(serviceContainer + " started in web application."); } public void contextDestroyed(ServletContextEvent sce) { serviceContainer.destroy(); logger.info(serviceContainer + " destroyed in web application."); } private List createServices() { List result = new ArrayList(); // populate services here. return result; } } You may configure above in the WEB-INF/web.xml like this: servicedemo.ServiceContainerListener The demo provided a placeholder that you must add your services in code. But you can easily make that configurable using the web.xml for context parameters. If you were to use Spring inside a Servlet container, you may directly use their org.springframework.web.context.ContextLoaderListener class that does pretty much same as above, except they allow you to specify their xml configuration file using the contextConfigLocation context parameter. That's how a typical Spring MVC based application is configure. Once you have this setup, you can experiment our POJO service just as the Spring xml sample given above to test things out. You should see our service in action by your logger output. PS: Actually what we described here are simply related to Servlet web application, and not JEE specific. So you can use Tomcat server just fine as well. The importance of Service's life-cycles and it's real world usage All the information I presented here are not novelty, nor a killer design pattern. In fact they have been used in many popular open source projects. However, in my past experience at work, folks always manage to make these extremely complicated, and worse case is that they completely disregard the importance of life-cycles when writing services. It's true that not everything you going to write needs to be fitted into a service, but if you find the need, please do pay attention to them, and take good care that they do invoked properly. The last thing you want is to exit JVM without clean up in services that you allocated precious resources for. These would become more disastrous if you allow your application to be dynamically reloaded during deployment without exiting JVM, in which will lead to system resources leakage. The above Service practice has been put into use in the TimeMachine project. In fact, if you look at the timemachine.scheduler.service.SchedulerEngine, it would just be a container of many services running together. And that's how user can extend the scheduler functionalities as well, by writing a Service. You can load these services dynamically by a simple properties file.
September 4, 2012
by Zemian Deng
· 39,154 Views
article thumbnail
Everything You Need To Know About Couchbase Architecture
After receiving a lot of good feedback and comment on my last blog on MongoDb, I was encouraged to do another deep dive on another popular document oriented db; Couchbase. I have been a long-time fan CouchDb and has wrote a blog on it many years ago. After it merges with Membase, I am very excited to take a deep look into it again. Couchbase is the merge of two popular NOSQL technologies: Membase, which provides persistence, replication, sharding to the high performance memcached technology CouchDB, which pioneers the document oriented model based on JSON Like other NOSQL technologies, both Membase and CouchDB are built from the ground up on a highly distributed architecture, with data shard across machines in a cluster. Built around the Memcached protocol, Membase provides an easy migration to existing Memcached users who want to add persistence, sharding and fault resilience on their familiar Memcached model. On the other hand, CouchDB provides first class support for storing JSON documents as well as a simple RESTful API to access them. Underneath, CouchDB also has a highly tuned storage engine that is optimized for both update transaction as well as query processing. Taking the best of both technologies, Membase is well-positioned in the NOSQL marketplace. Programming model Couchbase provides client libraries for different programming languages such as Java / .NET / PHP / Ruby / C / Python / Node.js For read, Couchbase provides a key-based lookup mechanism where the client is expected to provide the key, and only the server hosting the data (with that key) will be contacted. Couchbase also provides a query mechanism to retrieve data where the client provides a query (for example, range based on some secondary key) as well as the view (basically the index). The query will be broadcasted to all servers in the cluster and the result will be merged and sent back to the client. For write, Couchbase provides a key-based update mechanism where the client sends in an updated document with the key (as doc id). When handling write request, the server will return to client’s write request as soon as the data is stored in RAM on the active server, which offers the lowest latency for write requests. Following is the core API that Couchbase offers. (in an abstract sense) # Get a document by key doc = get(key) # Modify a document, notice the whole document # need to be passed in set(key, doc) # Modify a document when no one has modified it # since my last read casVersion = doc.getCas() cas(key, casVersion, changedDoc) # Create a new document, with an expiration time # after which the document will be deleted addIfNotExist(key, doc, timeToLive) # Delete a document delete(key) # When the value is an integer, increment the integer increment(key) # When the value is an integer, decrement the integer decrement(key) # When the value is an opaque byte array, append more # data into existing value append(key, newData) # Query the data results = query(viewName, queryParameters) In Couchbase, document is the unit of manipulation. Currently Couchbase doesn't support server-side execution of custom logic. Couchbase server is basically a passive store and unlike other document oriented DB, Couchbase doesn't support field-level modification. In case of modifying documents, client need to retrieve documents by its key, do the modification locally and then send back the whole (modified) document back to the server. This design tradeoff network bandwidth (since more data will be transferred across the network) for CPU (now CPU load shift to client). Couchbase currently doesn't support bulk modification based on a condition matching. Modification happens only in a per document basis. (client will save the modified document one at a time). Transaction Model Similar to many NOSQL databases, Couchbase’s transaction model is primitive as compared to RDBMS. Atomicity is guaranteed at a single document and transactions that span update of multiple documents are unsupported. To provide necessary isolation for concurrent access, Couchbase provides a CAS (compare and swap) mechanism which works as follows … When the client retrieves a document, a CAS ID (equivalent to a revision number) is attached to it. While the client is manipulating the retrieved document locally, another client may modify this document. When this happens, the CAS ID of the document at the server will be incremented. Now, when the original client submits its modification to the server, it can attach the original CAS ID in its request. The server will verify this ID with the actual ID in the server. If they differ, the document has been updated in between and the server will not apply the update. The original client will re-read the document (which now has a newer ID) and re-submit its modification. Couchbase also provides a locking mechanism for clients to coordinate their access to documents. Clients can request a LOCK on the document it intends to modify, update the documents and then releases the LOCK. To prevent a deadlock situation, each LOCK grant has a timeout so it will automatically be released after a period of time. Deployment Architecture In a typical setting, a Couchbase DB resides in a server clusters involving multiple machines. Client library will connect to the appropriate servers to access the data. Each machine contains a number of daemon processes which provides data access as well as management functions. The data server, written in C/C++, is responsible to handle get/set/delete request from client. The Management server, written in Erlang, is responsible to handle the query traffic from client, as well as manage the configuration and communicate with other member nodes in the cluster. Virtual Buckets The basic unit of data storage in Couchbase DB is a JSON document (or primitive data type such as int and byte array) which is associated with a key. The overall key space is partitioned into 1024 logical storage unit called "virtual buckets" (or vBucket). vBucket are distributed across machines within the cluster via a map that is shared among servers in the cluster as well as the client library. High availability is achieved through data replication at the vBucket level. Currently Couchbase supports one active vBucket zero or more standby replicas hosted in other machines. Curremtly the standby server are idle and not serving any client request. In future version of Couchbase, the standby replica will be able to serve read request. Load balancing in Couchbase is achieved as follows: Keys are uniformly distributed based on the hash function When machines are added and removed in the cluster. The administrator can request a redistribution of vBucket so that data are evenly spread across physical machines. Management Server Management server performs the management function and co-ordinate the other nodes within the cluster. It includes the following monitoring and administration functions Heartbeat: A watchdog process periodically communicates with all member nodes within the same cluster to provide Couchbase Server health updates. Process monitor: This subsystem monitors execution of the local data manager, restarting failed processes as required and provide status information to the heartbeat module. Configuration manager: Each Couchbase Server node shares a cluster-wide configuration which contains the member nodes within the cluster, a vBucket map. The configuration manager pull this config from other member nodes at bootup time. Within a cluster, one node’s Management Server will be elected as the leader which performs the following cluster-wide management function Controls the distribution of vBuckets among other nodes and initiate vBucket migration Orchestrates the failover and update the configuration manager of member nodes If the leader node crashes, a new leader will be elected from surviving members in the cluster. When a machine in the cluster has crashed, the leader will detect that and notify member machines in the cluster that all vBuckets hosted in the crashed machine is dead. After getting this signal, machines hosting the corresponding vBucket replica will set the vBucket status as “active”. The vBucket/server map is updated and eventually propagated to the client lib. Notice that at this moment, the replication level of the vBucket will be reduced. Couchbase doesn’t automatically re-create new replicas which will cause data copying traffic. Administrator can issue a command to explicitly initiate a data rebalancing. The crashed machine, after reboot can rejoin the cluster. At this moment, all the data it stores previously will be completely discard and the machine will be treated as a brand new empty machine. As more machines are put into the cluster (for scaling out), vBucket should be redistributed to achieve a load balance. This is currently triggered by an explicit command from the administrator. Once receive the “rebalance” command, the leader will compute the new provisional map which has the balanced distribution of vBuckets and send this provisional map to all members of the cluster. To compute the vBucket map and migration plan, the leader attempts the following objectives: Evenly distribute the number of active vBuckets and replica vBuckets among member nodes. Place the active copy and each replicas in physically separated nodes. Spread the replica vBucket as wide as possible among other member nodes. Minimize the amount of data migration Orchestrate the steps of replica redistribution so no node or network will be overwhelmed by the replica migration. Once the vBucket maps is determined, the leader will pass the redistribution map to each member in the cluster and coordinate the steps of vBucket migration. The actual data transfer happens directly between the origination node to the destination node. Notice that since we have generally more vBuckets than machines. The workload of migration will be evenly distributed automatically. For example, when new machines are added into the clusters, all existing machines will migrate some portion of its vBucket to the new machines. There is no single bottleneck in the cluster. Throughput the migration and redistribution of vBucket among servers, the life cycle of a vBucket in a server will be in one of the following states “Active”: means the server is hosting the vBucket is ready to handle both read and write request “Replica”: means the server is hosting the a copy of the vBucket that may be slightly out of date but can take read request that can tolerate some degree of outdate. “Pending”: means the server is hosting a copy that is in a critical transitional state. The server cannot take either read or write request at this moment. “Dead”: means the server is no longer responsible for the vBucket and will not take either read or write request anymore. Data Server Data server implements the memcached APIs such as get, set, delete, append, prepend, etc. It contains the following key datastructure: One in-memory hashtable (key by doc id) for the corresponding vBucket hosted. The hashtable acts as both a metadata for all documents as well as a cache for the document content. Maintain the entry gives a quick way to detect whether the document exists on disk. To support async write, there is a checkpoint linkedlist per vBucket holding the doc id of modified documents that hasn't been flushed to disk or replicated to the replica. To handle a "GET" request Data server routes the request to the corresponding ep-engine responsible for the vBucket. The ep-engine will lookup the document id from the in-memory hastable. If the document content is found in cache (stored in the value of the hashtable), it will be returned. Otherwise, a background disk fetch task will be created and queued into the RO dispatcher queue. The RO dispatcher then reads the value from the underlying storage engine and populates the corresponding entry in the vbucket hash table. Finally, the notification thread notifies the disk fetch completion to the memcached pending connection, so that the memcached worker thread can revisit the engine to process a get request. To handle a "SET" request, a success response will be returned to the calling client once the updated document has been put into the in-memory hashtable with a write request put into the checkpoint buffer. Later on the Flusher thread will pickup the outstanding write request from each checkpoint buffer, lookup the corresponding document content from the hashtable and write it out to the storage engine. Of course, data can be lost if the server crashes before the data has been replicated to another server and/or persisted. If the client requires a high data availability across different crashes, it can issue a subsequent observe() call which blocks on the condition that the server persist data on disk, or the server has replicated the data to another server (and get its ACK). Overall speaking, the client has various options to tradeoff data integrity with throughput. Hashtable Management To synchronize accesses to a vbucket hash table, each incoming thread needs to acquire a lock before accessing a key region of the hash table. There are multiple locks per vbucket hash table, each of which is responsible for controlling exclusive accesses to a certain ket region on that hash table. The number of regions of a hash table can grow dynamically as more documents are inserted into the hash table. To control the memory size of the hashtable, Item pager thread will monitor the memory utilization of the hashtable. Once a high watermark is reached, it will initiate an eviction process to remove certain document content from the hashtable. Only entries that is not referenced by entries in the checkpoint buffer can be evicted because otherwise the outstanding update (which only exists in hashtable but not persisted) will be lost. After eviction, the entry of the document still remains in the hashtable; only the document content of the document will be removed from memory but the metadata is still there. The eviction process stops after reaching the low watermark. The high / low water mark is determined by the bucket memory quota. By default, the high water mark is set to 75% of bucket quota, while the low water mark is set to 60% of bucket quota. These water marks can be configurable at runtime. In CouchDb, every document is associated with an expiration time and will be deleted once it is expired. Expiry pager is responsible for tracking and removing expired document from both the hashtable as well as the storage engine (by scheduling a delete operation). Checkpoint Manager Checkpoint manager is responsible to recycle the checkpoint buffer, which holds the outstanding update request, consumed by the two downstream processes, Flusher and TAP replicator. When all the request in the checkpoint buffer has been processed, the checkpoint buffer will be deleted and a new one will be created. TAP Replicator TAP replicator is responsible to handle vBucket migration as well as vBucket replication from active server to replica server. It does this by propagating the latest modified document to the corresponding replica server. At the time a replica vBucket is established, the entire vBucket need to be copied from the active server to the empty destination replica server as follows The in-memory hashtable at the active server will be transferred to the replica server. Notice that during this period, some data may be updated and therefore the data set transfered to the replica can be inconsistent (some are the latest and some are outdated). Nevertheless, all updates happen after the start of transfer is tracked in the checkpoint buffer. Therefore, after the in-memory hashtable transferred is completed, the TAP replicator can pickup those updates from the checkpoint buffer. This ensures the latest versioned of changed documents are sent to the replica, and hence fix the inconsistency. However the hashtable cache doesn’t contain all the document content. Data also need to be read from the vBucket file and send to the replica. Notice that during this period, update of vBucket will happen in active server. However, since the file is appended only, subsequent data update won’t interfere the vBucket copying process. After the replica server has caught up, subsequent update at the active server will be available at its checkpoint buffer which will be pickup by the TAP replicator and send to the replica server. CouchDB Storage Structure Data server defines an interface where different storage structure can be plugged-in. Currently it supports both a SQLite DB as well as CouchDB. Here we describe the details of CouchDb, which provides a super high performance storage mechanism underneath the Couchbase technology. Under the CouchDB structure, there will be one file per vBucket. Data are written to this file in an append-only manner, which enables Couchbase to do mostly sequential writes for update, and provide the most optimized access patterns for disk I/O. This unique storage structure attributes to Couchbase’s fast on-disk performance for write-intensive applications. The following diagram illustrate the storage model and how it is modified by 3 batch updates (notice that since updates are asynchronous, it is perform by "Flusher" thread in batches). The Flusher thread works as follows: 1) Pick up all pending write request from the dirty queue and de-duplicate multiple update request to the same document. 2) Sort each request (by key) into corresponding vBucket and open the corresponding file 3) Append the following into the vBucket file (in the following contiguous sequence) All document contents in such write request batch. Each document will be written as [length, crc, content] one after one sequentially. The index that stores the mapping from document id to the document’s position on disk (called the BTree by-id) The index that stores the mapping from update sequence number to the document’s position on disk. (called the BTree by-seq) The by-id index plays an important role for looking up the document by its id. It is organized as a B-Tree where each node contains a key range. To lookup a document by id, we just need to start from the header (which is the end of the file), transfer to the root BTree node of the by-id index, and then further traverse to the leaf BTree node that contains the pointer to the actual document position on disk. During the write, the similar mechanism is used to trace back to the corresponding BTree node that contains the id of the modified documents. Notice that in the append-only model, update is not happening in-place, instead we located the existing location and copy it over by appending. In other words, the modified BTree node will be need to be copied over and modified and finally paste to the end of file, and then its parent need to be modified to point to the new location, which triggers the parents to be copied over and paste to the end of file. Same happens to its parents’ parent and eventually all the way to the root node of the BTree. The disk seek can be at the O(logN) complexity. The by-seq index is used to keep track of the update sequence of lived documents and is used for asynchronous catchup purposes. When a document is created, modified or deleted, a sequence number is added to the by-seq btree and the previous seq node will be deleted. Therefore, for cross-site replication, view index update and compaction, we can quickly locate all the lived documents in the order of their update sequence. When a vBucket replicator asks for the list of update since a particular time, it provides the last sequence number in previous update, the system will then scan through the by-seq BTree node to locate all the document that has sequence number larger than that, which effectively includes all the document that has been modified since the last replication. As time goes by, certain data becomes garbage (see the grey-out region above) and become unreachable in the file. Therefore, we need a garbage collection mechanism to clean up the garbage. To trigger this process, the by-id and by-seq B-Tree node will keep track of the data size of lived documents (those that is not garbage) under its substree. Therefore, by examining the root BTree node, we can determine the size of all lived documents within the vBucket. When the ratio of actual size and vBucket file size fall below a certain threshold, a compaction process will be triggered whose job is to open the vBucket file and copy the survived data to another file. Technically, the compaction process opens the file and read the by-seq BTree at the end of the file. It traces the Btree all the way to the leaf node and copy the corresponding document content to the new file. The compaction process happens while the vBucket is being updated. However, since the file is appended only, new changes are recorded after the BTree root that the compaction has opened, so subsequent data update won’t interfere with the compaction process. When the compaction is completed, the system need to copy over the data that was appended since the beginning of the compaction to the new file. View Index Structure Unlike most indexing structure which provide a pointer from the search attribute back to the document. The CouchDb index (called View Index) is better perceived as a denormalized table with arbitrary keys and values loosely associated to the document. Such denormalized table is defined by a user-provided map() and reduce() function. map = function(doc) { … emit(k1, v1) … emit(k2, v2) … } reduce = function(keys, values, isRereduce) { if (isRereduce) { // Do the re-reduce only on values (keys will be null) } else { // Do the reduce on keys and values } // result must be ready for input values to re-reduce return result } Whenever a document is created, updated, deleted, the corresponding map(doc) function will be invoked (in an asynchronous manner) to generate a set of key/value pairs. Such key/value will be stored in a B-Tree structure. All the key/values pairs of each B-Tree node will be passed into the reduce() function, which compute an aggregated value within that B-Tree node. Re-reduce also happens in non-leaf B-Tree nodes which further aggregate the aggregated value of child B-Tree nodes. The management server maintains the view index and persisted it to a separate file. Create a view index is perform by broadcast the index creation request to all machines in the cluster. The management process of each machine will read its active vBucket file and feed each surviving document to the Map function. The key/value pairs emitted by the Map function will be stored in a separated BTree index file. When writing out the BTree node, the reduce() function will be called with the list of all values in the tree node. Its return result represent a partially reduced value is attached to the BTree node. The view index will be updated incrementally as documents are subsequently getting into the system. Periodically, the management process will open the vBucket file and scan all documents since the last sequence number. For each changed document since the last sync, it invokes the corresponding map function to determine the corresponding key/value into the BTree node. The BTree node will be split if appropriate. Underlying, Couchbase use a back index to keep track of the document with the keys that it previously emitted. Later when the document is deleted, it can look up the back index to determine what those key are and remove them. In case the document is updated, the back index can also be examined; semantically a modification is equivalent to a delete followed by an insert. The following diagram illustrates how the view index file will be incrementally updated via the append-only mechanism. Query Processing Query in Couchbase is made against the view index. A query is composed of the view name, a start key and end key. If the reduce() function isn’t defined, the query result will be the list of values sorted by the keys within the key range. In case the reduce() function is defined, the query result will be a single aggregated value of all keys within the key range. If the view has no reduce() function defined, the query processing proceeds as follows: Client issue a query (with view, start/end key) to the management process of any server (unlike a key based lookup, there is no need to locate a specific server). The management process will broadcast the request to other management process on all servers (include itself) within the cluster. Each management process (after receiving the broadcast request) do a local search for value within the key range by traversing the BTree node of its view file, and start sending back the result (automatically sorted by the key) to the initial server. The initial server will merge the sorted result and stream them back to the client. However, if the view has reduce() function defined, the query processing will involve computing a single aggregated value as follows: Client issue a query (with view, start/end key) to the management process of any server (unlike a key based lookup, there is no need to locate a specific server). The management process will broadcast the request to other management process on all servers (include itself) within the cluster. Each management process do a local reduce for value within the key range by traversing the BTree node of its view file to compute the reduce value of the key range. If the key range span across a BTree node, the pre-computed of the sub-range can be used. This way, the reduce function can reuse a lot of partially reduced values and doesn’t need to recomputed every value of the key range from scratch. The original server will do a final re-reduce() in all the return value from each other servers, and then passed back the final reduced value to the client. To illustrate the re-reduce concept, lets say the query has its key range from A to F. Instead of calling reduce([A,B,C,D,E,F]), the system recognize the BTree node that contains [B,C,D] has been pre-reduced and the result P is stored in the BTree node, so it only need to call reduce(A,P,E,F). Update View Index as vBucket migrates Since the view index is synchronized with the vBuckets in the same server, when the vBucket has migrated to a different server, the view index is no longer correct; those key/value that belong to a migrated vBucket should be discarded and the reduce value cannot be used anymore. To keep track of the vBucket and key in the view index, each bTree node has a 1024-bitmask indicating all the vBuckets that is covered in the subtree (ie: it contains a key emitted from a document belonging to the vBucket). Such bit-mask is maintained whenever the bTree node is updated. At the server-level, a global bitmask is used to indicate all the vBuckets that this server is responsible for. In processing the query of the map-only view, before the key/value pair is returned, an extra check will be perform for each key/value pair to make sure its associated vBucket is what this server is responsible for. When processing the query of a view that has a reduce() function, we cannot use the pre-computed reduce value if the bTree node contains a vBucket that the server is not responsible for. In this case, the bTree node’s bit mask is compared with the global bit mask. In case if they are not aligned, then the reduce value need to be recomputed. Here is an example to illustrate this process Couchbase is one of the popular NOSQL technology built on a solid technology foundation designed for high performance. In this post, we have examined a number of such key features: Load balancing between servers inside a cluster that can grow and shrink according to workload conditions. Data migration can be used to re-achieve workload balance. Asynchronous write provides lowest possible latency to client as it returns once the data is store in memory. Append-only update model pushes most update transaction into sequential disk access, hence provide extremely high throughput for write intensive applications. Automatic compaction ensures the data lay out on disk are kept optimized all the time. Map function can be used to pre-compute view index to enable query access. Summary data can be pre-aggregated using the reduce function. Overall, this cut down the workload of query processing dramatically. For a review on NOSQL architecture in general and some theoretical foundation, I have wrote a NOSQL design pattern blog, as well as some fundamental difference between SQL and NOSQL. For other NOSQL technologies, please read my other blog on MongoDb, Cassandra and HBase, Memcached Special thanks to Damien Katz and Frank Weigel from Couchbase team who provide a lot of implementation details of Couchbase.
July 7, 2012
by Ricky Ho
· 84,698 Views · 5 Likes
article thumbnail
The Limited Usefulness of AsyncContext.start()
Some time ago I came across What's the purpose of AsyncContext.start(...) in Servlet 3.0? question. Quoting the Javadoc of aforementioned method: Causes the container to dispatch a thread, possibly from a managed thread pool, to run the specified Runnable. To remind all of you, AsyncContext is a standard way defined in Servlet 3.0 specification to handle HTTP requests asynchronously. Basically HTTP request is no longer tied to an HTTP thread, allowing us to handle it later, possibly using fewer threads. It turned out that the specification provides an API to handle asynchronous threads in a different thread pool out of the box. First we will see how this feature is completely broken and useless in Tomcat and Jetty - and then we will discuss why the usefulness of it is questionable in general. Our test servlet will simply sleep for given amount of time. This is a scalability killer in normal circumstances because even though sleeping servlet is not consuming CPU, but sleeping HTTP thread tied to that particular request consumes memory - and no other incoming request can use that thread. In our test setup I limited the number of HTTP worker threads to 10 which means only 10 concurrent requests are completely blocking the application (it is unresponsive from the outside) even though the application itself is almost completely idle. So clearly sleeping is an enemy of scalability. @WebServlet(urlPatterns = Array("/*")) class SlowServlet extends HttpServlet with Logging { protected override def doGet(req: HttpServletRequest, resp: HttpServletResponse) { logger.info("Request received") val sleepParam = Option(req.getParameter("sleep")) map {_.toLong} TimeUnit.MILLISECONDS.sleep(sleepParam getOrElse 10) logger.info("Request done") } } Benchmarking this code reveals that the average response times are close to sleep parameter as long as the number of concurrent connections is below the number of HTTP threads. Unsurprisingly the response times begin to grow the moment we exceed the HTTP threads count. Eleventh connection has to wait for any other request to finish and release worker thread. When the concurrency level exceeds 100, Tomcat begins to drop connections - too many clients are already queued. So what about the the fancy AsyncContext.start() method (do not confuse with ServletRequest.startAsync())? According to the JavaDoc I can submit any Runnable and the container will use some managed thread pool to handle it. This will help partially as I no longer block HTTP worker threads (but still another thread somewhere in the servlet container is used). Quickly switching to asynchronous servlet: @WebServlet(urlPatterns = Array("/*"), asyncSupported = true) class SlowServlet extends HttpServlet with Logging { protected override def doGet(req: HttpServletRequest, resp: HttpServletResponse) { logger.info("Request received") val asyncContext = req.startAsync() asyncContext.setTimeout(TimeUnit.MINUTES.toMillis(10)) asyncContext.start(new Runnable() { def run() { logger.info("Handling request") val sleepParam = Option(req.getParameter("sleep")) map {_.toLong} TimeUnit.MILLISECONDS.sleep(sleepParam getOrElse 10) logger.info("Request done") asyncContext.complete() } }) } } We are first enabling the asynchronous processing and then simply moving sleep() into a Runnable and hopefully a different thread pool, releasing the HTTP thread pool. Quick stress test reveals slightly unexpected results (here: response times vs. number of concurrent connections): Guess what, the response times are exactly the same as with no asynchronous support at all (!) After closer examination I discovered that when AsyncContext.start() is called Tomcat submits given task back to... HTTP worker thread pool, the same one that is used for all HTTP requests! This basically means that we have released one HTTP thread just to utilize another one milliseconds later (maybe even the same one). There is absolutely no benefit of calling AsyncContext.start() in Tomcat. I have no idea whether this is a bug or a feature. On one hand this is clearly not what the API designers intended. The servlet container was suppose to manage separate, independent thread pool so that HTTP worker thread pool is still usable. I mean, the whole point of asynchronous processing is to escape the HTTP pool. Tomcat pretends to delegate our work to another thread, while it still uses the original worker thread pool. So why I consider this to be a feature? Because Jetty is "broken" in exactly same way... No matter whether this works as designed or is only a poor API implementation, using AsyncContext.start() in Tomcat and Jetty is pointless and only unnecessarily complicates the code. It won't give you anything, the application works exactly the same under high load as if there was no asynchronous logic at all. But what about using this API feature on correct implementations like IBM WAS? It is better, but still the API as is doesn't give us much in terms of scalability. To explain again: the whole point of asynchronous processing is the ability to decouple HTTP request from an underlying thread, preferably by handling several connections using the same thread. AsyncContext.start() will run the provided Runnable in a separate thread pool. Your application is still responsive and can handle ordinary requests while long-running request that you decided to handle asynchronously are processed in a separate thread pool. It is better, unfortunately the thread pool and thread per connection idiom is still a bottle-neck. For the JVM it doesn't matter what type of threads are started - they still occupy memory. So we are no longer blocking HTTP worker threads, but our application is not more scalable in terms of concurrent long-running tasks we can support. In this simple and unrealistic example with sleeping servlet we can actually support thousand of concurrent (waiting) connections using Servlet 3.0 asynchronous support with only one extra thread - and without AsyncContext.start(). Do you know how? Hint: ScheduledExecutorService. Postscriptum: Scala goodness I almost forgot. Even though examples were written in Scala, I haven't used any cool language features yet. Here is one: implicit conversions. Make this available in your scope: implicit def blockToRunnable[T](block: => T) = new Runnable { def run() { block } } And suddenly you can use code block instead of instantiating Runnable manually and explicitly: asyncContext start { logger.info("Handling request") val sleepParam = Option(req.getParameter("sleep")) map { _.toLong} TimeUnit.MILLISECONDS.sleep(sleepParam getOrElse 10) logger.info("Request done") asyncContext.complete() } Sweet!
May 22, 2012
by Tomasz Nurkiewicz
· 17,534 Views · 1 Like
article thumbnail
How to Use Sigma.js with Neo4j
i’ve done a few posts recently using d3.js and now i want to show you how to use two other great javascript libraries to visualize your graphs. we’ll start with sigma.js and soon i’ll do another post with three.js . we’re going to create our graph and group our nodes into five clusters. you’ll notice later on that we’re going to give our clustered nodes colors using rgb values so we’ll be able to see them move around until they find their right place in our layout. we’ll be using two sigma.js plugins, the gefx (graph exchange xml format) parser and the forceatlas2 layout. you can see what a gefx file looks like below. notice it comes from gephi which is an interactive visualization and exploration platform, which runs on all major operating systems, is open source, and is free. ... ... in order to build this file, we will need to get the nodes and edges from the graph and create an xml file. get '/graph.xml' do @nodes = nodes @edges = edges builder :graph end we’ll use cypher to get our nodes and edges: def nodes neo = neography::rest.new cypher_query = " start node = node:nodes_index(type='user')" cypher_query << " return id(node), node" neo.execute_query(cypher_query)["data"].collect{|n| {"id" => n[0]}.merge(n[1]["data"])} end we need the node and relationship ids, so notice i’m using the id() function in both cases. def edges neo = neography::rest.new cypher_query = " start source = node:nodes_index(type='user')" cypher_query << " match source -[rel]-> target" cypher_query << " return id(rel), id(source), id(target)" neo.execute_query(cypher_query)["data"].collect{|n| {"id" => n[0], "source" => n[1], "target" => n[2]} } end so far we have seen graphs represented as json, and we’ve built these manually. today we’ll take advantage of the builder ruby gem to build our graph in xml. xml.instruct! :xml xml.gexf 'xmlns' => "http://www.gephi.org/gexf", 'xmlns:viz' => "http://www.gephi.org/gexf/viz" do xml.graph 'defaultedgetype' => "directed", 'idtype' => "string", 'type' => "static" do xml.nodes :count => @nodes.size do @nodes.each do |n| xml.node :id => n["id"], :label => n["name"] do xml.tag!("viz:size", :value => n["size"]) xml.tag!("viz:color", :b => n["b"], :g => n["g"], :r => n["r"]) xml.tag!("viz:position", :x => n["x"], :y => n["y"]) end end end xml.edges :count => @edges.size do @edges.each do |e| xml.edge:id => e["id"], :source => e["source"], :target => e["target"] end end end end you can get the code on github as usual and see it running live on heroku. you will want to see it live on heroku so you can see the nodes in random positions and then move to form clusters. use your mouse wheel to zoom in, and click and drag to move around. credit goes out to alexis jacomy and mathieu jacomy . you’ve seen me create numerous random graphs, but for completeness here is the code for this graph. notice how i create 5 clusters and for each node i assign half its relationships to other nodes in their cluster and half to random nodes? this is so the forceatlas2 layout plugin clusters our nodes neatly. def create_graph neo = neography::rest.new graph_exists = neo.get_node_properties(1) return if graph_exists && graph_exists['name'] names = 500.times.collect{|x| generate_text} clusters = 5.times.collect{|x| {:r => rand(256), :g => rand(256), :b => rand(256)} } commands = [] names.each_index do |n| cluster = clusters[n % clusters.size] commands << [:create_node, {:name => names[n], :size => 5.0 + rand(20.0), :r => cluster[:r], :g => cluster[:g], :b => cluster[:b], :x => rand(600) - 300, :y => rand(150) - 150 }] end names.each_index do |from| commands << [:add_node_to_index, "nodes_index", "type", "user", "{#{from}"] connected = [] # create clustered relationships members = 20.times.collect{|x| x * 10 + (from % clusters.size)} members.delete(from) rels = 3 rels.times do |x| to = members[x] connected << to commands << [:create_relationship, "follows", "{#{from}", "{#{to}"] unless to == from end # create random relationships rels = 3 rels.times do |x| to = rand(names.size) commands << [:create_relationship, "follows", "{#{from}", "{#{to}"] unless (to == from) || connected.include?(to) end end batch_result = neo.batch *commands end
April 12, 2012
by Max De Marzi
· 15,380 Views
article thumbnail
Bootstrapping CDI in several environments
i feel like writing some posts about cdi (contexts and dependency injection). so this is the first one of a series of x posts ( 0 javax.enterprise cdi-api 1.0 provided an empty beans.xml will do to enable cdi you must have a beans.xml file in your project (under the meta-inf or web-inf). that’s because cdi needs to identify the beans in your classpath (this is called bean discovery) and build its internal metamodel. with the beans.xml file cdi knows it has beans to discover. so, for all the following examples i’ll make it simple and will leave this file completely empty. java ee 6 containers let’s start with the easiest possible environment : java ee 6 containers . why is it the simplest ? well, because you don’t have to do anything : cdi is part of java ee 6 as well as the web profile 1.0 so you don’t need to manually bootstrap it. let’s see how to inject a cdi bean within an ejb 3.1 and a servlet 3.0 . ejb 3.1 since ejb 3.1 you can use the ejbcontainer api to get an in-memory embedded ejb container and you can easily unit test your ejbs. so let’s write an ejb and a test class. first let’s have a look at the code of the ejb. as you can see, with version 3.1 an ejb is just a pojo : no inheritance, no interface, just one @stateless annotation. it gets a reference of the hello bean buy using the @inject annotation and uses it in the saysomething() method. @stateless public class mainejb31 { @inject hello hello; public string saysomething() { return hello.sayhelloworld(); } } you can now package the mainejb31, hello and world classes with the empty beans.xml file into a jar, deploy it to glassfish 3.x , and it will work. but if you don’t want to bother deploying it to glassfish and just unit test it, this is what you need to do : public class mainejbtest { private static ejbcontainer ec; private static context ctx; @beforeclass public static void initcontainer() throws exception { map properties = new hashmap(); properties.put(ejbcontainer.modules, new file("target/classes")); ec = ejbcontainer.createejbcontainer(properties); ctx = ec.getcontext(); } @afterclass public static void closecontainer() throws exception { if (ec != null) ec.close(); } @test public void shoulddisplayhelloworld() throws exception { // looks up the ejb mainejb31 mainejb = (mainejb31) ctx.lookup("java:global/classes/mainejb!org.antoniogoncalves.cdi.helloworld.mainejb"); assertequals("should say hello world !!!", "hello world !!!", mainejb.saysomething()); } } in the code above the method initcontainer() initializes the ejbcontainer. the shoulddisplayhelloworld() looks up the ejb (using the new portable jndi name ), invokes it and makes sure the saysomething() method returns hello world !!!. green test. that was pretty easy too. servlet 3.0 servlet 3.0 is part of java ee 6, so again, there is no needed configuration to bootstrap cdi. let’s use the new @webservlet annotation and write a very simple one that injects a reference of hello and displays an html page with hello world !!!. this is what the servlet looks like : @webservlet(urlpatterns = "/mainservlet") public class mainservlet30 extends httpservlet { @inject hello hello; @override protected void service(httpservletrequest req, httpservletresponse resp) throws servletexception, ioexception { resp.setcontenttype("text/html"); printwriter out = resp.getwriter(); out.println(""); out.println(""); out.println(""); out.println(saysomething()); out.println(""); out.println(""); out.close(); } public string saysomething() { return hello.sayhelloworld(); } } thanks to the @webservlet i don’t need any web.xml (it’s optional in servlet 3.0) to map the mainservlet30 to the /mainservlet url. you can now package the mainservlet30, hello and world classes with the empty beans.xml and no web.xml into a war, deploy it to glassfish 3.x , go to http://localhost:8080/bootstrapping-servlet30-1.0/mainservlet and it will work. unfortunately servlet 3.0 doesn’t have an api for the container (such as ejbcontainer). there is no servletcontainer api that would let you use an embedded servlet container in a standard way and, why not, easily unit test it. application client container not many people know it, but java ee (or even older j2ee versions) comes with an application client container (acc). it’s like an ejb or servlet container but for plain pojos. for example you can develop a swing application (yes, i’m sure that some of you still use swing), run it into the acc and get some extra services given by the container (security, naming, certain annotations…). glassfish v3 has an acc that you can launch in a command line : appclient -jar . so i thought, great, i can use cdi with acc the same way i use it within ejb or servlet container, no need to bootstrap anything, it’s all out of the box. i was wrong . as per the cdi specification (section 12.1), cdi is not required to support application client bean archives. so the glassfish application client container doesn’t support it. i haven’t tried the jboss acc , maybe it works. other containers the beauty of cdi is that it doesn’t require java ee 6 . you can use cdi with simple pojos in a java se environment, as well as some servlet 2.5 containers. of course it’s not as easy to bootstrap because you need a bit of configuration. but it then works fine (not always but). java se 6 ok, so until now there was nothing to do to bootstrap cdi. it is already bundled with the ejb 3.1 and servlet 3.0 containers of java ee 6 (and web profile). so the idea here is to use cdi in a simple java se environment. coming back to our hello and world classes, we need a pojo with an entry point that will bootstrap cdi so we can use injection to get those classes. in standard java se when we say entry point , we think of a public static void main(string[] args) method. well, we need something similar… but different. weld is the reference implementation of cdi. that means it implements the specification, the standard apis (mostly found in javax.inject and javax.enterprise.context packages) but also some proprietary code (in org.jboss.weld package). bootstrapping cdi in java se is not specified so you will need to use specific weld features. you can do that in two different flavors: by observing the containerinitialized event or using the programatic bootstrap api consisting of the weld and weldcontainer classes. the following code uses the containerinitialized event. as you can see, it uses the @observes annotation that i’ll explain in a future post. but the idea is that this class is listening to the event and processes the code once the event is triggered. import org.jboss.weld.environment.se.events.containerinitialized; import javax.enterprise.event.observes; import javax.inject.inject; public class mainjavase6 { @inject hello hello; public void saysomething(@observes containerinitialized event) { system.out.println(hello.sayhelloworld()); } } but who trigers the containerinitialized event ? well, it’s the org.jboss.weld.environment.se.startmain class. i’m using maven so a nice trick is to use the exec-maven-plugin to run the startmain class. download the code , have a look at the pom.xml and give it a try. the other possibility is to programmatically bootstrap the weld container. this can be handy in unit testing. the code below initializes the weld container (with new weld().initialize()) and then looks for the hello class (using weld.instance().select(hello.class).get()). import org.jboss.weld.environment.se.weld; import org.jboss.weld.environment.se.weldcontainer; import org.junit.beforeclass; import org.junit.test; import static junit.framework.assert.assertequals; public class hellotest { @test public void shoulddisplayhelloworld() { weldcontainer weld = new weld().initialize(); hello hello = weld.instance().select(hello.class).get(); assertequals("should say hello world !!!", "hello world !!!", hello.sayhelloworld()); } } execute the test with mvn test and it should be green. as you can see, there is a bit more work using cdi in a java se environment, but it’s not that complicated. tomcat 6.x ok, and what about your legacy servlet 2.5 containers ? the first one that comes in mind is tomcat 6.x ( note that tomcat 7.x will implement servlet 3.0 but is still in beta version at the time of writing this post ). weld provides support for tomcat but you need to configure it a bit to make cdi work. first of all, this is a servlet 2.5, not a 3.0. so the code of the servlet is slightly different from the one seen before (no annotation allowed) and of course, you need your good old web.xml file : public class mainservlet25 extends httpservlet { @inject hello hello; @override protected void service(httpservletrequest req, httpservletresponse resp) throws servletexception, ioexception { resp.setcontenttype("text/html"); printwriter out = resp.getwriter(); out.println(""); out.println(""); out.println(""); out.println(saysomething()); out.println(""); out.println(""); out.close(); } public string saysomething() { return hello.sayhelloworld(); } } because we don’t have a @webservlet annotation in servlet 2.5, we need to declare and map it in the web.xml (using the servlet and servlet-mapping tags). then, you need to explicitly specify the servlet listener to boot weld and control its interaction with requests (org.jboss.weld.environment.servlet.listener). tomcat has a read-only jndi, so weld can’t automatically bind the beanmanager extension spi. to bind the beanmanager into jndi, you should populate meta-inf/context.xml and make the beanmanager available to your deployment by adding it to your web.xml: mainservlet25 org.antoniogoncalves.cdi.bootstrapping.servlet.mainservlet25 mainservlet25 /mainservlet org.jboss.weld.environment.servlet.listener beanmanager javax.enterprise.inject.spi.beanmanager the meta-inf/context.xml file is an optional file which contains a context for a single tomcat web application. this can be used to define certain behaviours for your application, jndi resources and other settings. package all the files (mainservlet25, hello, world, meta-inf/context.xml, beans.xml and web.xml) into a war and deploy it into tomcat 6.x. go to http://localhost:8080/bootstrapping-servlet25-tomcat-1.0/mainservlet and you will see your hello world page. jetty 6.x another famous servlet 2.5 containers is jetty 6.x (at codehaus) and jetty 7.x ( note that jetty 8.x will implement servlet 3.0 but it’s still in experimental stage at the time of writing this post ). if you look at the weld documentation, there is actually support for jetty 6.x and 7.x . the code is the same one as tomcat (because it’s a servlet 2.5 container), but the configuration changes. with jetty you need to add two files under web-inf : jetty-env.xml and jetty-web.xml : beanmanager javax.enterprise.inject.spi.beanmanager org.jboss.weld.resources.managerobjectfactory true package all the files (mainservlet25, hello, world, web-inf/jetty-env.xml, web-inf/jetty-web.xml, beans.xml and web.xml) into a war and deploy it into jetty 6.x. go to http://localhost:8080/bootstrapping-servlet25-jetty6/mainservlet and you will see your hello world page. there was a mistake in the weld documentation so i couldn’t make it work. i started a thread on the weld forum and thanks to dan allen , pete muir and all the weld team, this was fixed and i managed to make it work. simple as posting an email to the forum . thanks for your help guys. spring 3.x here is the tricky part. spring 3.x implements the jsr 330 : dependency injection for java , which means that @inject works out of the box. but i didn’t find a way to integrate cdi with spring 3.x . the weld documentation mentions that because of its extension points, “ integration with third-party frameworks such as spring (…) was envisaged by the designers of cdi “. i did find this blog that simulates cdi features by enabling spring ones. what i didn’t find is a clear statement or roadmap on springsource about supporting cdi or not in future releases. the last trace of this topic is a comment on a long tss flaming thread . at that time (16 december 2009), juergen huller said “ with respect to implementing cdi on top of spring (…) trying to hammer it into the semantic frame of another framework such as cdi would be an exercise that is certainly achievable (…) but ultimately pointless “. but if you have any fresh news about it, let me know. conclusion as i said, this post is not about explaining cdi, i’ll do that in future posts. i just wanted to focus on how to bootstrap it in several environments so you can try by yourself. as you saw, it’s much simpler to use cdi within an ejb 3.1 or servlet 3.0 container in java ee 6. i’ve used glassfish 3.x but it should also work with other java ee 6 or web profile containers such as jboss 6 or resin . when you don’t use java ee 6, there is a bit more work to do. depending on your environment or servlet container you need some configuration to bootstrap weld. by the way, i’ve used weld because it’s the reference implementation, the one bunddled with glassfish and jboss. but you could also use openwebbeans , another cdi implementation. download the code , give it a try, and give me some feedback. from http://agoncal.wordpress.com/2011/01/12/bootstrapping-cdi-in-several-environments/
April 28, 2011
by Antonio Goncalves
· 31,390 Views
article thumbnail
Clustering Tomcat Servers with High Availability and Disaster Fallback
There has been a lot of buzz lately on high-availability and clustering. Most developers don't care and why should they? These features should be transparent to the application architecture and not something of concern to the developers of that application. But knowledge never hurts, so I emerged myself into the world of load balancing, heartbeats and virtual IP addresses. And you know what? Next time we need a infrastructure like this, I can at least sit down with the guys from the infrastructure department and at least know what the hell they are talking about. So what exactly is a high-availability clustered infrastructure (HACI, as I'll call it from now on) ? In essence, it should be a zero-downtime infrastructure (or at least perceived as one by the end user, which means never ever returning a default browser 404 page), capable of horizontal scaling when the need for it arises and without a single point of failure. It's the SLA writer's dream. A basic HACI setup looks like this: The users enters through a virtual IP address, assigned to one of the two load balancers. Only one of the load-balancers is active (the active master, LB1), the other one is there in the event LB1 fails ((LB2, a passive slave). The two load balancers are redundant, ie. having the exact same configuration. The load balancers redirect all traffic to the real servers. This can be done through round-robin assignment or through other means like sticky sessions, where the same user is redirected to the same server each and every time within a session. Servers can be added at any moment and configured on the load balancers. Ideally, the load balancer configuration is aware of the hardware specification and balances the load accordingly, but that's beyond the scope of this article (it involves adding weights). If all servers balanced by the load balancer fail, a backup server should be used to redirect all traffic coming from the load balancer. This can be a very lightweight server, which purpose is only to provide a sensible error page to the user (something like 'Sorry, we are performing maintenance'). Again, perception and immediate feedback to the user is key. You don't want to show the user a plain 404 page. Off course, if the backup server goes down too, you're in trouble (off course, by that time, warning bells should have gone off on every level in the hierarchy). So how to achieve this with as little effort as possible? If you want to try this out, I suggest you start by installing a virtual machine like VirtualBox or VMWare. This way you can try out the configuration yourself. In this example, I'll be load-balancing 3 Tomcat servers using sticky sessions using 2 load balancers in active-passive mode. I'm assuming all 3 Tomcat servers share the same hardware configuration, so they are all able to handle the same amount of traffic each. I'm also throwing in a backup server, in case all 3 Tomcat servers go down (serving a custom 503 page kindly informing the user of a catastrophic failure, instead of dropping the standard 404 bomb). You want to start off by assigning IP addresses to the servers. This will make your life a bit easier. We'll need 7 addresses: 3 for the tomcat server, 1 for the backup server, 2 for the loadbalancers and 1 virtual IP address to be shared between the load balancers (and which will be the entry point for your users). So our assignment will be: Virtual IP 10.0.5.99 www.haci.local LB1 10.0.5.100 lb1.haci.local #MASTER LB2 10.0.5.101 lb2.haci.local #SLAVE WEB1 10.0.5.102 web1.haci.local WEB2 10.0.5.103 web2.haci.local WEB3 10.0.5.104 web3.haci.local BACKUP 10.0.5.105 backup.haci.local Setting up the web servers is easy. You just install Tomcat on each server and create a simple JSP file to be served to users (make a small change, like the background color, on each server to distinguish the servers). I won't be covering session replication between the Tomcat servers, as it'll take me too far. If you want, you can configure the appropriate session replication and storage (using multicast or JDBC for example). The backup server I'm using is a basic LAMP server that returns a simple 503 page on every request it gets. The 503 error code is important, because it reflects the current state of the system: currently unavailable. For the loadbalancers I'll be using 2 applications: HAProxy and keepalived. HAProxy is going to handle load balancing, while keepalived will handle the failover between the two load balancers. First, we're going to configure HAProxy for both LB1 and LB2. Installing HAProxy is quite easy on an ubuntu system. Just do a sudo apt-get install haproxy and you're off. After the install, backup the current HAProxy config and start editing away. cp /etc/haproxy.cfg /etc/haproxy.cfg_orig cat /dev/null > /etc/haproxy.cfg vi /etc/haproxy.cfg The content of the config to reflect our setup should become something like this (same config on LB1 and LB2): global log 127.0.0.1 local0 log 127.0.0.1 local1 notice #log loghost local0 info maxconn 4096 #debug #quiet user haproxy group haproxy defaults log global mode http option httplog option dontlognull retries 3 redispatch maxconn 2000 contimeout 5000 clitimeout 50000 srvtimeout 50000 frontend http-in bind 10.0.5.99:80 default_backend servers backend servers mode http stats enable stats auth someuser:somepassword balance roundrobin cookie JSESSIONID prefix option httpclose option forwardfor option httpchk HEAD /check.txt HTTP/1.0 server web1 10.0.5.102:80 cookie haci_web1 check server web2 10.0.5.103:80 cookie haci_web2 check server web3 10.0.5.104:80 cookie haci_web3 check server webbackup 10.0.5.105:80 backup After this, enable HAProxy on both LB1 and LB2 by editing /etc/defaults/haproxy # Set ENABLED to 1 if you want the init script to start haproxy. ENABLED=1 # Add extra flags here. #EXTRAOPTS="-de -m 16" So far for the HAProxy configuration. We can't start it up yet, as LB1 and LB2 aren't listening yet on the virtual IP address. Next we'll configure the failover of the loadbalancers using keepalived. Installing it on Ubuntu is as easy as it was for HAProxy: sudo apt-get install keepalived. But its configuration is slightly different on both load balancers. First, we need to configure the both servers to be able to listen to the shared IP address. Add the following line to /etc/sysctl.conf: net.ipv4.ip_nonlocal_bind=1 And run sysctl -p Now, we configure keepalived so that LB1 is configured as the main load balancer and binds to the shared IP address, while LB2 is on standby, ready to take over whenever LB1 goes down. The configuration for LB1 looks like this (edit /etc/keepalived/keepalived.conf): vrrp_script chk_haproxy { # Requires keepalived-1.1.13 script "killall -0 haproxy" # cheaper than pidof interval 2 # check every 2 seconds weight 2 # add 2 points of prio if OK } vrrp_instance VI_1 { interface eth0 state MASTER virtual_router_id 51 priority 101 # 101 on master, 100 on backup virtual_ipaddress { 10.0.5.99 } track_script { chk_haproxy } } Start up keepalived and check whether it is listening to the virtual IP address. /etc/init.d/keepalived start ip addr sh eth0 It should return something like this, indicating it is listening to the virtual IP address 2: eth0: mtu 1500 qdisc pfifo_fast qlen 1000 link/ether 00:0c:29:a5:5b:93 brd ff:ff:ff:ff:ff:ff inet 10.0.5.100/24 brd 10.0.5.255 scope global eth0 inet 10.0.5.99/32 scope global eth0 inet6 fe80::20c:29ff:fea5:5b93/64 scope link valid_lft forever preferred_lft forever Next, we configure LB2. The configuration is almost the same, exception for the priority. vrrp_script chk_haproxy { # Requires keepalived-1.1.13 script "killall -0 haproxy" # cheaper than pidof interval 2 # check every 2 seconds weight 2 # add 2 points of prio if OK } vrrp_instance VI_1 { interface eth0 state MASTER virtual_router_id 51 priority 100 # 101 on master, 100 on backup virtual_ipaddress { 10.0.5.99 } track_script { chk_haproxy } } Start up keepalived and check the network interface. /etc/init.d/keepalived start ip addr sh eth0 It should return something like this, indicating it is not listening to the virtual IP address 2: eth0: mtu 1500 qdisc pfifo_fast qlen 1000 link/ether 00:0c:29:a5:5b:93 brd ff:ff:ff:ff:ff:ff inet 10.0.5.101/24 brd 10.0.5.255 scope global eth0 inet6 fe80::20c:29ff:fea5:5b93/64 scope link valid_lft forever preferred_lft forever Now, start up HAProxy on both LB1 and LB2. /etc/init.d/haproxy start Now you can issue requests to 10.0.5.99 (or www.haci.local), which will go to LB1, which in turn will load-balance the request to either WEB1, WEB2 and WEB3. You can test the load balancing by turning off WEB1 (or the server you're currently on). You can also the backup server by turning all main webservers (WEB1, WEB2 and WEB3). And you can test the loadbalancer failover by turning off LB1. At that point LB2 will kick in and act as the master, loadbalancing all requests. When you turn LB1 back on, it'll take over the master role once again. HAProxy allows you to add extra servers very easily, reloading the configuration without breaking existing sessions. See the HAProxy documentation for more info or on ServerFault. (http://serverfault.com/questions/165883/is-there-a-way-to-add-more-backend-server-to-haproxy-without-restarting-haproxy). Cheap and effective. While most enterprise shops have hardware load balancers, which also have these possibilities and more, if you're on a tight budget or need to simulate a HACI environment for development purposes (a lesson here: always simulate your production environment when you're testing during development), this might be the sane option. To finish, I'll quickly explain how to set up the backup server (a simple LAMP server). Create a vhost configuration on the apache for www.haci.local or any other domain pointing to the virtual IP address and set up mod_rewrite for it: RewriteEngine On RewriteCond %{REQUEST_URI} !\.(css|gif|ico|jpg|js|png|swf|txt)$ [NC] RewriteConf %{REQUEST_URI} !/503.php RewriteRule .* /503.php [L] Then create the 503.php file and add this to the top of it: Sorry, our servers are currently undergoing maintenance. Please check back with us in a while. Thank you for your patience. You can decorate the 503.php file any way you like. You can even use CSS, JavaScript and image files in the php file. Now, back to my IDE. I'm getting withdrawal symptoms.
March 11, 2011
by Lieven Doclo
· 57,985 Views
article thumbnail
Apache Solr: Get Started, Get Excited!
we've all seen them on various websites. crappy search utilities. they are a constant reminder that search is not something you should take lightly when building a website or application. search is not just google's game anymore. when a java library called lucene was introduced into the apache ecosystem, and then solr was built on top of that, open source developers began to wield some serious power when it came to customizing search features. in this article you'll be introduced to apache solr and a wealth of applications that have been built with it. the content is divided as follows: introduction setup solr applications summary 1. introduction apache solr is an open source search server. it is based on the full text search engine called apache lucene . so basically solr is an http wrapper around an inverted index provided by lucene. an inverted index could be seen as a list of words where each word-entry links to the documents it is contained in. that way getting all documents for the search query "dzone" is a simple 'get' operation. one advantage of solr in enterprise projects is that you don't need any java code, although java itself has to be installed. if you are unsure when to use solr and when lucene, these answers could help. if you need to build your solr index from websites, you should take a look into the open source crawler called apache nutch before creating your own solution. to be convinced that solr is actually used in a lot of enterprise projects, take a look at this amazing list of public projects powered by solr . if you encounter problems then the mailing list or stackoverflow will help you. to make the introduction complete i would like to mention my personal link list and the resources page which lists books, articles and more interesting material. 2. setup solr 2.1. installation as the very first step, you should follow the official tutorial which covers the basic aspects of any search use case: indexing - get the data of any form into solr. examples: json, xml, csv and sql-database. this step creates the inverted index - i.e. it links every term to its documents. querying - ask solr to return the most relevant documents for the users' query to follow the official tutorial you'll have to download java and the latest version of solr here . more information about installation is available at the official description . next you'll have to decide which web server you choose for solr. in the official tutorial, jetty is used, but you can also use tomcat. when you choose tomcat be sure you are setting the utf-8 encoding in the server.xml . i would also research the different versions of solr, which can be quite confusing for beginners: the current stable version is 1.4.1. use this if you need a stable search and don't need one of the latest features. the next stable version of solr will be 3.x the versions 1.5 and 2.x will be skipped in order to reach the same versioning as lucene. version 4.x is the latest development branch. solr 4.x handles advanced features like language detection via tika, spatial search , results grouping (group by field / collapsing), a new "user-facing" query parser ( edismax handler ), near real time indexing, huge fuzzy search performance improvements, sql join-a like feature and more. 2.2. indexing if you've followed the official tutorial you have pushed some xml files into the solr index. this process is called indexing or feeding. there are a lot more possibilities to get data into solr: using the data import handler (dih) is a really powerful language neutral option. it allows you to read from a sql database, from csv, xml files, rss feeds, emails, etc. without any java knowledge. dih handles full-imports and delta-imports. this is necessary when only a small amount of documents were added, updated or deleted. the http interface is used from the post tool, which you have already used in the official tutorial to index xml files. client libraries in different languages also exist. (e.g. for java (solrj) or python ). before indexing you'll have to decide which data fields should be searchable and how the fields should get indexed. for example, when you have a field with html in it, then you can strip irrelevant characters , tokenize the text into 'searchable terms', lower case the terms and finally stem the terms . in contrast, if you would have a field with text in it that should not be interpreted (e.g. urls) you shouldn't tokenize it and use the default field type string. please refer to the official documentation about field and field type definitions in the schema.xml file. when designing an index keep in mind the advice from mauricio : "the document is what you will search for. " for example, if you have tweets and you want to search for similar users, you'll need to setup a user index - created from the tweets. then every document is a user. if you want to search for tweets, then setup a tweet index; then every document is a tweet. of course, you can setup both indices with the multi index options of solr. please also note that there is a project called solr cell which lets you extract the relevant information out of several different document types with the help of tika. 2.3. querying for debugging it is very convenient to use the http interface with a browser to query solr and get back xml. use firefox and the xml will be displayed nicely: you can also use the velocity contribution , a cross-browser tool, which will be covered in more detail in the section about 'search application prototyping' . to query the index you can use the dismax handler or standard query handler . you can filter and sort the results: q=superman&fq=type:book&sort=price asc you can also do a lot more ; one other concept is boosting. in solr you can boost while indexing and while querying. to prefer the terms in the title write: q=title:superman^2 subject:superman when using the dismax request handler write: q=superman&qf=title^2 subject check out all the various query options like fuzzy search , spellcheck query input , facets , collapsing and suffix query support . 3. applications now i will list some interesting use cases for solr - in no particular order. to see how powerful and flexible this open source search server is. 3.1. drupal integration the drupal integration can be seen as generic use case to integrate solr into php projects. for the php integration you have the choice to either use the http interface for querying and retrieving xml or json. or to use the php solr client library . here is a screenshot of a typical faceted search in drupal : for more information about faceted search look into the wiki of solr . more php projects which integrates solr: open source typo3- solr module magento enterprise - solr module . the open source integration is out dated. oxid - solr module . no open source integration available. 3.2. hathi trust the hathi trust project is a nice example that proves solr's ability to search big digital libraries. to quote directly from the article : "... the index for our one million book index is over 200 gigabytes ... so we expect to end up with a two terabyte index for 10 million books" other examples for libraries: vufind - aims to replace opac internet archive national library of australia 3.3. auto suggestions mainly, there are two approaches to implement auto-suggestions (also called auto-completion) with solr: via facets or via ngramfilterfactory . to push it to the extreme you can use a lucene index entirely in ram. this approach is used in a large music shop in germany. live examples for auto suggestions: kaufda.de 3.4. spatial search applications when mentioning spatial search, people have geographical based applications in mind. with solr, this ordinary use case is attainable . some examples for this are : city search - city guides yellow pages kaufda.de spatial search can be useful in many different ways : for bioinformatics, fingerprints search, facial search, etc. (getting the fingerprint of a document is important for duplicate detection). the simplest approach is implemented in jetwick to reduce duplicate tweets, but this yields a performance of o(n) where n is the number of queried terms. this is okay for 10 or less terms, but it can get even better at o(1)! the idea is to use a special hash set to get all similar documents. this technique is called local sensitive hashing . read this nice paper about 'near similarity search and plagiarism analysis' for more information. 3.5. duckduckgo duckduckgo is made with open source and its "zero click" information is done with the help of solr using the dismax query handler: the index for that feature contains 18m documents and has a size of ~12gb. for this case had to tune solr: " i have two requirements that differ a bit from most sites with respect to solr: i generally only show one result, with sometimes a couple below if you click on them. therefore, it was really important that the first result is what people expected. false positives are really bad in 0-click, so i needed a way to not show anything if a match wasn't too relevant. i got around these by a) tweaking dismax and schema and b) adding my own relevancy filter on top that would re-order and not show anything in various situations. " all the rest is done with tuned open source products. to quote gabriel again: "the main results are a hybrid of a lot of things, including external apis, e.g. bing, wolframalpha, yahoo, my own indexes and negative indexes (spam removal), etc. there are a bunch of different types of data i'm working with. " check out the other cool features such as privacy or bang searches . 3.6. clustering support with carrot2 carrot2 is one of the "contributed plugins" of solr. with carrot2 you can support clustering : " clustering is the assignment of a set of observations into subsets (called clusters) so that observations in the same cluster are similar in some sense. " see some research papers regarding clustering here . here is one visual example when applying clustering on the search "pannous" - our company : 3.7. near real time search solr isn't real time yet, but you can tune solr to the point where it becomes near real time, which means that the time ('real time latency') that a document takes to be searchable after it gets indexed is less than 60 seconds even if you need to update frequently. to make this work, you can setup two indices. one write-only index "w" for the indexer and one read-only index "r" for your application. index r refers to the same data directory of w, which has to be defined in the solrconfig.xml of r via: /pathto/indexw/data/ to make sure your users and the r index see the indexed documents of w, you have to trigger an empty commit every 60 seconds: wget -q http://localhost:port/solr/update?stream.body=%3ccommit/%3e -o /dev/null everytime such a commit is triggered a new searcher without any cache entries is created. this can harm performance for visitors hitting the empty cache directly after this commit, but you can fill the cache with static searches with the help of the newsearcher entry in your solrconfig.xml. additionally, the autowarmcount property needs to be tuned, which fills the cache with a newsearcher from old entries. also, take a look at the article 'scaling lucene and solr' , where experts explain in detail what to do with large indices (=> 'sharding') and what to do for high query volume (=> 'replicating'). 3.8. loggly = full text search in logs feeding log files into solr and searching them at near real-time shows that solr can handle massive amounts of data and queries the data quickly. i've setup a simple project where i'm doing similar things , but loggly has done a lot more to make the same task real-time and distributed. you'll need to keep the write index as small as possible otherwise commit time will increase too great. loggly creates a new solr index every 5 minutes and includes this when searching using the distributed capabilities of solr ! they are merging the cores to keep the number of indices small, but this is not as simple as it sounds. watch this video to get some details about their work. 3.9. solandra = solr + cassandra solandra combines solr and the distributed database cassandra , which was created by facebook for its inbox search and then open sourced. at the moment solandra is not intended for production use. there are still some bugs and the distributed limitations of solr apply to solandra too. tthe developers are working very hard to make solandra better. jetwick can now run via solandra just by changing the solrconfig.xml. solandra also has the advantages of being real-time (no optimize, no commit!) and distributed without any major setup involved. the same is true for solr cloud. 3.10. category browsing via facets solr provides facets , which make it easy to show the user some useful filter options like those shown in the "drupal integration" example. like i described earlier , it is even possible to browse through a deep category tree. the main advantage here is that the categories depend on the query. this way the user can further filter the search results with this category tree provided by you. here is an example where this feature is implemented for one of the biggest second hand stores in germany. a click on 'schauspieler' shows its sub-items: other shops: game-change 3.11. jetwick - open twitter search you may have noticed that twitter is using lucene under the hood . twitter has a very extreme use case: over 1,000 tweets per second, over 12,000 queries per second, but the real-time latency is under 10 seconds! however, the relevancy at that volume is often not that good in my opinion. twitter search often contains a lot of duplicates and noise. reducing this was one reason i created jetwick in my spare time. i'm mentioning jetwick here because it makes extreme use of facets which provides all the filters to the user. facets are used for the rss-alike feature (saved searches), the various filters like language and retweet-count on the left, and to get trending terms and links on the right: to make jetwick more scalable i'll need to decide which of the following distribution options to choose: use solr cloud with zookeeper use solandra move from solr to elasticsearch which is also based on apache lucene other examples with a lot of facets: cnet reviews - product reviews. electronics reviews, computer reviews & more. shopper.com - compare prices and shop for computers, cell phones, digital cameras & more. zappos - shoes and clothing. manta.com - find companies. connect with customers. 3.12. plaxo - online address management plaxo.com , which is now owned by comcast, hosts web addresses for more than 40 million people and offers smart search through the addresses - with the help of solr. plaxo is trying to get the latest 'social' information of your contacts through blog posts, tweets, etc. plaxo also tries to reduce duplicates . 3.13. replace fast or google search several users report that they have migrated from a commercial search solution like fast or google search appliance (gsa) to solr (or lucene). the reasons for that migration are different: fast drops linux support and google can make integration problems. the main reason for me is that solr isn't a black box —you can tweak the source code, maintain old versions and fix your bugs more quickly! 3.14. search application prototyping with the help of the already integrated velocity plugin and the data import handler it is possible to create an application prototype for your search within a few hours. the next version of solr makes the use of velocity easier. the gui is available via http://localhost:port/solr/browse if you are a ruby on rails user, you can take a look into flare. to learn more about search application prototyping, check out this video introduction and take a look at these slides. 3.15. solr as a whitelist imagine you are the new google and you have a lot of different types of data to display e.g. 'news', 'video', 'music', 'maps', 'shopping' and much more. some of those types can only be retrieved from some legacy systems and you only want to show the most appropriated types based on your business logic . e.g. a query which contains 'new york' should result in the selection of results from 'maps', but 'new yorker' should prefer results from the 'shopping' type. with solr you can set up such a whitelist-index that will help to decide which type is more important for the search query. for example if you get more or more relevant results for the 'shopping' type then you should prefer results from this type. without the whitelist-index - i.e. having all data in separate indices or systems, would make it nearly impossible to compare the relevancy. the whitelist-index can be used as illustrated in the next steps. 1. query the whitelist-index, 2. decide which data types to display, 3. query the sub-systems and 4. display results from the selected types only. 3.16. future solr is also useful for scientific applications, such as a dna search systems. i believe solr can also be used for completely different alphabets so that you can query nucleotide sequences - instead of words - to get the matching genes and determine which organism the sequence occurs in, something similar to blast . another idea you could harness would be to build a very personalized search. every user can drag and drop their websites of choice and query them afterwards. for example, often i only need stackoverflow, some wikis and some mailing lists with the expected results, but normal web search engines (google, bing, etc.) give me results that are too cluttered. my final idea for a future solr-based app could be a lucene/solr implementation of desktop search. solr's facets would be especially handy to quickly filter different sources (files, folders, bookmarks, man pages, ...). it would be a great way to wade through those extra messy desktops. 4. summary the next time you think about a problem, think about solr! even if you don't know java and even if you know nothing about search: solr should be in your toolbox. solr doesn't only offer professional full text search, it could also add valuable features to your application. some of them i covered in this article, but i'm sure there are still some exciting possibilities waiting for you!
January 25, 2011
by Peter Karussell
· 147,278 Views
article thumbnail
JMS Clustering by Example
It's amazing how the JBoss Team put together an easy way to do JMS Clustering, out of the box!!. I'll start with an easy example, creating a Queue named "MyClusteredQueue". In this example I'm using JBoss AS 5.1. and two computers connected on the same network, with these IP's: - Computer A: 192.168.0.143 - Computer B: 192.168.0.210 So, here are the steps: 1) Install the JBoss on both computers. We are going to use the "all" configuration for both computers. 2) We create our Queue on both servers. Go to $JBOSS_HOME/server/all/deploy/messaging/ and edit the destinations-service.xml file. Add the MyClusteredQueue before the last server tag. It looks like this: jboss.messaging:service=ServerPeer jboss.messaging:service=PostOffice true This is how you add a Queue to the JBoss, and the people how are familiar with this, the only new thing is to add the attribute "Clustered". This step must be set on both computers. At the end of the article you can find the files. 3) Write the MDB to consume the messages, and deploy it on the two computers. (I'm using an EJB 3 - MDB style). import java.net.InetAddress; import javax.ejb.ActivationConfigProperty; import javax.ejb.MessageDriven; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import org.apache.log4j.Logger; /** * @author felipeg * */ @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"), @ActivationConfigProperty(propertyName="destination", propertyValue="queue/MyClusteredQueue") }) public class JMSClusterClientHandler implements MessageListener { Logger log = Logger.getLogger(JMSClusterClientHandler.class); @Override public void onMessage(Message message) { try{ if (message instanceof ObjectMessage) { InetAddress addr = InetAddress.getLocalHost(); log.info("########## Processing Host: " + addr.getHostName() + " ##########" ); ObjectMessage objMessage = (ObjectMessage) message; Object obj = objMessage.getObject(); log.info("Object received:" + obj.toString()); } } catch (Exception e) { e.printStackTrace(); } } } 4) Start the jboss with the following options: Computer A: $ cd $JBOSS_HOME/bin $ ./run.sh -c all -b 192.168.0.143 -Djboss.messaging.ServerPeerID=1 Computer B: $ cd $JBOSS_HOME/bin $ ./run.sh -c all -b 192.168.0.210 -Djboss.messaging.ServerPeerID=2 It is necesary to give an ID to each server and this is accomplished with this directive: -Djboss.messaging.ServerPeerID When you start the jboss on computer A, you should see the logs (server.log) telling you that there is one node ready and listening, and once you start the jboss on computer B, on the log will appear the two nodes, the two IP's ready to consume messages. 5) Now it's time to send a Message to the Queue. To accomplish this it's necessary to change the connection factory to "ClusteredConnectionFactory" (JMSDispatcher.java - See the code below). Also on the jndi.properties (if you are using the default InitialContext) file it's necessary to add the two computers ip's separated by comma to the java.naming.provider.url property. (In my case a create a Properties variable and I set all the necessary properties, JMSDispatcher.java - see the code below). java.naming.provider.url=192.168.0.143:1099,192.168.0.210:1099 The client that I wrote is a web application, that consist in one index.jsp page, which contains a form that prompts you for the name of the queue, the type of messaging (Queue or Topic), the server ip and port, how many times it will send the message and the actual message to be sent; also the web application has a Servlet (JMSClusteredClient.java - see code below) that receives the postback and helper class (JMSDispatcher.java - see code below) that sends the message to the jboss servers. You can to deploy it in any computer. In my case I deployed it on the Computer A. And you can access it through this URL: http://192.168.0.143:8080/JMSWeb/ (just modify the IP where the client war was deployed). If you notice (on the index.jsp - code below) I've already put some default values that reflects the name of the Queue, and the IP's of my two computers. Now, If you increment the number of times that the message will be sent (maybe a 10) and fill out the message box, and click "Send" you should see on the two servers some of the messages being consumed by the MDB. Here are the Files to create the client: index.jsp JMS Clustered - Test Client Server: QueueTopic Times:Message: Servlet: JMSClusteredClient.java public class JMSClusteredClient extends HttpServlet { private static final long serialVersionUID = 1L; /** * @see HttpServlet#service(HttpServletRequest request, HttpServletResponse response) */ protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { PrintWriter out = response.getWriter(); String topicqueue = request.getParameter("topicqueue"); String message = request.getParameter("message"); String server = request.getParameter("server"); String messageType = request.getParameter("messageType"); String times = request.getParameter("times"); int intTimes = Integer.parseInt(times); JMSDispatcher dispatcher = new JMSDispatcher(); dispatcher.setTopicQueueName(topicqueue); dispatcher.setServer(server); dispatcher.setMessageType(messageType); try { for(int count =1; count <= intTimes;count++){ dispatcher.sendMessage( count + " of " + times + " " + message); } out.println("Message [" + message + "] sent successfully to [" + topic + "] to the [" + server + "] server " + times + " times."); } catch (JMSException e) { e.printStackTrace(); out.println("Error:" + e.getMessage()); } catch (NamingException e) { out.println("Error:" + e.getMessage()); e.printStackTrace(); } finally{ out.close(); } } } A utility to send the messages: JMSDispatcher.java public class JMSDispatcher { /** * */ private static final long serialVersionUID = 7105145023422143880L; private static Logger log = Logger.getLogger(JMSDispatcher.class); private final String CONNECTION_FACTORY_CLUSTERED = "ClusteredConnectionFactory"; private final String CONNECTION_FACTORY = "ConnectionFactory"; private final String TOPIC = "TOPIC"; private final String QUEUE = "QUEUE"; private String topicQueueName; private String server; private String messageType; public void setTopicQueueName(String value){ this.topicQueueName = value; } public void setServer(String value){ this.server = value; } public void setMessageType(String value){ this.messageType = value; } public void sendMessage(Object objectMessage) throws JMSException, NamingException{ log.debug("##### Setting up a Queue/Topic Message: #####"); if (TOPIC.equals(messageType)){ sendTopicMessage(objectMessage); } else if (QUEUE.equals(messageType)){ sendQueueMessage(objectMessage); } log.debug("##### Publishing Message: Done #####"); } private void sendQueueMessage(Object objectMessage) throws JMSException, NamingException{ try{ InitialContext initialContext = getInitialContext(); QueueConnectionFactory qcf = (QueueConnectionFactory) initialContext.lookup(CONNECTION_FACTORY_CLUSTERED); QueueConnection queueConn = qcf.createQueueConnection(); Queue queue = (Queue) initialContext.lookup(topicQueueName); QueueSession queueSession = queueConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); queueConn.start(); QueueSender send = queueSession.createSender(queue); ObjectMessage om = queueSession.createObjectMessage((Serializable)objectMessage); setMessageProperties(om); log.debug("##### Publishing Message to a Queue: " + queueName + "#####"); send.send(om); send.close(); queueConn.stop(); queueSession.close(); queueConn.close(); }catch(MessageFormatException ex){ log.error("##### The MESSAGE is not Serializable ####"); throw ex; }catch(MessageNotWriteableException ex){ log.error("##### The MESSAGE is not Readable ####"); throw ex; }catch(JMSException ex){ log.error("##### JMS provider fails to set the object due to some internal error. ####"); throw ex; } } private void sendTopicMessage(Object objectMessage) throws JMSException, NamingException{ try{ InitialContext initialContext = getInitialContext(); TopicConnectionFactory tcf = (TopicConnectionFactory)initialContext.lookup(CONNECTION_FACTORY_CLUSTERED); TopicConnection topicConn = tcf.createTopicConnection(); Topic topic = (Topic) initialContext.lookup(topicQueueName); TopicSession topicSession = topicConn.createTopicSession(false,TopicSession.AUTO_ACKNOWLEDGE); topicConn.start(); TopicPublisher send = topicSession.createPublisher(topic); ObjectMessage om = topicSession.createObjectMessage(); om.setObject((Serializable)objectMessage); setMessageProperties(om); log.debug("##### Publishing Message to a Topic: " + topicName + "#####"); send.publish(om); send.close(); topicConn.stop(); topicSession.close(); topicConn.close(); }catch(MessageFormatException ex){ log.error("##### The MESSAGE is not Serializable ####"); throw ex; }catch(MessageNotWriteableException ex){ log.error("##### The MESSAGE is not Readable ####"); throw ex; }catch(JMSException ex){ log.error("##### JMS provider fails to set the object due to some internal error. ####"); throw ex; } } private InitialContext getInitialContext() throws NamingException{ Properties jboss = new Properties(); jboss.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory"); jboss.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces"); jboss.put("java.naming.provider.url", server); return new InitialContext(jboss); } } And the web.xml JMSWeb index.jsp JMSClusteredClient JMSClusteredClient com.blogspot.felipeg48.jms.web.JMSClusteredClient JMSClusteredClient /JMSClusteredClient Happy Clustering!!
May 26, 2010
by Felipe Gutierrez
· 16,728 Views
article thumbnail
Running Hazelcast on a 100 Node Amazon EC2 Cluster
The purpose of this article is to give you the details of our 100 node cluster demo. This demo is recorded and you can watch the 5 minute screencast Hazelcast is an open source clustering and highly scalable data distribution platform for Java. JVMs that are running Hazelcast will dynamically cluster and allow you to easily share and partition your application data across the cluster. Hazelcast is a peer-to-peer solution (there is no master node, every node is a peer) so there is no single point of failure. Communication among cluster members is always TCP/IP with Java NIO beauty. The default configuration comes with 1 backup so if a node fails, no data will be lost (you can specify the backup count). It is as simple as using java.util.{Map, Queue, Set, List}. Just add the hazelcast.jar into your classpath and start coding. When you download the Hazelcast, you will find a test.sh under bin directory. The test.sh runs an application which randomly makes 40% get, 40% put and 20% remove on a distributed map. In this demo the same test application will be used to see how it performs on 100 node cluster. Amazon EC2 and S3 An easy to use and scalable cloud environment was needed for demo so we decided to use Amazon EC2 for server instances (nodes) and S3 service to store demo application zip and configuration files. With its newly announced Java SDK, it is very simple to start/stop server instances and upload files to S3 programatically. Hazelcast AMI & Launcher The challenge here is that we are running an application on 100 nodes and dealing with each and every server in the cluster is a huge task. We don't want to ssh into every server and manually start the application. This part is automated by creating a special server image (AMI). The AMI contains Java Runtime and a launcher application we developed, which will download the demo application from Amazon S3, unzip it, and run the hazelcast/bin/test.sh in it. The Launcher is actually so generic that it can run any application; it doesn't care/know what test.sh contains. Deployer Deployment of the demo application is also automated so that we don't need to login into AWS Management Console and manually start instances. Deployer instantiates any number of Amazon EC2 servers with any AMI and also uploads the demo application zip file to S3. So the idea here is that, the Deployer will store the application into S3 and launch 100 EC2 instances with our image. The Launcher on each instance will download the application from S3 and run it. Demo Details. The smallest EC2 instances (m1.small) are used to run the demo. These are the virtual instances with CPU about 1.0 GHz. Also keep in mind that EC2 platform suffers from considerable amount of network latency. That's why we increased the thread count to 250 in our application. The following steps performed during the demo Download hazelcast-1.8.3.zip from www.hazelcast.com. Unzip the file and move the monitoring war file into tomcat6/webapps directory. Edit the test.sh under the bin directory: Add -Xmx1G -Xms1G Add -Dhazelcast.initial.wait.seconds=100 to make the cluster evenly partition on start so that migration can be avoided for better performance. Add t250 as an argument to the application to set thread count to 250. Remember the latency issue. Run the Deployer from IDE. Check from EC2 Management Console if 100 servers started. Start tomcat. Copy the public DNS name of one of the servers to connect to from monitoring tool. Go to http://localhost:8080/hazelcast-monitor-1.8.3/ (Hazelcast Monitoring Tool). Paste the address and connect to the cluster. Enjoy! Results You should always look for programatic ways of launching applications on the cloud. With these tools we were able to deploy and run the demo application on 100 servers in minutes. The entire Hazelcast cluster was making over 400,000 operations per second on the smallest EC2 instances. In our next demo we will experiment Hazelcast on large data set and even bigger cluster. Watch the screencast
April 16, 2010
by Fuad Malikov
· 62,669 Views · 1 Like
article thumbnail
Hibernate Performance Tuning
Hibernate is a powerful, high performance object/relational persistence and query service. Hibernate lets you develop persistent classes following object-oriented idiom - including association, inheritance, polymorphism, composition, and collections. Hibernate allows you to express queries in its own portable SQL extension (HQL), as well as in native SQL, or with an object-oriented Criteria and Example API. Quintessential to using any ORM framework like hibernate is to know how to leverage the various performance tuning methods supported by the framework. In this volume Wings Jiang discusses three performance tuning strategies for hibernate: SQL Optimization Session Management Data Caching SQL Optimization When using Hibernate in your application, you already have been coding HQL (Hibernate Query Language) somewhere. For example, “from User user where user.name = ‘John’”. If issuing your SQL statement like this, Hibernate cannot use the SQL cache implemented by database because name of the user, in most scenarios, is extremely distinct. On the contrary, while using placeholder to achieve this, like “from User user where user.name =?” will be cached by the Database to fulfill the performance improvement. You can also set some Hibernate properties to improve performance, such as setting the number of records retrieved while fetching records via configuring property hibernate.jdbc.fetch_size, setting the batch size when committing the batch processing via configuring property hibernate.jdbc.batch_size and switching off the SQL output via setting property hibernate.show_sql to false in product environments. In addition, the performance tuning of your target Database is also significant, like SQL clauses tuning, reasonable indexes, delicate table structures, data partitions etc. Session Management Undoubtedly, Session is the pith of Hibernate. It manages the Database related attributes, such as JDBC connections, data entities’ states. Managing the Session efficiently is the key to getting high performance in enterprise applications. One of the many commonly used and equally elegant approaches to session management in hibernate is to use ThreadLocal. Threadlocal will create a local copy of session for every thread. Thus synchronization problems are averted, when objects are put in the Threadlocal, . To understand how ThreadLocal variables are used in Java, refer to Sun Java Documentation at http://java.sun.com/j2se/1.5.0/docs/api/java/lang/ThreadLocal.html Data Caching Before accomplishing any data caching, it is essential to set the property hibernate.cache.user_query_cache = true. There are three kinds of commonly used Caching Strategies in Hibernate: Using cache based on Session level (aka Transaction layer level cache). This is also called first-level cache. Using cache based on SessionFactory level (Application layer level cache). This is also called second-level cache. Using cluster cache which is employed in distributed application (in different JVMs). In fact, some techniques, like loading data by id, lazy initialization which betokens loading appropriate data in proper time rather than obtaining a titanic number of useless records, which are fairly useless in the subsequent operations are consummated via data caching. First Level Cache (aka Transaction layer level cache) Fetching an object from database always has a cost associated with it. This can be offset by storing the entities in hibernate session. Next time the entities are required, they are fetched from the session, rather than fetching from the database. To clear an object from the session use: session.evict(object). To clear all the objects from the session use session.clear(). Second Level Cache (aka Application layer level cache) In this approach, if an object is not found in session, it is searched for in the session factory before querying the database for the object. If an object is indeed fetched from database, the selected data should be put in session cache. This would improve the performance when the object is required next time. To remove an entity from session factory use the various overloaded implementations of evict() method of SessionFactory. In fact, Hibernate lets you tailor your own caching implementation by specifying the name of a class that implements org.hibernate.cache.CacheProvider using the property hibernate.cache.provider_class. But it is recommended to employ a few built-in integrations with open source cache providers (listed below). Cache Type Cluster Safe Query Cache Supported Hashtable Memory NO YES EHCache Memory, Disk NO YES OSCache Memory, Disk NO YES SwarmCache Clustered YES (clustered invalidation) NO JBoss TreeCache Clustered YES (replication) YES Terracota Clustered YES YES In order to use second level caching, developers have to append some configurations in hibernate.cfg.xml (for example, using EHCache here). net.sf.ehcache.hibernate.Provider In addition, developers also need to create a cache specific configuration file (Example: ehcache.xml for EHCache). (1) diskStore : Sets the path to the directory where cache .data files are created. The following properties are translated: a.user.home - User's home directory b.user.dir - User's current working directory c.java.io.tmpdir (Default temp file path) maxElementsInMemory : Sets the maximum number of objects that will be created in memory. eternal : Sets whether elements are eternal. If eternal, timeouts are ignored and the element is never expired. timeToIdleSeconds : Sets the time to idle for an element before it expires. Is only used if the element is not eternal. Idle time is now - last accessed time. timeToLiveSeconds : Sets the time to live for an element before it expires. Is only used if the element is not eternal. TTL is now - creation time overflowToDisk : Sets whether elements can overflow to disk when the in-memory cache has reached the maxInMemory limit. Finally the cache concurrency strategy has to be specified in mapping files. For example, the following code fragment shows how to configure your cache strategy. … … Cache Concurrency Strategies There are four kinds of built-in cache concurrency strategies provided by Hibernate. Chosing a right concurrency strategy for your hibernate implementation is the key to cache performance optimization. Besides to ensure data consistency and transaction integrity it is indispensable to master these strategies. read-only If your application needs to read but never modify instances of a persistent class, a read-only cache may be used. This is the simplest and best performing strategy. It's even perfectly safe for use in a cluster. nonstrict-read-write If the application only occasionally needs to update data (For example, if it is extremely unlikely that two transactions would try to update the same item simultaneously) and strict transaction isolation is not required, a nonstrict-read-write cache might be appropriate. read-write If the application needs to update data, a read-write cache might be appropriate. This cache strategy should never be used if serializable transaction isolation level is required. transactional If the application seldom needs to update data and at the same time, application also needs to avoid “dirty read” and “repeatable read”, this kind of concurrency strategy can be employed. The transactional cache strategy provides support for fully transactional cache providers such as JBoss TreeCache. The following table lists cache concurrency strategy supported by various cache providers. Cache Read-only Nonstrict-read-write Read-write Transactional Hashtable YES YES YES N/A EHCache YES YES YES N/A OSCache YES YES YES N/A SwarmCache YES YES N/A N/A JBoss TreeCache YES N/A N/A YES Cluster Cache (in different JVMs) Hibernate also supports cluster caching in disparate JVMs. At present, both SwarmCache and JBoss TreeCache support cluster caching across multiple JVMs. In some situations, especially at the level of enterprise, certain application has to support the concurrency accessing of thousands of users, at that time, cluster cache can help you because the cluster can provide failover and load balancing which improve the performance of application. Points to Note When employing one of the four cache strategies above, pay close attention to the following situation: Data cached almost immutable If data you want to cache is almost constant, you can use data caching which can improve the performance of the application. On the contrary, if the caching data are quiet volatile, Hibernate have to maintain and update the caching over time which extremely leads to performance hit. Data sizes in reasonable range If the size of data you is caching is massive, Hibernate will occupy the most memories of system, which causes the long waiting time of the whole application. Low frequency of data updating If data you are caching needs to be modified frequently, Hibernate have to take an array of time to update and modify the data in caching, which impacts the performance of the application as well. High frequency of data querying If data you are caching is steady, which means that most of the operations are querying, searching, no updating and modifying, making the most use of caching will be affording huge performance improvement. None crucial data Because of existing some incongruities when keeping the data in caching, so if the data you are caching is fairly crucial, do not use caching. By contrast, if the data in caching is insignificant, just use it without any vacillation. Summary Actually, after employing SQL Optimization, Session Management, Data Caching, we will obtain great battalions of performance gains, which make applications achieve acceptable waiting time for the final customers. External Links for Further Study http://www.hibernate.org/hib_docs/reference/en/html/performance.html http://blogs.jboss.com/blog/acoliver/2006/01/23/Hibernate_EJB3_Tuning.txt About Author I am Wings Jiang from BCM China. I have mainly focused on J2EE technologies in recent years and worked in several projects involving Struts/Tapestry, Spring, Hibernate, WebLogic, Websphere, Oracle, DB2 etc. I have experience in design and code of several Java applications. Hibernate performance is one of the areas I pay close heed to in my current working.
June 10, 2009
by Ming Jiang
· 141,805 Views · 4 Likes
  • Previous
  • ...
  • 128
  • 129
  • 130
  • 131
  • 132
  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook
×