DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Mastering Concurrency: An In-Depth Guide to Java's ExecutorService
  • How To Create Asynchronous and Retryable Methods With Failover Support
  • External Task Client Implementation in Camunda With Spring Boot Application
  • Spring Beans With Auto-Generated Implementations: How-To

Trending

  • Proactive Security in Distributed Systems: A Developer’s Approach
  • How To Build Resilient Microservices Using Circuit Breakers and Retries: A Developer’s Guide To Surviving
  • How to Use AWS Aurora Database for a Retail Point of Sale (POS) Transaction System
  • Introduction to Retrieval Augmented Generation (RAG)
  1. DZone
  2. Software Design and Architecture
  3. Cloud Architecture
  4. Hazelcast Distributed Execution with Spring

Hazelcast Distributed Execution with Spring

By 
Eren Avsarogullari user avatar
Eren Avsarogullari
·
Dec. 11, 12 · Interview
Likes (1)
Comment
Save
Tweet
Share
29.6K Views

Join the DZone community and get the full member experience.

Join For Free

The ExecutorService feature had come with Java 5 and is under the java.util.concurrent package. It extends the Executor interface and provides a thread pool functionality to execute asynchronous short tasks. Java Executor Service Types is suggested to look over basic ExecutorService implementation.

Also ThreadPoolExecutor is a very useful implementation of ExecutorService ınterface. It extends AbstractExecutorService providing default implementations of ExecutorService execution methods. It provides improved performance when executing large numbers of asynchronous tasks and maintains basic statistics, such as the number of completed tasks. How to develop and monitor Thread Pool Services by using Spring is also suggested to investigate how to develop and monitor Thread Pool Services.

So far, we have just talked Undistributed Executor Service implementation. Let us also investigate Distributed Executor Service.

Hazelcast Distributed Executor Service feature is a distributed implementation of java.util.concurrent.ExecutorService. It allows to execute business logic in cluster. There are four alternative ways to realize it :

1) The logic can be executed on a specific cluster member which is chosen.
2) The logic can be executed on the member owning the key which is chosen.
3) The logic can be executed on the member Hazelcast will pick.
4) The logic can be executed on all or subset of the cluster members.

This article shows how to develop Distributed Executor Service via Hazelcast and Spring.

Used Technologies :

JDK 1.7.0_09
Spring 3.1.3
Hazelcast 2.4
Maven 3.0.4

STEP 1 : CREATE MAVEN PROJECT

A maven project is created as below. (It can be created by using Maven or IDE Plug-in).

STEP 2 : LIBRARIES

Firstly, Spring dependencies are added to Maven’ s pom.xml

<properties>
    <spring.version>3.1.3.RELEASE</spring.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
 
<dependencies>
    <!-- Spring 3 dependencies -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>${spring.version}</version>
    </dependency>
 
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring.version}</version>
    </dependency>
 
    <!-- Hazelcast library -->
    <dependency>
        <groupId>com.hazelcast</groupId>
        <artifactId>hazelcast-all</artifactId>
        <version>2.4</version>
    </dependency>
 
    <!-- Log4j library -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.16</version>
    </dependency>
</dependencies>

maven-compiler-plugin(Maven Plugin) is used to compile the project with JDK 1.7

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.0</version>
    <configuration>
      <source>1.7</source>
      <target>1.7</target>
    </configuration>
</plugin>

maven-shade-plugin(Maven Plugin) can be used to create runnable-jar

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.0</version>
 
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer
                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>com.onlinetechvision.exe.Application</mainClass>
                    </transformer>
                    <transformer
                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>META-INF/spring.handlers</resource>
                    </transformer>
                    <transformer
                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>META-INF/spring.schemas</resource>
                    </transformer>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>


STEP 3 : CREATE Customer BEAN

A new Customer bean is created. This bean will be distributed between two node in OTV cluster. In the following sample, all defined properties(id, name and surname)’ types are String and standart java.io.Serializable interface has been implemented for serializing. If custom or third-party object types are used, com.hazelcast.nio.DataSerializable interface can be implemented for better serialization performance.

package com.onlinetechvision.customer;
 
import java.io.Serializable;
 
