Skip to main content

Extending WSO2 ESB with a Custom Transport Implementation - Part II

This blog post is a continuation to my previous blog post where I have described the concepts of WSO2 ESB transports mechanism. Since we have covered the basics, let's start writing some real code. I will be using the ISO8583 standard as my subject to this custom implementation. I will be grabbing some content from this blog post for my reference to ISO8583 java implementation (Business logic). Thanks Manoj Fernando for writing such an informative post.

http://manoj-fernando.blogspot.com/2013/08/iso8583-with-wso2-esb.html

Idea of the custom transport implementation is to provide a mechanism to write your business logic which can plug in to the WSO2 ESB runtime. I am not going to tell more about ISO8583 or it's internal implementations. I will be using already implemented java library jPos for this purpose.  It has the functionality to cover the basic use cases of ISO8583 implementations.

Sample use case

Let’s take the scenario of a certain financial application needing to make a credit transaction by sending an XML message that needs to be converted to an ISO8583 byte stream before passed on to the wire through a TCP channel.



Implementation

First, we need to define our ISO8583 field definition.  This might be a bit confusing to some.  If we are dealing with a specification, why do we need a field definition?  This is because that ISO8583 specification is not hard-binding any data elements and/or field ordering. It is entirely up to the application designer to define which field types/IDs need to be placed for their specific transactional requirements.

At a glance, the field definition file looks like the following.

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE isopackager SYSTEM "genericpackager.dtd">
<isopackager>
  <isofield
      id="0"
      length="4"
      name="Message Type Indicator"
      class="org.jpos.iso.IFA_NUMERIC"/>
  <isofield
      id="1"
      length="16"
      name="Bitmap"
      class="org.jpos.iso.IFA_BITMAP"/>
  <isofield
      id="2"
      length="19"
      name="Primary Account number"
      class="org.jpos.iso.IFA_LLNUM"/>
  <isofield
      id="3"
      length="6"
      name="Processing Code"
      class="org.jpos.iso.IFA_NUMERIC"/>

</isopackager>

Please refer to [1] & [2] for a complete reference of ISO8583.   As per now, let me just say that each field should have an ID, a length and type specified in its definition.  I have only listed a snippet of the XML config here, and you may find the full definition jposdef.xml inside the codebase.
I have created a simple maven project to implement this transport.  Make sure that you have included the jPOS dependencies on pom.xml as follows.
              <dependency>
                     <groupId>org.jpos</groupId>
                     <artifactId>jpos</artifactId>
                     <version>1.9.0</version>
              </dependency>
              <dependency>
                     <groupId>com.sleepycat</groupId>
                     <artifactId>je</artifactId>
                     <version>4.0.92</version>
             </dependency>


To implement the transport sender, you need to subclass the AbstractTransportSender and implement its sendMessage method as follows.

public class ISO8583TransportSender extends AbstractTransportSender {

@Override
public void sendMessage(MessageContext msgCtx, String targetEPR,
OutTransportInfo outTransportInfo) throws AxisFault {

try {
URI isoURL = new URI(targetEPR);
ISOPackager packager = new GenericPackager(this.getClass()
.getResourceAsStream("jposdef.xml"));

ASCIIChannel chl = new ASCIIChannel(isoURL.getHost(),
isoURL.getPort(), packager);

                        writeMessageOut(msgCtx, chl);

} catch (Exception e) {
throw new AxisFault(
"An exception occured in sending the ISO message");
}

}


    /**
     * Writting the message to the output channel after applying correct message formatter
     * @param msgContext
     * @param chl
     * @throws org.apache.axis2.AxisFault
     * @throws java.io.IOException
     */
    private void writeMessageOut(MessageContext msgContext,
                                 ASCIIChannel chl) throws AxisFault, IOException {
        ISO8583MessageFormatter messageFormatter = (ISO8583MessageFormatter)BaseUtils.getMessageFormatter(msgContext);
        OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
        messageFormatter.setAsciiChannel(chl);
        messageFormatter.writeTo(msgContext, format, null, true);
    }

Within the TransportSender, we are extracting the URL and then create the relevant entities for the Message Formatter and pass the control to the MessageFormatter. Within the MessageFormatter, we can send the actual message to the back end server.

public class ISO8583MessageFormatter implements MessageFormatter {

    private ASCIIChannel asciiChannel;
    @Override
    public byte[] getBytes(MessageContext messageContext, OMOutputFormat omOutputFormat) throws AxisFault {
        return new byte[0];
    }

    @Override
    public void writeTo(MessageContext messageContext, OMOutputFormat omOutputFormat, OutputStream outputStream, boolean b) throws AxisFault {
        ISOMsg isoMsg = toISO8583(messageContext);
        ASCIIChannel chl = this.asciiChannel;
        try {
            chl.connect();
            chl.send(isoMsg);
            chl.disconnect();
        } catch (Exception ex) {
            throw new AxisFault(
                    "An exception occured in sending the ISO message");
        }
    }

