Sending Messages In The Message Pump
This post explains how to send and receive messages from a System Message Pump. There is a previous post showing how to create a System Message Pump that you can access here. Please look over that post first as this one uses code from and references that post.
After creating a Message Pump program from the previous System Message Pump sample, the next step is to create and receive messages between two different programs. To start, we can create a new program that will interact with the previous System Message Pump program. We’ll call this program our PostMessage program. This sample program only has two messages it can send, but could be edited to add more. These messages will also only pulse output relays but can be edited to do a lot of other actions as well.
import com.integpg.system.JANOS;
import com.integpg.system.SystemMsg;
import com.integpg.system.MessagePump;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Json;
public class CommunicationPost2 {
//This funciton decides from the user input it gets whether or not the program should post another message or close the program.
//The only message you can create is message 1300
public static int getresponse(MessagePump closesPump) throws IOException {
String type = "";
BufferedReader reading = new BufferedReader(new InputStreamReader(System.in));
int testtype = 0;
boolean running = true;
boolean typecheck = false;
while (running == true) {
System.out.println("\nPlease enter a System Pump command. Type 'create' to make a command. Type 'finished' when done.\n");
String choice = reading.readLine();
System.out.println("|" + choice + "|");
switch (choice) {
case "finished":
System.out.println("Finishing program...\n");
System.out.println();
closesPump.close();
System.out.println();
System.exit(0);
case "create":
System.out.println();
System.out.println("\nPlease enter the type number\n");
System.out.println();
while (typecheck == false) {
type = reading.readLine();
try{
testtype = Integer.parseInt(type);
typecheck = true;
}
catch (NumberFormatException e) {
System.out.println("\nIncorrect input. Try again\n");
}
}
return testtype;
default:
System.out.println("\nBad input. Try again.\n");
}
}
return testtype;
}
//adds a value to the Json object that we be sent as part of the message to the other program
public static Json addToJson(Json gettingJson, String value, Object jsonvalue) {
gettingJson.put(value, jsonvalue);
return gettingJson;
}
//creates the message that will be sent to the other program to complete
public static SystemMsg createMessage(int messagetype, byte[] messageinfo) {
SystemMsg newMsg = new SystemMsg();
newMsg.type = messagetype;
newMsg.msg = messageinfo;
System.out.println(newMsg);
return newMsg;
}
//completes the message 1301 if it was sent from the other program
public static void completeMessage1301(String recievedJsonInfo) {
Json holdinginfo = new Json(recievedJsonInfo);
String[] arrayofkeys = holdinginfo.keyarray();
int togglecheck = holdinginfo.getInt(arrayofkeys[0]);
int channelscheck = holdinginfo.getInt(arrayofkeys[1]);
int durationcheck = holdinginfo.getInt(arrayofkeys[2]);
System.out.println(arrayofkeys[0] + ": " + togglecheck + ", " + arrayofkeys[1] + ": " + channelscheck + ", " + arrayofkeys[2] + ": " + durationcheck);
try {
JANOS.setOutputPulsed(togglecheck, channelscheck, durationcheck);
} catch (IOException ex) {
ex.printStackTrace();
}
}
public static Json getInfoToJson() throws IOException {
Json newJson = new Json();
String tempduration = "";
String tempchannels = "";
String temp_O_or_C = "";
boolean successcheck1 = false;
boolean successcheck2 = false;
boolean successcheck3 = false;
BufferedReader reading = new BufferedReader(new InputStreamReader(System.in));
System.out.println("\nThis is for toggling pulse outputs.\n");
System.out.println("\nPlease enter if which channels you are pulsing: ' 1 - 255'\n");
while (successcheck1 == false) {
try {
temp_O_or_C = reading.readLine();
successcheck1 = true;
} catch (IOException | NumberFormatException e) {
System.out.println("\nIncorrect format, try again.\n");
}
if (successcheck1 == true) {
int O_or_C = Integer.parseInt(temp_O_or_C);
newJson = addToJson(newJson, "Pulse", O_or_C);
}
}
System.out.println("\nPlease enter what channels: '1 - 255'\n");
while (successcheck2 == false) {
try {
tempchannels = reading.readLine();
successcheck2 = true;
} catch (IOException | NumberFormatException e) {
System.out.println("\nIncorrect format, try again.\n");
}
if (successcheck2 == true) {
int channels = Integer.parseInt(tempchannels);
newJson = addToJson(newJson, "Channels", channels);
}
}
System.out.println("\nPlease enter duration in milliseconds:\n");
while (successcheck3 == false) {
try {
tempduration = reading.readLine();
successcheck3 = true;
} catch (IOException | NumberFormatException e) {
System.out.println("\nIncorrect format, try again.\n");
}
if (successcheck3 == true) {
int duration = Integer.parseInt(tempduration);
newJson = addToJson(newJson, "Duration", duration);
}
}
return newJson;
}
public static void main(String[] args) throws IOException, InterruptedException {
MessagePump MESSAGE_PUMP = new MessagePump();
MESSAGE_PUMP.open();
while (true) {
boolean checkmsg = false;
int fixedtype = getresponse(MESSAGE_PUMP);
if (fixedtype == 1300) {
Json firstJson = getInfoToJson();
SystemMsg systemMsg = createMessage(fixedtype, firstJson.toString().getBytes());
MESSAGE_PUMP.postMessage(systemMsg);
System.out.println();
}
while (checkmsg == false) {
SystemMsg msgRecieved = MESSAGE_PUMP.getMessage(1301);
if (msgRecieved.type == 1301) {
System.out.println("Message 1301 recieved, pulsing output 1.");
String jsoninfo2 = new String(msgRecieved.msg);
completeMessage1301(jsoninfo2);
checkmsg = true;
}
}
}
}
}
This PostMessage program will create a message that, after getting inputs from the user, will be sent to the previously created System Message Pump program. The program starts by asking the user if they wish to create a message or close the program, then calling the “getresponse” function to initiate whichever the user chooses.
public static int getresponse(MessagePump closesPump) throws IOException {
String type = "";
BufferedReader reading = new BufferedReader(new InputStreamReader(System.in));
int testtype = 0;
boolean running = true;
boolean typecheck = false;
while (running == true) {
System.out.println("\nPlease enter a System Pump command. Type 'create' to make a command. Type 'finished' when done.\n");
String choice = reading.readLine();
System.out.println("|" + choice + "|");
switch (choice) {
case "finished":
System.out.println("Finishing program...\n");
System.out.println();
closesPump.close();
System.out.println();
System.exit(0);
case "create":
System.out.println();
System.out.println("\nPlease enter the type number\n");
System.out.println();
while (typecheck == false) {
type = reading.readLine();
try{
testtype = Integer.parseInt(type);
typecheck = true;
}
catch (NumberFormatException e) {
System.out.println("\nIncorrect input. Try again\n");
}
}
return testtype;
default:
System.out.println("\nBad input. Try again.\n");
}
}
return testtype;
}
If the user decides to create a message, it will call a function called “getInfoToJson”. This will gather information to form a Json object. The first two values are from 1 – 255 to represent the 8 outputs in bit values, and the last value is time in milliseconds. The “getInfoToJson” function will call the “addToJson” function to create the Json object “getInfoToJson” will return.
public static Json getInfoToJson() throws IOException {
Json newJson = new Json();
String tempduration = "";
String tempchannels = "";
String temp_O_or_C = "";
boolean successcheck1 = false;
boolean successcheck2 = false;
boolean successcheck3 = false;
BufferedReader reading = new BufferedReader(new InputStreamReader(System.in));
System.out.println("\nThis is for toggling pulse outputs.\n");
System.out.println("\nPlease enter if which channels you are pulsing: ' 1 - 255'\n");
while (successcheck1 == false) {
try {
temp_O_or_C = reading.readLine();
successcheck1 = true;
} catch (IOException | NumberFormatException e) {
System.out.println("\nIncorrect format, try again.\n");
}
if (successcheck1 == true) {
int O_or_C = Integer.parseInt(temp_O_or_C);
newJson = addToJson(newJson, "Pulse", O_or_C);
}
}
System.out.println("\nPlease enter what channels: '1 - 255'\n");
while (successcheck2 == false) {
try {
tempchannels = reading.readLine();
successcheck2 = true;
} catch (IOException | NumberFormatException e) {
System.out.println("\nIncorrect format, try again.\n");
}
if (successcheck2 == true) {
int channels = Integer.parseInt(tempchannels);
newJson = addToJson(newJson, "Channels", channels);
}
}
System.out.println("\nPlease enter duration in milliseconds:\n");
while (successcheck3 == false) {
try {
tempduration = reading.readLine();
successcheck3 = true;
} catch (IOException | NumberFormatException e) {
System.out.println("\nIncorrect format, try again.\n");
}
if (successcheck3 == true) {
int duration = Integer.parseInt(tempduration);
newJson = addToJson(newJson, "Duration", duration);
}
}
return newJson;
}
Then it will call the “createMessage” function to get the ID and information of the message that we are going to send.
public static SystemMsg createMessage(int messagetype, byte[] messageinfo) {
SystemMsg newMsg = new SystemMsg();
newMsg.type = messagetype;
newMsg.msg = messageinfo;
System.out.println(newMsg);
return newMsg;
}
After this message is sent, it will be received from an edited version of the System Message Pump program and then send a certain message. The code for the edited Engine Pump file of the System Message Pump program is below.
import com.integpg.system.JANOS;
import com.integpg.system.MessagePump;
import com.integpg.system.SystemMsg;
import java.io.IOException;
import java.util.Json;
import java.util.Vector;
public class MessagePumpEngine implements Runnable {
private static final MessagePumpEngine THIS = new MessagePumpEngine();
private static final MessagePump MESSAGE_PUMP = new MessagePump();
private static final Vector LISTENERS = new Vector<>();
private static Thread _thread;
/**
* Singleton constructor
*/
private MessagePumpEngine() {
}
/**
* adds a listener that will be alerted of all received messages. The
* listener will be responsible for determining if the message has meaning
* to them
*
* @param listener
*/
public static void addListener(MessagePumpListener listener) {
synchronized (LISTENERS) {
LISTENERS.addElement(listener);
}
}
/**
* starts our message pump engine.
*/
static void start() {
if (null == _thread) {
_thread = new Thread(THIS);
_thread.setName("message-pump-engine");
_thread.setDaemon(true);
_thread.start();
}
}
//completes the message 1300 after being sent from the other application
public static void completeMessage1300 (String recievedJsonInfo) {
Json holdinginfo = new Json(recievedJsonInfo);
String[] arrayofkeys = holdinginfo.keyarray();
int togglecheck = holdinginfo.getInt(arrayofkeys[0]);
int channelscheck = holdinginfo.getInt(arrayofkeys[1]);
int durationcheck = holdinginfo.getInt(arrayofkeys[2]);
System.out.println(arrayofkeys[0] + ": " + togglecheck + ", " + arrayofkeys[1] + ": " + channelscheck + ", " + arrayofkeys[2] + ": " + durationcheck);
try {
JANOS.setOutputPulsed(togglecheck, channelscheck, durationcheck);
}
catch (IOException ex) {
ex.printStackTrace();
}
}
//creates the message that will be sent to the other program to complete
public static SystemMsg createMessage (int messageType, byte [] messageInfo) {
SystemMsg returnMsg = new SystemMsg();
returnMsg.type = messageType;
returnMsg.msg = messageInfo;
System.out.println(returnMsg);
return returnMsg;
}
//adds a value to the Json object that we be sent as part of the message to the other program
public static Json addToJson(Json Jsonadd, String valuename, Object jsonvalue) {
Jsonadd.put(valuename, jsonvalue);
return Jsonadd;
}
@Override
public void run() {
System.out.println("open the message pump and start monitoring.\r\n");
MESSAGE_PUMP.open();
for (;;) {
// read all messages from the message pump
SystemMsg systemMsg = MESSAGE_PUMP.getMessage();
// we must repost as fast as we can
MESSAGE_PUMP.postMessage(systemMsg);
// notify all of our listeners
if (systemMsg.type == 1300) {
System.out.println();
System.out.println("Recieved command 1300, pulsing output. Sending another command.\n");
String jsoninfo = new String(systemMsg.msg);
completeMessage1300(jsoninfo);
Json thirdjson = new Json();
addToJson(thirdjson, "setclose", 1);
addToJson(thirdjson, "settingchannel", 1);
addToJson(thirdjson, "timeset", 5000);
SystemMsg returnMsg = createMessage(1301, thirdjson.toString().getBytes());
MESSAGE_PUMP.postMessage(returnMsg);
}
}
}
/**
* Exposes the postMesssage method of the System MessagePump
*
* @param msg the message the will be sent to the system message pump
*/
public static void postMessage(SystemMsg msg) {
MESSAGE_PUMP.postMessage(msg);
}
}
When this file of code replaces the engine pump file in the System Message Pump program, there is a loop that will be listening for the message sent from our previous program with the message type of 1300. After it receives that message, it will create a string from all of the message’s info, and then call the “completeMessage1301” function to pulse the correct outputs. Once the program does that, it will then use the “createMessage” function to create another message for the PostMessage program to read. There is another loop for the listeners to read that message when its sent as well.
public void run() {
System.out.println("open the message pump and start monitoring.\r\n");
MESSAGE_PUMP.open();
for (;;) {
// read all messages from the message pump
SystemMsg systemMsg = MESSAGE_PUMP.getMessage();
// we must repost as fast as we can
MESSAGE_PUMP.postMessage(systemMsg);
if (systemMsg.type == 1300) {
System.out.println();
System.out.println("Recieved command 1300, pulsing output. Sending another command.\n");
String jsoninfo = new String(systemMsg.msg);
completeMessage1300(jsoninfo);
Json thirdjson = new Json();
addToJson(thirdjson, "setclose", 1);
addToJson(thirdjson, "settingchannel", 1);
addToJson(thirdjson, "timeset", 5000);
SystemMsg returnMsg = createMessage(1301, thirdjson.toString().getBytes());
MESSAGE_PUMP.postMessage(returnMsg);
}
}
}
}
}
This will have us go back to the PostMessage program to receive the message the Engine Pump file sent back. The PostMessage program calls the completeMessage1301 function to grab the values from the message and pulse the correct outputs. After that, the program will loop back to the beginning and re-prompt the user if they want to create or finish the program, and repeat this process until the user decides to finish the program.
while (checkmsg == false) {
SystemMsg msgRecieved = MESSAGE_PUMP.getMessage(1301);
if (msgRecieved.type == 1301) {
System.out.println("Message 1301 recieved, pulsing output 1.");
String jsoninfo2 = new String(msgRecieved.msg);
completeMessage1301(jsoninfo2);
checkmsg = true;
}
}