DZone
Big Data Zone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
  • Refcardz
  • Trend Reports
  • Webinars
  • Zones
  • |
    • Agile
    • AI
    • Big Data
    • Cloud
    • Database
    • DevOps
    • Integration
    • IoT
    • Java
    • Microservices
    • Open Source
    • Performance
    • Security
    • Web Dev
DZone > Big Data Zone > Streaming Real-Time Chat Messages Into Scylla With Apache Pulsar

Streaming Real-Time Chat Messages Into Scylla With Apache Pulsar

Learn how to build real-time messaging and streaming applications using a variety of OSS libraries, schemas, languages, frameworks, and tools utilizing ScyllaDB.

Timothy Spann user avatar by
Timothy Spann
·
Mar. 17, 22 · Big Data Zone · Tutorial
Like (4)
Save
Tweet
4.67K Views

Join the DZone community and get the full member experience.

Join For Free

At Scylla Summit 2022, I presented "FLiP Into Apache Pulsar Apps with ScyllaDB". Using the same content, in this blog, we'll demonstrate step-by-step how to build real-time messaging and streaming applications using a variety of OSS libraries, schemas, languages, frameworks, and tools utilizing ScyllaDB. We'll also introduce options from MQTT, Web Sockets, Java, Golang, Python, NodeJS, Apache NiFi, Kafka on Pulsar, Pulsar protocol, and more. You will learn how to quickly deploy an app to a production cloud cluster with StreamNative, and build your own fast applications using the Apache Pulsar and Scylla integration.

Before we jump into the how, let's review why this integration can be used for a speedy application build. Scylla is an ultra-fast, low-latency, high-throughput, open-source NoSQL platform that is fully compatible with Cassandra. Populating Scylla tables utilizing the Scylla-compatible Pulsar IO sink doesn't require any complex or specialized coding, and the sink makes it easy to load data to Scylla using a simple configuration file pointing to Pulsar topics that stream all events directly to Scylla tables.

Now, let's build a streaming real-time chat message system utilizing Scylla and Apache Pulsar!

Let's start the process to create a chat application that publishes messages to an event bus anytime someone fills out a web form. After the message is published, sentiment analysis is performed on the "comments" text field of the payload, and the result of the analysis is output to a downstream topic.

Event-driven applications, like our chat application, use a message bus to communicate between loosely-coupled, collaborating services. Different services communicate with each other by exchanging messages asynchronously. In the context of microservices, these messages are often referred to as events.

The message bus receives events from producers, filters the events, and then pushes the events to consumers without tying the events to individual services. Other services can subscribe to the event bus to receive those events for processing (consumers).

Apache Pulsar is a cloud-native, distributed messaging and event-streaming platform that acts as a message bus. It supports common messaging paradigms with its diverse subscription types and consumption patterns.

As a feature required for our integration, Pulsar supports IO Connectors. Pulsar IO connectors enable you to create, deploy, and manage connectors utilizing simple configuration files and basic CLI tools and REST APIs. We will utilize a Pulsar IO Connector to sink data from Pulsar topics to Scylla DB.

First, we download the Cassandra connector to deploy it to my Pulsar cluster. This process is documented at the Pulsar IO Cassandra Sink connector information.

Next, we download the pulsar-io-cassandra-X.nar archive to our connectors directory. Scylla DB is fully compatible with Cassandra, so we can use that connector to stream messages to it.

When using a Pulsar IO connector like the Scylla DB one I used for my demo, you can specify the configuration details inside a YAML file like the one shown below.

Properties files
 
configs:
    roots: "172.17.0.2:9042"
    keyspace: "pulsar_test_keyspace"
    columnFamily: "pulsar_test_table"
    keyname: "key"
    columnName: "col"


The main configuration shown above is done in YAML format and lists the root server with port, a keyspace, a column family, keyname, and column name to populate.

First, we will need to create a topic to consume from.

Shell
 
bin/pulsar-admin topics create persistent://public/default/chatresult2 


When you deploy the connector you pass in these configuration properties by command line call as shown below.

Shell
 
bin/pulsar-admin sinks create --tenant public --namespace default --name "scylla-test-sink" --sink-type cassandra --sink-config-file conf/scylla.yml --inputs chatresult2 


For new data, create a keyspace, table, and index, or use one of your existing ones.

SQL
 
CREATE KEYSPACE pulsar_test_keyspace with replication = {‘class’:’SimpleStrategy’, ‘replication_factor’:1};
CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text);
CREATE INDEX on pulsar_test_table(col);


