DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workkloads.

Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Java, Spring Boot, and MongoDB: Performance Analysis and Improvements
  • Intro to Spring Data MongoDB Reactive and How to Move It to the Cloud
  • Spring Data: Easy MongoDB Migration Using Mongock
  • High-Performance Batch Processing Using Apache Spark and Spring Batch

Trending

  • Beyond Linguistics: Real-Time Domain Event Mapping with WebSocket and Spring Boot
  • A Modern Stack for Building Scalable Systems
  • Streamlining Event Data in Event-Driven Ansible
  • GDPR Compliance With .NET: Securing Data the Right Way
  1. DZone
  2. Data Engineering
  3. Databases
  4. Spring Batch Goodies With MongoDB

Spring Batch Goodies With MongoDB

Explore a tutorial that explains what needs to be done in order to get spring batch goodies along with MongoDB.

By 
Paweł Gawędzki user avatar
Paweł Gawędzki
·
Updated Nov. 13, 18 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
28.3K Views

Join the DZone community and get the full member experience.

Join For Free

In this article, I am going to explain what needs to be done in order to get spring batch goodies along with MongoDB. I assume that the reader has basic knowledge about spring boot, spring batch, MongoDB, and Java 8.

Let's start!

Motivation

In my previous article I showed how to implement an application that replicates one database into local MongoDB instance using spring batch. Now, I want to show what interfaces needs to be implemented in order to have all the details regarding batch itself. Spring batch gives all the information out of the box, but I could not fing the default mechanism to store them into the MongoDB database. After some research, I found this repository that I used as my starting point.

Preconditions

System requirements:

    • Java 8
    • Maven 3.5.2
    • Spring Boot 1.5.10
    • MongoDB 3.4.

Here are the dependencies defined in pom.xml:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>

This is a Spring Boot application with spring batch and MongoDB starters.

Implementation and Explanation

Let's see the batch configuration in class MainBatchConfigurer:

@Configuration
public class MainBatchConfigurer implements BatchConfigurer {

    @Autowired
    private ExecutionContextDao mongoExecutionContextDao;
    @Autowired
    private JobExecutionDao mongoJobExecutionDao;
    @Autowired
    private JobInstanceDao mongoJobInstanceDao;
    @Autowired
    private StepExecutionDao mongoStepExecutionDao;

    @Override
    public JobRepository getJobRepository() {
        return new SimpleJobRepository(
          mongoJobInstanceDao, 
          mongoJobExecutionDao, 
          mongoStepExecutionDao, 
          mongoExecutionContextDao
        );
    }

    @Override
    public PlatformTransactionManager getTransactionManager() {
        return new ResourcelessTransactionManager();
    }

    @Override
    public SimpleJobLauncher getJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(getJobRepository());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

    @Override
    public JobExplorer getJobExplorer() {
        return new SimpleJobExplorer(
          mongoJobInstanceDao, 
          mongoJobExecutionDao, 
          mongoStepExecutionDao, 
          mongoExecutionContextDao
        );
    }
}

The configuration was done based on the documentation that I found on the Spring page. The MainBatchConfigurer is spring configuration class that implements interface BatchConfigurer from spring package. Spring documentation says:

The core interface for this configuration is the BatchConfigurer. The default implementation provides the beans ... and requires a DataSource as a bean within the context to be provided. This data source will be used by the JobRepository.

BatchConfigurer requires following definitions:

    • job repository
    • transaction manager
    • job launcher
    • job explorer

These definitions are mandatory, but we do not have to think about how to implement them. Spring contains default implementations for each of them. What is important here is that the job repository and job explorer need implementation of interfaces ExecutionContextDao, JobExecutionDao, JobInstanceDao, and StepExecutionDao. Also, when SimpleJobLauncher is created, it is very important to call method afterPropertiesSet after the job launcher is fully created. SimpleJobLauncher requires a fully defined job repository. As spring documentation says:

"...the JobRepositoryis used for basic CRUD operations of the various persisted domain objects within Spring Batch, such as JobExecution and StepExecution. It is required by many of the major framework features, such as the JobLauncher, Job, and Step."

In this particular case, I do not need any kind of transaction handling, so I'm using ResourcelessTransactionManager. If one needs complex transaction handling, then proper implementation should be chosen. When it comes to JobExplorer, the documentation says:

"... JobExplorer is a read-only version of the  JobRepository..."

Now, lets see the interface that has to be used in order to store batch details into the database. First, I want to show ExecutionContextDao. It contains the following methods:

ExecutionContext getExecutionContext(JobExecution jobExecution);

ExecutionContext getExecutionContext(StepExecution stepExecution);

void saveExecutionContext(final JobExecution jobExecution);

void saveExecutionContext(final StepExecution stepExecution);

void saveExecutionContexts(final Collection<StepExecution> stepExecutions);

void updateExecutionContext(final JobExecution jobExecution);

void updateExecutionContext(final StepExecution stepExecution);

No magic here. The interface clearly shows what has to be done here. So, let's take a look at the implementation.

