Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Using GRAKN.AI to Stream Twitter Data

DZone's Guide to

Using GRAKN.AI to Stream Twitter Data

Learn how to how to use artificial intelligence to stream public tweets into a distributed knowledge base. Let's get streaming!

Free Resource

Find out how AI-Fueled APIs from Neura can make interesting products more exciting and engaging. 

GRAKN.AI is the database for AI. It is a distributed knowledge base designed specifically to handle complex data in knowledge-oriented system — something for which traditional database technologies are simply inadequate.

To ensure that their internal knowledge is the most up-to-date and relevant, AI systems are always hungry for newly updated data. Working seamlessly with streaming data is therefore useful for building knowledge-oriented systems. In this post, we will look at how to stream public tweets into Grakn’s distributed knowledge base.

Everyday Use Cases

In many of the posts that we’ve written so far, we’ve encountered data (covering everything from pharmaceutical bio-targets to financial instruments) that might be less readily available to the average developer along with use cases that might be daunting to more novice developers. GRAKN.AI is optimized to deal with complex data and use cases and it offers capabilities that existing databases simply cannot compete with.

Nonetheless, many day-to-day use cases can be easily built with Grakn. For more novice users or for users with less complicated use cases, getting a handle on Grakn would be a huge benefit to their dev skills.

Therefore, while streaming is relevant to most complex data analytics use cases, we decided that it would be appropriate to use a data set that would be very familiar to all users: social media data.

Specifically, we’re going to look at using GRAKN.AI to stream Twitter data.

Breakdown

Since this topic will go into great tutorial depth (and length), it has been split into a three-part series.

Streaming Data Into the Knowledge Base

This post will cover key concepts such as receiving, inserting, and querying data. This post will be far the longest. By the end of the post, you will have learned about these concepts:

  1. Defining a simple GRAKN.AI ontology using the Java API.
  2. Streaming public tweets into the application with the Twitter4J library.
  3. Inserting tweets into the knowledge graph using Grakn’s Graph API.

Querying the Knowledge Base

We will focus mostly on making simple queries; for example, grouping tweets by user and showing the aggregate number of tweets per user, as well as cover a few more advanced queries in order to highlight the analytic capabilities of Grakn.

Optimization

  • We will be looking into adding batch insertion, which offers better performance — especially when consuming high-volume data.

OK, let’s get streaming!

(No Need to) Install GRAKN.AI

If you’ve not yet downloaded and installed Grakn, we won’t cover that in depth here, as you do not need to for the purposes of this tutorial! The JAR will be downloaded automatically, as it uses only in-memory graphs, so everything is self-contained.

Nonetheless, if you’re not familiar with Grakn and its query language, Graql, we recommend checking out this introductory post along with this explanation of some key terminology. They cover core Grakn concepts that are fundamental to using the platform. And, if you’re ready to download and install Grakn, check our setup guide.

Registering Your Own Twitter Application

As of today, you will need a valid credential in order to call practically every endpoint in the Twitter API. Therefore, you must already own a Twitter application (or register a new one) before proceeding further.

You can register your own application in the Twitter Application Management. Once you’ve done so, you can get the credentials by visiting the Keys and Access Tokens tab. The values we care about in particular are Consumer Key, Consumer Secret, Access Token, and Access Token Secret.

Getting Started: Bootstrapping the Skeleton Project

Let’s bootstrap a new Maven project!

Hit the command line and run the following command to generate a new Maven project:

mvn archetype:generate \
  -DgroupId=ai.grakn \
  -DartifactId=twitterexample \
  -DarchetypeArtifactId=maven-archetype-quickstart \
  -DinteractiveMode=false

Now that you have basic project structure and pom.xml in place, let’s start customizing them to our needs. We will add two things to the <build> section:

  1. maven-compiler-plugin configuration in order to enable lambda and other nifty Java 8 features.
  2. maven-shade-plugin configuration, which points to our Main class to create a fat JAR.

Then, continue to the <dependencies> section and make sure you have all the required dependencies, i.e. grakn-graph, grakn-graql, twitter4j-core, and twitter4j-stream.

You can see the complete pom.xml definition here.

The Main Class

Let’s kick things off by defining a Main class inside the ai.grakn.twitterexample package. Aside from Twitter credentials, it contains a few important Grakn settings.