/**
 * Customer Bean.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class Customer implements Serializable {
 
    private static final long serialVersionUID = 1856862670651243395L;
 
    private String id;
    private String name;
    private String surname;
 
    public String getId() {
        return id;
    }
 
    public void setId(String id) {
        this.id = id;
    }
 
    public String getName() {
        return name;
    }
 
    public void setName(String name) {
        this.name = name;
    }
 
    public String getSurname() {
        return surname;
    }
 
    public void setSurname(String surname) {
        this.surname = surname;
    }
 
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((id == null) ? 0 : id.hashCode());
        result = prime * result + ((name == null) ? 0 : name.hashCode());
        result = prime * result + ((surname == null) ? 0 : surname.hashCode());
        return result;
    }
 
    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        Customer other = (Customer) obj;
        if (id == null) {
            if (other.id != null)
                return false;
        } else if (!id.equals(other.id))
            return false;
        if (name == null) {
            if (other.name != null)
                return false;
        } else if (!name.equals(other.name))
            return false;
        if (surname == null) {
            if (other.surname != null)
                return false;
        } else if (!surname.equals(other.surname))
            return false;
        return true;
    }
 
    @Override
    public String toString() {
        return "Customer [id=" + id + ", name=" + name + ", surname=" + surname + "]";
    }
 
}

STEP 4 : CREATE ICacheService INTERFACE

A new ICacheService Interface is created for service layer to expose cache functionality.

package com.onlinetechvision.cache.srv;
 
import com.hazelcast.core.IMap;
import com.onlinetechvision.customer.Customer;
 
/**
 * A new ICacheService Interface is created for service layer to expose cache functionality.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public interface ICacheService {
 
    /**
     * Adds Customer entries to cache
     *
     * @param String key
     * @param Customer customer
     *
     */
    void addToCache(String key, Customer customer);
 
    /**
     * Deletes Customer entries from cache
     *
     * @param String key
     *
     */
    void deleteFromCache(String key);
 
    /**
     * Gets Customer cache
     *
     * @return IMap Coherence named cache
     */
    IMap<String, Customer> getCache();
}

STEP 5 : CREATE CacheService IMPLEMENTATION

CacheService is implementation of ICacheService Interface.

package com.onlinetechvision.cache.srv;
 
import com.hazelcast.core.IMap;
import com.onlinetechvision.customer.Customer;
import com.onlinetechvision.test.listener.CustomerEntryListener;
 
/**
 * CacheService Class is implementation of ICacheService Interface.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class CacheService implements ICacheService {
 
    private IMap<String, Customer> customerMap;
 
    /**
     * Constructor of CacheService
     *
     * @param IMap customerMap
     *
     */
    @SuppressWarnings("unchecked")
    public CacheService(IMap<String, Customer> customerMap) {
        setCustomerMap(customerMap);
        getCustomerMap().addEntryListener(new CustomerEntryListener(), true);
    }
 
    /**
     * Adds Customer entries to cache
     *
     * @param String key
     * @param Customer customer
     *
     */
    @Override
    public void addToCache(String key, Customer customer) {
        getCustomerMap().put(key, customer);
    }
 
    /**
     * Deletes Customer entries from cache
     *
     * @param String key
     *
     */
    @Override
    public void deleteFromCache(String key) {
        getCustomerMap().remove(key);
    }
 
    /**
     * Gets Customer cache
     *
     * @return IMap Coherence named cache
     */
    @Override
    public IMap<String, Customer> getCache() {
        return getCustomerMap();
    }
 
    public IMap<String, Customer> getCustomerMap() {
        return customerMap;
    }
 
    public void setCustomerMap(IMap<String, Customer> customerMap) {
        this.customerMap = customerMap;
    }
 
}

STEP 6 : CREATE IDistributedExecutorService INTERFACE

A new IDistributedExecutorService Interface is created for service layer to expose distributed execution functionality.

package com.onlinetechvision.executor.srv;
 
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
 
import com.hazelcast.core.Member;
 