@Repository
public class MongoExecutionContextDao extends AbstractMongoDao implements ExecutionContextDao {

    @PostConstruct
    public void init() {
        super.init();
        getCollection().createIndex(
                BasicDBObjectBuilder.start()
                        .add(STEP_EXECUTION_ID_KEY, 1)
                        .add(JOB_EXECUTION_ID_KEY, 1)
                        .get()
        );
    }

    @Override
    public ExecutionContext getExecutionContext(JobExecution jobExecution) {
        return getExecutionContext(JOB_EXECUTION_ID_KEY, jobExecution.getId());
    }

    @Override
    public ExecutionContext getExecutionContext(StepExecution stepExecution) {
        return getExecutionContext(STEP_EXECUTION_ID_KEY, stepExecution.getId());
    }

    @Override
    public void saveExecutionContext(JobExecution jobExecution) {
        saveOrUpdateExecutionContext(
          JOB_EXECUTION_ID_KEY, jobExecution.getId(), jobExecution.getExecutionContext()
        );
    }

    @Override
    public void saveExecutionContext(StepExecution stepExecution) {
        saveOrUpdateExecutionContext(
          STEP_EXECUTION_ID_KEY, stepExecution.getId(), stepExecution.getExecutionContext()
        );
    }

    @Override
    public void updateExecutionContext(JobExecution jobExecution) {
        saveOrUpdateExecutionContext(
          JOB_EXECUTION_ID_KEY, jobExecution.getId(), jobExecution.getExecutionContext()
        );
    }

    @Override
    public void updateExecutionContext(StepExecution stepExecution) {
        saveOrUpdateExecutionContext(
          STEP_EXECUTION_ID_KEY, stepExecution.getId(), stepExecution.getExecutionContext()
        );
    }

    private void saveOrUpdateExecutionContext(String executionIdKey, 
                                              Long executionId, 
                                              ExecutionContext executionContext) {
        Assert.notNull(executionId, "ExecutionId must not be null.");
        Assert.notNull(executionContext, "The ExecutionContext must not be null.");

        DBObject dbObject = new BasicDBObject(executionIdKey, executionId);
        for (Map.Entry<String, Object> entry : executionContext.entrySet()) {
            Object value = entry.getValue();
            String key = entry.getKey();
            dbObject.put(key.replaceAll(DOT_STRING, DOT_ESCAPE_STRING), value);
            if (value instanceof BigDecimal || value instanceof BigInteger) {
                dbObject.put(key + TYPE_SUFFIX, value.getClass().getName());
            }
        }
        getCollection().update(
          new BasicDBObject(executionIdKey, executionId), dbObject, true, false
        );
    }

    @SuppressWarnings({"unchecked"})
    private ExecutionContext getExecutionContext(String executionIdKey, Long executionId) {
        Assert.notNull(executionId, "ExecutionId must not be null.");
        DBObject result = getCollection().findOne(
          new BasicDBObject(executionIdKey, executionId)
        );
        ExecutionContext executionContext = new ExecutionContext();
        if (result != null) {
            result.removeField(executionIdKey);
            removeSystemFields(result);
            for (String key : result.keySet()) {
                Object value = result.get(key);
                String type = (String) result.get(key + TYPE_SUFFIX);
                if (type != null && Number.class.isAssignableFrom(value.getClass())) {
                    try {
                        value = NumberUtils.convertNumberToTargetClass(
                              (Number) value, 
                              (Class<? extends Number>) Class.forName(type)
                        );
                    } catch (Exception e) {
                        logger.warn("Failed to convert {} to {}", key, type);
                    }
                }
                //Mongo db does not allow key name with "." character.
                executionContext.put(
                        key.replaceAll(DOT_ESCAPE_STRING, DOT_STRING), value
                );
            }
        }
        return executionContext;
    }

    @Override
    protected DBCollection getCollection() {
        return mongoTemplate.getCollection(ExecutionContext.class.getSimpleName());
    }

    @Override
    public void saveExecutionContexts(Collection<StepExecution> stepExecutions) {
        Assert.notNull(stepExecutions, "Attempt to save a null collection of step executions");
        for (StepExecution stepExecution : stepExecutions) {
            saveExecutionContext(stepExecution);
            saveExecutionContext(stepExecution.getJobExecution());
        }
    }
}

What needs to be done here is to implement all the methods that save ExecutionContext based on JobExecution and StepExecution. This has to be done manually. To save ExecutionContext as a MongoDB document private method, saveOrUpdateExecutionContext was implemented. What it does is extracts parameters from ExecutionContext, puts them into the MongoDB document, and stores that document into MongoDB. The other method worth mentioning here is private method getExecutionContext. This method does exactly the opposite of what saveOrUpdateExecutionContext does. It gets the raw document from the collection named "ExecutionContext" (check method getCollection) and converts it into object ExecutionContext.

Now, let us look closer into the interface JobInstanceDao. The interface contains the following methods:

JobInstance createJobInstance(String jobName, JobParameters jobParameters);

@Nullable
JobInstance getJobInstance(String jobName, JobParameters jobParameters);