First, we have decided to use an in-memory graph for simplicity’s sake — working with an in-memory graph frees us from having to set up a Grakn distribution in the local machine. The in-memory graph is not for storing data and will be lost once the program finishes execution. Second, the graph will be stored in a keyspace named twitter-example.

package ai.grakn.twitterexample;
import ai.grakn.Grakn;
import ai.grakn.GraknSession;
public class Main {

}

We then define a GraknSession object in main(). Enclosing it in a try-with-resource construct is a good practice, lest we forget closing the session by calling session.close().

public static void main(String[] args) {
 try (GraknSession session = Grakn.session(graphImplementation, keyspace)) {
 // our code will go here
 }
}

Following that, another equally important object for operating on the graph is GraknGraph. After performing the operations we desire, we must not forget to commit. For convenience, let’s define a helper method that opens a GraknGraph in write mode and commits it after executing the function fn. We will be using this function in various places throughout the tutorial.

public class GraknTweetOntologyHelper {
  public static void withGraknGraph(GraknSession session, Consumer<GraknGraph> fn) {
    GraknGraph graphWriter = session.open(GraknTxType.WRITE);
    fn.accept(graphWriter);
    graphWriter.commit();
  }
}

We have decided to omit exception handling to keep the tutorial simple. In production code, however, it will be very important and must not be forgotten.

Defining the Ontology

Let’s define the ontology. In case you skipped the familiarization links above and are a bit rusty on the concept of an ontology, it is recommended that you read the What Is an Ontology? post before continuing.

As we are mainly interested in both the tweet and who posted the tweet, let's capture these concepts by defining two entities: user and tweet.

The user entity will hold the user’s actual username in a resource called screen_name, while thetweet  entity will contain the user’s tweet in another resource called text. We will also define a resource identifier for the ID.

Next we will define two roles — posts and posted_by to express that a user posts a tweet, and similarly, a tweet is posted by a user. We will tie this two roles by a relation called user-tweet-relation.

The structure can be summarized in the following graph:

With that set, let’s define a new method initTweetOntology inside GraknTweetOntologyHelper class and define our ontology creation there.

public class GraknTweetOntologyHelper {
  public static void initTweetOntology(GraknGraph graknGraph) {
  }
}

Start by defining our resources:

// resources
ResourceType idType = graknGraph.putResourceType(“identifier”, ResourceType.DataType.STRING);
ResourceType textType = graknGraph.putResourceType(“text”, ResourceType.DataType.STRING);
ResourceType screenNameType = graknGraph.putResourceType(“screen_name”, ResourceType.DataType.STRING);

Entities:

// entities
EntityType tweetType = graknGraph.putEntityType(“tweet”);
EntityType userType = graknGraph.putEntityType(“user”);

Roles and relations:

// roles
RoleType postsType = graknGraph.putRoleType(“posts”);
RoleType postedByType = graknGraph.putRoleType(“posted_by”);
// relations
RelationType userTweetRelationType = graknGraph.putRelationType(“user-tweet-relation”).relates(postsType).relates(postedByType);

And finally, assign resources and roles appropriately.

// resource and relation assignments
tweetType.resource(idType);
tweetType.resource(textType);
userType.resource(screenNameType);
userType.plays(postsType);
tweetType.plays(postedByType);

Now invoke the method in main so the ontology is created at the start of the application.

public static void main(String[] args) {
  try (GraknSession session = Grakn.session(graphImplementation, keyspace)) {
    withGraknGraph(session, graknGraph -> initTweetOntology(graknGraph)); // initialize ontology
  }
}

Streaming Data From Twitter

Now that we’re done with ontology creation, let’s develop the code for listening to the public tweet stream.

Define a new method listenToTwitterStreamAsync and put it in a class named AsyncTweetStreamProcessorHelper. In addition to accepting Twitter credential settings, we will also need to supply a callback onTweetReceived, which will be invoked whenever the application receives a new tweet. Further down, we will use this callback for storing, querying, and displaying tweets as they come.

public class AsyncTweetStreamProcessorHelper {
  public static TwitterStream listenToTwitterStreamAsync(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret, BiConsumer<String, String> onTweetReceived) {
    final String DEFAULT_LANGUAGE = “en”;
    // …
  }
}