/**
 * A new IDistributedExecutorService Interface is created for service layer to expose distributed execution functionality.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public interface IDistributedExecutorService {
 
    /**
     * Executes the callable object on stated member
     *
     * @param Callable callable
     * @param Member member
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    String executeOnStatedMember(Callable<String> callable, Member member) throws InterruptedException, ExecutionException;
 
    /**
     * Executes the callable object on member owning the key
     *
     * @param Callable callable
     * @param Object key
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    String executeOnTheMemberOwningTheKey(Callable<String> callable, Object key) throws InterruptedException, ExecutionException;
 
    /**
     * Executes the callable object on any member
     *
     * @param Callable callable
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    String executeOnAnyMember(Callable<String> callable) throws InterruptedException, ExecutionException;
 
    /**
     * Executes the callable object on all members
     *
     * @param Callable callable
     * @param Set all members
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    Collection<String> executeOnMembers(Callable<String> callable, Set<Member> members) throws InterruptedException, ExecutionException;
}

STEP 7 : CREATE DistributedExecutorService IMPLEMENTATION

DistributedExecutorService is implementation of IDistributedExecutorService Interface.

package com.onlinetechvision.executor.srv;
 
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
 
import org.apache.log4j.Logger;
 
import com.hazelcast.core.DistributedTask;
import com.hazelcast.core.Member;
import com.hazelcast.core.MultiTask;
 
/**
 * DistributedExecutorService Class is implementation of IDistributedExecutorService Interface.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class DistributedExecutorService implements IDistributedExecutorService {
 
    private static final Logger logger = Logger.getLogger(DistributedExecutorService.class);
 
    private ExecutorService hazelcastDistributedExecutorService;
 
    /**
     * Executes the callable object on stated member
     *
     * @param Callable callable
     * @param Member member
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    @SuppressWarnings("unchecked")
    public String executeOnStatedMember(Callable<String> callable, Member member) throws InterruptedException, ExecutionException {
        logger.debug("Method executeOnStatedMember is called...");
        ExecutorService executorService = getHazelcastDistributedExecutorService();
        FutureTask<String> task = (FutureTask<String>) executorService.submit( new DistributedTask<String>(callable, member));
        String result = task.get();
        logger.debug("Result of method executeOnStatedMember is : " + result);
        return result;
    }
 
    /**
     * Executes the callable object on member owning the key
     *
     * @param Callable callable
     * @param Object key
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    @SuppressWarnings("unchecked")
    public String executeOnTheMemberOwningTheKey(Callable<String> callable, Object key) throws InterruptedException, ExecutionException {
        logger.debug("Method executeOnTheMemberOwningTheKey is called...");
        ExecutorService executorService = getHazelcastDistributedExecutorService();
        FutureTask<String> task = (FutureTask<String>) executorService.submit(new DistributedTask<String>(callable, key));
        String result = task.get();
        logger.debug("Result of method executeOnTheMemberOwningTheKey is : " + result);
        return result;
    }
 
    /**
     * Executes the callable object on any member
     *
     * @param Callable callable
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    public String executeOnAnyMember(Callable<String> callable) throws InterruptedException, ExecutionException {
        logger.debug("Method executeOnAnyMember is called...");
        ExecutorService executorService = getHazelcastDistributedExecutorService();
        Future<String> task = executorService.submit(callable);
        String result = task.get();
        logger.debug("Result of method executeOnAnyMember is : " + result);
        return result;
    }
 
    /**
     * Executes the callable object on all members
     *
     * @param Callable callable
     * @param Set all members
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    public Collection<String> executeOnMembers(Callable<String> callable, Set<Member> members) throws ExecutionException, InterruptedException {
        logger.debug("Method executeOnMembers is called...");
        MultiTask<String> task = new MultiTask<String>(callable, members);
        ExecutorService executorService = getHazelcastDistributedExecutorService();
        executorService.execute(task);
        Collection<String> results = task.get();
        logger.debug("Result of method executeOnMembers is : " + results.toString());
        return results;
    }
 
    public ExecutorService getHazelcastDistributedExecutorService() {
        return hazelcastDistributedExecutorService;
    }
 
    public void setHazelcastDistributedExecutorService(ExecutorService hazelcastDistributedExecutorService) {
        this.hazelcastDistributedExecutorService = hazelcastDistributedExecutorService;
    }
 
}

STEP 8 : CREATE TestCallable CLASS

TestCallable Class shows business logic to be executed.

TestCallable task for first member of the cluster :

package com.onlinetechvision.task;
 
import java.io.Serializable;
import java.util.concurrent.Callable;
 
/**
 * TestCallable Class shows business logic to be executed.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class TestCallable implements Callable<String>, Serializable{
 
    private static final long serialVersionUID = -1839169907337151877L;
 
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return String computed result
     * @throws Exception if unable to compute a result
     */
    public String call() throws Exception {
        return "First Member' s TestCallable Task is called...";
    }
 
}

