Camel Clustered File Ingestion With JDBC and Spring
Reading files with Apache Camel is not so easy when you need to deploy your route to multiple servers. Thankfully, Camel has the concept of an idempotent consumer.
Join the DZone community and get the full member experience.
Join For FreeReading a file is a common use for Apache Camel. From using a file to kick off a larger route to simply needing to store the file content, the ability to only read a file once is important. This is easy when you have a single server with your route deployed, but what about when you deploy your route to multiple servers? Thankfully, Camel has the concept of an idempotent consumer.
A useful way to implement this is with an idempotent repository. This repository will keep track of the file being read and not allow another server to read it. It works as a lock on the file. After the file reading has been completed, there are options to either remove the row from the database (which would allow for files with the same name to be ingested later) or to keep the row there (and not allow for files of the same name to be later ingested).
So how do you set up an idempotent repository using JDBC and Spring?
First, you will need to create the table and necessary Spring beans. Here, I am using the JDBCMessageIdRepository
. At the time of development, this bug was still an issue, so I could not use the JPA version. This has since been fixed and you can easily swap one out for the other. This example assumes you already have a bean for your datasource setup. You can use the same fileConsumerRepo
bean for multiple routes in your deployment.
CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP );
@Bean
public JdbcMessageIdRepository fileConsumerRepo() {
JdbcMessageIdRepository fileConsumerRepo = null;
try {
fileConsumerRepo = new JdbcMessageIdRepository(dataSource(), "fileConsumerRepo");
} catch (Exception ex) {
LOGGER.info("############ Caught exception inside Creating fileConsumerRepo ..." + ex.getMessage());
}
if (fileConsumerRepo == null) {
LOGGER.info("############ fileConsumerRepo == null ...");
}
return fileConsumerRepo;
}
In addition, make sure to include org.apache.camel.processor.idempotent.jpa
in your packages to scan or persistence unit depending on your exact setup. In my case, I was using a LocalContainerEntityManagerFactoryBean
for JPA.
@Bean
public LocalContainerEntityManagerFactoryBean entityManagerFactory() throws NamingException {
final LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
em.setPersistenceUnitName("camel");
final HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
vendorAdapter.setDatabase(Database.ORACLE);
vendorAdapter.setShowSql(false);
vendorAdapter.setGenerateDdl(false);
em.setJpaVendorAdapter(vendorAdapter);
em.setPackagesToScan(new String[] { "com.my.package.model","org.apache.camel.processor.idempotent.jpa" });
em.setJpaProperties(additionalProperties());
em.setDataSource(this.dataSource());
return em;
}
Now, your idempotent repository is ready for a Camel to use. Below is an example:
from("file:/my/test/dir?moveFailed=.error&autoCreate=true&readLockLoggingLevel=WARN&shuffle=true&readLock=idempotent&idempotentRepository=#fileConsumerRepo&readLockRemoveOnCommit=true")
.routeId("ingestionFile")
.convertBodyTo(String.class)
.log(LoggingLevel.INFO, "File received");
Let's break down the options:
Now deploy your route to multiple servers and watch the magic! You can do this easily using EAP or Karaf by using a port offset. If you are looking for more examples, check out this sample project put together by fellow Red Hatter Josh Reagan. And whether you are new to containers or have experience, downloading this cheat sheet can assist you when encountering tasks you haven't done lately.
Published at DZone with permission of Mary Cochran, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments