Tag Archive: message pump

We can take the Message Pump sample one step further. We can incorporate real-time web page communication. The web server supports WebSockets. WebSockets allow for real-time bidirectional communication. We can leverage this feature and have these messages put on the Message Pump so that our Java application can get them.

Our web page for the example looks like this.

In the code we have a button handler that will send the json formatted message to the web server.

            // register an onclick method so that we can post a message to get a random number from our 
            // sample java application.
            document.getElementById('random-number-button').onclick = function (e) {
                var getUpdates = {Message: "Post Message", Number: 2000, Content: "get-random-number"};
                _comm.sendJson(getUpdates);
            };

 

A full listing of the web page is as follows

<!DOCTYPE html>
<html lang="en">

    <head>
        <base href="/messagepumpsample/">

        <title>Message Pump Web Interface</title>

        <script src="comm.js" type="text/javascript"></script>
        <script src="md5.js" type="text/javascript"></script>

        <style>
            body { padding: 5px; }
        </style>

    </head>


    <body>

        <div>

            <h1>Message Pump Web Interface</h1>

            <button id="random-number-button">Get Random Number</button>
            <p style="width: 600px;">
                Clicking the button above will cause a message to get sent to the Web Server.  The Web Server 
                then places it in the System Message Pump.  Our sample Java application will then receive it and 
                for the sake of the sample will respond with a random number.  By placing the random number in a 
                message and back on to the System Message Pump with the same message type that it received, the 
                Operating System knows to send it back to the Web Server and in turn back to the sending Web 
                Socket connection.
            </p>

        </div>


        <script type="text/javascript">
            // our websocket communication object
            var _comm;
            var _loggedIn = false;

            // method called when the connection is successfully opened
            var onopen = function () {
                console.log("comm opened");
            };
            
            // method called when our websocket object receives a message from the web server
            var onmessage = function (evt) {
                var json = JSON.parse(evt.data);
                console.log(json);

                if (json.Message === "Monitor") {
                    if (!_loggedIn) {
                    }
                    _loggedIn = true;
                }

                //
                else if (json.Message === "Reply Message") {
                    if (2000 === json.Number) {
                        alert("Your Random Number is " + json.Content);
                    }
                }
            };
            
            // instantiate our websocket communications object and wire up some methods.  lastly call connect!
            _comm = new Comm();
            _comm.onopen = onopen;
            _comm.onmessage = onmessage;
            _comm.connect();


            
            // register an onclick method so that we can post a message to get a random number from our 
            // sample java application.
            document.getElementById('random-number-button').onclick = function (e) {
                var getUpdates = {Message: "Post Message", Number: 2000, Content: "get-random-number"};
                _comm.sendJson(getUpdates);
            };
        </script>
    </body>

</html>

To accomplish this we created a class to handle the get-random-number request. We could have handled this in our Main class but hey I like modularity.

package com.integpg.messagepumpsample;

import com.integpg.system.SystemMsg;
import java.util.Random;

public class RandomNumberMessageHandler implements MessagePumpListener {

    @Override
    public void messageReceived(SystemMsg systemMsg) {
        // this sample uses message type 2000
        if (2000 == systemMsg.type) {
            String content = new String(systemMsg.msg);
            System.out.println("content: " + content);
            
            Random rand = new Random();
            int randomNumber = rand.nextInt();
            
            SystemMsg responseMsg = new SystemMsg();
            responseMsg.type = 2000;
            responseMsg.msg = String.valueOf(randomNumber).getBytes();
            
            MessagePumpEngine.postMessage(responseMsg);
        }
    }

}

We also had to modify our init() method to attach our MessagePumpHandler.

package com.integpg.messagepumpsample;

import com.integpg.system.SystemMsg;
import java.util.Random;

public class RandomNumberMessageHandler implements MessagePumpListener {

    @Override
    public void messageReceived(SystemMsg systemMsg) {
        // this sample uses message type 2000
        if (2000 == systemMsg.type) {
            String content = new String(systemMsg.msg);
            System.out.println("content: " + content);
            
            // get a random number
            Random rand = new Random();
            int randomNumber = rand.nextInt();
            
            // build our response
            SystemMsg responseMsg = new SystemMsg();
            responseMsg.type = 2000;
            responseMsg.msg = String.valueOf(randomNumber).getBytes();
            MessagePumpEngine.postMessage(responseMsg);
        }
    }

}

Almost immediately upon clicking the button we get our response.

As you can see here the web page gives you an amazing ability to communicate with a Java application in real-time.