TestCallable task for second member of the cluster :

package com.onlinetechvision.task;
 
import java.io.Serializable;
import java.util.concurrent.Callable;
 
/**
 * TestCallable Class shows business logic to be executed.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class TestCallable implements Callable<String>, Serializable{
 
    private static final long serialVersionUID = -1839169907337151877L;
 
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return String computed result
     * @throws Exception if unable to compute a result
     */
    public String call() throws Exception {
        return "Second Member' s TestCallable Task is called...";
    }
 
}


STEP 9 : CREATE AnotherAvailableMemberNotFoundException CLASS

AnotherAvailableMemberNotFoundException is thrown when another available member is not found. To avoid this exception, first node should be started before the second node.

package com.onlinetechvision.exception;
 
/**
 * AnotherAvailableMemberNotFoundException is thrown when another available member is not found.
 * To avoid this exception, first node should be started before the second node.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class AnotherAvailableMemberNotFoundException extends Exception {
 
    private static final long serialVersionUID = -3954360266393077645L;
 
    /**
     * Constructor of AnotherAvailableMemberNotFoundException
     *
     * @param  String Exception message
     *
     */
    public AnotherAvailableMemberNotFoundException(String message) {
        super(message);
    }
 
}

STEP 10 : CREATE CustomerEntryListener CLASS

CustomerEntryListener Class listens entry changes on named cache object.

package com.onlinetechvision.test.listener;
 
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
 
/**
 * CustomerEntryListener Class listens entry changes on named cache object.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
@SuppressWarnings("rawtypes")
public class CustomerEntryListener implements EntryListener {
 
    /**
     * Invoked when an entry is added.
     *
     * @param EntryEvent
     *
     */
    public void entryAdded(EntryEvent ee) {
        System.out.println("EntryAdded... Member : " + ee.getMember() + ", Key : "+ee.getKey()+", OldValue : "+ee.getOldValue()+", NewValue : "+ee.getValue());
    }
 
    /**
     * Invoked when an entry is removed.
     *
     * @param EntryEvent
     *
     */
    public void entryRemoved(EntryEvent ee) {
        System.out.println("EntryRemoved... Member : " + ee.getMember() + ", Key : "+ee.getKey()+", OldValue : "+ee.getOldValue()+", NewValue : "+ee.getValue());
    }
 
    /**
     * Invoked when an entry is evicted.
     *
     * @param EntryEvent
     *
     */
    public void entryEvicted(EntryEvent ee) {
 
    }  
 
    /**
     * Invoked when an entry is updated.
     *
     * @param EntryEvent
     *
     */
    public void entryUpdated(EntryEvent ee) {
 
    }
 
}

STEP 11 : CREATE Starter CLASS

Starter Class loads Customers to cache and executes distributed tasks.

Starter Class of first member of the cluster :

package com.onlinetechvision.exe;
 
import com.onlinetechvision.cache.srv.ICacheService;
import com.onlinetechvision.customer.Customer;
 
/**
 * Starter Class loads Customers to cache and executes distributed tasks.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class Starter {
 
    private ICacheService cacheService;
 
    /**
     * Loads cache and executes the tasks
     *
     */
    public void start() {
        loadCacheForFirstMember();
    }
 
    /**
     * Loads Customers to cache
     *
     */
    public void loadCacheForFirstMember() {
        Customer firstCustomer = new Customer();
        firstCustomer.setId("1");
        firstCustomer.setName("Jodie");
        firstCustomer.setSurname("Foster");
 
        Customer secondCustomer = new Customer();
        secondCustomer.setId("2");
        secondCustomer.setName("Kate");
        secondCustomer.setSurname("Winslet");
 
        getCacheService().addToCache(firstCustomer.getId(), firstCustomer);
        getCacheService().addToCache(secondCustomer.getId(), secondCustomer);
    }
 
    public ICacheService getCacheService() {
        return cacheService;
    }
 
    public void setCacheService(ICacheService cacheService) {
        this.cacheService = cacheService;
    }
 
}


Starter Class of second member of the cluster :

package com.onlinetechvision.exe;
 
import java.util.Set;
import java.util.concurrent.ExecutionException;
 
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.Member;
import com.onlinetechvision.cache.srv.ICacheService;
import com.onlinetechvision.customer.Customer;
import com.onlinetechvision.exception.AnotherAvailableMemberNotFoundException;
import com.onlinetechvision.executor.srv.IDistributedExecutorService;
import com.onlinetechvision.task.TestCallable;
 
