/*
 * Decompiled with CFR 0.152.
 */
package com.integ.mqtt.protocol;

import com.integ.common.logging.AppLog;
import com.integ.common.logging.Logger;
import com.integ.common.logging.RollingFileLog;
import com.integ.common.system.Application;
import com.integ.mqtt.protocol.ConnAckPacket;
import com.integ.mqtt.protocol.ConnectOptions;
import com.integ.mqtt.protocol.ConnectPacket;
import com.integ.mqtt.protocol.ConnectionAcknowledgedEventObject;
import com.integ.mqtt.protocol.ControlPacketTypes;
import com.integ.mqtt.protocol.MqttClientListener;
import com.integ.mqtt.protocol.MqttIncommingPacket;
import com.integ.mqtt.protocol.MqttOutgoingPacket;
import com.integ.mqtt.protocol.PingPacket;
import com.integ.mqtt.protocol.PublishPacket;
import com.integ.mqtt.protocol.SubscribeOptions;
import com.integ.mqtt.protocol.SubscribePacket;
import com.integ.mqtt.protocol.SubscriptionListener;
import com.integ.mqtt.protocol.UnhandledSubscriptionListener;
import com.integ.mqtt.protocol.UnsubscribeOptions;
import com.integ.mqtt.protocol.UnsubscribePacket;
import com.integpg.system.ArrayUtils;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.text.QuickDateFormat;
import java.util.ArrayList;
import java.util.EventObject;
import java.util.Hashtable;
import java.util.Iterator;

public class MqttClient
implements Runnable {
    private static final Logger MQTT_LOG = RollingFileLog.getLogger("/temp/" + Application.getAppName() + "_protocol.log");
    private static final Logger MQTT_ERROR_LOG = RollingFileLog.getLogger("/" + Application.getAppName() + "_protocol_error.log");
    private static QuickDateFormat QUICK_DATE_FORMAT = new QuickDateFormat("MM-dd-yyyy HH:mm:ss");
    private ConnAckPacket _connAckPacket = null;
    private boolean _connecting = false;
    private STATES _state = STATES.DISCONNECTED;
    private Socket _client = null;
    private String _host = "";
    private int _port = 1883;
    private String _connectionInfoString;
    private boolean _secure = false;
    private final Object _connectLock = new Object();
    private final ArrayList<MqttClientListener> _connecitonListeners = new ArrayList();
    private ConnectOptions _connectOptions;
    private boolean _gracefulClose;
    private DataInputStream _dataIn = null;
    private final ByteArrayOutputStream _baosOut = new ByteArrayOutputStream();
    private final DataOutputStream _dataOut = new DataOutputStream(this._baosOut);
    private Thread _listenerThread = null;
    private long pubStart = System.currentTimeMillis();
    private long _nextPingTime = System.currentTimeMillis();
    private final ArrayList<SubscriptionListener> _subscriptionListeners = new ArrayList();
    private UnhandledSubscriptionListener _unhandledSubscriptionListener;
    private final Hashtable<String, Byte[]> _publishedTopics = new Hashtable();

    public MqttClient setHost(String host) {
        System.out.println("set host: " + host);
        this._host = host;
        return this;
    }

    public String getHost() {
        return this._host;
    }

    public MqttClient setPort(int port) {
        System.out.println("set port: " + port);
        this._port = port;
        return this;
    }

    public MqttClient setSecure(boolean secure) {
        this._secure = secure;
        return this;
    }

    public void addConnectionListener(MqttClientListener listener) {
        this._connecitonListeners.add(listener);
    }

    public void addSubscriptionListener(SubscriptionListener listener) {
        this._subscriptionListeners.add(listener);
    }

    public void setUnhandledSubscriptionListener(UnhandledSubscriptionListener unhandledSubscriptionListener) {
        this._unhandledSubscriptionListener = unhandledSubscriptionListener;
    }

    public String getConnectionInfo() {
        String connectionString = this._connectionInfoString;
        if (null != this._connAckPacket) {
            connectionString = connectionString + this._connAckPacket.getReturnStatus();
        }
        return this._connectionInfoString;
    }

    public boolean getClosedGracefully() {
        return this._gracefulClose;
    }

    @Override
    public void run() {
        try {
            try {
                this._gracefulClose = false;
                while (true) {
                    this.readPacket();
                    if (this._nextPingTime >= System.currentTimeMillis()) continue;
                    this.sendPing();
                }
            }
            catch (Exception ex) {
                if (!this._gracefulClose) {
                    ex.printStackTrace();
                    MQTT_ERROR_LOG.error("error in mqtt run", ex);
                    this.close(false);
                    this._state = STATES.CONNECTION_LOST;
                }
                this._listenerThread = null;
            }
        }
        catch (Throwable throwable) {
            this._listenerThread = null;
            throw throwable;
        }
    }

    public void close() {
        this.close(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void close(boolean graceful) {
        AppLog.info("Closing graceful: " + graceful + " " + Thread.currentThread().getName());
        this._gracefulClose = graceful;
        if (null != this._client) {
            try {
                if (graceful) {
                    AppLog.warn("Closing gracefully");
                } else {
                    AppLog.warn("Closing due to an ERROR");
                }
                this._client.close();
                this._client = null;
                System.out.println("_listenerThread: " + this._listenerThread);
                System.out.println("Thread.currentThread(): " + Thread.currentThread());
                if (null != this._listenerThread && Thread.currentThread() != this._listenerThread) {
                    this._listenerThread.interrupt();
                    this._listenerThread = null;
                }
                for (MqttClientListener connectionListener : this._connecitonListeners) {
                    connectionListener.connectionClosed(new EventObject(this));
                }
            }
            catch (Exception ex) {
                MQTT_LOG.error(ex);
            }
            finally {
                this._state = graceful ? STATES.DISCONNECTED : STATES.CONNECTION_LOST;
            }
        }
    }

    public boolean isConnectionAccepted() {
        return null != this._client && this._state == STATES.CONNECTION_ACCEPTED;
    }

    public boolean isConnected() {
        return null != this._client && (this._state == STATES.CONNECTED || this._state == STATES.CONNECTION_ACCEPTED);
    }

    public boolean isConnecting() {
        return this._connecting;
    }

    public void setConnectOptions(ConnectOptions connectOptions) {
        this._connectOptions = connectOptions;
    }

    public synchronized void connect() {
        if (!this.isConnected() && !this.isConnecting()) {
            this._connecting = true;
            AppLog.info("Connect " + Thread.currentThread().getName());
            this._connAckPacket = null;
            try {
                for (MqttClientListener connectionListener : this._connecitonListeners) {
                    connectionListener.connectionAttempt(new EventObject(this));
                }
                if (null != this._host && !"".equalsIgnoreCase(this._host)) {
                    MQTT_LOG.info(String.format("connect to %s:%d, secure: %s", this._host, this._port, String.valueOf(this._secure)));
                    this._client = new Socket(this._host, this._port);
                    if (this._secure) {
                        this._client.setSecure(true);
                    }
                    this._client.setSoTimeout(60000);
                    this._state = STATES.CONNECTED;
                    this._connectionInfoString = this._client.getInetAddress().toString() + ":" + this._client.getPort() + " using local port: " + this._client.getLocalPort();
                    this._gracefulClose = false;
                    MQTT_LOG.info(String.format("connected to %s", this._connectionInfoString));
                    this._dataIn = new DataInputStream(this._client.getInputStream());
                    if (null == this._listenerThread) {
                        this._listenerThread = new Thread(this);
                        this._listenerThread.setName(this.getClass().getName() + ":listener");
                        this._listenerThread.start();
                    }
                    this.sendConnectPacket();
                    for (MqttClientListener connectionListener : this._connecitonListeners) {
                        connectionListener.connectionAknowledged(new ConnectionAcknowledgedEventObject(this).setConnAckPacket(this._connAckPacket));
                    }
                }
            }
            catch (Exception ex) {
                ex.printStackTrace();
                MQTT_LOG.error(ex);
                MQTT_ERROR_LOG.error("error in mqtt connect", ex);
                this.close(false);
                this._state = STATES.CONNECT_FAILED;
            }
            this._connecting = false;
        }
    }

    public void readPacket() throws Exception {
        int b0 = 0;
        try {
            b0 = this._dataIn.read();
        }
        catch (SocketTimeoutException ex) {
            return;
        }
        int packetType = b0 >> 4;
        int packetFlags = b0 & 0xF;
        System.out.println(String.format("packetType: %d, - %s, packetFlags: %d", packetType, ControlPacketTypes.getDescription(packetType), packetFlags));
        int remainingLength = this._dataIn.read();
        System.out.println("remainingLength: " + remainingLength);
        byte[] packet = new byte[remainingLength];
        this._dataIn.read(packet);
        ConnAckPacket incommingPacket = null;
        switch (packetType) {
            case 2: {
                incommingPacket = new ConnAckPacket();
                this.readConnAckPacket(packet);
                break;
            }
            case 3: {
                this.readPublishPacket(packetFlags, packet);
                break;
            }
            case 4: {
                this.readPublishAckPacket(packet);
                break;
            }
            case 9: {
                this.readSubAckPacket(packet);
                break;
            }
            case 11: {
                this.readUnsubAckPacket(packet);
                break;
            }
            case 13: {
                this.readPingRespPacket(packet);
                break;
            }
            default: {
                throw new Exception("Packet Type (" + packetType + ") not yet implemented");
            }
        }
        if (null != incommingPacket) {
            try {
                ((MqttIncommingPacket)incommingPacket).read(packet);
            }
            catch (Exception ex) {
                MQTT_ERROR_LOG.error("error reading incomming packet", ex);
            }
        }
    }

    private String readString(byte[] bytes, int offset) throws Exception {
        short length = ArrayUtils.getShort((byte[])bytes, (int)offset);
        return new String(bytes, offset + 2, (int)length);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void send(MqttOutgoingPacket controlPacket) throws Exception {
        block6: {
            try {
                if (!this.isConnected()) break block6;
                byte[] bytes = controlPacket.getPacket();
                if (255 < bytes.length) {
                    throw new Exception("Value to publish is too large");
                }
                ByteArrayOutputStream byteArrayOutputStream = this._baosOut;
                synchronized (byteArrayOutputStream) {
                    this._baosOut.reset();
                    this._dataOut.write((controlPacket.getPacketType() << 4) + controlPacket.getFlags());
                    this._dataOut.write(bytes.length);
                    this._dataOut.write(bytes);
                    byte[] outBytes = this._baosOut.toByteArray();
                    this._client.getOutputStream().write(outBytes);
                    this._client.getOutputStream().flush();
                    MQTT_LOG.info(String.format("sent %s: %s", ControlPacketTypes.getDescription(controlPacket.getPacketType()), Thread.currentThread().getName()));
                }
            }
            catch (Exception ex) {
                MQTT_ERROR_LOG.error("error in mqtt send control packet: " + controlPacket.getClass().getName(), ex);
                this.close(false);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendConnectPacket() throws Exception {
        block9: {
            try {
                if (!this.isConnected()) break block9;
                ConnectPacket connectPacket = new ConnectPacket(this._connectOptions);
                this.send(connectPacket);
                Iterator<MqttClientListener> iterator = this._connectLock;
                synchronized (iterator) {
                    System.out.println("connection lock wait");
                    this._connectLock.wait(30000L);
                    System.out.println("connection lock notified");
                }
                if (STATES.CONNECTION_ACCEPTED == this._state) {
                    for (MqttClientListener connectionListener : this._connecitonListeners) {
                        connectionListener.connectionEstablished(new EventObject(this));
                    }
                } else if (this._state == STATES.CONNECT_FAILED) {
                    this.close();
                }
            }
            catch (Exception ex) {
                MQTT_ERROR_LOG.error("error in mqtt send connect packet", ex);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void readConnAckPacket(byte[] packet) throws Exception {
        ConnAckPacket connAckPacket = new ConnAckPacket();
        connAckPacket.read(packet);
        this._connAckPacket = connAckPacket;
        MQTT_LOG.info(String.format("connectAcknowledgeFlags: %d", connAckPacket.getFlags()));
        MQTT_LOG.info(String.format("returnCode: %d", connAckPacket.getReturnCode().ordinal()));
        MQTT_LOG.info(String.format("returnStatus: %s", connAckPacket.getReturnStatus()));
        switch (connAckPacket.getReturnCode()) {
            case CONNECTION_ACCEPTED: {
                this._state = STATES.CONNECTION_ACCEPTED;
                break;
            }
            default: {
                this._state = STATES.CONNECT_FAILED;
            }
        }
        Object object = this._connectLock;
        synchronized (object) {
            this._connectLock.notify();
        }
    }

    public void subscribe(SubscribeOptions subscribeOptions) throws Exception {
        if (this.isConnectionAccepted()) {
            SubscribePacket subscribePacket = new SubscribePacket(subscribeOptions);
            this.send(subscribePacket);
        } else {
            MQTT_LOG.warn("not connected in subscribe");
        }
    }

    private void readSubAckPacket(byte[] packet) throws Exception {
        short packetIdentifier = ArrayUtils.getShort((byte[])packet, (int)0);
        MQTT_LOG.info(String.format("packetIdentifier: %d", packetIdentifier));
        byte returnCode = packet[2];
        MQTT_LOG.info(String.format("returnCode: %d", returnCode));
        switch (returnCode) {
            case 0: {
                MQTT_LOG.info("Success - Maximum QoS 0");
                break;
            }
            case 1: {
                MQTT_LOG.info("Success - Maximum QoS 1");
                break;
            }
            case 2: {
                MQTT_LOG.info("Success - Maximum QoS 2");
                break;
            }
            case 128: {
                MQTT_LOG.info("Failure");
                break;
            }
            default: {
                MQTT_LOG.info("FUTURE USE");
            }
        }
    }

    public void unsubscribe(UnsubscribeOptions unsubscribeOptions) throws Exception {
        if (this.isConnectionAccepted()) {
            UnsubscribePacket unsubscribePacket = new UnsubscribePacket(unsubscribeOptions);
            this.send(unsubscribePacket);
        } else {
            MQTT_LOG.warn("not connected in unsubscribe");
        }
    }

    private void readUnsubAckPacket(byte[] packet) throws Exception {
        short packetIdentifier = ArrayUtils.getShort((byte[])packet, (int)0);
        MQTT_LOG.info(String.format("packetIdentifier: %d", packetIdentifier));
    }

    public void publish(String topicName, byte[] valueBytes) throws Exception {
        this.publish(topicName, valueBytes, false);
    }

    public void publish(String topicName, byte[] valueBytes, boolean retainFlag) throws Exception {
        this.pubStart = System.currentTimeMillis();
        MQTT_LOG.info(String.format("publish: %s", topicName));
        if (this.isConnectionAccepted()) {
            Byte[] byteObject = new Byte[valueBytes.length];
            int i = 0;
            for (byte b : valueBytes) {
                byteObject[i++] = b;
            }
            this._publishedTopics.put(topicName, byteObject);
            MQTT_LOG.info(String.format("%s added, %d published topics in collection", topicName, this._publishedTopics.size()));
            PublishPacket publishPacket = new PublishPacket(topicName, valueBytes);
            publishPacket.setRetainFlag(retainFlag);
            this.send(publishPacket);
        } else {
            MQTT_LOG.warn("not connected in publish");
        }
    }

    private void readPublishAckPacket(byte[] packet) throws Exception {
        short packetIdentifier = ArrayUtils.getShort((byte[])packet, (int)0);
        MQTT_LOG.info(String.format("packetIdentifier: %d", packetIdentifier));
        long elapsed = System.currentTimeMillis() - this.pubStart;
        MQTT_LOG.info(String.format("took %.3f to publish and ack", (double)elapsed / 1000.0));
    }

    private void readPublishPacket(int flags, byte[] packet) throws Exception {
        System.out.println("read publish packet");
        int dupFlag = flags >> 3 & 1;
        int qosLevel = flags >> 1 & 3;
        int retain = flags & 1;
        MQTT_LOG.info(String.format("dupFlag: %d, qosLevel: %d, retain: ", dupFlag, qosLevel, retain));
        int pos = 0;
        String topicName = this.readString(packet, 0);
        pos += topicName.length() + 2;
        short packetIdentifier = 0;
        if (0 != (flags & 6)) {
            packetIdentifier = ArrayUtils.getShort((byte[])packet, (int)pos);
            MQTT_LOG.info(String.format("packetIdentifier: %d", packetIdentifier));
            pos += 2;
        }
        if (packet.length > pos) {
            byte[] data = new byte[packet.length - pos];
            ArrayUtils.arraycopy((Object)packet, (int)pos, (Object)data, (int)0, (int)(packet.length - pos));
            MQTT_LOG.info(String.format("topicName: %s = %s", topicName, new String(data)));
            boolean contains = this._publishedTopics.containsKey(topicName);
            MQTT_LOG.info(String.format("published topics contains %s = %s", topicName, String.valueOf(contains)));
            if (contains && this._publishedTopics.get(topicName).equals(data)) {
                this._publishedTopics.remove(topicName);
                MQTT_LOG.info(String.format("%s removed, %d published topics in collection", topicName, this._publishedTopics.size()));
            } else {
                boolean handled = false;
                for (SubscriptionListener _subscriptionListener : this._subscriptionListeners) {
                    handled |= _subscriptionListener.subscriptionUpdate(topicName, data);
                }
                if (null != this._unhandledSubscriptionListener && !handled) {
                    this._unhandledSubscriptionListener.unhandledSubscriptionUpdate(topicName, data);
                }
            }
        }
        if (1 == qosLevel) {
            this.sendPubAck(packetIdentifier);
        } else if (2 == qosLevel) {
            this.sendPubRec();
        }
    }

    private void sendPubAck(int packetIdentifier) throws Exception {
        try {
            if (this.isConnectionAccepted()) {
                int SUBSCRIBE_PACKET = 4;
                int packetType = SUBSCRIBE_PACKET << 4;
                this._baosOut.reset();
                this._dataOut.write(packetType);
                this._dataOut.write(2);
                this._dataOut.writeShort(packetIdentifier);
                byte[] outBytes = this._baosOut.toByteArray();
                this._client.getOutputStream().write(outBytes);
                this._client.getOutputStream().flush();
            }
        }
        catch (Exception ex) {
            MQTT_ERROR_LOG.error("error in mqtt send pub ack", ex);
        }
    }

    private void sendPubRec() {
        if (this.isConnectionAccepted()) {
            // empty if block
        }
    }

    public void sendPing() throws Exception {
        PingPacket pingPacket = new PingPacket();
        this.send(pingPacket);
        this._nextPingTime += 1800000L;
        System.out.println(String.format("next ping @ %s", QUICK_DATE_FORMAT.format(this._nextPingTime)));
    }

    private void readPingRespPacket(byte[] packet) {
        System.out.println("ping response");
    }

    protected void finalize() {
        System.out.println(this.getClass().getName() + " finalizer");
        this.close(true);
    }

    private static enum STATES {
        DISCONNECTED,
        CONNECTING,
        CONNECTED,
        CONNECTION_ACCEPTED,
        CONNECT_FAILED,
        CONNECTION_LOST;

    }
}