The first thing we need to do here is to create a Configuration object out of the Twitter credential settings. Let’s write a dedicated method just for that and name it createTwitterConfiguration. Afterward, use that method to create the Configuration object which we will need in listenToTwitterStreamAsync.

public class AsyncTweetStreamProcessorHelper {
  public static TwitterStream listenToTwitterStreamAsync(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret, BiConsumer<String, String> onTweetReceived) {
    final String DEFAULT_LANGUAGE = “en”;
    Configuration conf = createTwitterConfiguration(consumerKey, consumerSecret, accessToken, accessTokenSecret);
      // …
    }
    private static Configuration createTwitterConfiguration(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret) {
      return new ConfigurationBuilder()
        .setDebugEnabled(false)
        .setOAuthConsumerKey(consumerKey)
        .setOAuthConsumerSecret(consumerSecret)
        .setOAuthAccessToken(accessToken)
        .setOAuthAccessTokenSecret(accessTokenSecret)
        .build();
  }
}

Next, we will create a private class TweetListener and make it implement the StatusListener interface from Twitter4J. This interface has a bunch of methods we can override depending on what we want to receive from Twitter. As we only care about receiving tweet updates and nothing else, we only need to override a single method onStatus.

The constructor of our TweetListener class accepts a callback onStatusReceived, which will be executed every time we receive a new tweet.

Once we’re done defining the class, let’s come back to listenToTwitterStreamAsync and instantiate it. We will also instantiate two other classes, TwitterStreamFactory and TwitterStream. Now we can start listening to Twitter by calling the sample method. We supplied “en” , which means we are only interested in English tweets.

public class AsyncTweetStreamProcessorHelper {
  public static TwitterStream listenToTwitterStreamAsync(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret, BiConsumer<String, String> onTweetReceived) {
    final String DEFAULT_LANGUAGE = “en”;
    Configuration conf = createTwitterConfiguration(consumerKey, consumerSecret, accessToken, accessTokenSecret);
    TweetListener tweetListener = new TweetListener(onTweetReceived);
    TwitterStreamFactory twitterStreamFactory = new TwitterStreamFactory(conf);
    TwitterStream twitterStreamSingleton = twitterStreamFactory.getInstance();
    twitterStreamSingleton.addListener(tweetListener);
    twitterStreamSingleton.sample(DEFAULT_LANGUAGE);
    return twitterStreamSingleton;
  }
}
// An implementation which implements twitter4j’s StatusListener
class TweetListener implements StatusListener {
  public TweetListener(BiConsumer<String, String> onStatusReceived) {
    this.onStatusReceived = onStatusReceived;
  }
  public void onStatus(Status status) {
    onStatusReceived.accept(status.getUser().getScreenName(), status.getText());
  }
  public void onException(Exception ex) {
    ex.printStackTrace();
  }
  // a bunch of empty event handler implementations, we’re not using them
  public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
  public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
  public void onScrubGeo(long lat, long long_) {}
  public void onStallWarning(StallWarning stallWarning) {}
  private BiConsumer<String, String> onStatusReceived;
}

Let’s wrap up this section by adding the call to listenToTwitterStreamAsync into main.

public static void main(String[] args) {
  try (GraknSession session = Grakn.session(graphImplementation, keyspace)) {
    withGraknGraph(session, graknGraph -> initTweetOntology(graknGraph)); // initialize ontology
    listenToTwitterStreamAsync(consumerKey, consumerSecret, accessToken, accessTokenSecret, (screenName, tweet) -> {
      // TODO: do something upon receiving a new tweet
    });
  }
}

Inserting Tweets Into the Knowledge Graph

At this point, our little program already has a clearly defined ontology and is able to listen to incoming tweets. However, we have yet to decide what exactly we’re going to do with them. In this section, we will have a look at how to:

  1. Insert an incoming tweet into the knowledge graph.
  2. Insert a user who posted the tweet, only once — we don’t want to insert the same user twice.
  3. Maintain an association between a tweet and the user.

We will be using the graph API for inserting the data in the graph because it is lightweight and efficient.

Insert a Tweet

To insert a tweet, we must create a tweet entity and a text resource to hold the tweet’s textual data, before associating said resource with the entity.

Let’s do that with a new method. It will accept a single String and inserts it into the knowledge graph before returning the Entity of said tweet.

Pay attention to how we need to retrieve the EntityTypes and ResourceTypes of entity and resource we are interested in — we need them in order to perform the actual insertion.