/**
 * Starter Class loads Customers to cache and executes distributed tasks.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class Starter {
 
    private String hazelcastInstanceName;
    private Hazelcast hazelcast;
    private IDistributedExecutorService distributedExecutorService;
    private ICacheService cacheService;
 
    /**
     * Loads cache and executes the tasks
     *
     */
    public void start() {
        loadCache();
        executeTasks();
    }
 
    /**
     * Loads Customers to cache
     *
     */
    public void loadCache() {
        Customer firstCustomer = new Customer();
        firstCustomer.setId("3");
        firstCustomer.setName("Bruce");
        firstCustomer.setSurname("Willis");
 
        Customer secondCustomer = new Customer();
        secondCustomer.setId("4");
        secondCustomer.setName("Colin");
        secondCustomer.setSurname("Farrell");
 
        getCacheService().addToCache(firstCustomer.getId(), firstCustomer);
        getCacheService().addToCache(secondCustomer.getId(), secondCustomer);
    }
 
    /**
     * Executes Tasks
     *
     */
    public void executeTasks() {
        try {
            getDistributedExecutorService().executeOnStatedMember(new TestCallable(), getAnotherMember());
            getDistributedExecutorService().executeOnTheMemberOwningTheKey(new TestCallable(), "3");
            getDistributedExecutorService().executeOnAnyMember(new TestCallable());
            getDistributedExecutorService().executeOnMembers(new TestCallable(), getAllMembers());
        } catch (InterruptedException | ExecutionException | AnotherAvailableMemberNotFoundException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * Gets cluster members
     *
     * @return Set<Member> Set of Cluster Members
     *
     */
    private Set<Member> getAllMembers() {
        Set<Member> members = getHazelcastLocalInstance().getCluster().getMembers();
 
        return members;
    }
 
    /**
     * Gets an another member of cluster
     *
     * @return Member Another Member of Cluster
     * @throws AnotherAvailableMemberNotFoundException An Another Available Member can not found exception
     */
    private Member getAnotherMember() throws AnotherAvailableMemberNotFoundException {
        Set<Member> members = getAllMembers();
        for(Member member : members) {
            if(!member.localMember()) {
                return member;
            }
        }
 
        throw new AnotherAvailableMemberNotFoundException("No Other Available Member on the cluster. Please be aware that all members are active on the cluster");
    }
 
    /**
     * Gets Hazelcast local instance
     *
     * @return HazelcastInstance Hazelcast local instance
     */
    @SuppressWarnings("static-access")
    private HazelcastInstance getHazelcastLocalInstance() {
        HazelcastInstance instance = getHazelcast().getHazelcastInstanceByName(getHazelcastInstanceName());
        return instance;
    }
 
    public String getHazelcastInstanceName() {
        return hazelcastInstanceName;
    }
 
    public void setHazelcastInstanceName(String hazelcastInstanceName) {
        this.hazelcastInstanceName = hazelcastInstanceName;
    }
 
    public Hazelcast getHazelcast() {
        return hazelcast;
    }
 
    public void setHazelcast(Hazelcast hazelcast) {
        this.hazelcast = hazelcast;
    }
 
    public IDistributedExecutorService getDistributedExecutorService() {
        return distributedExecutorService;
    }
 
    public void setDistributedExecutorService(IDistributedExecutorService distributedExecutorService) {
        this.distributedExecutorService = distributedExecutorService;
    }
 
    public ICacheService getCacheService() {
        return cacheService;
    }
 
    public void setCacheService(ICacheService cacheService) {
        this.cacheService = cacheService;
    }
 
}


STEP 12 : CREATE hazelcast-config.properties FILE

hazelcast-config.properties file shows the properties of cluster members.

First member properties :

hz.instance.name = OTVInstance1
 
hz.group.name = dev
hz.group.password = dev
 
hz.management.center.enabled = true
hz.management.center.url = http://localhost:8080/mancenter
 
hz.network.port = 5701
hz.network.port.auto.increment = false
 
hz.tcp.ip.enabled = true
 
hz.members = 192.168.1.32
 
hz.executor.service.core.pool.size = 2
hz.executor.service.max.pool.size = 30
hz.executor.service.keep.alive.seconds = 30
 
hz.map.backup.count=2
hz.map.max.size=0
hz.map.eviction.percentage=30
hz.map.read.backup.data=true
hz.map.cache.value=true
hz.map.eviction.policy=NONE
hz.map.merge.policy=hz.ADD_NEW_ENTRY

Second member properties :

hz.instance.name = OTVInstance2
 
hz.group.name = dev
hz.group.password = dev
 
hz.management.center.enabled = true
hz.management.center.url = http://localhost:8080/mancenter
 
hz.network.port = 5702
hz.network.port.auto.increment = false
 
hz.tcp.ip.enabled = true
 
hz.members = 192.168.1.32
 
hz.executor.service.core.pool.size = 2
hz.executor.service.max.pool.size = 30
hz.executor.service.keep.alive.seconds = 30
 
hz.map.backup.count=2
hz.map.max.size=0
hz.map.eviction.percentage=30
hz.map.read.backup.data=true
hz.map.cache.value=true
hz.map.eviction.policy=NONE
hz.map.merge.policy=hz.ADD_NEW_ENTRY


STEP 13 : CREATE applicationContext-hazelcast.xml

Spring Hazelcast Configuration file, applicationContext-hazelcast.xml, is created and Hazelcast Distributed Executor Service and Hazelcast Instance are configured.

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:hz="http://www.hazelcast.com/schema/spring"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
 
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
 
http://www.hazelcast.com/schema/spring
 
http://www.hazelcast.com/schema/spring/hazelcast-spring-2.4.xsd">
 
    <hz:map id="customerMap" name="customerMap" instance-ref="instance"/>
 
    <!-- Hazelcast Distributed Executor Service definition -->
    <hz:executorService id="hazelcastDistributedExecutorService" instance-ref="instance" name="hazelcastDistributedExecutorService" />
 
    <!-- Hazelcast Instance configuration -->
    <hz:hazelcast id="instance">
        <hz:config>
 
            <!-- Hazelcast Instance Name -->
            <hz:instance-name>${hz.instance.name}</hz:instance-name>
 
        <!-- Hazelcast Group Name and Password -->
        <hz:group name="${hz.group.name}" password="${hz.group.password}"/>
 
                <!-- Hazelcast Management Center URL -->
            <hz:management-center  enabled="${hz.management.center.enabled}" url="${hz.management.center.url}"/>
 
            <!-- Hazelcast Tcp based network configuration -->
            <hz:network port="${hz.network.port}" port-auto-increment="${hz.network.port.auto.increment}">
                <hz:join>
                    <hz:tcp-ip enabled="${hz.tcp.ip.enabled}">
                        <hz:members>${hz.members}</hz:members>
                    </hz:tcp-ip>
                </hz:join>
            </hz:network>
 
            <!-- Hazelcast Distributed Executor Service configuration -->
            <hz:executor-service name="executorService"
                                 core-pool-size="${hz.executor.service.core.pool.size}"
                                 max-pool-size="${hz.executor.service.max.pool.size}"
                                 keep-alive-seconds="${hz.executor.service.keep.alive.seconds}"/>
 
            <!-- Hazelcast Distributed Map configuration -->
            <hz:map name="map"
                backup-count="${hz.map.backup.count}"
                max-size="${hz.map.max.size}"
                eviction-percentage="${hz.map.eviction.percentage}"
                read-backup-data="${hz.map.read.backup.data}"
                cache-value="${hz.map.cache.value}"
                eviction-policy="${hz.map.eviction.policy}"
                merge-policy="${hz.map.merge.policy}"  />
 
        </hz:config>
 
    </hz:hazelcast>  
 
</beans>

STEP 14 : CREATE applicationContext.xml

Spring Configuration file, applicationContext.xml, is created.

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:hz="http://www.hazelcast.com/schema/spring"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
 
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
 
    <import resource="classpath:applicationContext-hazelcast.xml" />
 
    <!-- Beans Declaration -->
    <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>
                <value>classpath:/hazelcast-config.properties</value>
            </list>
        </property>
    </bean>
 
    <bean id="cacheService" class="com.onlinetechvision.cache.srv.CacheService">
        <constructor-arg ref="customerMap"/>
    </bean>
 
    <bean id="distributedExecutorService" class="com.onlinetechvision.executor.srv.DistributedExecutorService">
        <property name="hazelcastDistributedExecutorService" ref="hazelcastDistributedExecutorService" />
    </bean>
 
    <bean id="hazelcast" class="com.hazelcast.core.Hazelcast"/>
 
    <bean id="starter" class="com.onlinetechvision.exe.Starter">
        <property name="hazelcastInstanceName" value="${hz.instance.name}" />
        <property name="hazelcast" ref="hazelcast" />
        <property name="distributedExecutorService" ref="distributedExecutorService" />
        <property name="cacheService" ref="cacheService" />
    </bean>
</beans>

STEP 15 : CREATE Application CLASS

Application Class is created to run the application.

ackage com.onlinetechvision.exe;
 
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
/**
 * Application class starts the application
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class Application {
 
    /**
     * Starts the application
     *
     * @param  String[] args
     *
     */
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
        Starter starter = (Starter) context.getBean("starter");
        starter.start();
    }
 
}

