Skip to main content

Understanding WSO2 Stream Processor - Part 2

In the first part of this tutorial, I have explained about the concepts around WSO2 Stream Processor and how they are correlated with each other and which components users can use to implement their streaming analytics requirements. It laid out the platform for this tutorial (part 2) where we get our hands dirty with WSO2 SP.
The first thing you have to do is download the WSO2 SP runtime from WSO2 website.

https://wso2.com/analytics/install

Once you download the product distribution, you can extract that into a directory and run the product from the bin directory. You need to set the “JAVA_HOME” environment variable to your java installation (1.8 or higher) before starting the product. In this part of the tutorial, we are going to implement some streaming analytics use cases with WSO2 SP. Hence we need to start the SP in “editor” mode using the following command (for linux).

$ sh bin/editor.sh

This command will start the editor profile of the WSO2 SP and prints the URL of the editor in the console similar to below.


Now you can click on the above link and it will open up the editor in a browser window.



This is your playgorund where you can implement your streaming analytics use cases and test, debug and deploy into the runtime. All these activities can be done without moving away from the editor. Editor comes with so many samples which are self-explanatory and easy to execute. Let’s open up an existing sample to get things going.

Let’s start with the sample “ReceiveAndCount” by clicking on the sample. This will open the source file of this siddhi application. If you ignore the comments section, the code looks like below. You can save this file with the name “ReceiveAndCount.siddhi”.

@App:name("ReceiveAndCount")

@App:description('Receive events via HTTP transport and view the output on the console')

@Source(type = 'http',
       receiver.url='http://localhost:8006/productionStream',
       basic.auth.enabled='false',
       @map(type='json'))
define stream SweetProductionStream (name string, amount double);

@sink(type='log')
define stream TotalCountStream (totalCount long);

-- Count the incoming events
@info(name='query1')
from SweetProductionStream
select count() as totalCount
insert into TotalCountStream;

Let’s go through this code and understand what we are doing here. First we define the name of the siddhi application and a description about the use case.

@App:name("ReceiveAndCount")

@App:description('Receive events via HTTP transport and view the output on the console')

Then we define the source of the events with the following code segment. Here we are specifying the protocol as “http” and the data type as “json”. Also we specify the URL of the exposed service and the format of data which is coming (schema).

@Source(type = 'http',
       receiver.url='http://localhost:8006/productionStream',
       basic.auth.enabled='false',
       @map(type='json'))
define stream SweetProductionStream (name string, amount double);
After that we define the sink where we specify action on the output and the format of the output stream. Here we are pushing the result to “log” file.

@sink(type='log')
define stream TotalCountStream (totalCount long);

Finally we have the processing logic where we give the name “query1” through the @info annotation for identification of this query. Here we are taking events from input stream which we have defined in the source section and then using the “count()” function to count the number of events and push the result into output stream which we have defined within the sink section.

-- Count the incoming events
@info(name='query1')
from SweetProductionStream
select count() as totalCount
insert into TotalCountStream;

With this understanding, let’s run this siddhi application from the editor by saving this file and clicking on the “Run” button or select the relevant menu item. If it is deployed and started successfully, you will see the below log message in the editor console.

ReceiveAndCount.siddhi - Started Successfully!

Now let’s send some events to this siddhi application. You can either use a tool like PostMan/SOAPUI or the built in event simulation feature of the editor. Here I’m using the event simulator which is coming with the editor. You can click on the “event simultor” icon which is on the left side panel (second icon) and it will expand that panel and open the event simulation section.



Here you need to select the following values.
  • Siddhi App Name = ReceiveAndCount
  • Stream Name - SweetProductionStream
  • name(STRING) - Flour (sample value)
  • amount(DOUBLE) - 23 (sample value)

Once you select those values, you can click on “Send” button and it will send an event with following JSON format

{ name: “Flour”, amount: 23}

If you observe the console which you start the editor at the beginning, you will see the following line getting printed.
[2018-06-01 10:57:01,776] INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} - ReceiveAndCount : TotalCountStream : Event{timestamp=1527830821771, data=[1], isExpired=false}

If you click on send event 2 more times, you will see that “data” element of the above log line is aggregating to number of events you have sent.

[2018-06-01 10:58:51,500] INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} - ReceiveAndCount : TotalCountStream : Event{timestamp=1527830931494, data=[2], isExpired=false}

[2018-06-01 10:58:52,846] INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} - ReceiveAndCount : TotalCountStream : Event{timestamp=1527830932845, data=[3], isExpired=false}


Congratulations! You have run your first siddhi application with WSO2 SP which counts the number of events received to a given http service.

