Learn Apache Camel: Indexing Tweets in Real-time
An introduction to Apache Camel using a sample application that reads tweets from Twitter’s sample feed and indexes them in real time using ElasticSearch.
Join the DZone community and get the full member experience.
Join For Free
there’s a point in most software development projects when the application needs to start communicating with other applications or 3rd party components.
whether it’s sending an email notification, calling an external api, writing to a file or migrating data from one place to another, you either roll out your own solution or leverage an existing framework.
as for existing frameworks in the java ecosystem, on one end of the spectrum we find tibco businessworks and mule esb , and on the other end there’s spring integration and apache camel .
in this tutorial i’m going to introduce you to apache camel through a sample application that reads tweets from twitter’s sample feed and indexes those tweets in real time using elasticsearch .
what is apache camel?
integrating an application with internal or external components in an ecosystem is one of the most complex tasks in software development and when it’s not done right, it can result in a huge mess and a real pain to maintain on the longer term.
fortunately, camel—an open-source integration framework hosted at apache—is based on the enterprise integration patterns and these patterns can help writing more readable and maintainable code. similar to lego, these patterns can be used as building blocks to create a solid software design.
apache camel also supports a wide array of connectors to integrate your application with different frameworks and technologies. and by the way, it also plays together nicely with spring .
if you’re not familiar with spring, you may find this post helpful: processing twitter feed using spring boot .
in the following sections we’ll go through a sample application where camel is integrated with both twitter's sample feed and elasticsearch.
what is elasticsearch?
elasticsearch similar to apache solr is a highly-scalable open-source, java-based full-text search engine built on top of apache lucene .
in this sample application we’re going to use elasticsearch to index tweets in real-time and also to provide full-text search capabilities on these tweets.
other technologies used
besides apache camel and elasticsearch, i also included other frameworks in this application: gradle as the build tool, spring boot as the web application framework , and twitter4j to read tweets from the twitter sample feed.
getting started
the skeleton of the project was generated at http://start.spring.io where i checked the web dependency option, filled out the project metadata section and selected ‘gradle project’ as the type of the project.
once the project is generated, you can download and import it into your favourite ide. i’m not going go into more details on gradle now, but here’s the list of all the dependencies in the build.gradle file:
def camelversion = '2.15.2'
dependencies {
compile("org.springframework.boot:spring-boot-starter-web")
compile("org.apache.camel:camel-core:${camelversion}")
compile("org.apache.camel:camel-spring-boot:${camelversion}")
compile("org.apache.camel:camel-twitter:${camelversion}")
compile("org.apache.camel:camel-elasticsearch:${camelversion}")
compile("org.apache.camel:camel-jackson:${camelversion}")
compile("joda-time:joda-time:2.8.2")
testcompile("org.springframework.boot:spring-boot-starter-test")
}
integration using camel routes
camel imlements a message- oriented architecture and it’s main building blocks are routes that describe the flow of the messages.
routes can be described in either xml (old way) or its java dsl (new way). we’re only going to discuss the java dsl in this post as that’s the prefered and more elegant option.
all right, let’s look at a simple route then:
from("file://orders").
convertbodyto(string.class).
to("log:com.mycompany.order?level=debug").
to("jms:topic:orderstopic");
there are a few things to note here:
- messages flow between endpoints which are represented by and configured using uris
-
a route can only have a single message producer endpoint (in this case
file://orders
which reads files from the orders folder) and multiple message consumer endpoints:-
log:com.mycompany.order?level=debug
which logs the content of a file in a debug message under com.mycompany.order logging category -
jms:topic:orderstopic
which writes the content of the file into a jms topic
-
-
in between endpoints the messages can be altered, i.e.
convertbodyto(string.class)
which converts the message body to a string.
also note that the same uri can be used for a consumer endpoint in one route and a producer endpoint in another:
from("file://orders").
convertbodyto(string.class).
to("direct:orders");
from("direct:orders).
to("log:com.mycompany.order?level=debug").
to("jms:topic:orderstopic");
the direct endpoint is one of the generic endpoints and it allows passing messages synchronously from one route to another.
this helps creating readable code and reusing routes in multiple places in the code.
indexing tweets
now let’s take a look at some routes from our code. let’s start with something simple:
private string es_tweet_indexer_endpoint = "direct:tweet-indexer-es";
...
from("twitter://streaming/sample?type=event&consumerkey={{twitter4j.oauth.consumerkey}}&consumersecret={{twitter4j.oauth.consumersecret}}&accesstoken={{twitter4j.oauth.accesstoken}}&accesstokensecret={{twitter4j.oauth.accesstokensecret}}")
.to(es_tweet_indexer_endpoint)
;
this is so simple, right? by now you may have figured that this route reads tweets from the twitter sample feed and passes them to the
direct:tweet-indexer-es
endpoint. note that the
consumerkey
,
consumersecret
, etc. are configured and passed in as system properties (see
http://twitter4j.org/en/configuration.html
).
now let’s look at a slightly more complex route that reads from the
direct:tweet-indexer-es
endpoint and inserts tweets to elasticsearch in batches (see comments for detailed explanation on each step):
@value("${elasticsearch.tweet.uri}")
private string elasticsearchtweeturi;
...
from(es_tweet_indexer_endpoint)
// groups tweets into separate indexes on a weekly basis to make it easier clean up old tweets:
.process(new weeklyindexnameheaderupdater(es_tweet_index_type))
// converts twitter4j tweet object into an elasticsearch document represented by a map:
.process(new elasticsearchtweetconverter())
// collects tweets into weekly batches based on index name:
.aggregate(header("indexname"), new listaggregationstrategy())
// creates new batches every 2 seconds
.completioninterval(2000)
// makes sure the last batch will be processed before the application shuts down:
.forcecompletiononstop()
// inserts a batch of tweets to elasticsearch:
.to(elasticsearchtweeturi)
.log("uploaded documents to elasticsearch index ${headers.indexname}: ${body.size()}")
;
notes on this route:
-
elasticsearchtweeturi
is a field whose value is taken by spring from theapplication.properties
file (elasticsearch.tweet.uri=elasticsearch://tweet-indexer?operation=bulk_index&ip=127.0.0.1&port=9300
) and injected into the field - to implement custom processing logic within a route, we can create classes that implement the processor interface. see weeklyindexnameheaderupdater and elasticsearchtweetconverter .
- the tweets are aggregated using the custom listaggregationstrategy strategy which aggregates messages into an arraylist and which will be later on passed on to the next endpoint every 2 seconds (or when the application stops).
-
camel implements an
expression language
that we’re using to log the size of the batch (
${body.size()}
) and the name of the index (${headers.indexname}
) where messages were inserted from.
searching tweets in elasticsearch
now that we have tweets indexed in elasticsearch, it’s time to run some search on them.
first let’s look at the route that receives a search query and the maxsize param that limits the number of search results:
public static final string tweet_search_uri = "vm:tweetsearch";
...
from(tweet_search_uri)
.setheader("camelfilename", simple("tweet-${body}-${header.maxsize}-${date:now:yyyymmddhhmmss}.txt"))
// calls the search() method of the estweetservice which returns an iterator
// to process search result - better than keeping the whole resultset in memory:
.split(method(estweetservice, "search"))
// converts elasticsearch doucment to map object:
.process(new elasticsearchsearchhitconverter())
// serializes the map object to json:
.marshal(new jacksondataformat())
// appends new line at the end of every tweet
.setbody(simple("${body}\n"))
// write search results as json into a file under /tmp folder:
.to("file:/tmp?fileexist=append")
.end()
.log("wrote search results to /tmp/${headers.camelfilename}")
;
this route will be triggered when a message is passed to the
vm:tweetsearch
endpoint (which uses an in-memory queue to process messages asynchronously).
the
searchcontroller
class
implements a rest api allowing users to run a tweet search by sending a message to the
vm:tweetsearch
endpoint using camel’s
producertemplate
class:
@autowired
private producertemplate producertemplate;
@requestmapping(value = "/tweet/search", method = { requestmethod.get, requestmethod.post },
produces = mediatype.text_plain_value)
@responsebody
public string tweetsearch(@requestparam("q") string query,
@requestparam(value = "max") int maxsize) {
log.info("tweet search request received with query: {} and max: {}", query, maxsize);
map<string, object> headers = new hashmap<string, object>();
// "content" is the field in the elasticsearch index that we'll be querying:
headers.put("queryfield", "content");
headers.put("maxsize", maxsize);
producertemplate.asyncrequestbodyandheaders(camelrouter.tweet_search_uri, query, headers);
return "request is queued";
}
this will trigger the execution of the elasticsearch, however the result is not returned in the response but written to a file in the
/tmp
folder (as discussed earlier).
this route uses the
elasticsearchservice
class
to search tweets in elasticsearch. when this route is executed, camel calls the
search()
method and passes in the search query and the
maxsize
as input parameters:
public searchhititerator search(@body string query, @header(value = "queryfield") string queryfield, @header(value = "maxsize") int maxsize) {
boolean scroll = maxsize > batchsize;
log.info("executing {} on index type: '{}' with query: '{}' and max: {}", scroll ? "scan & scroll" : "search", indextype, query, maxsize);
querybuilder qb = termquery(queryfield, query);
long starttime = system.currenttimemillis();
searchresponse response = scroll ? preparesearchforscroll(maxsize, qb) : preparesearchforregular(maxsize, qb);
return new searchhititerator(client, response, scroll, maxsize, keep_alive_millis, starttime);
}
note that depending on
maxsize
and
batchsize
, the code either executes a regular search that returns a single page of results, or executes a
scroll
request which allows us to retrieve a large number of results. in the case of scrolling,
searchhititerator
will make subsequent calls to elasticsearch to retrieve the results in batches.
installing elasticsearch
- download elasticsearch from https://www.elastic.co/downloads/elasticsearch .
-
install it to a local folder (
$es_home
) -
edit
$es_home/config/elasticsearch.yml
and add this line:cluster.name: tweet-indexer
-
install the bigdesk plugin to monitor elasticsearch:
$es_home/bin/plugin -install lukas-vlcek/bigdesk
-
run elasticsearch:
$es_home/bin/elasticsearch.sh
or$es_home/bin/elasticsearch.bat
these steps will allow you to run a standalone elasticsearch instance with minimal configuration, but keep in mind that they’re not intended for production use.
running the application
this is the entry point to the application and can be run from the command line.
package com.kaviddiss.twittercamel;
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
@springbootapplication
public class application {
public static void main(string[] args) {
springapplication.run(application.class, args);
}
}
to run the application, either run the
application.main()
method from your favorite ide or execute the below line from the command line:
$gradle_home/bin/gradlew build && java -jar build/libs/twitter-camel-ingester-0.0.1-snapshot.jar
once the application starts up, it will automatically start indexing tweets. go to http://localhost:9200/_plugin/bigdesk/#cluster to visualize your indexes:
to search tweets, enter a url something similar to this into the browser: http://localhost:8080/tweet/search?q=toronto&max=100 .
using the bigdesk plugin, we can monitor how elasticsearch is indexing tweets:
conclusion
in this introduction to apache camel we covered how to use this integration framework to communicate with external components like twitter sample feed and elasticsearch to index and search tweets in real-time.
the source code of the sample application is available at https://github.com/davidkiss/twitter-camel-ingester .
you may also find this post interesting: how to get started with storm framework in 5 minutes .
Published at DZone with permission of David Kiss, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments