package org.dacframe.broker;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.dacframe.Agent;
import org.dacframe.AgentBroker;
import org.dacframe.DACException;
import org.dacframe.cs.CacheServiceHM;
import org.dacframe.worker.WorkerSingleThreaded;

/* loaded from: input_file:org/dacframe/broker/ActiveMQBroker.class */
public class ActiveMQBroker implements AgentBroker {
    private static Logger log = Logger.getLogger(ActiveMQBroker.class);
    private static final String AGENTS_QUEUE = "AB.AGENTS";
    private static final String RESULTS_QUEUE = "AB.RESULTS";
    private static final String RECIPIENT_PROPERTY_NAME = "identString";
    private Connection agentProducerConnection;
    private Connection agentConsumerConnection;
    private Connection resultProducerConnection;
    private Connection resultConsumerConnection;
    private Session agentProducerSession;
    private Session agentConsumerSession;
    private Session resultProducerSession;
    private Session resultConsumerSession;
    private MessageProducer agentProducer;
    private MessageProducer resultProducer;
    private MessageConsumer agentConsumer;
    private MessageConsumer resultConsumer;
    private String brokerURL;

    public ActiveMQBroker(String str) throws DACException {
        try {
            this.brokerURL = str;
            initJMS();
        } catch (JMSException e) {
            throw new DACException((Throwable) e);
        }
    }

    @Override // org.dacframe.AgentReceiver
    public void commit() throws DACException {
        try {
            this.resultProducerSession.commit();
            this.resultConsumerSession.commit();
            this.agentProducerSession.commit();
            this.agentConsumerSession.commit();
        } catch (JMSException e) {
            throw new DACException((Throwable) e);
        }
    }

    @Override // org.dacframe.AgentReceiver
    public Agent receiveAgent() throws DACException {
        try {
            ObjectMessage receive = this.agentConsumer.receive();
            if (receive instanceof ObjectMessage) {
                return (Agent) receive.getObject();
            }
            throw new DACException("Received bad agent message " + receive);
        } catch (JMSException e) {
            throw new DACException((Throwable) e);
        }
    }

    @Override // org.dacframe.AgentReceiver
    public void rollback() throws DACException {
        try {
            this.resultProducerSession.rollback();
            this.resultConsumerSession.rollback();
            this.agentProducerSession.rollback();
            this.agentConsumerSession.rollback();
        } catch (JMSException e) {
            throw new DACException((Throwable) e);
        }
    }

    @Override // org.dacframe.AgentReceiver
    public void startTransaction() {
    }

    @Override // org.dacframe.AgentManager
    public void putResult(String str, Object obj) throws DACException {
        log.debug("Putting result for agent " + str + "...");
        if (!(obj instanceof Serializable)) {
            throw new IllegalArgumentException("Results must be serializable!");
        }
        try {
            ObjectMessage createObjectMessage = this.resultProducerSession.createObjectMessage((Serializable) obj);
            createObjectMessage.setStringProperty(RECIPIENT_PROPERTY_NAME, str);
            if (obj instanceof String) {
                createObjectMessage.setStringProperty("result", (String) obj);
            }
            this.resultProducer.send(createObjectMessage);
            log.debug("Result put");
        } catch (JMSException e) {
            throw new DACException((Throwable) e);
        }
    }

    private List<Object> receiveResultsImpl(String str, int i) throws DACException {
        if (str == null) {
            throw new IllegalArgumentException("recipientIdentString cannot be null");
        }
        log.debug("Receiving agent results for agent " + str + "...");
        ArrayList arrayList = new ArrayList();
        initResultConsumer(str);
        while (true) {
            try {
                ObjectMessage receiveNoWait = this.resultConsumer.receiveNoWait();
                if (receiveNoWait != null) {
                    ObjectMessage objectMessage = receiveNoWait;
                    log.debug("Consumed result message " + receiveNoWait);
                    if (receiveNoWait.propertyExists("result")) {
                        log.info("result: " + receiveNoWait.getStringProperty("result"));
                    }
                    arrayList.add(objectMessage.getObject());
                    if (i > 0 && arrayList.size() == i) {
                        log.debug("Consumed all needed results, returning");
                        break;
                    }
                } else if (i == -1) {
                    log.debug("No message received, returning what I've collected so far");
                    break;
                }
            } catch (JMSException e) {
                throw new DACException((Throwable) e);
            }
        }
        closeResultConsumer();
        return arrayList;
    }

    @Override // org.dacframe.AgentManager
    public List<Object> receiveAgentResults(String str) throws DACException {
        return receiveResultsImpl(str, -1);
    }

    @Override // org.dacframe.AgentManager
    public List<Object> receiveAgentResults(String str, int i) throws DACException {
        return receiveResultsImpl(str, i);
    }

    @Override // org.dacframe.AgentManager
    public ExecutorService getExecutorService() throws DACException {
        throw new RuntimeException("NOT IMPLEMENTED!");
    }

    @Override // org.dacframe.AgentManager
    public void sendAgent(Agent agent, int i) throws DACException {
        sendAgent(agent);
    }

    @Override // org.dacframe.AgentManager
    public void sendAgent(Agent agent) throws DACException {
        log.debug("Sending agent " + agent + "...");
        try {
            this.agentProducer.send(this.agentProducerSession.createObjectMessage(agent));
            log.debug("Agent sent");
        } catch (JMSException e) {
            throw new DACException((Throwable) e);
        }
    }

    public void cleanup() throws DACException {
        try {
            this.agentProducer.close();
            this.agentConsumer.close();
            this.resultProducer.close();
            closeResultConsumer();
            this.resultProducerSession.close();
            this.resultConsumerSession.close();
            this.agentProducerSession.close();
            this.agentConsumerSession.close();
            this.resultProducerConnection.close();
            this.resultConsumerConnection.close();
            this.agentProducerConnection.close();
            this.agentConsumerConnection.close();
        } catch (JMSException e) {
            throw new DACException((Throwable) e);
        }
    }

    public List<ObjectMessage> browseAllResults() throws DACException {
        try {
            ArrayList arrayList = new ArrayList();
            Enumeration enumeration = this.resultProducerSession.createBrowser(new ActiveMQQueue(RESULTS_QUEUE)).getEnumeration();
            while (enumeration.hasMoreElements()) {
                arrayList.add((ObjectMessage) enumeration.nextElement());
            }
            return arrayList;
        } catch (JMSException e) {
            throw new DACException((Throwable) e);
        }
    }

    private void initJMS() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, this.brokerURL);
        String str = null;
        try {
            this.agentProducerConnection = activeMQConnectionFactory.createConnection();
        } catch (JMSException e) {
            str = this.brokerURL;
            ConsoleAppender consoleAppender = new ConsoleAppender(new PatternLayout());
            log.addAppender(consoleAppender);
            log.info("Unable to connect to JMS broker - schedule lazy initialization");
            log.removeAppender(consoleAppender);
            try {
                new ActiveMQEmbeddedBroker(this.brokerURL).start();
            } catch (Exception e2) {
                log.error("Unable to start Default Environment.", e2);
                throw new JMSException("Unable to start Default Environment.");
            }
        }
        if (this.agentProducerConnection == null) {
            this.agentProducerConnection = activeMQConnectionFactory.createConnection();
        }
        this.agentConsumerConnection = activeMQConnectionFactory.createConnection();
        this.resultProducerConnection = activeMQConnectionFactory.createConnection();
        this.resultConsumerConnection = activeMQConnectionFactory.createConnection();
        this.agentProducerSession = this.agentProducerConnection.createSession(true, 0);
        this.agentConsumerSession = this.agentConsumerConnection.createSession(true, 0);
        this.resultProducerSession = this.resultProducerConnection.createSession(true, 0);
        this.resultConsumerSession = this.resultConsumerConnection.createSession(true, 0);
        this.agentProducer = this.agentProducerSession.createProducer(new ActiveMQQueue(AGENTS_QUEUE));
        this.resultProducer = this.resultProducerSession.createProducer(new ActiveMQQueue(RESULTS_QUEUE));
        this.agentConsumer = this.agentConsumerSession.createConsumer(new ActiveMQQueue("AB.AGENTS?consumer.prefetchSize=0"));
        this.agentProducerConnection.start();
        this.agentConsumerConnection.start();
        this.resultProducerConnection.start();
        this.resultConsumerConnection.start();
        if (str != null) {
            WorkerSingleThreaded workerSingleThreaded = new WorkerSingleThreaded();
            try {
                workerSingleThreaded.setAgentBroker(new ActiveMQBroker(this.brokerURL));
                workerSingleThreaded.setCacheService(new CacheServiceHM());
                new Thread(workerSingleThreaded).start();
            } catch (DACException e3) {
                log.error("Error while creating Worker for Default Environment: ", e3);
            }
        }
    }

    private void closeResultConsumer() {
        if (this.resultConsumer != null) {
            try {
                this.resultConsumer.close();
            } catch (JMSException e) {
                log.error(e, (Throwable) null);
            }
        }
    }

    private void initResultConsumer(String str) throws DACException {
        String format = String.format("%s = '%s'", RECIPIENT_PROPERTY_NAME, str);
        log.debug("Using message selector: " + format);
        closeResultConsumer();
        try {
            this.resultConsumer = this.resultConsumerSession.createConsumer(new ActiveMQQueue("AB.RESULTS?consumer.prefetchSize=0"), format);
        } catch (JMSException e) {
            throw new DACException((Throwable) e);
        }
    }
}