Let’s do something meaningful with the next sample. Let’s say we want to implement a fraud detection use case where if someone is spending more than 100K within a 10 minute time interval from one credit card, that needs to be considered as a red flag and send an email to the user. We can implement this use case with the following siddhi application.


@App:name("AlertsAndThresholds")

@App:description('Simulate a single event and receive alerts as e-mail when a predefined threshold value is exceeded')

define stream TransactionStream(creditCardNo string, country string, item string, transaction double);

@sink(type='email',
     username ='sender.username',
     address ='sender.email',
     password= 'XXXXXXX',
     subject='Alert for large value transaction: cardNo:{{creditCardNo}}',
     to='email.address.to.be.sent',
     port = '465',
     host = 'smtp.gmail.com',
     ssl.enable = 'true',
     auth = 'true',
     @map(type='text'))
define stream AlertStream(creditCardNo string, country string, item string, lastTransaction double);

@info(name='query1')
partition with(creditCardNo of TransactionStream)
begin
from TransactionStream#window.time(10 minute)[sum(transaction) > 100000]
select creditCardNo, country, item, transaction as lastTransaction
insert into AlertStream;
end;

The above application sends an email when there is a fraudulent event occurs. The execution flow and the application logic can be explained using the below figure.



Here we create a partition of the event stream using a given credit card number. Within that partition, we check for a 10 minute time window and within that period, we do an aggregation and check the value to be greater than 100K. If all those conditions are satisfied, we choose the last arrived event and send those details through an email to the relevant user.

You can save the above siddhi application as “AlertsAndThresholds.siddhi” file within the editor and then send a series of events from event simulation section and observe that when there are transactions which sums up to 100K for a given credit card number, it will send an email to the configured email address. The email will look similar to below.

Alert for large value transaction: cardNo:444444

creditCardNo:"444444",
country:"lk",
item:"test",
lastTransaction:50000.0

That’s it. You just wrote a siddhi application to detect fraudulent activities. You can extend this application based on your conditions.

Comments

Popular posts from this blog

How to configure timeouts in WSO2 ESB to get rid of client timeout errors

WSO2 ESB has defined some configuration parameters which controls the timeout of a particular request which is going out of ESB. In a particular  scneario, your client sends a request to ESB, and then ESB sends a request to another endpoint to serve the request. CLIENT->WSO2 ESB->BACKEND The reason for clients getting timeout is that ESB timeout is larger than client's timeout. This can be solved by either increasing the timeout at client side or by decreasing the timeout in ESB side. In any of the case, you can control the timeout in ESB using the below properties. 1) Global timeout defined in synapse.properties (ESB_HOME\repository\conf\) file. This will decide the maximum time that a callback is waiting in the ESB for a response for a particular request. If ESB does not get any response from Back End, it will drop the message and clears out the call back. This is a global level parameter which affects all the endpoints configured in ESB. synapse.global_timeout_inte

Understanding Threads created in WSO2 ESB

WSO2 ESB is an asynchronous high performing messaging engine which uses Java NIO technology for its internal implementations. You can find more information about the implementation details about the WSO2 ESB’s high performing http transport known as Pass-Through Transport (PTT) from the links given below. [1] http://soatutorials.blogspot.com/2015/05/understanding-wso2-esb-pass-through.html [2] http://wso2.com/library/articles/2013/12/demystifying-wso2-esb-pass-through-transport-part-i/ From this tutorial, I am going to discuss about various threads created when you start the ESB and start processing requests with that. This would help you to troubleshoot critical ESB server issues with the usage of a thread dump. You can monitor the threads created by using a monitoring tool like Jconsole or java mission control (java 1.7.40 upwards). Given below is a list of important threads and their stack traces from an active ESB server.  PassThroughHTTPSSender ( 1 Thread )

How puppet works in your IT infrstructure

What is Puppet? Puppet is IT automation software that helps system administrators manage infrastructure throughout its lifecycle, from provisioning and configuration to orchestration and reporting. Using Puppet, you can easily automate repetitive tasks, quickly deploy critical applications, and proactively manage change, scaling from 10s of servers to 1000s, on-premise or in the cloud. How the puppet works? It works like this..Puppet agent is a daemon that runs on all the client servers(the servers where you require some configuration, or the servers which are going to be managed using puppet.) All the clients which are to be managed will have puppet agent installed on them, and are called nodes in puppet. Puppet Master: This machine contains all the configuration for different hosts. Puppet master will run as a daemon on this master server. Puppet Agent: This is the daemon that will run on all the servers, which are to be managed using p