@Nullable
JobInstance getJobInstance(@Nullable Long instanceId);

@Nullable
JobInstance getJobInstance(JobExecution jobExecution);

List<JobInstance> getJobInstances(String jobName, int start, int count);

List<String> getJobNames();

List<JobInstance> findJobInstancesByName(String jobName, int start, int count);

int getJobInstanceCount(@Nullable String jobName) throws NoSuchJobException;

The above methods manage JobInstance on a database level. Below is the implementation:

@Repository
public class MongoJobInstanceDao extends AbstractMongoDao implements JobInstanceDao {

    @PostConstruct
    @Override
    public void init() {
        super.init();
        getCollection().createIndex(jobInstanceIdObj(1L));
    }

    @Override
    public JobInstance createJobInstance(String jobName, final JobParameters jobParameters) {
        Assert.notNull(jobName, "Job name must not be null.");
        Assert.notNull(jobParameters, "JobParameters must not be null.");
        Assert.state(getJobInstance(jobName, jobParameters) == null, "JobInstance must not already exist");

        Long jobId = getNextId(JobInstance.class.getSimpleName(), mongoTemplate);

        JobInstance jobInstance = new JobInstance(jobId, jobName);

        jobInstance.incrementVersion();

        Map<String, JobParameter> jobParams = jobParameters.getParameters();
        Map<String, Object> paramMap = new HashMap<>(jobParams.size());
        for (Map.Entry<String, JobParameter> entry : jobParams.entrySet()) {
            paramMap.put(
              entry.getKey().replaceAll(DOT_STRING, DOT_ESCAPE_STRING), 
              entry.getValue().getValue()
            );
        }
        getCollection().save(
                BasicDBObjectBuilder.start()
                        .add(JOB_INSTANCE_ID_KEY, jobId)
                        .add(JOB_NAME_KEY, jobName)
                        .add(JOB_KEY_KEY, createJobKey(jobParameters))
                        .add(VERSION_KEY, jobInstance.getVersion())
                        .add(JOB_PARAMETERS_KEY, new BasicDBObject(paramMap))
                        .get()
        );
        return jobInstance;
    }

    @Override
    public JobInstance getJobInstance(String jobName, JobParameters jobParameters) {
        Assert.notNull(jobName, "Job name must not be null.");
        Assert.notNull(jobParameters, "JobParameters must not be null.");

        String jobKey = createJobKey(jobParameters);

        return mapJobInstance(getCollection().findOne(
                BasicDBObjectBuilder.start()
                        .add(JOB_NAME_KEY, jobName)
                        .add(JOB_KEY_KEY, jobKey).get()), jobParameters);
    }

    @Override
    public JobInstance getJobInstance(Long instanceId) {
        return mapJobInstance(getCollection().findOne(jobInstanceIdObj(instanceId)));
    }

    @Override
    public JobInstance getJobInstance(JobExecution jobExecution) {
        DBObject instanceId = mongoTemplate.getCollection(JobExecution.class.getSimpleName())
          .findOne(jobExecutionIdObj(jobExecution.getId()), jobInstanceIdObj(1L));
        removeSystemFields(instanceId);
        return mapJobInstance(getCollection().findOne(instanceId));
    }

    @Override
    public List<JobInstance> getJobInstances(String jobName, int start, int count) {
        return mapJobInstances(
          getCollection()
          .find(new BasicDBObject(JOB_NAME_KEY, jobName))
          .sort(jobInstanceIdObj(-1L))
          .skip(start)
          .limit(count)
        );
    }

    @Override
    @SuppressWarnings({"unchecked"})
    public List<String> getJobNames() {
        List results = getCollection().distinct(JOB_NAME_KEY);
        Collections.sort(results);
        return results;
    }

    private String createJobKey(JobParameters jobParameters) {

        Map<String, JobParameter> props = jobParameters.getParameters();
        StringBuilder stringBuilder = new StringBuilder();
        List<String> keys = new ArrayList<>(props.keySet());
        Collections.sort(keys);
        for (String key : keys) {
            stringBuilder.append(key).append("=").append(props.get(key).toString()).append(";");
        }

        MessageDigest digest;
        try {
            digest = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            throw new IllegalStateException("MD5 algorithm not available.  Fatal (should be in the JDK).");
        }

        try {
            byte[] bytes = digest.digest(stringBuilder.toString().getBytes("UTF-8"));
            return String.format("%032x", new BigInteger(1, bytes));
        } catch (UnsupportedEncodingException e) {
            throw new IllegalStateException("UTF-8 encoding not available. Fatal (should be in the JDK).");
        }
    }

    @Override
    protected DBCollection getCollection() {
        return mongoTemplate.getCollection(JobInstance.class.getSimpleName());
    }

    private List<JobInstance> mapJobInstances(DBCursor dbCursor) {
        List<JobInstance> results = new ArrayList<>();
        while (dbCursor.hasNext()) {
            results.add(mapJobInstance(dbCursor.next()));
        }
        return results;
    }