There are many times that you want to get a message from the system. You may also want to communicate between processes. To do either of these we employ the Message Pump.

In this example  we will monitor the messages that are going around the message pump inside the JNIOR.

We first need a class that will define all the message pump types

package com.integpg.messagepumpsample;

/**
 * A class that contains an enumeration of values for each message type.
 */
public class SystemMessageTypes {

    public static final int SM_SHUTDOWN = 0x01;
    public static final int SM_PROBE = 0x02;
    public static final int SM_GCRUN = 0x10;
    public static final int SM_WATCHDOG = 0x11;
    public static final int SM_SYSLOG = 0x12;
    public static final int SM_REGUPDATE = 0x40;
    public static final int SM_WEBSTARTUP = 0x60;
    public static final int SM_WEBSHUTDOWN = 0x61;
    public static final int SM_PROTCMDMSG = 0x70;
    public static final int SM_PROTCMDRESP = 0x71;
    public static final int SM_PIPEOPEN = 0x80;
    public static final int SM_PIPECLOSE = 0x81;
    public static final int SM_USER = 0x400;



    /**
     * @return a String definition based on the type parameter
     */
    public static String getMessageNameByType(int type) {
        switch (type) {
            case SM_SHUTDOWN:
                return "Shutdown";
            case SM_PROBE:
                return "Probe";
            case SM_GCRUN:
                return "GC Run";
            case SM_WATCHDOG:
                return "Watchdog";
            case SM_SYSLOG:
                return "Syslog";
            case SM_REGUPDATE:
                return "Registry Update";
            case SM_WEBSTARTUP:
                return "Web Server Startup";
            case SM_WEBSHUTDOWN:
                return "Web Server Shutdown";
            case SM_PROTCMDMSG:
                return "Protocol Command Message";
            case SM_PROTCMDRESP:
                return "Protocol Command Reesponse";
            case SM_PIPEOPEN:
                return "Pipe Opened";
            case SM_PIPECLOSE:
                return "Pipe Closed";
            default:
                if (type >= SM_USER) return "User Defined Type " + String.valueOf(type);
                else return "Unknown Type " + String.valueOf(type);
        }
    }
}

Not only does the above class define the types but it gives us a nice helper function to get a nice textual description for the message type.

We will create a class that will monitor the message pump. The biggest note to observe is that we MUST forward the message once we receive it so that other processes can see the message. The message pump message delivery is also time sensitive. If we receive the message and perform a lengthy process before re-posting it the we could cause the system to assert as the message pump timing has been violated.

package com.integpg.messagepumpsample;

import com.integpg.system.MessagePump;
import com.integpg.system.SystemMsg;
import java.util.Vector;

public class MessagePumpEngine implements Runnable {

    private static final MessagePumpEngine THIS = new MessagePumpEngine();
    private static final MessagePump MESSAGE_PUMP = new MessagePump();
    private static final Vector LISTENERS = new Vector<>();

    private static Thread _thread;



    /**
     * Singleton constructor
     */
    private MessagePumpEngine() {
    }



    /**
     * adds a listener that will be alerted of all received messages. The listener will be responsible for determining
     * if the message has meaning to them
     */
    public static void addListener(MessagePumpListener listener) {
        synchronized (LISTENERS) {
            LISTENERS.addElement(listener);
        }
    }



    /**
     * starts our message pump engine.
     */
    static void start() {
        if (null == _thread) {
            _thread = new Thread(THIS);
            _thread.setName("message-pump-engine");
            _thread.setDaemon(true);
            _thread.start();
        }
    }



    @Override
    public void run() {
        System.out.println("open the message pump and start monitoring.\r\n");
        MESSAGE_PUMP.open();

        for (;;) {
            // read all messages from the message pump
            SystemMsg systemMsg = MESSAGE_PUMP.getMessage();

            // we must repost as fast as we can
            MESSAGE_PUMP.postMessage(systemMsg);

            // notify all of our listeners
            synchronized (LISTENERS) {
                for (int i = 0; i < LISTENERS.size(); i++) {
                    MessagePumpListener listener = LISTENERS.elementAt(i);
                    listener.messageReceived(systemMsg);
                }
            }

        }
    }



    /**
     * Exposes the postMesssage method of the System MessagePump
     *
     * @param msg the message the will be sent to the system message pump
     */
    public static void postMessage(SystemMsg msg) {
        MESSAGE_PUMP.postMessage(msg);
    }
}

When the above class receives a message from the message pump it will want to alert any listeners.  Lets define the listener interface now.

