Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Spring Boot With Apache Ignite: Fail-Fast Distributed MapReduce Closures

DZone's Guide to

Spring Boot With Apache Ignite: Fail-Fast Distributed MapReduce Closures

Learn about doing distributed compute jobs that do data computations or external service calls using Apache Ignite distributed closures.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

In this article, we are going to a cover a use case in Apache Ignite: doing distributed compute jobs that do data computations or external service calls using Apache Ignite distributed closures that have a MapReduce nature and fail-fast if the computations fail or have unexpected results.

mapReduce

The following numbers match up with the numbers in the above figure:

  1. The main node will submit a collection of Ignite callables plus the custom fail-fast reducer (which we will explain in more detail later).
  2. The list of jobs will be distributed between server nodes in the current cluster topology with the same cluster group for actual execution. This is also done to use the distributed parallel MapReduce nature execution of Ignite compute grid in synchronous or asynchronous nonblocking way.
  3. Each job will return the result or error to the fail-fast reducer which, upon receiving the results of each single compute task, will determine if it can keep collecting results before reducing the final aggregated result or if it should fail-fast immediately once one of the jobs fails or has unexpected results.

So, how it is implemented?

The fail-fast Ignite compute grid reducer:

/**
 * a fail fast map reducer to decide if it should keep waiting for other jobs to final reduce or it should terminate
 * and fail fast with the current responses if any failed
 */
@Component
@Scope("prototype")
public class FailFastReducer implements IgniteReducer<ServiceResponse, MapReduceResponse> {

    private final  Map<String, ServiceResponse> responseMap = new ConcurrentHashMap<>();

    /**
     * @param serviceCallResponse the job response
     * @return return a boolean to decide it is time to reduce or not
     */
    @Override
    public boolean collect(ServiceResponse serviceCallResponse) {
        if (serviceCallResponse != null) {
            if (serviceCallResponse.isSuccess()) {
                responseMap.put(serviceCallResponse.getServiceOrigin(), serviceCallResponse);
                return true;
            } else {
                responseMap.put(serviceCallResponse.getServiceOrigin(), serviceCallResponse);
                return false;
            }
        }
        return false;
    }

    /**
     * @return the final generic reduced response containing the list of jobs responses and global status
     */
    @Override
    public MapReduceResponse reduce() {
        return MapReduceResponse.builder().success(checkStatus()).reducedResponses(responseMap).build();
    }

    /**
     * @return the generic reduced response status based into the single status of each single collected jobs response
     */
    public boolean checkStatus() {
        boolean status = true;
        for (Map.Entry<String, ServiceResponse> key : responseMap.entrySet()) {
            status = status && responseMap.get(key.getKey()).isSuccess();
        }
        return status;
    }

}

Generic Ignite compute utility to trigger MapReduce tasks in synchronous or asynchronous non-blocking:

/**
 * generic utility class for map reduce call
 */
@Component
public class DataGridCompute {

    @Autowired
    private Ignite ignite;

    /**
     * @param jobs the list of jobs to be distributed into the data grid nodes from the master node
     * @param igniteReducer the ignite reducer which will be used to determine the reduction and collection logic
     * @param callback the callback to be invoked upon receiving the reduced final response
     * @param <R> generic response type from the jobs
     * @param <E> generic map reduced response type
     * @throws IgniteException
     *
     * a generic async map reduced call inside ignite compute grid
     */
    public <R, E> void executeMapReduceFailFast(Collection<IgniteCallable<R>> jobs, IgniteReducer<R, E> igniteReducer, Consumer<E> callback) throws IgniteException {
        // you need to define your cluster group and if any defined in your data grid
        IgniteCompute igniteCompute = ignite.compute(ignite.cluster().forPredicate(clusterNode -> !clusterNode.isClient()));
        //execute the list of jobs in map reduce fashion and pass the custom reducer as well
        IgniteFuture<E> future=igniteCompute.callAsync(jobs, igniteReducer);
        // then async listen for the result to invoke your post call back
        future.listen(result -> callback.accept(result.get()));
    }


    /**
     * @param jobs the list of jobs to be distributed into the data grid nodes from the master node
     * @param igniteReducer the ignite reducer which will be used to determine the reduction and collection logic
     * @param <R> generic response type from the jobs
     * @param <E> generic map reduced response type
     * @throws IgniteException
     * @return <E> generic map reduced response type
     * a generic sync map reduced call inside ignite compute grid
     */
    public <R, E> E executeMapReduceFailFastSync(Collection<IgniteCallable<R>> jobs, IgniteReducer<R, E> igniteReducer) throws IgniteException {
        // you need to define your cluster group and if any defined in your data grid
        IgniteCompute igniteCompute = ignite.compute(ignite.cluster().forPredicate(clusterNode -> !clusterNode.isClient()));
        //execute the list of jobs in map reduce fashion and pass the custom reducer as well
        return igniteCompute.call(jobs, igniteReducer);
    }
}