    private JobInstance mapJobInstance(DBObject dbObject) {
        return mapJobInstance(dbObject, null);
    }

    private JobInstance mapJobInstance(DBObject dbObject, JobParameters jobParameters) {
        JobInstance jobInstance = null;
        if (dbObject != null) {
            Long id = (Long) dbObject.get(JOB_INSTANCE_ID_KEY);
            if (jobParameters == null) {
                jobParameters = getJobParameters(id, mongoTemplate);
            }

            jobInstance = new JobInstance(id, (String) dbObject.get(JOB_NAME_KEY)); // should always be at version=0 because they never get updated
            jobInstance.incrementVersion();
        }
        return jobInstance;
    }

    @Override
    public List<JobInstance> findJobInstancesByName(String jobName, int start, int count) {
        List<JobInstance> result = new ArrayList<>();
        List<JobInstance> jobInstances = mapJobInstances(
                getCollection()
                        .find(new BasicDBObject(JOB_NAME_KEY, jobName))
                        .sort(jobInstanceIdObj(-1L))
        );
        for (JobInstance instanceEntry : jobInstances) {
            String key = instanceEntry.getJobName();
            String curJobName = key.substring(0, key.lastIndexOf("|"));

            if (curJobName.equals(jobName)) {
                result.add(instanceEntry);
            }
        }
        return result;
    }

    @Override
    public int getJobInstanceCount(String jobName) throws NoSuchJobException {
        int count = 0;
        List<JobInstance> jobInstances = mapJobInstances(
                getCollection()
                        .find(new BasicDBObject(JOB_NAME_KEY, jobName))
                        .sort(jobInstanceIdObj(-1L))
        );
        for (JobInstance instanceEntry : jobInstances) {
            String key = instanceEntry.getJobName();
            String curJobName = key.substring(0, key.lastIndexOf("|"));

            if (curJobName.equals(jobName)) {
                count++;
            }
        }

        if (count == 0) {
            throw new NoSuchJobException(String.format("No job instances for job name %s were found", jobName));
        } else {
            return count;
        }
    }

}

What all the above methods do is create a single JobInstance as a document, store that document in a collection named "JobInstance," and gets a single JobInstance from that collection and maps raw documents into the JobInstance object. Also, there is a method that creates a job key based on job parameters. The outcome from this class is a MongoDB collection "JobInstance" full of documents with job details.

The next interface that I would like to shortly describe is JobExecutionDao. This interface contains the following methods:

void saveJobExecution(JobExecution jobExecution);

void updateJobExecution(JobExecution jobExecution);

List<JobExecution> findJobExecutions(JobInstance jobInstance);

@Nullable
JobExecution getLastJobExecution(JobInstance jobInstance);

Set<JobExecution> findRunningJobExecutions(String jobName);

@Nullable
JobExecution getJobExecution(Long executionId);

void synchronizeStatus(JobExecution jobExecution);

The names of the method are self-explanatory. Only the method synchronizeStatus can be a bit mysterious. The idea behind this method is:

Because it may be possible that the status of a JobExecution is updated while running, the following method will synchronize only the status and version fields.

So, knowing that, let's have a look at the implementation of this interface for the MongoDB database:

@Repository
public class MongoJobExecutionDao extends AbstractMongoDao implements JobExecutionDao {

    @PostConstruct
    public void init() {
        super.init();
        getCollection().createIndex(
                BasicDBObjectBuilder.start()
                        .add(JOB_EXECUTION_ID_KEY, 1)
                        .add(JOB_INSTANCE_ID_KEY, 1)
                        .get()
        );
    }

    public void saveJobExecution(JobExecution jobExecution) {
        validateJobExecution(jobExecution);
        jobExecution.incrementVersion();
        Long id = getNextId(JobExecution.class.getSimpleName(), mongoTemplate);
        save(jobExecution, id);
    }

    private void save(JobExecution jobExecution, Long id) {
        jobExecution.setId(id);
        DBObject object = toDbObjectWithoutVersion(jobExecution);
        object.put(VERSION_KEY, jobExecution.getVersion());
        getCollection().save(object);
    }

    private DBObject toDbObjectWithoutVersion(JobExecution jobExecution) {
        return BasicDBObjectBuilder.start()
                .add(JOB_EXECUTION_ID_KEY, jobExecution.getId())
                .add(JOB_INSTANCE_ID_KEY, jobExecution.getJobId())
                .add(START_TIME_KEY, jobExecution.getStartTime())
                .add(END_TIME_KEY, jobExecution.getEndTime())
                .add(STATUS_KEY, jobExecution.getStatus().toString())
                .add(EXIT_CODE_KEY, jobExecution.getExitStatus().getExitCode())
                .add(EXIT_MESSAGE_KEY, jobExecution.getExitStatus().getExitDescription())
                .add(CREATE_TIME_KEY, jobExecution.getCreateTime())
                .add(LAST_UPDATED_KEY, jobExecution.getLastUpdated()).get();
    }

