package org.apache.james.queue.activemq.metric;

import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.MapMessage;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TemporaryQueue;
import java.util.HashMap;
import java.util.Map;
import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.activemq.ActiveMQConfiguration;
import org.apache.james.queue.api.MailQueueName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.class */
public class ActiveMQMetricCollectorImpl implements ActiveMQMetricCollector {
    private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQMetricCollectorImpl.class);
    private final ActiveMQMetricConfiguration config;
    private final ConnectionFactory connectionFactory;
    private final MetricFactory metricFactory;
    private final GaugeRegistry gaugeRegistry;
    private final Map<String, ActiveMQMetrics> registeredStatistics = new HashMap();
    private Disposable disposable;

    @Inject
    public ActiveMQMetricCollectorImpl(ActiveMQConfiguration activeMQConfiguration, ConnectionFactory connectionFactory, MetricFactory metricFactory, GaugeRegistry gaugeRegistry) {
        this.config = activeMQConfiguration.getMetricConfiguration();
        this.connectionFactory = connectionFactory;
        this.metricFactory = metricFactory;
        this.gaugeRegistry = gaugeRegistry;
    }

    @Override // org.apache.james.queue.activemq.metric.ActiveMQMetricCollector
    public void collectBrokerStatistics() {
        collectStatistics(ActiveMQMetrics.forBroker(this.gaugeRegistry));
    }

    @Override // org.apache.james.queue.activemq.metric.ActiveMQMetricCollector
    public void collectQueueStatistics(MailQueueName mailQueueName) {
        collectStatistics(ActiveMQMetrics.forQueue(mailQueueName.asString(), this.gaugeRegistry));
    }

    private void collectStatistics(ActiveMQMetrics activeMQMetrics) {
        if (!this.config.isEnabled() || this.registeredStatistics.containsKey(activeMQMetrics.getName())) {
            return;
        }
        LOGGER.info("collecting statistics for {}", activeMQMetrics.getName());
        this.registeredStatistics.put(activeMQMetrics.getName(), activeMQMetrics);
    }

    @Override // org.apache.james.queue.activemq.metric.ActiveMQMetricCollector
    public void start() {
        if (!this.config.isEnabled()) {
            LOGGER.info("collecting statistics disabled");
            return;
        }
        collectBrokerStatistics();
        LOGGER.info("start delay={} interval={} timeout={} aqmp_timeout={}", new Object[]{this.config.getStartDelay(), this.config.getInterval(), this.config.getTimeout(), this.config.getAqmpTimeout()});
        this.disposable = Flux.interval(this.config.getStartDelay(), this.config.getInterval()).flatMap(l -> {
            return Flux.fromStream(() -> {
                return this.registeredStatistics.values().stream();
            }).flatMap(activeMQMetrics -> {
                return this.metricFactory.decoratePublisherWithTimerMetric(activeMQMetrics.getName() + "._time", Mono.fromCallable(() -> {
                    return fetchAndUpdate(activeMQMetrics);
                }).timeout(this.config.getTimeout()));
            });
        }).onErrorContinue(this::logError).subscribeOn(Schedulers.newSingle(ActiveMQMetricCollectorImpl.class.getSimpleName())).subscribe();
    }

    @Override // org.apache.james.queue.activemq.metric.ActiveMQMetricCollector
    @PreDestroy
    public void stop() {
        this.disposable.dispose();
    }

    private Void logError(Throwable th, Object obj) {
        LOGGER.warn("failed to fetch and update broker statistics", th);
        return null;
    }

    @VisibleForTesting
    Void fetchAndUpdate(ActiveMQMetrics activeMQMetrics) throws JMSException {
        Connection connection = null;
        Session session = null;
        TemporaryQueue temporaryQueue = null;
        MessageConsumer messageConsumer = null;
        MessageProducer messageProducer = null;
        try {
            Connection createConnection = this.connectionFactory.createConnection();
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            TemporaryQueue createTemporaryQueue = createSession.createTemporaryQueue();
            MessageConsumer createConsumer = createSession.createConsumer(createTemporaryQueue);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(activeMQMetrics.getName()));
            Message createMessage = createSession.createMessage();
            createMessage.setJMSReplyTo(createTemporaryQueue);
            createProducer.send(createMessage);
            long millis = this.config.getAqmpTimeout().toMillis();
            Message receive = createConsumer.receive(millis);
            if (receive == null) {
                throw new JMSException("no message received, timed out after " + millis + " ms");
            }
            if (!(receive instanceof MapMessage)) {
                throw new JMSException("expected MapMessage but got " + String.valueOf(receive.getClass()));
            }
            activeMQMetrics.updateMetrics((MapMessage) receive);
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (JMSException e) {
                }
            }
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (JMSException e2) {
                }
            }
            if (createTemporaryQueue != null) {
                try {
                    createTemporaryQueue.delete();
                } catch (JMSException e3) {
                }
            }
            if (createSession != null) {
                try {
                    createSession.close();
                } catch (JMSException e4) {
                }
            }
            if (createConnection == null) {
                return null;
            }
            try {
                createConnection.close();
                return null;
            } catch (JMSException e5) {
                return null;
            }
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    messageProducer.close();
                } catch (JMSException e6) {
                }
            }
            if (0 != 0) {
                try {
                    messageConsumer.close();
                } catch (JMSException e7) {
                }
            }
            if (0 != 0) {
                try {
                    temporaryQueue.delete();
                } catch (JMSException e8) {
                }
            }
            if (0 != 0) {
                try {
                    session.close();
                } catch (JMSException e9) {
                }
            }
            if (0 != 0) {
                try {
                    connection.close();
                } catch (JMSException e10) {
                }
            }
            throw th;
        }
    }
}
