package org.apache.james.mailbox.cassandra.mail.task;

import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import jakarta.inject.Inject;
import jakarta.mail.Flags;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxCounterDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxCounters;
import org.apache.james.mailbox.model.MessageRange;
import org.apache.james.task.Task;
import org.apache.james.util.streams.Limit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.class */
public class RecomputeMailboxCountersService {
    private static final Logger LOGGER = LoggerFactory.getLogger(RecomputeMailboxCountersService.class);
    private static final int MAILBOX_CONCURRENCY = 2;
    private static final int MESSAGE_CONCURRENCY = 8;
    private final CassandraMailboxDAO mailboxDAO;
    private final CassandraMessageIdDAO imapUidToMessageIdDAO;
    private final CassandraMessageIdToImapUidDAO messageIdToImapUidDAO;
    private final CassandraMailboxCounterDAO counterDAO;

    /* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService$Context.class */
    public static class Context {
        private final AtomicLong processedMailboxCount = new AtomicLong();
        private final ConcurrentLinkedDeque<CassandraId> failedMailboxes = new ConcurrentLinkedDeque<>();

        /* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService$Context$Snapshot.class */
        static class Snapshot {
            private final long processedMailboxCount;
            private final ImmutableList<CassandraId> failedMailboxes;

            private Snapshot(long j, ImmutableList<CassandraId> immutableList) {
                this.processedMailboxCount = j;
                this.failedMailboxes = immutableList;
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            public long getProcessedMailboxCount() {
                return this.processedMailboxCount;
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            public ImmutableList<CassandraId> getFailedMailboxes() {
                return this.failedMailboxes;
            }

            public final boolean equals(Object obj) {
                if (!(obj instanceof Snapshot)) {
                    return false;
                }
                Snapshot snapshot = (Snapshot) obj;
                return Objects.equals(Long.valueOf(this.processedMailboxCount), Long.valueOf(snapshot.processedMailboxCount)) && Objects.equals(this.failedMailboxes, snapshot.failedMailboxes);
            }

            public final int hashCode() {
                return Objects.hash(Long.valueOf(this.processedMailboxCount), this.failedMailboxes);
            }

            public String toString() {
                return MoreObjects.toStringHelper(this).add("processedMailboxCount", this.processedMailboxCount).add("failedMailboxes", this.failedMailboxes).toString();
            }
        }

        void incrementProcessed() {
            this.processedMailboxCount.incrementAndGet();
        }

        void addToFailedMailboxes(CassandraId cassandraId) {
            this.failedMailboxes.add(cassandraId);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Snapshot snapshot() {
            return new Snapshot(this.processedMailboxCount.get(), ImmutableList.copyOf(this.failedMailboxes));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService$Counter.class */
    public static class Counter {
        private final CassandraId mailboxId;
        private final AtomicLong unseen = new AtomicLong();
        private final AtomicLong total = new AtomicLong();

        private Counter(CassandraId cassandraId) {
            this.mailboxId = cassandraId;
        }

        void process(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
            this.total.incrementAndGet();
            if (composedMessageIdWithMetaData.getFlags().contains(Flags.Flag.SEEN)) {
                return;
            }
            this.unseen.incrementAndGet();
        }

        MailboxCounters snapshot() {
            return MailboxCounters.builder().mailboxId(this.mailboxId).count(this.total.get()).unseen(this.unseen.get()).build();
        }
    }

    /* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService$Options.class */
    public static class Options {
        private final boolean trustMessageProjection;

        public static Options trustMessageProjection() {
            return of(true);
        }

        public static Options recheckMessageProjection() {
            return of(false);
        }

        public static Options of(boolean z) {
            return new Options(z);
        }

        private Options(boolean z) {
            this.trustMessageProjection = z;
        }

        public boolean isMessageProjectionTrusted() {
            return this.trustMessageProjection;
        }

        public final boolean equals(Object obj) {
            if (obj instanceof Options) {
                return Objects.equals(Boolean.valueOf(this.trustMessageProjection), Boolean.valueOf(((Options) obj).trustMessageProjection));
            }
            return false;
        }

        public final int hashCode() {
            return Objects.hash(Boolean.valueOf(this.trustMessageProjection));
        }
    }

    @Inject
    RecomputeMailboxCountersService(CassandraMailboxDAO cassandraMailboxDAO, CassandraMessageIdDAO cassandraMessageIdDAO, CassandraMessageIdToImapUidDAO cassandraMessageIdToImapUidDAO, CassandraMailboxCounterDAO cassandraMailboxCounterDAO) {
        this.mailboxDAO = cassandraMailboxDAO;
        this.imapUidToMessageIdDAO = cassandraMessageIdDAO;
        this.messageIdToImapUidDAO = cassandraMessageIdToImapUidDAO;
        this.counterDAO = cassandraMailboxCounterDAO;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Task.Result> recomputeMailboxCounters(Context context, Options options) {
        return this.mailboxDAO.retrieveAllMailboxes().flatMap(mailbox -> {
            return recomputeMailboxCounter(context, mailbox, options);
        }, MAILBOX_CONCURRENCY).reduce(Task.Result.COMPLETED, Task::combine).onErrorResume(th -> {
            LOGGER.error("Error listing mailboxes", th);
            return Mono.just(Task.Result.PARTIAL);
        });
    }

    public Mono<Task.Result> recomputeMailboxCounter(Context context, Mailbox mailbox, Options options) {
        CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
        Counter counter = new Counter(cassandraId);
        Flux flatMap = this.imapUidToMessageIdDAO.retrieveMessages(cassandraId, MessageRange.all(), Limit.unlimited()).map((v0) -> {
            return v0.getComposedMessageId();
        }).flatMap(composedMessageIdWithMetaData -> {
            return latestMetadata(cassandraId, composedMessageIdWithMetaData, options);
        }, MESSAGE_CONCURRENCY);
        Objects.requireNonNull(counter);
        return flatMap.doOnNext(counter::process).then(Mono.defer(() -> {
            return this.counterDAO.resetCounters(counter.snapshot());
        })).then(Mono.just(Task.Result.COMPLETED)).doOnNext(result -> {
            LOGGER.info("Counters recomputed for {}", cassandraId.serialize());
            context.incrementProcessed();
        }).onErrorResume(th -> {
            context.addToFailedMailboxes(cassandraId);
            LOGGER.error("Error while recomputing counters for {}", cassandraId.serialize(), th);
            return Mono.just(Task.Result.PARTIAL);
        });
    }

    private Flux<ComposedMessageIdWithMetaData> latestMetadata(CassandraId cassandraId, ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Options options) {
        if (options.isMessageProjectionTrusted()) {
            return Flux.just(composedMessageIdWithMetaData);
        }
        return this.messageIdToImapUidDAO.retrieve((CassandraMessageId) composedMessageIdWithMetaData.getComposedMessageId().getMessageId(), Optional.of(cassandraId), JamesExecutionProfiles.ConsistencyChoice.STRONG).map((v0) -> {
            return v0.getComposedMessageId();
        }).doOnNext(composedMessageIdWithMetaData2 -> {
            if (composedMessageIdWithMetaData2.equals(composedMessageIdWithMetaData)) {
                return;
            }
            LOGGER.warn("Possible denormalization issue on {}. Mismatch between the two denormalization table. This can also be due to concurrent modifications.", composedMessageIdWithMetaData.getComposedMessageId());
        }).switchIfEmpty(Flux.empty().doOnComplete(() -> {
            LOGGER.warn("Possible denormalization issue on {}. Source of truth do not contain listed entry.This can also be due to concurrent modifications.", composedMessageIdWithMetaData.getComposedMessageId());
        }));
    }
}
