DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
  1. DZone
  2. Data Engineering
  3. Databases
  4. Real Time Twitter Search via Websocket or Comet

Real Time Twitter Search via Websocket or Comet

Jean-Francois Arcand user avatar by
Jean-Francois Arcand
·
Oct. 17, 10 · Interview
Like (0)
Save
Tweet
Share
8.27K Views

Join the DZone community and get the full member experience.

Join For Free

Currently Twitter supports a streaming API described as follows:

"The Twitter Streaming API allows high-throughput near-realtime access to various subsets of public and protected Twitter data."

Unfortunately, no such API is available when it is time to execute real time search. Thanks to the freshly released Atmosphere Framework 0.6.2, let's create such an API in less than 25 lines. As an extra, let's use the Atmosphere JQuery Plugin to select the best transport for communicating with this API: Websocket, http-streaming or long-polling.

The Server Side

What we need to accomplish:

  1. Provide a URI that can be used to send search requests (hashtag)
  2. Based on a keyword/hashtag, connect and poll the Twitter search API every second (or more) to collect the JSON result. Since we want to support thousand of clients, let's make sure never block waiting for the result, and instead be fully asynchronous.
  3. Broadcast/Push back the JSON object to our set of suspended responses. By suspended here I means a connection which use long-polling, http-streaming or Websocket. As you might be aware, Atmosphere hides such details to your application so you don't have to focus on how the transport works, but on your application itself.
  4. Provide a URI for stopping the real time search.

Now let's build it. If you can't wait, you can read the entire code here and here. For our application, let's use the atmosphere-annotations and atmosphere-jersey modules:

@Path("/search/{tagid}")
@Singleton
public class TwitterFeed {

    private final AsyncHttpClient asyncClient = new AsyncHttpClient();
    private final ConcurrentHashMap<String, Future<?>> futures
                 = new ConcurrentHashMap<String, Future<?>>();

    @GET
    public SuspendResponse<String>
             search(final @PathParam("tagid") Broadcaster feed
                    final @PathParam("tagid") String tagid) {

We want our class to be invoked when the request takes the form of /search/{hashtag} and the hashtag can be anything you want to search on. Next we define our first method by asking Jersey to inject a Broadcaster and the hashtag (tagid) from the URL. For those of you are aren't familiar with Atmosphere concept, a Broadcaster is an object that can be used to broadcast/push back events to the browser. A Broadcaster contains the list of connections that has been suspended, independently of the transport used: long-polling, streaming or Websocket. Hence a Broadcaster can be used to broadcast realtime events. For our current application, we will create one Broadcaster per hashtag. The next step is to configure our Broadcaster to poll the Twitter Search API by simply doing:

if (feed.getAtmosphereResources().size() == 0) {
   Future<?> future = feed.scheduleFixedBroadcast(new Callable<String>() {

           private final AtomicReference<String> refreshUrl = new AtomicReference<String>("");

           public String call() throws Exception {
              String query = null;
              if (!refreshUrl.get().isEmpty()) {
                  query = refreshUrl.get();
              } else {
                  query = "?q=" + tagid;
              }
              asyncClient.prepareGet(
                  "http://search.twitter.com/search.json"  + query)
                       .execute(new AsyncCompletionHandler <Integer>()) {

                          @Override
                          public Object onCompleted(Response response) throws Exception {
                            String s = response.getResponseBody();
                            JSONObject json = new JSONObject(s);
                            refreshUrl.set(json.getString("refresh_url"));
                            feed.broadcast(s).get();
                            return response.getStatusCode();
                          }
                    });
                    return "OK";
                }
            }, 1, TimeUnit.SECONDS);

            futures.put(tagid, future);
        }

First, we query our Broadcaster (feed) to see if there is already a connection who asked for the real time search. If none, then we invoke Broadcaster.scheduleFixedBroadcast(..) with a Callable. That Callable will be executed every second. Inside the callable we use my other active open source project AsynHttpClient, which allows the Callable to be executed asynchronously, e.g we send the request but we don't block waiting for the response.

The AsyncHttpClient will take care of calling back the AsyncCompletionHandler once the Twitter API has sent us the entire response. We could have streamed the response, but to make the sample simple we just use an AsyncCompletionHandler that buffer the entire JSON response.

From the JSON object we get the refresh_url value which we will use next time we query the Twitter Search API in order to receive only the new results instead of the entire set. Next we just need to tell Atmosphere to suspend the connection and use this Broadcaster:

return new SuspendResponse.SuspendResponseBuilder<String>()
                .broadcaster(feed)
                .outputComments(true)
                .addListener(new EventsLogger())
                .build();

Finally we just broadcast the result as it is, so our client can use JSON to read the response. We do store the Future returned so later we can stop the real time Broadcast:

