Skip to main content

Building an In-Order, Reliable Messaging system for your enterprise with WSO2 Products

Business Requirement

Today, your business is all about working with information. If you are a business owner and if you need to grow your business, best way is to make it accessible over the internet. When your business grows, it can be thousands, millions or even billions of people interacting with your business every second. If you think about online stores like ebay, amazon or google play, they need to process millions of messages per second.

When a customer interact with such a system, that information need to saved in somewhere (ex:database). This can be illustrated as below.



In the above figure, Client interacting with a web browser and access your system which is implemented as a web service which is hosted somewhere in the internet. At a given moment, there can be any number of parallel users accessing your business. To handle each and every request from the customers, you need to have a robust, reliable messaging system.

How WSO2 can serve your requirement?

WSO2 is an enterprise middleware company which provides a full set of products which can be used in enterprise integration scenarios. Mapping of the WSO2 products in to the above business scenario is not that hard since WSO2 provides a well designed products for most of your enterprise integrations.

Re-designed diagram with the WSO2 product mapping can be given as below.



According to the above figure, we need 3 WSO2 products which are specifically designed for the respective use cases.

WSO2 ESB - Ultimate integration engine which can be used to connect heterogeneous systems.

WSO2 MB - Reliable messaging broker which can be used to store your messages.

WSO2 DSS - Simple yet powerful engine which can be used to interact with a database operations in a reliable manner.

Let's make it happen with actual configurations

First you need to download the 3 products which are mentioned above from the WSO2 site (links are provided above)

After downloading the products, you can extract them to 3 directories. Let's consider these 3 directories as below. You need to follow the below link for installing the WSO2 products in linux enviornment


ESB_HOME - extracted directory for WSO2 ESB
MB_HOME - extracted directory for WSO2 MB
DSS_HOME - extracted directory for WSO2 DSS

Once you have installed the products, let's make the system integrated for the business requirement.

1. Once you have installed all the 3 products, you need to configure them such that they can work together by copying required libraries.

  • ESB+MB integration
You can use the following link to integrate WSO2 ESB with the WSO2 MB.

You don't need to create the proxy service since we will be creating it later.
  • MB+DSS integration
You can use the following link to integrate WSO2 ESB with the WSO2 DSS.

make sure you have changed the offset of the DSS server to 2. Now your servers are running at following offset values.
ESB - 0 (9443)
MB - 1 (9444)
DSS - 2 (9445)


2. Make an interface in the ESB to access from the Web browser.
We can create a proxy service to handle the requests coming from the Web browser in to ESB. Proxy configuration can be given as below.

<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
      name="RequestInProxy"
      transports="https,http"
      statistics="enable"
      trace="disable"
      startOnLoad="true">
  <target>
     <inSequence>
        <property name="OUT_ONLY" value="true"/>
        <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
        <property name="transportNonBlocking" scope="axis2" action="remove"/>
        <log level="custom">
           <property name="STATUS" value="Message Received"/>
        </log>
        <send>
           <endpoint>
              <address uri="jms:/StockQuotesQueue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=tcp://localhost:5673&amp;transport.jms.DestinationType=queue&amp;java.naming.provider.url=repository/conf/jndi.properties"
                       format="pox"/>
           </endpoint>
        </send>
     </inSequence>
     <faultSequence>
        <log level="custom">
           <property name="ERROR" value="Error in MessageReceivingProxy - Deactivating service"/>
        </log>
     </faultSequence>
  </target>
  <description/>
</proxy>

you can save this proxy in ESB_HOME/repository/deployment/server/synapse-configs/default/proxy-services directory. From the above proxy, we are saving the message coming in to the proxy to the JMS queue named StockQuotesProxy.

3. Once the messages are saved in the queue, we need to send them to the Database with the DSS accessing the queue. This can be done by achieved by using one of the following methodologies.

  • Using the JMS endpoint in the DSS