The custom aggregated reducer response class:

/**
 *  the generic reduce response that contain all single collected jobs responses
 */
@Builder
@Getter
@ToString
@EqualsAndHashCode
public class MapReduceResponse implements Serializable {
    private Map<String, ServiceResponse> reducedResponses;
    boolean success;
}

The single task response class:

/**
 * @param <T> the service call response type
 */
@Getter
@Setter
@ToString
@EqualsAndHashCode
@Builder
public class ServiceResponse<T> implements Serializable {
    private T response;
    private boolean success ;
    private String serviceOrigin;
}

Example service for calling the Ignite compute grid with distributed closures. We will use the synchronous method to test the execution:

/**
 * sample service for how to call map reduce jobs in parallel asynchronous with fail fast reducer
 */
@Service
public class ComputeService {

    private static final Logger logger = LoggerFactory.getLogger(AlertsService.class);
    private final DataGridCompute dataGridCompute;
    @Autowired
    private  FailFastReducer failFastReducer;

    @Autowired
    public ComputeService(DataGridCompute dataGridCompute) {
        this.dataGridCompute = dataGridCompute;
    }
    /**
     * call to ignite compute grid with list if jobs in parallel asynchronous
     */
    public void validateWithAllServicesInParallelAsync(List<IgniteCallable<ServiceResponse>> jobs){
        // execute the jobs with the fail fast reducer in parallel and async the just log the final aggregated response
        dataGridCompute.executeMapReduceFailFast(jobs,failFastReducer,
                mapReduceResponse -> logger.debug(mapReduceResponse.toString()));

    }
    /**
     * call to ignite compute grid with list if jobs in parallel synchronous
     */
    public MapReduceResponse validateWithAllServicesInParallelSync(List<IgniteCallable<ServiceResponse>> jobs){
        // execute the jobs with the fail fast reducer in parallel and sync the just log the final aggregated response
        return dataGridCompute.executeMapReduceFailFastSync(jobs,failFastReducer);
    }
}

Unit test for fail-fast and successful cases using Spring Boot integration testing:

    @Test
    public void testMapReducedJobsWithFailFastSync(){
        // example of ignite jobs, first one succeeded , second fail, third succeeded , but the reducer will fail fast once he collect the failed job
        IgniteCallable validationServiceJob1=() -> ServiceResponse.<String>builder().response("Job 1 is valid").serviceOrigin("job1")
                .success(true).build();
        IgniteCallable validationServiceJob2=() -> ServiceResponse.<String>builder().response("Job 2 is failed").serviceOrigin("job2")
                .success(false).build();
        IgniteCallable validationServiceJob3=() -> ServiceResponse.<String>builder().response("Job 3 is valid").serviceOrigin("job3")
                .success(true).build();

        final MapReduceResponse mapReduceResponse = computeService.validateWithAllServicesInParallelSync(
                Arrays.asList(validationServiceJob1,validationServiceJob2,validationServiceJob3)
        );
        boolean status=true;
            for(ServiceResponse serviceResponse: mapReduceResponse.getReducedResponses().values()){

                status=status && serviceResponse.isSuccess();
            }
        // make sure the aggregated status is failed
        assertEquals(status,false);
        assertEquals(mapReduceResponse.isSuccess(),false);

    }
    @Test
    public void testMapReducedJobsWithFailFastSyncFirstAllSuccess(){
        // example of ignite jobs, all succeeded , so the reducer collect all and return successfully
        IgniteCallable validationServiceJob1=() -> ServiceResponse.<String>builder().serviceOrigin("job1")
                .response("Job 1 is valid").success(true).build();
        IgniteCallable validationServiceJob2=() -> ServiceResponse.<String>builder().serviceOrigin("job2")
                .response("Job 2 is valid").success(true).build();
        IgniteCallable validationServiceJob3=() -> ServiceResponse.<String>builder().serviceOrigin("job3")
                .response("Job 3 is valid").success(true).build();
        final MapReduceResponse mapReduceResponse = computeService.validateWithAllServicesInParallelSync(
                Arrays.asList(validationServiceJob1,validationServiceJob2,validationServiceJob3)
        );
        boolean status=true;
           for(ServiceResponse serviceResponse: mapReduceResponse.getReducedResponses().values()){

               status=status && serviceResponse.isSuccess();
           }
        // make sure the aggregated status is success
        assertEquals(status,true);
        assertEquals(mapReduceResponse.isSuccess(),true);
    }

References

  1. Ignite compute grid
  2. The code on GitHub

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
apache ignite ,nosql ,in-memory computing ,in-memory data grid ,fail-fast ,mapreduce ,tutorial ,big data

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}