    private void validateJobExecution(JobExecution jobExecution) {
        Assert.notNull(jobExecution, "JobExecution cannot be null.");
        Assert.notNull(jobExecution.getJobId(), "JobExecution Job-Id cannot be null.");
        Assert.notNull(jobExecution.getStatus(), "JobExecution status cannot be null.");
        Assert.notNull(jobExecution.getCreateTime(), "JobExecution create time cannot be null");
    }

    public synchronized void updateJobExecution(JobExecution jobExecution) {
        validateJobExecution(jobExecution);

        Long jobExecutionId = jobExecution.getId();
        Assert.notNull(jobExecutionId, "JobExecution ID cannot be null. JobExecution must be saved before it can be updated");
        Assert.notNull(jobExecution.getVersion(), "JobExecution version cannot be null. JobExecution must be saved before it can be updated");

        Integer version = jobExecution.getVersion() + 1;

        if (getCollection().findOne(jobExecutionIdObj(jobExecutionId)) == null) {
            throw new NoSuchObjectException(String.format("Invalid JobExecution, ID %s not found.", jobExecutionId));
        }

        DBObject object = toDbObjectWithoutVersion(jobExecution);
        object.put(VERSION_KEY, version);
        WriteResult update = getCollection().update(
                BasicDBObjectBuilder.start()
                        .add(JOB_EXECUTION_ID_KEY, jobExecutionId)
                        .add(VERSION_KEY, jobExecution.getVersion())
                        .get(),
                object
        );

        jobExecution.incrementVersion();
    }

    public List<JobExecution> findJobExecutions(JobInstance jobInstance) {
        Assert.notNull(jobInstance, "Job cannot be null.");
        Long id = jobInstance.getId();
        Assert.notNull(id, "Job Id cannot be null.");
        DBCursor dbCursor = getCollection()
          .find(jobInstanceIdObj(id))
          .sort(new BasicDBObject(JOB_EXECUTION_ID_KEY, -1));
        List<JobExecution> result = new ArrayList<>();
        while (dbCursor.hasNext()) {
            DBObject dbObject = dbCursor.next();
            result.add(mapJobExecution(jobInstance, dbObject));
        }
        return result;
    }

    public JobExecution getLastJobExecution(JobInstance jobInstance) {
        Long id = jobInstance.getId();

        DBCursor dbCursor = getCollection()
          .find(jobInstanceIdObj(id))
          .sort(new BasicDBObject(CREATE_TIME_KEY, -1))
          .limit(1);
        if (!dbCursor.hasNext()) {
            return null;
        } else {
            DBObject singleResult = dbCursor.next();
            if (dbCursor.hasNext()) {
                throw new IllegalStateException("There must be at most one latest job execution");
            }
            return mapJobExecution(jobInstance, singleResult);
        }
    }

    public Set<JobExecution> findRunningJobExecutions(String jobName) {
        DBCursor instancesCursor = mongoTemplate.getCollection(JobInstance.class.getSimpleName())
                .find(new BasicDBObject(JOB_NAME_KEY, jobName), jobInstanceIdObj(1L));
        List<Long> ids = new ArrayList<>();
        while (instancesCursor.hasNext()) {
            ids.add((Long) instancesCursor.next().get(JOB_INSTANCE_ID_KEY));
        }

        DBCursor dbCursor = getCollection().find(
                BasicDBObjectBuilder
                        .start()
                        .add(JOB_INSTANCE_ID_KEY, new BasicDBObject("$in", ids.toArray()))
                        .add(END_TIME_KEY, null).get()).sort(
                jobExecutionIdObj(-1L));
        Set<JobExecution> result = new HashSet<>();
        while (dbCursor.hasNext()) {
            result.add(mapJobExecution(dbCursor.next()));
        }
        return result;
    }

    public JobExecution getJobExecution(Long executionId) {
        return mapJobExecution(getCollection().findOne(jobExecutionIdObj(executionId)));
    }

    public void synchronizeStatus(JobExecution jobExecution) {
        Long id = jobExecution.getId();
        DBObject jobExecutionObject = getCollection().findOne(jobExecutionIdObj(id));
        int currentVersion = jobExecutionObject != null ? ((Integer) jobExecutionObject.get(VERSION_KEY)) : 0;
        if (currentVersion != jobExecution.getVersion()) {
            if (jobExecutionObject == null) {
                save(jobExecution, id);
                jobExecutionObject = getCollection().findOne(jobExecutionIdObj(id));
            }
            String status = (String) jobExecutionObject.get(STATUS_KEY);
            jobExecution.upgradeStatus(BatchStatus.valueOf(status));
            jobExecution.setVersion(currentVersion);
        }
    }

    @Override
    protected DBCollection getCollection() {
        return mongoTemplate.getCollection(JobExecution.class.getSimpleName());
    }

    private JobExecution mapJobExecution(DBObject dbObject) {
        return mapJobExecution(null, dbObject);
    }