If we are using the JMS endpoint, we need to write a data service for consuming the messages from the MB queue. This can be written as follows. First you need to configure your database with the DSS server. If you are using mysql database, you can follow this link to configure.


Once you have configure the database with the DSS, you can define your dataservice to retrieve the messages from the MB using JMS transport. For doing this, you need the following two files in the DSS_HOME/repository/deployment/server/dataservices directory.

StockQuotesQueue_services.xml
----------------------------------------------

<serviceGroup>
    <service name="StockQuotesQueue">
       <parameter name="transport.jms.ContentType">
       <rules>
           <jmsProperty>contentType</jmsProperty>
           <default>application/xml</default>
       </rules>
       </parameter>
        <parameter name="JMS_REPLY_TO">DSS_FAULTS</parameter>
    </service>
</serviceGroup>


StockQuotesQueue.dbs
---------------------------------------------
<data enableBatchRequests="true" name="StockQuotesQueue">
  <description>Sample Data Service</description>
  <config id="DemoDataSource">
     <property name="driverClassName">com.mysql.jdbc.Driver</property>
     <property name="url">jdbc:mysql://localhost:3306/ESB_SP_SAMPLE</property>
     <property name="username">root</property>
     <property name="password">root123</property>
  </config>
  <query id="insertRequestData" useConfig="DemoDataSource">
     <sql>Call ESB_SP_SAMPLE.InsertRequestData(?,?,?,?)</sql>
     <properties>
        <property name="org.wso2.ws.dataservice.query_timeout">100</property>
        <property name="org.wso2.ws.dataservice.force_jdbc_batch_requests">true</property>
     </properties>
     <param name="name" ordinal="1" sqlType="STRING"/>
     <param name="id" ordinal="2" sqlType="STRING"/>
     <param name="price" ordinal="3" sqlType="DOUBLE"/>
     <param name="location" ordinal="4" sqlType="STRING"/>
  </query>
  <operation name="insertRequestData" returnRequestStatus="false">
     <call-query href="insertRequestData">
   <with-param name="name" query-param="name"/>
        <with-param name="id" query-param="id"/>
        <with-param name="price" query-param="price"/>
        <with-param name="location" query-param="location"/>        
     </call-query>
  </operation>
 </data>

In this data service, the message retrieved from the MB is written in to the database using a stored procedure which is defined in the mysql database. You can create the related database and the stored procedure with the following commands.

1. Connect to mysql server.
chanaka@chanaka-laptop:~$ mysql -u root -p
Enter password:

2. Create a sample databasae.
DROP DATABASE IF EXISTS ESB_SP_SAMPLE;
CREATE DATABASE ESB_SP_SAMPLE;

3. Create a table using the following statement.
USE ESB_SP_SAMPLE;

DROP TABLE IF EXISTS company;
CREATE TABLE company(name VARCHAR(10), id VARCHAR(10), price DOUBLE, location VARCHAR(10));

4. Inserts some data to the company table using following statements
INSERT INTO company VALUES ('WSO2','c1',2.9563,'SL');
INSERT INTO company VALUES ('IBM','c2',3.7563,'US');
INSERT INTO company VALUES ('SUN','c3',3.8349,'US');
INSERT INTO company VALUES ('MSFT','c4',3.2938,'US');

5. Create necessary Stored Procedures.

DROP PROCEDURE If EXISTS InsertRequestData;
CREATE PROCEDURE InsertRequestData(compName VARCHAR(10), compId VARCHAR(10), compPrice DOUBLE, compLocation VARCHAR(10)) INSERT INTO company VALUES(compName,compId,compPrice,compLocation) ;

Now we are all set and we can send a message to the proxy service using SOAP UI and we can observe that sample data is stored in the database once the message is arrived at the DSS. You can change the configurations such that the actual message content is saved in the database. Here is a sample message you can send from SOAP UI to be saved in the database.
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
  <soapenv:Header/>
  <soapenv:Body>
  <p:insertRequestData xmlns:p="http://ws.wso2.org/dataservice">
     <p:name>WSO2-Mob</p:name>
     <p:id>c45</p:id>
     <p:price>33.33</p:price>
     <p:location>SL</p:location>
  </p:insertRequestData>