In the previous section, we discussed why Apache Pulsar is well-suited for event-driven applications. In this section, we'll cover Pulsar Functions-a lightweight, serverless computing framework (similar to AWS Lambda). We'll leverage a Pulsar Function to deploy our ML model to transform or process messages in Pulsar. The diagram below illustrates our chat application example.

Chat application diagram

Keep in mind: Pulsar Functions give you the flexibility to use Java, Python, or Go for implementing your processing logic. You can easily use alternative libraries for your sentiment analysis algorithm.

The code below is a Pulsar Function that runs Sentiment Analysis on my stream of events. (The function runs once per event.)

Python
 
from pulsar import Function
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import json

class Chat(Function):
    def __init__(self):
        pass

    def process(self, input, context):
        logger = context.get_logger()
        logger.info("Message Content: {0}".format(input))
        msg_id = context.get_message_id()

        fields = json.loads(input)
        sid = SentimentIntensityAnalyzer()
        ss = sid.polarity_scores(fields["comment"])
        logger.info("Polarity: {0}".format(ss['compound']))
        sentimentVal = 'Neutral'
        if ss['compound'] == 0.00:
            sentimentVal = 'Neutral'
        elif ss['compound'] < 0.00:
            sentimentVal = 'Negative'
        else:
            sentimentVal = 'Positive'
        row = { }

        row['id'] = str(msg_id)
        row['sentiment'] = str(sentimentVal)
        row['userInfo'] = str(fields["userInfo"])
        row['comment'] = str(fields["comment"])
        row['contactInfo'] = str(fields["contactInfo"])
        json_string = json.dumps(row)
        return json_string


Here, we use the Vader Sentiment NLP ML Library to analyze the user's sentiment on the comment. We enrich our input record with the sentiment and then write it in JSON format to the output topic.

I use the Pulsar context to do logging. I could also push data values to state storage or record some metrics. For this example, we will just do some logging.

Below is the deployment script where you can find all of the options and tools in its github directory. We have to make sure we have our NLP library installed on all of our nodes.

Shell
 
bin/pulsar-admin functions create --auto-ack true 
--py pulsar-pychat-function/src/sentiment.py --classname "sentiment.Chat" --inputs "persistent://public/default/chat2" --log-topic "persistent://public/default/chatlog2" --name Chat --namespace default --output "persistent://public/default/chatresult2" --tenant public

pip3 install vaderSentiment


Now that we have built our topic, Function, and sink, let's build our application. The full web page is in the GitHub directory, but I'll show you the critical portions here. For this Single Page Application (SPA), I am using JQuery and DataTables that are included from their public CDNs. Datatable.html

HTML
 
<form action="/datatable.html" method="post" enctype="multipart/form-data" id="form-id">    
<div id="demo" name="demo"></demo>    
<p><label>User: </label><input name="user" type="text" id="user-id" size="75" value="" maxlength="100"/></p>
<p><label>Question: </label><input type="text"  name="other-field" type="text" id="other-field-id" size="75" maxlength="200" value=""/></p>
<p><label>Contact Info: </label><input name="contactinfo" type="text" id="contactinfo-id" size="75" maxlength="100" value=""/></p>
<p><input type="button" value="Send to Pulsar" onClick="loadDoc()" /></p>
    </form>


In the above HTML Form, we let users add a comment to our chat.

Now we are using JavaScript to send the form data as JSON to a Pulsar topic via WebSockets. WebSockets are a supported protocol for Apache Pulsar. The WebSocket URL is ws://pulsar1:8080/ws/v2/producer/persistent/public/default/chat2.

Where ws is the protocol, pulsar1 is the Pulsar server, port 8080 is our REST port, producer is what we are doing, persistent is our type of topic, public is our tenant, default is our namespace and chat2 is our topic: We populate an object and convert it to a JSON String and encode that payload as a Base64-encoded ASCII string. Then, we add that encoded String as the payload in a new JSON string that includes payload, properties, and context for our Pulsar Message. This format is required for the WebSocket protocol to convert to a regular message in our Pulsar topic.

JavaScript
 
<script>
function loadDoc() {
  var xhttp = new XMLHttpRequest();
  xhttp.onreadystatechange = function() {
    if (this.readyState == 4 && this.status == 200) {
      document.getElementById("demo").innerHTML = '';
    }
  };
var wsUri = "ws://pulsar1:8080/ws/v2/producer/persistent/public/default/chat2";

websocket = new WebSocket(wsUri);

const pulsarObject = { 
         userInfo: document.getElementById('user-id').value.substring(0,200), 
        contactInfo: document.getElementById('contactinfo-id').value.substring(0,200), 
        comment: document.getElementById('other-field-id').value.substring(0, 200)};
const jsonStr = JSON.stringify(pulsarObject);
var payloadStr = btoa(jsonStr);
const propertiesObject = {key: Date.now() }
var data = JSON.stringify({ "payload": payloadStr, "properties": propertiesObject, "context": "cs" });

websocket.onopen = function(evt) {
   if (websocket.readyState === WebSocket.OPEN) {
           websocket.send(data);
   }
};
websocket.onerror = function(evt) {console.log('ERR', evt)};
websocket.onmessage = function(evt) {}
websocket.onclose = function(evt) {
  if (evt.wasClean) {    console.log(evt);
  } else {    console.log('[close] Connection died');
  }
};
}
var form = document.getElementById('form-id');
form.onsubmit = function() {
    var formData = new FormData(form);
    var action = form.getAttribute('action');
    loadDoc();
    return false;
  }
</script>


In the above code, we'll grab the value of the fields from the form, stop the form from reloading the page, and then send the data to Pulsar.

Now, let's consume any messages sent to the result topic of our Sentiment Pulsar function.

In the below code we consume from a Pulsar topic: ws://pulsar1:8080/ws/v2/consumer/persistent/public/default/chatresult2/chatrreader?subscriptionType=Shared&receiverQueueSize=500.

In this URI, we can see this differs some from the producer URI. We have a receiverQueueSize, consumer tag and a subscription Type of Shared.

JavaScript:

JavaScript
 
$(document).ready(function() {
    var t = $('#example').DataTable();

var wsUri = "ws://pulsar1:8080/ws/v2/consumer/persistent/public/default/chatresult2/chatrreader?subscriptionType=Shared&receiverQueueSize=500";
websocket = new WebSocket(wsUri);
websocket.onopen = function(evt) {
   console.log('open');
};
websocket.onerror = function(evt) {console.log('ERR', evt)};
websocket.onmessage = function(evt) {

    var dataPoints = JSON.parse(evt.data);
    if ( dataPoints === undefined || dataPoints == null || dataPoints.payload === undefined || dataPoints.payload == null ) {
            return;
    }
    if (IsJsonString(atob(dataPoints.payload))) {
         var pulsarMessage = JSON.parse(atob(dataPoints.payload));
         if ( pulsarMessage === undefined || pulsarMessage == null ) {
                 return;
         }
         var sentiment = "";
         if ( !isEmpty(pulsarMessage.sentiment) ) {
                 sentiment = pulsarMessage.sentiment;
         }
         var publishTime = "";
         if ( !isEmpty(dataPoints.publishTime) ) {
                 publishTime = dataPoints.publishTime;
         }
         var comment = "";
         if ( !isEmpty(pulsarMessage.comment) ) {
                 comment = pulsarMessage.comment;
         }
         var userInfo= "";
         if ( !isEmpty(pulsarMessage.userInfo) ) {
                userInfo = pulsarMessage.userInfo;
         }
         var contactInfo= "";
         if ( !isEmpty(pulsarMessage.contactInfo) ) {
                 contactInfo = pulsarMessage.contactInfo;
         }

                 t.row.add( [ sentiment, publishTime, comment, userInfo, contactInfo ] ).draw(true);
       }
};

} );


For messages consumed in JavaScript WebSockets, we have to Base64-decode the payload and parse the JSON into an object, and then use the DataTable row.add method to add these new table rows to our results. This will happen whenever messages are received.

In this blog, we explained how to use Apache Pulsar to build simple, streaming applications regardless of the data source. We chose to add a Scylla compatible sink to our Chat application; however, we could do this for any data store in Apache Pulsar.

You can find the source code in the Github repo Scylla FLiPS The Stream With Apache Pulsar.

Database application Connector (mathematics) Event Sentiment analysis Data (computing)

Published at DZone with permission of Timothy Spann, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Creating Event-Based Architecture on Top of Existing API Ecosystem
  • Federated Schema Design
  • A First Look at CSS When and Else Statements
  • Augmented Analytics: The Future of Business Intelligence

Comments

Big Data Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • MVB Program
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends:

DZone.com is powered by 

AnswerHub logo