package org.apache.james.mailetcontainer.impl;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.mail.MessagingException;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.configuration2.HierarchicalConfiguration;
import org.apache.commons.configuration2.tree.ImmutableNode;
import org.apache.james.core.MailAddress;
import org.apache.james.lifecycle.api.Configurable;
import org.apache.james.lifecycle.api.Disposable;
import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.mailetcontainer.api.MailProcessor;
import org.apache.james.mailetcontainer.api.jmx.MailSpoolerMBean;
import org.apache.james.mailrepository.api.MailRepository;
import org.apache.james.mailrepository.api.MailRepositoryPath;
import org.apache.james.mailrepository.api.MailRepositoryStore;
import org.apache.james.mailrepository.api.MailRepositoryUrl;
import org.apache.james.mailrepository.api.Protocol;
import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.metrics.api.TimeMetric;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.mailet.Attribute;
import org.apache.mailet.AttributeName;
import org.apache.mailet.AttributeValue;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/james/mailetcontainer/impl/JamesMailSpooler.class */
public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMBean {
    public static final String SPOOL_PROCESSING = "spoolProcessing";
    public static final int MAXIMUM_FAILURE_COUNT = 5;
    private final MetricFactory metricFactory;
    private final GaugeRegistry gaugeRegistry;
    private final MailProcessor mailProcessor;
    private final MailRepositoryStore mailRepositoryStore;
    private final MailQueueFactory<?> queueFactory;
    private Configuration configuration;
    private Optional<Runner> runner;
    private MailQueue queue;
    private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailSpooler.class);
    public static final AttributeName MAIL_PROCESSING_ERROR_COUNT = AttributeName.of("mail-processing-error-count");
    public static final MailRepositoryPath ERROR_REPOSITORY_PATH = MailRepositoryPath.from("var/mail/error");

    /* loaded from: input_file:org/apache/james/mailetcontainer/impl/JamesMailSpooler$Configuration.class */
    public static class Configuration {
        private final int concurrencyLevel;
        private final MailRepositoryUrl errorRepositoryURL;

        public static Configuration from(MailRepositoryStore mailRepositoryStore, HierarchicalConfiguration<ImmutableNode> hierarchicalConfiguration) {
            return new Configuration(hierarchicalConfiguration.getInt("threads", 100), (MailRepositoryUrl) Optional.ofNullable(hierarchicalConfiguration.getString("errorRepository", (String) null)).map(MailRepositoryUrl::from).orElseGet(() -> {
                return MailRepositoryUrl.fromPathAndProtocol((Protocol) mailRepositoryStore.defaultProtocol().orElseThrow(() -> {
                    return new IllegalStateException("Cannot retrieve mailRepository URL, you need to configure an `errorRepository` property for the spooler.0");
                }), JamesMailSpooler.ERROR_REPOSITORY_PATH);
            }));
        }

        public Configuration(int i, MailRepositoryUrl mailRepositoryUrl) {
            Preconditions.checkArgument(i >= 0, "'threads' needs to be greater than or equal to zero");
            this.concurrencyLevel = i;
            this.errorRepositoryURL = mailRepositoryUrl;
        }

        public int getConcurrencyLevel() {
            return this.concurrencyLevel;
        }

        public boolean isEnabled() {
            return this.concurrencyLevel > 0;
        }

        public MailRepositoryUrl getErrorRepositoryURL() {
            return this.errorRepositoryURL;
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("concurrencyLevel", this.concurrencyLevel).add("errorRepositoryURL", this.errorRepositoryURL).toString();
        }
    }

    /* loaded from: input_file:org/apache/james/mailetcontainer/impl/JamesMailSpooler$Runner.class */
    private static class Runner {
        private final MetricFactory metricFactory;
        private final MailProcessor mailProcessor;
        private final MailRepository errorRepository;
        private final reactor.core.Disposable disposable;
        private final MailQueue queue;
        private final Configuration configuration;
        private final Scheduler scheduler;
        private final AtomicInteger processingActive = new AtomicInteger(0);
        private final Scheduler queueScheduler = Schedulers.newBoundedElastic(1, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "queueScheduler");

        private Runner(MetricFactory metricFactory, GaugeRegistry gaugeRegistry, MailProcessor mailProcessor, MailRepository mailRepository, MailQueue mailQueue, Configuration configuration) {
            this.metricFactory = metricFactory;
            this.mailProcessor = mailProcessor;
            this.errorRepository = mailRepository;
            this.queue = mailQueue;
            this.configuration = configuration;
            this.scheduler = Schedulers.newBoundedElastic(configuration.getConcurrencyLevel() + 1, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "spooler");
            this.disposable = run(mailQueue);
            AtomicInteger atomicInteger = this.processingActive;
            Objects.requireNonNull(atomicInteger);
            gaugeRegistry.register("spoolProcessing.inFlight", atomicInteger::get);
        }

        private reactor.core.Disposable run(MailQueue mailQueue) {
            return Flux.from(mailQueue.deQueue()).flatMap(mailQueueItem -> {
                return handleOnQueueItem(mailQueueItem).subscribeOn(this.scheduler);
            }, this.configuration.getConcurrencyLevel()).onErrorContinue((th, obj) -> {
                JamesMailSpooler.LOGGER.error("Exception processing mail while spooling {}", obj, th);
            }).subscribeOn(this.queueScheduler).subscribe();
        }

        private Mono<Void> handleOnQueueItem(MailQueue.MailQueueItem mailQueueItem) {
            TimeMetric timer = this.metricFactory.timer(JamesMailSpooler.SPOOL_PROCESSING);
            AtomicInteger atomicInteger = this.processingActive;
            Objects.requireNonNull(atomicInteger);
            Mono doOnSuccess = Mono.fromCallable(atomicInteger::incrementAndGet).flatMap(num -> {
                return processMail(mailQueueItem);
            }).doOnSuccess(r3 -> {
                timer.stopAndPublish();
            });
            AtomicInteger atomicInteger2 = this.processingActive;
            Objects.requireNonNull(atomicInteger2);
            return doOnSuccess.doOnTerminate(atomicInteger2::decrementAndGet);
        }

        private Mono<Void> processMail(MailQueue.MailQueueItem mailQueueItem) {
            Objects.requireNonNull(mailQueueItem);
            return Mono.using(mailQueueItem::getMail, mail -> {
                return Mono.fromRunnable(() -> {
                    performProcessMail(mailQueueItem, mail);
                });
            }, (v0) -> {
                LifecycleUtil.dispose(v0);
            });
        }

        private void performProcessMail(MailQueue.MailQueueItem mailQueueItem, Mail mail) {
            JamesMailSpooler.LOGGER.debug("==== Begin processing mail {} ====", mail.getName());
            ImmutableList<MailAddress> copyOf = ImmutableList.copyOf(mail.getRecipients());
            try {
                try {
                    this.mailProcessor.service(mail);
                    if (Thread.currentThread().isInterrupted()) {
                        throw new InterruptedException("Thread has been interrupted");
                    }
                    mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
                    JamesMailSpooler.LOGGER.debug("==== End processing mail {} ====", mail.getName());
                } catch (Throwable th) {
                    handleError(mailQueueItem, mail, copyOf, th);
                    JamesMailSpooler.LOGGER.debug("==== End processing mail {} ====", mail.getName());
                }
            } catch (Throwable th2) {
                JamesMailSpooler.LOGGER.debug("==== End processing mail {} ====", mail.getName());
                throw th2;
            }
        }

        private void handleError(MailQueue.MailQueueItem mailQueueItem, Mail mail, ImmutableList<MailAddress> immutableList, Throwable th) {
            int computeFailureCount = computeFailureCount(mail);
            mailQueueItem.getMail().setRecipients(immutableList);
            try {
                if (computeFailureCount > 5) {
                    JamesMailSpooler.LOGGER.error("Failed {} processing {} consecutive times. Abort. Mail is saved in {}", new Object[]{mail.getName(), Integer.valueOf(computeFailureCount), this.configuration.getErrorRepositoryURL().asString()});
                    storeInErrorRepository(mailQueueItem);
                } else {
                    JamesMailSpooler.LOGGER.error("Failed {} processing {} consecutive times. Mail is requeued with increased failure count.", new Object[]{mail.getName(), Integer.valueOf(computeFailureCount), th});
                    reEnqueue(mailQueueItem, computeFailureCount);
                }
            } catch (Exception e) {
                JamesMailSpooler.LOGGER.error("Could not apply standard error handling for {}, defaulting to nack", mail.getName(), e);
                nack(mailQueueItem, th);
            }
        }

        private int computeFailureCount(Mail mail) {
            return ((Integer) mail.getAttribute(JamesMailSpooler.MAIL_PROCESSING_ERROR_COUNT).flatMap(attribute -> {
                return attribute.getValue().valueAs(Integer.class);
            }).orElse(0)).intValue() + 1;
        }

        private void reEnqueue(MailQueue.MailQueueItem mailQueueItem, int i) throws MailQueue.MailQueueException {
            Mail mail = mailQueueItem.getMail();
            mail.setAttribute(new Attribute(JamesMailSpooler.MAIL_PROCESSING_ERROR_COUNT, AttributeValue.of(Integer.valueOf(i))));
            this.queue.enQueue(mail);
            mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
        }

        private void storeInErrorRepository(MailQueue.MailQueueItem mailQueueItem) throws MessagingException {
            this.errorRepository.store(mailQueueItem.getMail());
            mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
        }

        private void nack(MailQueue.MailQueueItem mailQueueItem, Throwable th) {
            try {
                mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.REJECT);
            } catch (MailQueue.MailQueueException e) {
                throw new RuntimeException(th);
            }
        }

        public void dispose() {
            JamesMailSpooler.LOGGER.info("start dispose() ...");
            JamesMailSpooler.LOGGER.info("Cancel queue consumption...");
            this.queueScheduler.dispose();
            JamesMailSpooler.LOGGER.info("Queue consumption canceled, shutting down processor threads...");
            this.scheduler.disposeGracefully().timeout(Duration.ofSeconds(5L)).onErrorResume(th -> {
                return Mono.empty();
            }).block();
            this.disposable.dispose();
            JamesMailSpooler.LOGGER.info("Thread shutdown completed. Turning off mail queue.");
            try {
                this.queue.close();
            } catch (IOException e) {
                JamesMailSpooler.LOGGER.debug("error closing queue", e);
            }
        }

        public int getCurrentSpoolCount() {
            return this.processingActive.get();
        }
    }

    @Inject
    public JamesMailSpooler(MetricFactory metricFactory, GaugeRegistry gaugeRegistry, MailProcessor mailProcessor, MailRepositoryStore mailRepositoryStore, MailQueueFactory<?> mailQueueFactory) {
        this.metricFactory = metricFactory;
        this.gaugeRegistry = gaugeRegistry;
        this.mailProcessor = mailProcessor;
        this.mailRepositoryStore = mailRepositoryStore;
        this.queueFactory = mailQueueFactory;
    }

    public void restart() {
        Optional<Runner> optional = this.runner;
        this.runner = Optional.of(new Runner(this.metricFactory, this.gaugeRegistry, this.mailProcessor, errorRepository(), this.queue, this.configuration));
        optional.ifPresent((v0) -> {
            v0.dispose();
        });
    }

    public void configure(HierarchicalConfiguration<ImmutableNode> hierarchicalConfiguration) {
        configure(Configuration.from(this.mailRepositoryStore, hierarchicalConfiguration));
    }

    public void configure(Configuration configuration) {
        this.configuration = configuration;
    }

    @PostConstruct
    public void init() {
        if (!this.configuration.isEnabled()) {
            LOGGER.info("Spooler had been deactivated. To enable it set 'threads' count to a value greater than zero");
            return;
        }
        LOGGER.info("init...");
        LOGGER.info("Concurrency level is {}", Integer.valueOf(this.configuration.getConcurrencyLevel()));
        this.queue = this.queueFactory.createQueue(MailQueueFactory.SPOOL, MailQueueFactory.prefetchCount(this.configuration.getConcurrencyLevel()));
        this.runner = Optional.of(new Runner(this.metricFactory, this.gaugeRegistry, this.mailProcessor, errorRepository(), this.queue, this.configuration));
        LOGGER.info("Spooler started");
    }

    private MailRepository errorRepository() {
        try {
            return this.mailRepositoryStore.select(this.configuration.getErrorRepositoryURL());
        } catch (MailRepositoryStore.MailRepositoryStoreException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @PreDestroy
    public void dispose() {
        this.runner.ifPresent((v0) -> {
            v0.dispose();
        });
    }

    public int getThreadCount() {
        return this.configuration.getConcurrencyLevel();
    }

    public int getCurrentSpoolCount() {
        return ((Integer) this.runner.map((v0) -> {
            return v0.getCurrentSpoolCount();
        }).orElse(0)).intValue();
    }
}