</soapenv:Body>
</soapenv:Envelope>

Now you can see the data you have sent to the ESB is saved in the mysql database.


  • Using a Message Processor defined in the ESB

You can achieve the same results by using a message processor defined in the ESB instead of defining a JMS data service at DSS side. For this you need to change the proxy service such that it saves the message in message store and then use the message processor to send the message to DSS service.

1. Define a message store by copying and pasting the following config to ESB source view. Alternatively you can use the message-store UI.   

<messageStore name="JMSMS" class="org.apache.synapse.message.store.impl.jms.JmsStore" xmlns="http://ws.apache.org/ns/synapse">
  <parameter name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
  <parameter name="java.naming.provider.url">repository/conf/jndi.properties</parameter>
  <parameter name="store.jms.destination">JMSMS</parameter>
  <parameter name="store.jms.JMSSpecVersion">1.1</parameter>
  <parameter name="store.jms.cache.connection">false</parameter>
</messageStore>

 2. Define an endpoint to send the message. We need to send the message to the DSS service.

<endpoint xmlns="http://ws.apache.org/ns/synapse" name="StockQuoteDataService">
  <address uri="http://localhost:9765/services/StockQuoteDataService">
     <suspendOnFailure>
        <progressionFactor>1.0</progressionFactor>
     </suspendOnFailure>
     <markForSuspension>
        <retriesBeforeSuspension>0</retriesBeforeSuspension>
        <retryDelay>0</retryDelay>
     </markForSuspension>
  </address>
</endpoint>

For this you only need the dbs file in the DSS and you can change that as below.

StockQuoteDataService.dbs
----------------------------------------
<data enableBatchRequests="true" name="StockQuoteDataService">
  <description>Sample Data Service</description>
  <config id="DemoDataSource">
     <property name="driverClassName">com.mysql.jdbc.Driver</property>
     <property name="url">jdbc:mysql://localhost:3306/ESB_SP_SAMPLE</property>
     <property name="username">root</property>
     <property name="password">root123</property>
  </config>
  <query id="insertRequestData" useConfig="DemoDataSource">
     <sql>Call ESB_SP_SAMPLE.InsertRequestData(?,?,?,?)</sql>
     <properties>
        <property name="org.wso2.ws.dataservice.query_timeout">100</property>
        <property name="org.wso2.ws.dataservice.force_jdbc_batch_requests">true</property>
     </properties>
     <param name="name" ordinal="1" sqlType="STRING"/>
     <param name="id" ordinal="2" sqlType="STRING"/>
     <param name="price" ordinal="3" sqlType="DOUBLE"/>
     <param name="location" ordinal="4" sqlType="STRING"/>
  </query>   
  <operation name="insertRequestData" returnRequestStatus="false">
     <call-query href="insertRequestData">
   <with-param name="name" query-param="name"/>
        <with-param name="id" query-param="id"/>
        <with-param name="price" query-param="price"/>
        <with-param name="location" query-param="location"/>        
     </call-query>
  </operation>  
</data>

copy this file into DSS_HOME/repository/deployment/server/dataservices/ directory.

3. Define a message forwarding processor as below by copying and pasting code or using the Management Console UI in ESB.

<messageProcessor name="Processor1" class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor" targetEndpoint="StockQuoteDataService" messageStore="JMSMS" xmlns="http://ws.apache.org/ns/synapse">
  <parameter name="interval">1000</parameter>
  <parameter name="client.retry.interval">1000</parameter>
  <parameter name="max.delivery.attempts">4</parameter>
  <parameter name="is.active">true</parameter>
</messageProcessor>