    private JobExecution mapJobExecution(JobInstance jobInstance, DBObject dbObject) {
        if (dbObject == null) {
            return null;
        }
        Long id = (Long) dbObject.get(JOB_EXECUTION_ID_KEY);
        JobExecution jobExecution;

        if (jobInstance == null) {
            jobExecution = new JobExecution(id);
        } else {
            JobParameters jobParameters = getJobParameters(jobInstance.getId(), mongoTemplate);
            jobExecution = new JobExecution(jobInstance, id, jobParameters, null);
        }
        jobExecution.setStartTime((Date) dbObject.get(START_TIME_KEY));
        jobExecution.setEndTime((Date) dbObject.get(END_TIME_KEY));
        jobExecution.setStatus(BatchStatus.valueOf((String) dbObject.get(STATUS_KEY)));
        jobExecution.setExitStatus(new ExitStatus(((String) dbObject.get(EXIT_CODE_KEY)), (String) dbObject.get(EXIT_MESSAGE_KEY)));
        jobExecution.setCreateTime((Date) dbObject.get(CREATE_TIME_KEY));
        jobExecution.setLastUpdated((Date) dbObject.get(LAST_UPDATED_KEY));
        jobExecution.setVersion((Integer) dbObject.get(VERSION_KEY));

        return jobExecution;
    }
}

Similarly to previous implementations, MongoJobExecutionDao does CRUD operations on JobExecution object (except D). The above methods convert JobExecution to a MongoDB document and store that document to the database and also converts raw MongoDB documents to the JobExecution object when read from the database. The outcome of this implementation of JobExecutionDao is a MongoDB collection named "JobExecution," which is full of documents containing job execution details.

Last but not least, let's look at interface StepExecutionDao. It contains the following methods:

void saveStepExecution(StepExecution stepExecution);

void saveStepExecutions(Collection<StepExecution> stepExecutions);

void updateStepExecution(StepExecution stepExecution);

@Nullable
StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId);

void addStepExecutions(JobExecution jobExecution);

As you can see, it is also uses CRUD (without D). Lets have a closer look at the implementation of this interface.

@Repository
public class MongoStepExecutionDao extends AbstractMongoDao implements StepExecutionDao {

    @PostConstruct
    public void init() {
        super.init();
        getCollection().createIndex(
          BasicDBObjectBuilder.start()
          .add(STEP_EXECUTION_ID_KEY, 1)
          .add(JOB_EXECUTION_ID_KEY, 1)
          .get()
        );
    }

    public void saveStepExecution(StepExecution stepExecution) {
        Assert.isNull(stepExecution.getId(),
                "to-be-saved (not updated) StepExecution can't already have an id assigned");
        Assert.isNull(stepExecution.getVersion(),
                "to-be-saved (not updated) StepExecution can't already have a version assigned");

        validateStepExecution(stepExecution);

        stepExecution.setId(getNextId(StepExecution.class.getSimpleName(), mongoTemplate));
        stepExecution.incrementVersion(); // should be 0 now
        DBObject object = toDbObjectWithoutVersion(stepExecution);
        object.put(VERSION_KEY, stepExecution.getVersion());
        getCollection().save(object);
    }

    private DBObject toDbObjectWithoutVersion(StepExecution stepExecution) {
        return start()
                .add(STEP_EXECUTION_ID_KEY, stepExecution.getId())
                .add(STEP_NAME_KEY, stepExecution.getStepName())
                .add(JOB_EXECUTION_ID_KEY, stepExecution.getJobExecutionId())
                .add(START_TIME_KEY, stepExecution.getStartTime())
                .add(END_TIME_KEY, stepExecution.getEndTime())
                .add(STATUS_KEY, stepExecution.getStatus().toString())
                .add(COMMIT_COUNT_KEY, stepExecution.getCommitCount())
                .add(READ_COUNT_KEY, stepExecution.getReadCount())
                .add(FILTER_COUT_KEY, stepExecution.getFilterCount())
                .add(WRITE_COUNT_KEY, stepExecution.getWriteCount())
                .add(EXIT_CODE_KEY, stepExecution.getExitStatus().getExitCode())
                .add(EXIT_MESSAGE_KEY, stepExecution.getExitStatus().getExitDescription())
                .add(READ_SKIP_COUNT_KEY, stepExecution.getReadSkipCount())
                .add(WRITE_SKIP_COUNT_KEY, stepExecution.getWriteSkipCount())
                .add(PROCESS_SKIP_COUT_KEY, stepExecution.getProcessSkipCount())
                .add(ROLLBACK_COUNT_KEY, stepExecution.getRollbackCount())
                .add(LAST_UPDATED_KEY, stepExecution.getLastUpdated()).get();
    }

    public synchronized void updateStepExecution(StepExecution stepExecution) {
        Integer currentVersion = stepExecution.getVersion();
        Integer newVersion = currentVersion + 1;
        DBObject object = toDbObjectWithoutVersion(stepExecution);
        object.put(VERSION_KEY, newVersion);
        getCollection().update(
          start()
          .add(STEP_EXECUTION_ID_KEY, stepExecution.getId())
          .add(VERSION_KEY, currentVersion).get(),
                object
        );

        stepExecution.incrementVersion();
    }