package com.integpg.messagepumpsample;

import com.integpg.system.SystemMsg;

/**
 * An interface that outlines the method or methods that all classes that implement this interface must define.
 */
public interface MessagePumpListener {

    public void messageReceived(SystemMsg systemMsg);
}

Now that we have laid the ground work for the application lets glue the peices together.  Here is our Main class

package com.integpg.messagepumpsample;

/**
 * HexUtils will give us access to the hex dump so that we can show a meaningful representation of the data in each
 * message
 */
import com.integpg.janoslib.utils.HexUtils;
/**
 * System classes pertaining to the message pump itself and the class that will be returned from the getMessage call
 */
import com.integpg.system.SystemMsg;
/**
 * A class that will format a date string quickly
 */
import com.integpg.janoslib.text.QuickDateFormat;

/**
 * A class to demonstrate the Message Pump feature in the JNIOR.
 */
public class MessagePumpSampleMain implements MessagePumpListener {

    /**
     * the QuickDateFormat() class is part of a library of classes and are beyond the scope of this example.
     */
    private final static QuickDateFormat QUICK_DATE_FORMAT = new QuickDateFormat("MM-dd-yy HH:mm:ss.fff");



    public static void main(String[] args) throws InterruptedException {
        // instantiate our main class
        MessagePumpSampleMain msgPumpSample = new MessagePumpSampleMain();
        msgPumpSample.init();

        // the main method has nothing left to do.  let it sleep forever.  Most if not all of our other threads 
        // are daemon threads that would not keep the process from sxiting if this main thread completes.  
        // Sometimes we might do somehting in this thread but for the sake of this example we have nothing to do.
        Thread.sleep(Integer.MAX_VALUE);
    }



    public void init() {
        // the message pump engine is a singleton.  We do not have to instatiate another instance.  We can just 
        // use static methods to interface with it.  
        //
        // first we need to add in any listeners that should be alerted when a new message is received.  It 
        // will be up to those llisteners to determine if it cares about the message.
        MessagePumpEngine.addListener(this);
        MessagePumpEngine.addListener(new RegistryMessageHandler());

        // we are ready.  start listening
        MessagePumpEngine.start();
    }



    @Override
    public void messageReceived(SystemMsg systemMsg) {
        // the QUICK_DATE_FORMAT.format() call is part of a library of classes and are beyond the scope of this 
        // example.
        String dateString = QUICK_DATE_FORMAT.format(System.currentTimeMillis());
        // so we have a message.  what kind is it?
        String messageName = SystemMessageTypes.getMessageNameByType(systemMsg.type);
        // print out what we know about this message
        System.out.println(dateString + " - Process Msg: " + messageName);

        // if there is data in the message then we should dump it so that we can see what it is.
        if (0 < systemMsg.msg.length) {
            // the HexUtils.hexDump() call is part of a library of classes and are beyond the scope of this 
            // example.
            String hexDumpString = HexUtils.hexDump(systemMsg.msg);
            System.out.println(hexDumpString);
        }
    }

}

In the above lines 48 and 49 we add our listener classes.  Line 48 is the this object representing the Main class.  You can see what happens in the messageReceived() method starting at line 58.  The other listener is the RegistryMessageHandler class.  Here is the definition for that class.

package com.integpg.messagepumpsample;

import com.integpg.system.JANOS;
import com.integpg.system.SystemMsg;

public class RegistryMessageHandler implements MessagePumpListener {

    @Override
    public void messageReceived(SystemMsg systemMsg) {
        // we only care about registry key updates
        if (SystemMessageTypes.SM_REGUPDATE == systemMsg.type) {
            // get the key name from the message data
            String keyName = new String(systemMsg.msg);
            // now get teh new value
            String keyValue = JANOS.getRegistryString(keyName, "");
            System.out.println("Registry Update: " + keyName + " = " + keyValue);
        }
    }

}

Each time a message is received from the message pump each of our listeners is notified.  Each listener can check the message type to see if it cares about the message.  A sample output from the application shows three interesting events.  First, I opened the web page for the unit.  The web server is NOT always running unless the web server port is accessed.  Once the web server port was hit it caused the web server to spin up and send the Web Server Startup message.  The next two events were registry updates.  The Main class listener shows us that there was a Registry Update along with the HEX dump of the message payload.  Then the RegistryMessageHandler listener used the registry key that was updated to get the new value.  You can download the sample application JAR file below.

msgpump-monitor.jar - 23 KB - MD5: 00eb2d3b97bbc67c5d1d13a9901e9fbe