public static Entity insertTweet(GraknGraph graknGraph, String tweet) {
  EntityType tweetEntityType = graknGraph.getEntityType(“tweet”);
  ResourceType tweetResouceType = graknGraph.getResourceType(“text”);
  Entity tweetEntity = tweetEntityType.addEntity();
  Resource tweetResource = tweetResouceType.putResource(tweet);
  return tweetEntity.resource(tweetResource);
}

Insert a User

In addition to the tweet, we also want to store who posted the tweet. A semantic we need to enforce is to insert a particular user only once, i.e. it doesn’t make sense to store the same user twice.

Therefore, let’s add a method for checking whether we’ve previously stored a particular user. We will be using Java 8’s Optional<T>, where we return the Entity object of that user only if it exists in the knowledge graph. Otherwise, an Optional.empty() will be returned.

public static Optional<Entity> findUser(QueryBuilder queryBuilder, String user) {
  MatchQuery findUser = queryBuilder.match(var(“x”).isa(“user”).has(“screen_name”, user)).limit(1);
  Iterator<Concept> concepts = findUser.get(“x”).iterator();
  if (concepts.hasNext()) {
    Entity entity = concepts.next().asEntity();
    return Optional.of(entity);
  }
  else return Optional.empty();
}

And the following method for inserting a user. This one is quite similar to the one we made for inserting a tweet.

public static Entity insertUser(GraknGraph graknGraph, String user) {
  EntityType userEntityType = graknGraph.getEntityType(“user”);
  ResourceType userResourceType =  graknGraph.getResourceType(“screen_name”);
  Entity userEntity = userEntityType.addEntity();
  Resource userResource = userResourceType.putResource(user);
  return userEntity.resource(userResource);
}

And finally, write a function for inserting a user only if it’s not yet there in the knowledge graph.

public static Entity insertUserIfNotExist(GraknGraph graknGraph, String screenName) {
  QueryBuilder qb = graknGraph.graql();
  return findUser(qb, screenName).orElse(insertUser(graknGraph, screenName));
}

Relating the Tweet to the User

We’re almost there with a complete tweet insertion functionality! There’s only one thing left to do, which is to relate the tweet entity with the user entity. Preserving this connection is crucial, after all.

The following function will create a relation between the user and tweet that we specify.

public static Relation insertUserTweetRelation(GraknGraph graknGraph, Entity user, Entity tweet) {
  RelationType userTweetRelationType = graknGraph.getRelationType(“user-tweet-relation”);
  RoleType postsType = graknGraph.getRoleType(“posts”);
  RoleType postedByType = graknGraph.getRoleType(“posted_by”);
  Relation userTweetRelation = userTweetRelationType.addRelation()
    .addRolePlayer(postsType, user)
    .addRolePlayer(postedByType, tweet);
  return userTweetRelation;
}

Tweet Insertion Wrap-Up

Finally, let’s wrap up by defining a function of which the sole responsibility is to execute all of the methods we have defined above.

public static Relation insertUserTweet(GraknGraph graknGraph, String screenName, String tweet) {
  Entity tweetEntity = insertTweet(graknGraph, tweet);
  Entity userEntity = insertUserIfNotExist(graknGraph, screenName);
  return insertUserTweetRelation(graknGraph, userEntity, tweetEntity);
}

Let’s add the method we’ve just defined to the main method as shown below.

public static void main(String[] args) {
  try (GraknSession session = Grakn.session(graphImplementation, keyspace)) {
    withGraknGraph(session, graknGraph -> initTweetOntology(graknGraph)); // initialize ontology
    listenToTwitterStreamAsync(consumerKey, consumerSecret, accessToken, accessTokenSecret, (screenName, tweet) -> {
    withGraknGraph(session, graknGraph -> insertUserTweet(graknGraph, screenName, tweet)); // insert tweet
  });
 }

Conclusion

We’re done with tweet insertion functionality! In the upcoming post in this series, we will look at how we can harness Graql for querying information from the data.

To find out how AI-Fueled APIs can increase engagement and retention, download Six Ways to Boost Engagement for Your IoT Device or App with AI today.

Topics:
ai ,grakn.ai ,twitter data ,streaming data ,machine learning ,tutorial

Published at DZone with permission of Ganeshwara Herawan Hananda, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}