    static BasicDBObject stepExecutionIdObj(Long id) {
        return new BasicDBObject(STEP_EXECUTION_ID_KEY, id);
    }

    public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId) {
        return mapStepExecution(getCollection().findOne(BasicDBObjectBuilder.start()
                .add(STEP_EXECUTION_ID_KEY, stepExecutionId)
                .add(JOB_EXECUTION_ID_KEY, jobExecution.getId()).get()), jobExecution);
    }

    private StepExecution mapStepExecution(DBObject object, JobExecution jobExecution) {
        if (object == null) {
            return null;
        }
        StepExecution stepExecution = new StepExecution(
          (String) object.get(STEP_NAME_KEY), 
          jobExecution, 
          (Long) object.get(STEP_EXECUTION_ID_KEY)
        );
        stepExecution.setStartTime((Date) object.get(START_TIME_KEY));
        stepExecution.setEndTime((Date) object.get(END_TIME_KEY));
        stepExecution.setStatus(BatchStatus.valueOf((String) object.get(STATUS_KEY)));
        stepExecution.setCommitCount((Integer) object.get(COMMIT_COUNT_KEY));
        stepExecution.setReadCount((Integer) object.get(READ_COUNT_KEY));
        stepExecution.setFilterCount((Integer) object.get(FILTER_COUT_KEY));
        stepExecution.setWriteCount((Integer) object.get(WRITE_COUNT_KEY));
        stepExecution.setExitStatus(new ExitStatus(
            (String) object.get(EXIT_CODE_KEY), 
            (String) object.get(EXIT_MESSAGE_KEY)
        ));
        stepExecution.setReadSkipCount((Integer) object.get(READ_SKIP_COUNT_KEY));
        stepExecution.setWriteSkipCount((Integer) object.get(WRITE_SKIP_COUNT_KEY));
        stepExecution.setProcessSkipCount((Integer) object.get(PROCESS_SKIP_COUT_KEY));
        stepExecution.setRollbackCount((Integer) object.get(ROLLBACK_COUNT_KEY));
        stepExecution.setLastUpdated((Date) object.get(LAST_UPDATED_KEY));
        stepExecution.setVersion((Integer) object.get(VERSION_KEY));
        return stepExecution;
    }

    public void addStepExecutions(JobExecution jobExecution) {
        DBCursor stepsCoursor = getCollection()
            .find(jobExecutionIdObj(jobExecution.getId()))
          .sort(stepExecutionIdObj(1L));
        while (stepsCoursor.hasNext()) {
            DBObject stepObject = stepsCoursor.next();
            mapStepExecution(stepObject, jobExecution);
        }
    }

    @Override
    protected DBCollection getCollection() {
        return mongoTemplate.getCollection(StepExecution.class.getSimpleName());
    }

    private void validateStepExecution(StepExecution stepExecution) {
        Assert.notNull(stepExecution, "StepExecution cannot be null.");
        Assert.notNull(stepExecution.getStepName(), "StepExecution step name cannot be null.");
        Assert.notNull(stepExecution.getStartTime(), "StepExecution start time cannot be null.");
        Assert.notNull(stepExecution.getStatus(), "StepExecution status cannot be null.");
    }

    @Override
    public void saveStepExecutions(Collection<StepExecution> stepExecutions) {
        Assert.notNull(stepExecutions,"Attempt to save an null collect of step executions");
        for (StepExecution stepExecution: stepExecutions) {
            saveStepExecution(stepExecution);
        }
    }
}

Nothing new here. This implementation converts StepExecution object to a MongoDB document when saving and raw document to StepExecution object when reading from the database. The outcome of this implementation is a MongoDB collection named "StepExecution," which is full of documents containing details of step execution.

There is one class left to show. All of the above implementations extend abstract class AbstractMongoDao. So lets look at this class:

abstract class AbstractMongoDao {
    static final String VERSION_KEY = "version";
    static final String START_TIME_KEY = "startTime";
    static final String END_TIME_KEY = "endTime";
    static final String EXIT_CODE_KEY = "exitCode";
    static final String EXIT_MESSAGE_KEY = "exitMessage";
    static final String LAST_UPDATED_KEY = "lastUpdated";
    static final String STATUS_KEY = "status";
    private static final String SEQUENCES_COLLECTION_NAME = "Sequences";
    private static final String ID_KEY = "_id";
    private static final String NS_KEY = "_ns";
    static final String DOT_ESCAPE_STRING = "\\{dot}";
    static final String DOT_STRING = "\\.";

    // Job Constants
    static final String JOB_NAME_KEY = "jobName";
    static final String JOB_INSTANCE_ID_KEY = "jobInstanceId";
    static final String JOB_KEY_KEY = "jobKey";
    static final String JOB_PARAMETERS_KEY = "jobParameters";

    // Job Execution Constants
    static final String JOB_EXECUTION_ID_KEY = "jobExecutionId";
    static final String CREATE_TIME_KEY = "createTime";