    @Override
    public String getContentType(MessageContext messageContext, OMOutputFormat omOutputFormat, String s) {
        return null;
    }

    @Override
    public URL getTargetAddress(MessageContext messageContext, OMOutputFormat omOutputFormat, URL url) throws AxisFault {
        return null;
    }

    @Override
    public String formatSOAPAction(MessageContext messageContext, OMOutputFormat omOutputFormat, String s) {
        return null;
    }

    public ISOMsg toISO8583(MessageContext messageContext) throws AxisFault {
        SOAPEnvelope soapEnvelope = messageContext.getEnvelope();
        OMElement isoElements = soapEnvelope.getBody().getFirstElement();

        ISOMsg isoMsg = new ISOMsg();

        @SuppressWarnings("unchecked")
        Iterator<OMElement> fieldItr = isoElements.getFirstChildWithName(
                new QName(ISO8583Constant.TAG_DATA)).getChildrenWithLocalName(
                ISO8583Constant.TAG_FIELD);

        String mtiVal = isoElements
                .getFirstChildWithName(new QName(ISO8583Constant.TAG_CONFIG))
                .getFirstChildWithName(new QName(ISO8583Constant.TAG_MTI))
                .getText();

        try {
            isoMsg.setMTI(mtiVal);

            while (fieldItr.hasNext()) {

                OMElement isoElement = (OMElement) fieldItr.next();

                String isoValue = isoElement.getText();

                int isoTypeID = Integer.parseInt(isoElement.getAttribute(
                        new QName("id")).getAttributeValue());

                isoMsg.set(isoTypeID, isoValue);

            }

            return isoMsg;

        } catch (ISOException ex) {
            throw new AxisFault("Error parsing the ISO8583 payload");
        } catch (Exception e) {

            throw new AxisFault("Error processing stream");
        }

    }


    public ASCIIChannel getAsciiChannel() {
        return asciiChannel;
    }

    public void setAsciiChannel(ASCIIChannel asciiChannel) {
        this.asciiChannel = asciiChannel;
    }

}

Here within the formatter, we are transforming the XML message into ISO8583 binary message and send to the back end server.

This is only an example of dividing your message sending logic to message sender and message formatter. You can design your implementation according to your requirement. Sometimes, you may not need specific formatter but you can do the formatting part also in the sender itself. But I have delegated a part of the message handling to message formatter for demonstration purpose.

Likewise, you can write a message receiver and message builder for receiving the messages via iso8583 protocol. I will leave that as an exercise for the reader.

Once we have the message sender and formatter implemented, we need to register them in the axis2.xml file. Let's go to the axis2.xml file and add following 2 entries there.

        <messageFormatter contentType="application/iso8583"
                         class="org.wso2.transport.iso8583.message.ISO8583MessageFormatter"/>

<transportSender name="iso8583" class="org.wso2.transport.iso8583.ISO8583TransportSender"/>

Once you create the jar file from your custom transport implementation code, place it is ESB_HOME/repository/components/lib directory.

If you are done with the above steps, you can start the ESB server.

Let's create a sample API to interact with this custom transport implementation. Here is the API definition.

<api xmlns="http://ws.apache.org/ns/synapse" name="iso8583" context="/iso8583">
   <resource methods="POST GET">
      <inSequence>
         <log level="full"></log>
         <property name="OUT_ONLY" value="true"></property>
         <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"></property>
         <property name="messageType" value="application/iso8583" scope="axis2"></property>
         <send>
            <endpoint name="isoserver">
               <address uri="iso8583://localhost:5000"></address>
            </endpoint>
         </send>
      </inSequence>
   </resource>
</api>

In the above configuration, I have specified the messageType as application/iso8583 such that it will engage the correct message formatter within the mediation flow. 

Now we need to create a sample TestServer to test the functionality of the ISO8583 back end server. We can create a MockServer using the jpos library itself. Here is the code for the TestServer.

public class TestServer implements ISORequestListener {
    static final String hostname = "localhost";
    static final int portNumber = 5000;


    public static void main(String[] args) throws ISOException {


        ISOPackager packager = new GenericPackager("jposdef.xml");
        ServerChannel channel = new ASCIIChannel(hostname, portNumber, packager);
        ISOServer server = new ISOServer(portNumber, channel, null);

        server.addISORequestListener(new TestServer());

        System.out.println("ISO8583 server started...");
        new Thread(server).start();
    }