STEP 16 : BUILD PROJECT

After OTV_Spring_Hazelcast_DistributedExecution Project is built, OTV_Spring_Hazelcast_DistributedExecution-0.0.1-SNAPSHOT.jar will be created.
Important Note : The Members of the cluster have got different configuration for Coherence so the project should be built separately for each member.

STEP 17 : INTEGRATION with HAZELCAST MANAGEMENT CENTER

Hazelcast Management Center enables to monitor and manage nodes in the cluster.

Entity and backup counts which are owned by customerMap, can be seen via Map Memory Data Table. We have distributed 4 entries via customerMap as shown below :

Sample keys and values can be seen via Map Browser :

Added First Entry :

Added Third Entry :

hazelcastDistributedExecutorService details can be seen via Executors tab. We have executed 3 task on first member and 2 tasks on second member as shown below :

STEP 18 : RUN PROJECT BY STARTING THE CLUSTER’ s MEMBER

After created OTV_Spring_Hazelcast_DistributedExecution-0.0.1-SNAPSHOT.jar file is run at the cluster’ s members, the following console output logs will be shown :

First member console output :

Kas 25, 2012 4:07:20 PM com.hazelcast.impl.AddressPicker
INFO: Interfaces is disabled, trying to pick one address from TCP-IP config addresses: [x.y.z.t]
Kas 25, 2012 4:07:20 PM com.hazelcast.impl.AddressPicker
INFO: Prefer IPv4 stack is true.
Kas 25, 2012 4:07:20 PM com.hazelcast.impl.AddressPicker
INFO: Picked Address[x.y.z.t]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true
Kas 25, 2012 4:07:21 PM com.hazelcast.system
INFO: [x.y.z.t]:5701 [dev] Hazelcast Community Edition 2.4 (20121017) starting at Address[x.y.z.t]:5701
Kas 25, 2012 4:07:21 PM com.hazelcast.system
INFO: [x.y.z.t]:5701 [dev] Copyright (C) 2008-2012 Hazelcast.com
Kas 25, 2012 4:07:21 PM com.hazelcast.impl.LifecycleServiceImpl
INFO: [x.y.z.t]:5701 [dev] Address[x.y.z.t]:5701 is STARTING
Kas 25, 2012 4:07:24 PM com.hazelcast.impl.TcpIpJoiner
INFO: [x.y.z.t]:5701 [dev]
--A new cluster is created and First Member joins the cluster.
Members [1] {
    Member [x.y.z.t]:5701 this
}
 