4. Invoke the scenario using a proxy service like below.
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
      name="RequestInProxy2"
      transports="https,http"
      statistics="disable"
      trace="disable"
      startOnLoad="true">
  <target>
     <inSequence>
        <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
        <property name="OUT_ONLY" value="true"/>
        <property name="target.endpoint" value="StockQuoteDataService"/>
        <log level="full"/>
        <store messageStore="JMSMS"/>
     </inSequence>
  </target>
  <description/>
</proxy>

Whenever a message comes to this proxy service, it will be stored in JMS message store (which is in Message Broker, JMSMS queue). If message processor is disabled and messages are sent, you will notice in the Message Broker's management console, the message count of JMSMS queue being increased.

Now we are all set and we can send a message to the proxy service using SOAP UI and we can observe that sample data is stored in the database once the message is arrived at the DSS. You can change the configurations such that the actual message content is saved in the database.
Here is a sample message you can send from SOAP UI to be saved in the database.
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
  <soapenv:Header/>
  <soapenv:Body>
  <p:insertRequestData xmlns:p="http://ws.wso2.org/dataservice">
     <p:name>WSO2-Mob</p:name>
     <p:id>c45</p:id>
     <p:price>33.33</p:price>
     <p:location>SL</p:location>
  </p:insertRequestData>
</soapenv:Body>
</soapenv:Envelope>

You need to set the WS-Addressing in SOAPUI and set the action as urn:insertRequestData and To header pointing to the soap service.

Comments

Post a Comment

Popular posts from this blog

Understanding Threads created in WSO2 ESB

WSO2 ESB is an asynchronous high performing messaging engine which uses Java NIO technology for its internal implementations. You can find more information about the implementation details about the WSO2 ESB’s high performing http transport known as Pass-Through Transport (PTT) from the links given below. [1] http://soatutorials.blogspot.com/2015/05/understanding-wso2-esb-pass-through.html [2] http://wso2.com/library/articles/2013/12/demystifying-wso2-esb-pass-through-transport-part-i/ From this tutorial, I am going to discuss about various threads created when you start the ESB and start processing requests with that. This would help you to troubleshoot critical ESB server issues with the usage of a thread dump. You can monitor the threads created by using a monitoring tool like Jconsole or java mission control (java 1.7.40 upwards). Given below is a list of important threads and their stack traces from an active ESB server.  PassThroughHTTPSSender ( 1 Thread )

WSO2 ESB tuning performance with threads

I have written several blog posts explaining the internal behavior of the ESB and the threads created inside ESB. With this post, I am talking about the effect of threads in the WSO2 ESB and how to tune up threads for optimal performance. You can refer [1] and [2] to understand the threads created within the ESB. [1] http://soatutorials.blogspot.com/2015/05/understanding-threads-created-in-wso2.html [2] http://wso2.com/library/articles/2012/03/importance-performance-wso2-esb-handles-nonobvious/ Within this blog post, I am discussing about the "worker threads" which are used for processing the data within the WSO2 ESB. There are 2 types of worker threads created when you start sending the requests to the server 1) Server Worker/Client Worker Threads 2) Mediator Worker (Synapse-Worker) Threads Server Worker/Client Worker Threads These set of threads will be used to process all the requests/responses coming to the ESB server. ServerWorker Threads will be used to pr

How to configure timeouts in WSO2 ESB to get rid of client timeout errors

WSO2 ESB has defined some configuration parameters which controls the timeout of a particular request which is going out of ESB. In a particular  scneario, your client sends a request to ESB, and then ESB sends a request to another endpoint to serve the request. CLIENT->WSO2 ESB->BACKEND The reason for clients getting timeout is that ESB timeout is larger than client's timeout. This can be solved by either increasing the timeout at client side or by decreasing the timeout in ESB side. In any of the case, you can control the timeout in ESB using the below properties. 1) Global timeout defined in synapse.properties (ESB_HOME\repository\conf\) file. This will decide the maximum time that a callback is waiting in the ESB for a response for a particular request. If ESB does not get any response from Back End, it will drop the message and clears out the call back. This is a global level parameter which affects all the endpoints configured in ESB. synapse.global_timeout_inte