    public boolean process(ISOSource isoSrc, ISOMsg isoMsg) {
        try {
            System.out.println("ISO8583 incoming message on host ["
                    + ((BaseChannel) isoSrc).getSocket().getInetAddress()
                    .getHostAddress() + "]");

            if (isoMsg.getMTI().equalsIgnoreCase("1800")) {

                receiveMessage(isoSrc, isoMsg);
                logISOMsg(isoMsg);

            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        return true;
    }

    private void receiveMessage(ISOSource isoSrc, ISOMsg isoMsg)
            throws ISOException, IOException {
        System.out.println("ISO8583 Message received...");
        ISOMsg reply = (ISOMsg) isoMsg.clone();
        reply.setMTI("1810");
        reply.set(39, "00");

        isoSrc.send(reply);
    }

    private static void logISOMsg(ISOMsg msg) {
        System.out.println("----ISO MESSAGE-----");
        try {
            System.out.println("  MTI : " + msg.getMTI());
            for (int i = 1; i <= msg.getMaxField(); i++) {
                if (msg.hasField(i)) {
                    System.out.println("    Field-" + i + " : "
                            + msg.getString(i));
                }
            }
        } catch (ISOException e) {
            e.printStackTrace();
        } finally {
            System.out.println("--------------------");
        }

    }

}

You can run the above program to mimic the ISO8583 server and then we can send a message from a client like Advanced REST Client plugin in the chrome browser. Our message payload should be like below.

<iso8583message>
       <config>
              <mti>1800</mti>
       </config>
       <data>
              <field id="3">110</field>
              <field id="5">4200.00</field>
              <field id="48">Simple Credit Transaction</field>
              <field id="6">645.23</field>
              <field id="88">66377125</field>
       </data>
</iso8583message>

When we send this message from client, ESB will accept the message and execute the message sender we have written and then selects the message formatter and send to the mock back end server. You can see the following log printed in the TestServer side if you have done all the things right.

ISO8583 server started...
ISO8583 incoming message on host [127.0.0.1]
ISO8583 Message received...
----ISO MESSAGE-----
  MTI : 1800
    Field-3 : 000110
    Field-5 : 000004200.00
    Field-6 : 000000645.23
    Field-48 : Simple Credit Transaction
    Field-88 : 0000000066377125
--------------------

When you are have configured the ESB, you will get exceptions if you do not copy following jar files to the lib directory alongside with custom transport jar file.

  • jpos-1.9.0.jar
  • jdom-1.1.3.jar
  • commons-cli-1.3.1.jar

Now we have written our message sender and message formatter implementations. Likewise, you can implement the message receiver and message builder code also. I have created an archive with all the relevant artifacts which I have developed for this blog post and uploaded them to github. You can download all the projects and relevant jar files from following location.


Comments

Post a Comment

Popular posts from this blog

WSO2 ESB tuning performance with threads

I have written several blog posts explaining the internal behavior of the ESB and the threads created inside ESB. With this post, I am talking about the effect of threads in the WSO2 ESB and how to tune up threads for optimal performance. You can refer [1] and [2] to understand the threads created within the ESB. [1] http://soatutorials.blogspot.com/2015/05/understanding-threads-created-in-wso2.html [2] http://wso2.com/library/articles/2012/03/importance-performance-wso2-esb-handles-nonobvious/ Within this blog post, I am discussing about the "worker threads" which are used for processing the data within the WSO2 ESB. There are 2 types of worker threads created when you start sending the requests to the server 1) Server Worker/Client Worker Threads 2) Mediator Worker (Synapse-Worker) Threads Server Worker/Client Worker Threads These set of threads will be used to process all the requests/responses coming to the ESB server. ServerWorker Threads will be used to pr

How puppet works in your IT infrstructure

What is Puppet? Puppet is IT automation software that helps system administrators manage infrastructure throughout its lifecycle, from provisioning and configuration to orchestration and reporting. Using Puppet, you can easily automate repetitive tasks, quickly deploy critical applications, and proactively manage change, scaling from 10s of servers to 1000s, on-premise or in the cloud. How the puppet works? It works like this..Puppet agent is a daemon that runs on all the client servers(the servers where you require some configuration, or the servers which are going to be managed using puppet.) All the clients which are to be managed will have puppet agent installed on them, and are called nodes in puppet. Puppet Master: This machine contains all the configuration for different hosts. Puppet master will run as a daemon on this master server. Puppet Agent: This is the daemon that will run on all the servers, which are to be managed using p

How to configure timeouts in WSO2 ESB to get rid of client timeout errors

WSO2 ESB has defined some configuration parameters which controls the timeout of a particular request which is going out of ESB. In a particular  scneario, your client sends a request to ESB, and then ESB sends a request to another endpoint to serve the request. CLIENT->WSO2 ESB->BACKEND The reason for clients getting timeout is that ESB timeout is larger than client's timeout. This can be solved by either increasing the timeout at client side or by decreasing the timeout in ESB side. In any of the case, you can control the timeout in ESB using the below properties. 1) Global timeout defined in synapse.properties (ESB_HOME\repository\conf\) file. This will decide the maximum time that a callback is waiting in the ESB for a response for a particular request. If ESB does not get any response from Back End, it will drop the message and clears out the call back. This is a global level parameter which affects all the endpoints configured in ESB. synapse.global_timeout_inte