Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Performing Aggregate Query on Twitter Data

DZone's Guide to

Performing Aggregate Query on Twitter Data

To ensure that their internal knowledge is the most up-to-date, systems are always hungry for newly updated data. Learn how to stream public tweets into a distributed knowledge base.

Free Resource

Access NoSQL and Big Data through SQL using standard drivers (ODBC, JDBC, ADO.NET). Free Download 

GRAKN.AI is the database for AI. It is a distributed knowledge base designed specifically to handle complex data in a knowledge-oriented system — a task for which traditional database technologies are not the best fit.

To ensure that their internal knowledge is the most up-to-date and relevant, AI systems are always hungry for newly updated data. Working seamlessly with streaming data is therefore useful for building knowledge-oriented systems. In this blog post, we will look at how to stream public tweets into Grakn's distributed knowledge base.

Continuing Where We Left Off

In my previous post, we covered data insertion aspects such as defining a schema as well as retrieving and inserting Twitter data. In this post, we will look specifically at performing an aggregate query.

Crafting Group Aggregate Query in Graql

We will perform a query that will count the number of tweets a user has posted since the program started. It can be achieved by utilizing the aggregate query feature. Graql has been chosen over the Java API for this task because it is declarative and therefore much easier to use for complex queries.

Let's look at how we can build it, step-by-step.

Start by creating a QueryBuilder object, which we will use to craft the query in Graql.

QueryBuilder qb = tx.graql();

Now, let's begin crafting the query. For this tutorial, let's create a match query where we retrieve both the user and tweet.

We will bind them into vars which will be named user and tweet, respectively. Notice how we deliberately assign the var s identical names as the respective entity types. This is not a necessity; in practice, you are free to name them anything you want.

Also, pay attention to how we also supply the user-tweet-relationship relationship as part of the condition.

qb.match(
  var("user").isa("user"),
  var("tweet").isa("tweet"),
  var().rel("posts", "user").rel("posted_by", "tweet").isa("user-tweet-relationship"));

The query we've just defined will return every user and tweet along with their relationships. We will use it as the basis of the aggregate query.

Let's do some aggregation over the result here.

We will supply "user" and count() as the argument for group(), which essentially tells Grakn to group the result by username and count the number of occurrences per username.

qb.match(
  var("user").isa("user"),
  var("tweet").isa("tweet"),
  var().rel("posts", "user").rel("posted_by", "tweet").isa("user-tweet-relationship")
).aggregate(group("user", count()));

The query will now return the number of tweet a user has posted, which is what we want, as an object of type Map<Concept, Long>.

To be able to conveniently iterate, we will transform it into the relatively more straightforward type Stream<Map.Entry<String, Long>>, i.e. a stream of pairs of username and tweet count.

// execute query
  Map<Concept, Long> result = ((Map<Concept, Long>) q.execute());
  // map Map<Concept, Long> into Stream<Map.Entry<String, Long>> before returning
  AttributeType screenNameAttributeType = tx.getAttributeType("screen_name");
  Stream<Map.Entry<String, Long>> mapped = result.entrySet().stream().map(entry -> {
    Concept key = entry.getKey();
    Long value = entry.getValue();
    String screenName = (String) key.asEntity().attributes(screenNameAttributeType).iterator().next().getValue();
    return new HashMap.SimpleImmutableEntry<>(screenName, value);
  });

Let's put them all together in a new function calculateTweetCountPerUser:

public static Stream<Map.Entry<String, Long>> calculateTweetCountPerUser(GraknTx tx) {
  // build query
  QueryBuilder qb = tx.graql();
  AggregateQuery q = qb.match(
      var("user").isa("user"),
      var("tweet").isa("tweet"),
      var().rel("posts", "user").rel("posted_by", "tweet").isa("user-tweet-relationship")
      ).aggregate(group("user", count()));
  // execute query
  Map<Concept, Long> result = ((Map<Concept, Long>) q.execute());
  // map Map<Concept, Long> into Stream<Map.Entry<String, Long>> before returning
  AttributeType screenNameAttributeType = tx.getAttributeType("screen_name");
  Stream<Map.Entry<String, Long>> mapped = result.entrySet().stream().map(entry -> {
    Concept key = entry.getKey();
    Long value = entry.getValue();
    String screenName = (String) key.asEntity().attributes(screenNameAttributeType).iterator().next().getValue();
    return new HashMap.SimpleImmutableEntry<>(screenName, value);
  });
  return mapped;
}

With that done, let's update the main function like so:

public class Main {
  // Twitter credentials
  private static final String consumerKey = "...";
  private static final String consumerSecret = "...";
  private static final String accessToken = "...";
  private static final String accessTokenSecret = "...";
  // Grakn settings
  private static final String implementation = Grakn.IN_MEMORY;
  private static final String keyspace = "twitter-example";
  public static void main(String[] args) {
    try (GraknSession session = Grakn.session(mplementation, keyspace)) {
      withGraknTx(session, tx -> initTweetSchema(tx)); // initialize schema
      listenToTwitterStreamAsync(consumerKey, consumerSecret, accessToken, accessTokenSecret, (screenName, tweet) -> {
        withGraknTx(session, tx -> {
          insertUserTweet(tx, screenName, tweet); // insert tweet
          Stream<Map.Entry<String, Long>> result = calculateTweetCountPerUser(tx); // query
          prettyPrintQueryResult(result); // display
        });
      });
    }
  }
  public static void prettyPrintQueryResult(Stream<Map.Entry<String, Long>> result) {
    System.out.println("------");
    result.forEach(e -> System.out.println("-- user " + e.getKey() + " tweeted " + e.getValue() + " time(s)."));
    System.out.println("------");
  }
}

Notice the two changes being introduced here. First, we've added the call to our newly made function calculateTweetCountPerUser. Second, we're adding a pretty print function prettyPrintQueryResult to display our query in a nice way.

Running the Application

We're all set! The only thing left is to run the application:

$ mvn package
$ java -jar target/twitterexample-1.0-SNAPSHOT.jar

Watch the terminal as the application runs. You should see the following text printed every time there's an incoming tweet:

------
-- user <user-1> tweeted 2 time(s).
-- user <user-2> tweeted 1 time(s).
-- user <user-3> tweeted 1 time(s).
-- user <user-n> tweeted 1 time(s).
------

Wrapping Up

In this post, we've looked at performing an aggregate query on real-time data. This query groups tweets by user and displays the number of tweets per user — akin to performing a SELECT followed by a GROUP BY clause in regular SQL.

Aggregate queries are one of the many bread and butter of data analysis, useful even when working with the sort of highly connected data which Grakn particularly excels at.

There are other types of supported aggregation functions, which you can find in the documentation. They are all expressed with a similar syntax so you shouldn't have any trouble understanding them.

Thanks for staying tuned! In the next article, we will look at how we can work with batch insertion. Batching is an important technique for achieving better throughput on high volume data.

And don't forget: we have a working sample which you can just clone and run.

The fastest databases need the fastest drivers - learn how you can leverage CData Drivers for high performance NoSQL & Big Data Access.

Topics:
big data ,twitter data ,querying ,tutorial

Published at DZone with permission of Ganeshwara Herawan Hananda, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}