    // Job Execution Contexts Constants
    static final String STEP_EXECUTION_ID_KEY = "stepExecutionId";
    static final String TYPE_SUFFIX = "_TYPE";

    // Step Execution Constants
    static final String STEP_NAME_KEY = "stepName";
    static final String COMMIT_COUNT_KEY = "commitCount";
    static final String READ_COUNT_KEY = "readCount";
    static final String FILTER_COUT_KEY = "filterCout";
    static final String WRITE_COUNT_KEY = "writeCount";
    static final String READ_SKIP_COUNT_KEY = "readSkipCount";
    static final String WRITE_SKIP_COUNT_KEY = "writeSkipCount";
    static final String PROCESS_SKIP_COUT_KEY = "processSkipCout";
    static final String ROLLBACK_COUNT_KEY = "rollbackCount";

    protected Logger logger;

    @Autowired
    protected MongoTemplate mongoTemplate;

    protected abstract DBCollection getCollection();

    protected void init() {
        logger = LoggerFactory.getLogger(this.getClass());
    }

    Long getNextId(String name, MongoTemplate mongoTemplate) {
        DBCollection collection = mongoTemplate.getDb()
          .getCollection(SEQUENCES_COLLECTION_NAME);
        BasicDBObject sequence = new BasicDBObject("name", name);
        collection.update(
              sequence, 
              new BasicDBObject("$inc", new BasicDBObject("value", 1L)), 
              true, 
              false
        );
        return (Long) collection.findOne(sequence).get("value");
    }

    void removeSystemFields(DBObject dbObject) {
        dbObject.removeField(ID_KEY);
        dbObject.removeField(NS_KEY);
    }

    BasicDBObject jobInstanceIdObj(Long id) {
        return new BasicDBObject(MongoJobInstanceDao.JOB_INSTANCE_ID_KEY, id);
    }

    BasicDBObject jobExecutionIdObj(Long id) {
        return new BasicDBObject(JOB_EXECUTION_ID_KEY, id);
    }

    @SuppressWarnings({"unchecked"})
    JobParameters getJobParameters(Long jobInstanceId, MongoTemplate mongoTemplate) {
        DBObject jobParamObj = mongoTemplate
                .getCollection(JobInstance.class.getSimpleName())
                .findOne(new BasicDBObject(jobInstanceIdObj(jobInstanceId)));

        if (jobParamObj != null && 
            jobParamObj.get(MongoJobInstanceDao.JOB_PARAMETERS_KEY) != null) {

            Map<String, ?> jobParamsMap = 
              (Map<String, ?>) jobParamObj.get(MongoJobInstanceDao.JOB_PARAMETERS_KEY);

            Map<String, JobParameter> map = new HashMap<>(jobParamsMap.size());
            for (Map.Entry<String, ?> entry : jobParamsMap.entrySet()) {
                Object param = entry.getValue();
                String key = entry.getKey().replaceAll(DOT_ESCAPE_STRING, DOT_STRING);
                if (param instanceof String) {
                    map.put(key, new JobParameter((String) param));
                } else if (param instanceof Long) {
                    map.put(key, new JobParameter((Long) param));
                } else if (param instanceof Double) {
                    map.put(key, new JobParameter((Double) param));
                } else if (param instanceof Date) {
                    map.put(key, new JobParameter((Date) param));
                } else {
                    map.put(key, null);
                }
            }
            return new JobParameters(map);
        }
        return null;
    }
}

This abstract class contains all the constants and methods used by its children. The constants are mainly the names of the attributes relevant to the proper dao interface. These constants are used as keys under which data are stored. Here also, MongoTemplate is autowired. There are also some utils methods that help with processing data.

Connection to the MongoDB database is defined in MongoDBConfig class.

@Configuration
public class MongoDBConfig extends AbstractMongoConfiguration {

    private static Logger logger = LoggerFactory.getLogger(MongoDBConfig.class);

    @Value("${spring.data.mongodb.database}")
    private String database;

    @Value("${spring.data.mongodb.host}")
    private String host;

    @Override
    protected String getDatabaseName() {
        return database;
    }

    @Override
    public MongoClient mongo() {
        MongoClientOptions.Builder mongoClientOptionsBuilder = MongoClientOptions.builder()
                .writeConcern(WriteConcern.ACKNOWLEDGED)
                .socketKeepAlive(true);
        return new MongoClient(new MongoClientURI(host, mongoClientOptionsBuilder));
    }
}

And that is it!

Summary

In this tutorial, I showed you how I used spring batch with mongodb. and how spring batch goodies can be saved and read from MongoDB. The full code is available on Github. The code and this article were built based on the spring-batch and springbatch-mongoDao repositories. I hope this tutorial was helpful.

Spring Framework MongoDB Spring Batch

Opinions expressed by DZone contributors are their own.

Related

  • Java, Spring Boot, and MongoDB: Performance Analysis and Improvements
  • Intro to Spring Data MongoDB Reactive and How to Move It to the Cloud
  • Spring Data: Easy MongoDB Migration Using Mongock
  • High-Performance Batch Processing Using Apache Spark and Spring Batch

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!