Kas 25, 2012 4:07:24 PM com.hazelcast.impl.MulticastJoiner
INFO: [x.y.z.t]:5701 [dev]
 
Members [1] {
    Member [x.y.z.t]:5701 this
}
 
...
-- First member adds two new entries to the cache...
EntryAdded... Member : Member [x.y.z.t]:5701 this, Key : 1, OldValue : null, NewValue : Customer [id=1, name=Jodie, surname=Foster]
EntryAdded... Member : Member [x.y.z.t]:5701 this, Key : 2, OldValue : null, NewValue : Customer [id=2, name=Kate, surname=Winslet]
 
...
--Second Member joins the cluster.
Members [2] {
    Member [x.y.z.t]:5701 this
    Member [x.y.z.t]:5702
}
 
...
-- Second member adds two new entries to the cache...
EntryAdded... Member : Member [x.y.z.t]:5702, Key : 4, OldValue : null, NewValue : Customer [id=4, name=Colin, surname=Farrell]
EntryAdded... Member : Member [x.y.z.t]:5702, Key : 3, OldValue : null, NewValue : Customer [id=3, name=Bruce, surname=Willis]


Second member console output :

Kas 25, 2012 4:07:48 PM com.hazelcast.impl.AddressPicker
INFO: Interfaces is disabled, trying to pick one address from TCP-IP config addresses: [x.y.z.t]
Kas 25, 2012 4:07:48 PM com.hazelcast.impl.AddressPicker
INFO: Prefer IPv4 stack is true.
Kas 25, 2012 4:07:48 PM com.hazelcast.impl.AddressPicker
INFO: Picked Address[x.y.z.t]:5702, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5702], bind any local is true
Kas 25, 2012 4:07:49 PM com.hazelcast.system
INFO: [x.y.z.t]:5702 [dev] Hazelcast Community Edition 2.4 (20121017) starting at Address[x.y.z.t]:5702
Kas 25, 2012 4:07:49 PM com.hazelcast.system
INFO: [x.y.z.t]:5702 [dev] Copyright (C) 2008-2012 Hazelcast.com
Kas 25, 2012 4:07:49 PM com.hazelcast.impl.LifecycleServiceImpl
INFO: [x.y.z.t]:5702 [dev] Address[x.y.z.t]:5702 is STARTING
Kas 25, 2012 4:07:49 PM com.hazelcast.impl.Node
INFO: [x.y.z.t]:5702 [dev] ** setting master address to Address[x.y.z.t]:5701
Kas 25, 2012 4:07:49 PM com.hazelcast.impl.MulticastJoiner
INFO: [x.y.z.t]:5702 [dev] Connecting to master node: Address[x.y.z.t]:5701
Kas 25, 2012 4:07:49 PM com.hazelcast.nio.ConnectionManager
INFO: [x.y.z.t]:5702 [dev] 55715 accepted socket connection from /x.y.z.t:5701
Kas 25, 2012 4:07:55 PM com.hazelcast.cluster.ClusterManager
INFO: [x.y.z.t]:5702 [dev]
--Second Member joins the cluster.
Members [2] {
    Member [x.y.z.t]:5701
    Member [x.y.z.t]:5702 this
}
 
