DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
  1. DZone
  2. Data Engineering
  3. Databases
  4. A Universal Streamer for Apache Ignite Based on Apache Camel

A Universal Streamer for Apache Ignite Based on Apache Camel

Apache Ignite has the concept of Data Streamers—components to ingest fast data in a streaming fashion into an Ignite cache from a variety of protocols, technologies, or platforms, such as JMS, MQTT, Twitter, Flume, Kafka, etc. However, with Apache Ignite 1.5.0 we released the jack of all trades: an Apache Camel streamer.

Raúl Kripalani user avatar by
Raúl Kripalani
·
Feb. 02, 16 · News
Like (6)
Save
Tweet
Share
4.57K Views

Join the DZone community and get the full member experience.

Join For Free

Apache Ignite has the concept of Data Streamers—components to ingest fast data in a streaming fashion into an Ignite cache from a variety of protocols, technologies, or platforms, such as JMS, MQTT, Twitter, Flume, Kafka, etc.

However, with Apache Ignite 1.5.0 we released the jack of all trades: an Apache Camel streamer. In case you are not familiar with Camel, skip below for a little introduction, and then come back.

NOTE: I have also committed a camel-ignite component (woohoo!) to the Camel codebase, but in this post, I'll be talking about the Ignite Camel streamer. I'll cover the camel-ignite component in another post.

Ignite Camel Streamer

This streamer allows you to consume from any Camel endpoint straight into Ignite. Thus, you can ingest entries straight into an Ignite cache based on:

  • Calls received on a Web Service (SOAP or REST), by extracting the body or headers
  • Listening on a TCP or UDP channel for messages
  • The content of files received via FTP or written to the local filesystem
  • Email messages received via POP3 or IMAP
  • A MongoDB tailable cursor
  • An AWS SQS queue
  • And many others

This is what I call a direct ingestion from an endpoint.

Moreover, you can also create a Camel route that performs more complex processing on incoming messages, e.g. transformations, validations, splitting, aggregating, idempotency, resequencing, enrichment, etc. and ingest only the result into the Ignite cache. This is what I call mediated ingestion.

A Brief Example

Let's assume we want to consume from an HTTP endpoint serviced by Jetty. We are going to receive HTTP POSTs with temperature readings from weather stations.

The payload's Content-Type is text/plain and it contains the reading. The station ID is passed in an HTTP request header called X-StationId:

// Start Apache Ignite.
Ignite ignite = Ignition.start();

// Create an streamer pipe which ingests into the 'mycache' cache.
IgniteDataStreamer<String, String> pipe = grid().dataStreamer("mycache");

// Create a Camel streamer and connect it.
CamelStreamer<String, String> streamer = new CamelStreamer<>(); 

streamer.setIgnite(ignite);  
streamer.setStreamer(pipe);

// This endpoint starts a Jetty server and consumes from all network interfaces on port 8080 and context path /ignite.
streamer.setEndpointUri("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST");

// This is the tuple extractor. We'll assume each message contains only one tuple.
// If your message contains multiple tuples, use a StreamMultipleTupleExtractor.
// The Tuple Extractor receives the Camel Exchange and returns a Map.Entry<?,?> with the key and value.
streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<Exchange, String, String>() {  
    @Override public Map.Entry<String, String> extract(Exchange exchange) {
        String stationId = exchange.getIn().getHeader("X-StationId", String.class);
        String temperature = exchange.getIn().getBody(String.class);
        return new GridMapEntry<>(stationId, temperature);
    }
});

// Start the streamer.
streamer.start();

By default, the response sent back to the caller (if it is a synchronous endpoint) is simply an echo of the original request. If you want to customize the response, set a Camel Processor as a responseProcessor:

streamer.setResponseProcessor(new Processor() {  
    @Override public void process(Exchange exchange) throws Exception {
        exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
        exchange.getOut().setBody("OK");
    }
});

You can also pass in a custom Camel Context to customize further stuff in Camel. In fact, that is exactly how you implement a mediated ingestion: by creating a route that consumes from the endpoint and sends the exchange to a direct: endpoint, from where the CamelStreamer will be consuming.

As I said before, you'll want to use this approach when you need more sophistication. In the following example, we'll be consuming from the same Jetty endpoint as before, but we'll be receiving JSON in this case. We will unmarshal it into an object and we'll validate it with Bean Validation.

If all goes OK, we'll dispatch it to the direct:ignite.ingest endpoint, where our Ignite streamer will be consuming from (we need to set this URI in our streamer). Direct endpoints in Camel are just in-memory transfers of messages.

// Create a CamelContext with a custom route that will:
//  (1) consume from our Jetty endpoint.
//  (2) transform incoming JSON into a Java object with Jackson.
//  (3) uses JSR 303 Bean Validation to validate the object.
//  (4) dispatches to the direct:ignite.ingest endpoint, where the streamer is consuming from.
CamelContext context = new DefaultCamelContext();  
context.addRoutes(new RouteBuilder() {  
    @Override
    public void configure() throws Exception {
        from("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST")
            .unmarshal().json(JsonLibrary.Jackson)
            .to("bean-validator:validate")
            .to("direct:ignite.ingest");
    }
});

// Remember our Streamer is now consuming from the Direct endpoint above.
streamer.setEndpointUri("direct:ignite.ingest");  

Feel free to contact the Ignite team at Gitter or the MGitter badgeailing Lists in case you have questions.

About Apache Camel

(skip if you're already familiar)

Apache Camel is an enterprise integration framework that revolves around the idea of the well-known Enterprise Integration Patterns popularized by Gregor Hohpe and Bobby Woolf, such as channels, pipes, filters, splitters, aggregators, routers, resequencers, etc. which you piece with one another like a Lego puzzle to create integration routes that connect systems together.

To date, there are over 200 components for Camel, many of which are adapters for different technologies like JMS, SOAP, HTTP, Files, FTP, POP3, SMTP, SSH; including cloud services like Amazon Web Services, Google Compute Engine, Salesforce; social networks like Twitter, Facebook; and even new generation databases like MongoDB, Cassandra; and data processing technologies like Hadoop (HDFS, HBase) and Spark.

Camel runs in any environment: standalone Java, OSGi, Servlet containers, Spring Boot, JEE application servers, etc. and it's fully modular, so you only deploy the components you'll actually be using and nothing else.

Check out What is Camel? for more information.

Apache Camel Apache Ignite

Published at DZone with permission of Raúl Kripalani. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Microservices 101: Transactional Outbox and Inbox
  • What’s New in Flutter 3.7?
  • DevOps for Developers — Introduction and Version Control
  • How Elasticsearch Works

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: