package org.apache.james.webadmin.service;

import com.github.fge.lambdas.Throwing;
import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import javax.inject.Inject;
import javax.mail.MessagingException;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.mailrepository.api.MailKey;
import org.apache.james.mailrepository.api.MailRepository;
import org.apache.james.mailrepository.api.MailRepositoryPath;
import org.apache.james.mailrepository.api.MailRepositoryStore;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.api.MailQueueName;
import org.apache.james.task.Task;
import org.apache.james.util.streams.Iterators;
import org.apache.james.util.streams.Limit;
import org.apache.mailet.Attribute;
import org.apache.mailet.AttributeName;
import org.apache.mailet.AttributeValue;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/webadmin/service/ReprocessingService.class */
public class ReprocessingService {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReprocessingService.class);
    public static final AttributeName RETRY_ATTRIBUTE_NAME = AttributeName.of("mailRepository-reprocessing");
    private final MailQueueFactory<?> mailQueueFactory;
    private final MailRepositoryStoreService mailRepositoryStoreService;

    /* loaded from: input_file:org/apache/james/webadmin/service/ReprocessingService$Configuration.class */
    public static class Configuration {
        private final MailQueueName mailQueueName;
        private final Optional<String> targetProcessor;
        private final Optional<Integer> maxRetries;
        private final boolean consume;
        private final Limit limit;

        public Configuration(MailQueueName mailQueueName, Optional<String> optional, Optional<Integer> optional2, boolean z, Limit limit) {
            this.mailQueueName = mailQueueName;
            this.targetProcessor = optional;
            this.maxRetries = optional2;
            this.consume = z;
            this.limit = limit;
        }

        public MailQueueName getMailQueueName() {
            return this.mailQueueName;
        }

        public Optional<String> getTargetProcessor() {
            return this.targetProcessor;
        }

        public boolean isConsume() {
            return this.consume;
        }

        public Limit getLimit() {
            return this.limit;
        }

        public Optional<Integer> getMaxRetries() {
            return this.maxRetries;
        }
    }

    /* loaded from: input_file:org/apache/james/webadmin/service/ReprocessingService$MissingKeyException.class */
    public static class MissingKeyException extends RuntimeException {
        MissingKeyException(MailKey mailKey) {
            super(mailKey.asString() + " can not be found");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/james/webadmin/service/ReprocessingService$Reprocessor.class */
    public static class Reprocessor implements Closeable {
        private final MailQueue mailQueue;
        private final Configuration configuration;

        Reprocessor(MailQueue mailQueue, Configuration configuration) {
            this.mailQueue = mailQueue;
            this.configuration = configuration;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void reprocess(MailRepository mailRepository, Mail mail, MailKey mailKey) {
            try {
                try {
                    incrementRetries(mail);
                    Optional<String> targetProcessor = this.configuration.getTargetProcessor();
                    Objects.requireNonNull(mail);
                    targetProcessor.ifPresent(mail::setState);
                    this.mailQueue.enQueue(mail);
                    if (this.configuration.isConsume()) {
                        mailRepository.remove(mailKey);
                    }
                } catch (Exception e) {
                    throw new RuntimeException("Error encountered while reprocessing mail " + mail.getName(), e);
                }
            } finally {
                LifecycleUtil.dispose(mail);
            }
        }

        private boolean retryExceeded(Mail mail) {
            Optional map = mail.getAttribute(ReprocessingService.RETRY_ATTRIBUTE_NAME).map(attribute -> {
                return attribute.getValue().getValue();
            });
            Class<Integer> cls = Integer.class;
            Objects.requireNonNull(Integer.class);
            Optional filter = map.filter(cls::isInstance);
            Class<Integer> cls2 = Integer.class;
            Objects.requireNonNull(Integer.class);
            Integer num = (Integer) filter.map(cls2::cast).orElse(0);
            return ((Boolean) this.configuration.getMaxRetries().map(num2 -> {
                return Boolean.valueOf(num.intValue() >= num2.intValue());
            }).orElse(false)).booleanValue();
        }

        private void incrementRetries(Mail mail) {
            Optional map = mail.getAttribute(ReprocessingService.RETRY_ATTRIBUTE_NAME).map(attribute -> {
                return attribute.getValue().getValue();
            });
            Class<Integer> cls = Integer.class;
            Objects.requireNonNull(Integer.class);
            Optional filter = map.filter(cls::isInstance);
            Class<Integer> cls2 = Integer.class;
            Objects.requireNonNull(Integer.class);
            mail.setAttribute(new Attribute(ReprocessingService.RETRY_ATTRIBUTE_NAME, AttributeValue.of(Integer.valueOf(((Integer) filter.map(cls2::cast).orElse(0)).intValue() + 1))));
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            try {
                this.mailQueue.close();
            } catch (IOException e) {
                ReprocessingService.LOGGER.debug("error closing queue", e);
            }
        }
    }

    @Inject
    public ReprocessingService(MailQueueFactory<?> mailQueueFactory, MailRepositoryStoreService mailRepositoryStoreService) {
        this.mailQueueFactory = mailQueueFactory;
        this.mailRepositoryStoreService = mailRepositoryStoreService;
    }

    public Mono<Task.Result> reprocessAll(MailRepositoryPath mailRepositoryPath, Configuration configuration, Consumer<MailKey> consumer) {
        return Mono.using(() -> {
            return new Reprocessor(getMailQueue(configuration.getMailQueueName()), configuration);
        }, reprocessor -> {
            return reprocessAll(reprocessor, mailRepositoryPath, configuration, consumer);
        }, (v0) -> {
            v0.close();
        });
    }

    private Mono<Task.Result> reprocessAll(Reprocessor reprocessor, MailRepositoryPath mailRepositoryPath, Configuration configuration, Consumer<MailKey> consumer) {
        return configuration.limit.applyOnFlux(Flux.fromStream(Throwing.supplier(() -> {
            return this.mailRepositoryStoreService.getRepositories(mailRepositoryPath);
        })).flatMap(Throwing.function(mailRepository -> {
            return Iterators.toFlux(mailRepository.list()).doOnNext(consumer).flatMap(mailKey -> {
                return Mono.fromCallable(() -> {
                    return mailRepository.retrieve(mailKey);
                }).map(mail -> {
                    return Triple.of(mail, mailRepository, mailKey);
                });
            }).filter(triple -> {
                return !reprocessor.retryExceeded((Mail) triple.getLeft());
            });
        }))).flatMap(triple -> {
            return reprocess((MailKey) triple.getRight(), (Mail) triple.getLeft(), (MailRepository) triple.getMiddle(), reprocessor);
        }).reduce(Task.Result.COMPLETED, Task::combine);
    }

    private Mono<Task.Result> reprocess(MailKey mailKey, Mail mail, MailRepository mailRepository, Reprocessor reprocessor) {
        return Mono.fromRunnable(() -> {
            reprocessor.reprocess(mailRepository, mail, mailKey);
        }).thenReturn(Task.Result.COMPLETED).onErrorResume(th -> {
            LOGGER.warn("Failed when reprocess mail {}", mailKey.asString(), th);
            return Mono.just(Task.Result.PARTIAL);
        });
    }

    public void reprocess(MailRepositoryPath mailRepositoryPath, MailKey mailKey, Configuration configuration) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException {
        Reprocessor reprocessor = new Reprocessor(getMailQueue(configuration.getMailQueueName()), configuration);
        try {
            Pair pair = (Pair) this.mailRepositoryStoreService.getRepositories(mailRepositoryPath).map(Throwing.function(mailRepository -> {
                return Pair.of(mailRepository, Optional.ofNullable(mailRepository.retrieve(mailKey)));
            })).filter(pair2 -> {
                return ((Optional) pair2.getRight()).isPresent();
            }).map(pair3 -> {
                return Pair.of((MailRepository) pair3.getLeft(), (Mail) ((Optional) pair3.getRight()).get());
            }).findFirst().orElseThrow(() -> {
                return new MissingKeyException(mailKey);
            });
            reprocessor.reprocess((MailRepository) pair.getKey(), (Mail) pair.getValue(), mailKey);
            reprocessor.close();
        } catch (Throwable th) {
            try {
                reprocessor.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private MailQueue getMailQueue(MailQueueName mailQueueName) {
        return (MailQueue) this.mailQueueFactory.getQueue(mailQueueName).orElseThrow(() -> {
            return new RuntimeException("Can not find queue " + mailQueueName.asString());
        });
    }
}