Kas 25, 2012 4:07:56 PM com.hazelcast.impl.LifecycleServiceImpl
INFO: [x.y.z.t]:5702 [dev] Address[x.y.z.t]:5702 is STARTED
-- Second member adds two new entries to the cache...
EntryAdded... Member : Member [x.y.z.t]:5702 this, Key : 3, OldValue : null, NewValue : Customer [id=3, name=Bruce, surname=Willis]
EntryAdded... Member : Member [x.y.z.t]:5702 this, Key : 4, OldValue : null, NewValue : Customer [id=4, name=Colin, surname=Farrell]
 
25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:42) - Method executeOnStatedMember is called...
25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:46) - Result of method executeOnStatedMember is : First Member' s TestCallable Task is called...
 
25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:61) - Method executeOnTheMemberOwningTheKey is called...
25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:65) - Result of method executeOnTheMemberOwningTheKey is : First Member' s TestCallable Task is called...
 
25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:78) - Method executeOnAnyMember is called...
25.11.2012 16:07:57 DEBUG (DistributedExecutorService.java:82) - Result of method executeOnAnyMember is : Second Member' s TestCallable Task is called...
 
25.11.2012 16:07:57 DEBUG (DistributedExecutorService.java:96) - Method executeOnMembers is called...
25.11.2012 16:07:57 DEBUG (DistributedExecutorService.java:101) - Result of method executeOnMembers is : [First Member' s TestCallable Task is called..., Second Member' s TestCallable Task is called...]


STEP 19 : DOWNLOAD

https://github.com/erenavsarogullari/OTV_Spring_Hazelcast_DistributedExecution

REFERENCES :

Java ExecutorService Interface
Hazelcast Distributed Executor Service












Hazelcast Spring Framework cluster Execution (computing) Interface (computing) Implementation Executor (software) Thread pool Task (computing)

Published at DZone with permission of Eren Avsarogullari, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Mastering Concurrency: An In-Depth Guide to Java's ExecutorService
  • How To Create Asynchronous and Retryable Methods With Failover Support
  • External Task Client Implementation in Camunda With Spring Boot Application
  • Spring Beans With Auto-Generated Implementations: How-To

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!