    @GET
    @Path("stop")
    public String stopSearch(
           final @PathParam("tagid") Broadcaster feed,
           final @PathParam("tagid") String tagid) {                 

            // Resume all connections associated with an hashtag
            feed.resumeAll();
            futures.get(tagid).cancel(true);
            return "DONE";
        }
    }

To stop a real time search, we just issue /search/{#hashtag}/stop. That's all we need to do on the server side. Our server side application is now able to receive Websocket, long-polling or http-streaming requests.

The client side

On the client side, you can use any existing Javascript library supporting Websocket or Comet to query the real time API. It is as simple as

/search/#hashtag                for subscribing to real time update (try it with WebSocket!)

/search/#hashtag/stop    to stop the real time update

But the easiest way is to use the Atmosphere's JQuery Plugin (of course :-)), which supports Websocket and Comet. More important, the Plugin is able to detect the best transport to use based on what the client and the server supports. As an example, if the application is deployed in Tomcat and you use Chrome, you can let the Plugin find the best transport or specify the one you want to use it.  This is as simple as:

   $.atmosphere.subscribe(document.location.toString() + 'search/' + hashtag              
              callback,
              $.atmosphere.request = {transport: 'Websocket'});

What we do above is to invoke the subscribe method by passing the URL containing the hashtag, a callback and some request properties. The Plugin will invoke the Callback as soon as the real time search starts on the server. The callback looks like:

function callback(response) {
   if (response.transport != 'polling' &&
           response.state != 'connected' &&
           response.state != 'closed') {

           if (response.status == 200) {
               var data = response.responseBody;

               try {
                    var result =  $.parseJSON(incompleteMessage + data);  
                    incompleteMessage = "";

                     var i = 0;
                     for (i = result.results.length -1 ; i > -1; i--){
                          $('ul').prepend($('<li></li>').text("["
                               + response.transport + "] "
                               + result.results[i].from_user + " "
                               + result.results[i].text));
                   }
              } catch (err) {
                    incompleteMessage = data;
              }

The critical piece of code above is the parseJSON(incompleteMessage + data);. The size of the the data sent back by the server may vary between servers so we may not get the entire JSON object in one invocation of the callback. So we need to wrap that call inside a try/catch (since parseJSON will fail)  and make sure next time the callback gets invoked we append the previously received data. That scenario will happen only if you search for a popular hashtag and you get a large number of response (try #Nordiques !!!).

That's it for the client side. You can download the sample here and deploy it in any webserver supporting comet and or websocket or none of them!! The interface is simple and demonstrate the transport's auto detection.

Any cool and better designed interface is welcomed :-) Finally, you can get more information about this sample by reading my JavaOne talk.

For any questions or to download Atmosphere Client and Server Framework, go to our main site and use our Nabble forum (no subscription needed), or follow the team or myself and tweet your questions there! You can also checkout the code on Github.

 Content from jfarcand.wordpress.com

twitter WebSocket Comet (pinball) API application

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Fixing Bottlenecks in Your Microservices App Flows
  • 5 Best Python Testing Frameworks
  • Integrate AWS Secrets Manager in Spring Boot Application
  • 10 Things to Know When Using SHACL With GraphDB

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: