Understanding WSO2 Stream Processor - Part 2

DZone 's Guide to

Understanding WSO2 Stream Processor - Part 2

In this post, we look at how to create and work with data stream processors using the WSO2 stream processor as an example.

· Big Data Zone ·
Free Resource

In the first part of this tutorial, I have explained the concepts around WSO2 Stream Processor and how they are correlated with each other and which components users can use to implement their streaming analytics requirements. It laid out the platform for this tutorial (part 2) where we get our hands dirty with WSO2 SP. 

The first thing you have to do is download the WSO2 SP runtime from WSO2 website.


Once you download the product distribution, you can extract that into a directory and run the product from the bin directory. You need to set the “JAVA_HOME” environment variable to your Java installation (1.8 or higher) before starting the product. In this part of the tutorial, we are going to implement some streaming analytics use cases with WSO2 SP. Hence we need to start the SP in “editor” mode using the following command (for Linux).

 $ sh bin/editor.sh

This command will start the editor profile of the WSO2 SP and prints the URL of the editor in the console similar to below.


Now you can click on the above link and it will open up the editor in a browser window. 

This is your playgorund where you can implement your streaming analytics use cases and test, debug and deploy into the runtime. All these activities can be done without moving away from the editor. The editor comes with so many samples which are self-explanatory and easy to execute. Let’s open up an existing sample to get things going.

Let’s start with the sample “ReceiveAndCount” by clicking on the sample. This will open the source file of this Siddhi application. If you ignore the comments section, the code looks like below. You can save this file with the name “ReceiveAndCount.siddhi.”


@App:description('Receive events via HTTP transport and view the output on the console')

@Source(type = 'http',
define stream SweetProductionStream (name string, amount double);

define stream TotalCountStream (totalCount long);

-- Count the incoming events
from SweetProductionStream
select count() as totalCount
insert into TotalCountStream;

Let’s go through this code and understand what we are doing here. First we define the name of the Siddhi application and a description about the use case.


@App:description('Receive events via HTTP transport and view the output on the console')

Then we define the source of the events with the following code segment. Here we are specifying the protocol as “http” and the data type as “json.” Also we specify the URL of the exposed service and the format of data which is coming (schema). 

@Source(type = 'http',
define stream SweetProductionStream (name string, amount double);

After that we define the sink where we specify an action on the output and the format of the output stream. Here we are pushing the result to the “log” file.

define stream TotalCountStream (totalCount long);

Finally, we have the processing logic where we give the name “query1” through the @info  annotation for identification of this query. Here we are taking events from the input stream which we have defined in the source section and then using the count() function to count the number of events and push the result into output stream which we have defined within the sink section.

-- Count the incoming events
from SweetProductionStream
select count() as totalCount
insert into TotalCountStream;

With this, let’s run this Siddhi application from the editor by saving this file and clicking on the “Run” button or selecting the relevant menu item. If it is deployed and started successfully, you will see the below log message in the editor console.

ReceiveAndCount.siddhi - Started Successfully! 

Now let’s send some events to this Siddhi application. You can either use a tool like PostMan/SOAPUI or the built in event simulation feature of the editor. Here I’m using the event simulator which is coming with the editor. You can click on the “event simultor” icon which is on the left side panel (second icon) and it will expand that panel and open the event simulation section. 

Here you need to select the following values.

  • Siddhi App Name = ReceiveAndCount
  • Stream Name - SweetProductionStream
  • name(STRING) - Flour (sample value)
  • amount(DOUBLE) - 23 (sample value)

Once you select those values, you can click on “Send” button and it will send an event with following JSON format:

{ name: “Flour”, amount: 23} 

If you observe the console when you start the editor at the beginning, you will see the following line getting printed.

[2018-06-01 10:57:01,776] INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} - ReceiveAndCount : TotalCountStream : Event{timestamp=1527830821771, data=[1], isExpired=false}

If you click on 'send event' two more times, you will see that “data” element of the above log line is aggregating to thenumber of events you have sent. 

[2018-06-01 10:58:51,500] INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} - ReceiveAndCount : TotalCountStream : Event{timestamp=1527830931494, data=[2], isExpired=false}

[2018-06-01 10:58:52,846] INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} - ReceiveAndCount : TotalCountStream : Event{timestamp=1527830932845, data=[3], isExpired=false}

Congratulations! You have run your first Siddhi application with WSO2 SP which counts the number of events received to a given HTTP service. 

Let’s do something meaningful with the next sample. Let’s say we want to implement a fraud detection use case where if someone is spending more than 100K within a 10 minute time interval from one credit card, that needs to be considered as a red flag and send an email to the user. We can implement this use case with the following Siddhi application.


@App:description('Simulate a single event and receive alerts as e-mail when a predefined threshold value is exceeded')

define stream TransactionStream(creditCardNo string, country string, item string, transaction double);

      username ='sender.username',
      address ='sender.email',
      password= 'XXXXXXX', 
      subject='Alert for large value transaction: cardNo:{{creditCardNo}}',
      port = '465',
      host = 'smtp.gmail.com',
      ssl.enable = 'true',
      auth = 'true', 
define stream AlertStream(creditCardNo string, country string, item string, lastTransaction double);

partition with(creditCardNo of TransactionStream)
from TransactionStream#window.time(10 minute)[sum(transaction) > 100000]
select creditCardNo, country, item, transaction as lastTransaction
insert into AlertStream;

The above application sends an email when there is a fraudulent that event occurs. The execution flow and the application logic can be explained using the below figure.

Here we create a partition of the event stream using a given credit card number. Within that partition, we check for a 10 minute time window and within that period, we do an aggregation and check the value to be greater than 100K. If all those conditions are satisfied, we choose the last arrived event and send those details through an email to the relevant user. 

You can save the above Siddhi application as “AlertsAndThresholds.siddhi” file within the editor and then send a series of events from event simulation section and observe that when there are transactions which sums up to 100K for a given credit card number, it will send an email to the configured email address. The email will look similar to below.

Alert for large value transaction: cardNo:444444

That’s it. You just wrote a Siddhi application to detect fraudulent activities. You can extend this application based on your conditions.

big data ,data streams ,stream processing ,wso2

Published at DZone with permission of Chanaka Fernando , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}