package org.apache.james.mailbox.cassandra.mail.task;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import jakarta.inject.Inject;
import jakarta.mail.Flags;
import java.time.Duration;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageMetadata;
import org.apache.james.mailbox.model.ComposedMessageId;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.task.Task;
import org.apache.james.util.ReactorUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.class */
public class SolveMessageInconsistenciesService {
    private static final Inconsistency NO_INCONSISTENCY = (context, cassandraMessageIdToImapUidDAO, cassandraMessageIdDAO) -> {
        return Mono.just(Task.Result.COMPLETED);
    };
    private static final Logger LOGGER = LoggerFactory.getLogger(SolveMessageInconsistenciesService.class);
    private static final Duration PERIOD = Duration.ofSeconds(1);
    private final CassandraMessageIdToImapUidDAO messageIdToImapUidDAO;
    private final CassandraMessageIdDAO messageIdDAO;

    /* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService$Context.class */
    public static class Context {
        private final AtomicLong processedImapUidEntries;
        private final AtomicLong processedMessageIdEntries;
        private final AtomicLong addedMessageIdEntries;
        private final AtomicLong updatedMessageIdEntries;
        private final AtomicLong removedMessageIdEntries;
        private final ConcurrentLinkedDeque<ComposedMessageId> fixedInconsistencies;
        private final ConcurrentLinkedDeque<ComposedMessageId> errors;

        /* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService$Context$Snapshot.class */
        static class Snapshot {
            private final long processedImapUidEntries;
            private final long processedMessageIdEntries;
            private final long addedMessageIdEntries;
            private final long updatedMessageIdEntries;
            private final long removedMessageIdEntries;
            private final ImmutableList<ComposedMessageId> fixedInconsistencies;
            private final ImmutableList<ComposedMessageId> errors;

            /* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService$Context$Snapshot$Builder.class */
            static class Builder {
                private Optional<Long> processedImapUidEntries = Optional.empty();
                private Optional<Long> processedMessageIdEntries = Optional.empty();
                private Optional<Long> addedMessageIdEntries = Optional.empty();
                private Optional<Long> updatedMessageIdEntries = Optional.empty();
                private Optional<Long> removedMessageIdEntries = Optional.empty();
                private ImmutableList.Builder<ComposedMessageId> fixedInconsistencies = ImmutableList.builder();
                private ImmutableList.Builder<ComposedMessageId> errors = ImmutableList.builder();

                Builder() {
                }

                public Builder processedImapUidEntries(long j) {
                    this.processedImapUidEntries = Optional.of(Long.valueOf(j));
                    return this;
                }

                public Builder processedMessageIdEntries(long j) {
                    this.processedMessageIdEntries = Optional.of(Long.valueOf(j));
                    return this;
                }

                public Builder addedMessageIdEntries(long j) {
                    this.addedMessageIdEntries = Optional.of(Long.valueOf(j));
                    return this;
                }

                public Builder updatedMessageIdEntries(long j) {
                    this.updatedMessageIdEntries = Optional.of(Long.valueOf(j));
                    return this;
                }

                public Builder removedMessageIdEntries(long j) {
                    this.removedMessageIdEntries = Optional.of(Long.valueOf(j));
                    return this;
                }

                public Builder addFixedInconsistencies(ComposedMessageId composedMessageId) {
                    this.fixedInconsistencies.add(composedMessageId);
                    return this;
                }

                public Builder errors(ComposedMessageId composedMessageId) {
                    this.errors.add(composedMessageId);
                    return this;
                }

                public Snapshot build() {
                    return new Snapshot(this.processedImapUidEntries.orElse(0L).longValue(), this.processedMessageIdEntries.orElse(0L).longValue(), this.addedMessageIdEntries.orElse(0L).longValue(), this.updatedMessageIdEntries.orElse(0L).longValue(), this.removedMessageIdEntries.orElse(0L).longValue(), this.fixedInconsistencies.build(), this.errors.build());
                }
            }

            public static Builder builder() {
                return new Builder();
            }

            private Snapshot(long j, long j2, long j3, long j4, long j5, ImmutableList<ComposedMessageId> immutableList, ImmutableList<ComposedMessageId> immutableList2) {
                this.processedImapUidEntries = j;
                this.processedMessageIdEntries = j2;
                this.addedMessageIdEntries = j3;
                this.updatedMessageIdEntries = j4;
                this.removedMessageIdEntries = j5;
                this.fixedInconsistencies = immutableList;
                this.errors = immutableList2;
            }

            public long getProcessedImapUidEntries() {
                return this.processedImapUidEntries;
            }

            public long getProcessedMessageIdEntries() {
                return this.processedMessageIdEntries;
            }

            public long getAddedMessageIdEntries() {
                return this.addedMessageIdEntries;
            }

            public long getUpdatedMessageIdEntries() {
                return this.updatedMessageIdEntries;
            }

            public long getRemovedMessageIdEntries() {
                return this.removedMessageIdEntries;
            }

            public ImmutableList<ComposedMessageId> getFixedInconsistencies() {
                return this.fixedInconsistencies;
            }

            public ImmutableList<ComposedMessageId> getErrors() {
                return this.errors;
            }

            public final boolean equals(Object obj) {
                if (!(obj instanceof Snapshot)) {
                    return false;
                }
                Snapshot snapshot = (Snapshot) obj;
                return Objects.equals(Long.valueOf(this.processedImapUidEntries), Long.valueOf(snapshot.processedImapUidEntries)) && Objects.equals(Long.valueOf(this.processedMessageIdEntries), Long.valueOf(snapshot.processedMessageIdEntries)) && Objects.equals(Long.valueOf(this.addedMessageIdEntries), Long.valueOf(snapshot.addedMessageIdEntries)) && Objects.equals(Long.valueOf(this.updatedMessageIdEntries), Long.valueOf(snapshot.updatedMessageIdEntries)) && Objects.equals(Long.valueOf(this.removedMessageIdEntries), Long.valueOf(snapshot.removedMessageIdEntries)) && Objects.equals(this.errors, snapshot.errors) && Objects.equals(this.fixedInconsistencies, snapshot.fixedInconsistencies);
            }

            public final int hashCode() {
                return Objects.hash(Long.valueOf(this.processedImapUidEntries), Long.valueOf(this.processedMessageIdEntries), Long.valueOf(this.addedMessageIdEntries), Long.valueOf(this.updatedMessageIdEntries), Long.valueOf(this.removedMessageIdEntries), this.fixedInconsistencies, this.errors);
            }

            public String toString() {
                return MoreObjects.toStringHelper(this).add("processedImapUidEntries", this.processedImapUidEntries).add("processedMessageIdEntries", this.processedMessageIdEntries).add("addedMessageIdEntries", this.addedMessageIdEntries).add("updatedMessageIdEntries", this.updatedMessageIdEntries).add("removedMessageIdEntries", this.removedMessageIdEntries).add("fixedInconsistencies", this.fixedInconsistencies).add("errors", this.errors).toString();
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Context() {
            this(new AtomicLong(), new AtomicLong(), new AtomicLong(), new AtomicLong(), new AtomicLong(), ImmutableList.of(), ImmutableList.of());
        }

        private Context(AtomicLong atomicLong, AtomicLong atomicLong2, AtomicLong atomicLong3, AtomicLong atomicLong4, AtomicLong atomicLong5, Collection<ComposedMessageId> collection, Collection<ComposedMessageId> collection2) {
            this.processedImapUidEntries = atomicLong;
            this.processedMessageIdEntries = atomicLong2;
            this.addedMessageIdEntries = atomicLong3;
            this.updatedMessageIdEntries = atomicLong4;
            this.removedMessageIdEntries = atomicLong5;
            this.fixedInconsistencies = new ConcurrentLinkedDeque<>(collection);
            this.errors = new ConcurrentLinkedDeque<>(collection2);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void incrementProcessedImapUidEntries() {
            this.processedImapUidEntries.incrementAndGet();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void incrementMessageIdEntries() {
            this.processedMessageIdEntries.incrementAndGet();
        }

        void incrementAddedMessageIdEntries() {
            this.addedMessageIdEntries.incrementAndGet();
        }

        void incrementUpdatedMessageIdEntries() {
            this.updatedMessageIdEntries.incrementAndGet();
        }

        void incrementRemovedMessageIdEntries() {
            this.removedMessageIdEntries.incrementAndGet();
        }

        void addFixedInconsistency(ComposedMessageId composedMessageId) {
            this.fixedInconsistencies.add(composedMessageId);
        }

        void addErrors(ComposedMessageId composedMessageId) {
            this.errors.add(composedMessageId);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Snapshot snapshot() {
            return new Snapshot(this.processedImapUidEntries.get(), this.processedMessageIdEntries.get(), this.addedMessageIdEntries.get(), this.updatedMessageIdEntries.get(), this.removedMessageIdEntries.get(), ImmutableList.copyOf(this.fixedInconsistencies), ImmutableList.copyOf(this.errors));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService$FailedToRetrieveRecord.class */
    public static class FailedToRetrieveRecord implements Inconsistency {
        private final CassandraMessageMetadata message;

        private FailedToRetrieveRecord(CassandraMessageMetadata cassandraMessageMetadata) {
            this.message = cassandraMessageMetadata;
        }

        @Override // org.apache.james.mailbox.cassandra.mail.task.SolveMessageInconsistenciesService.Inconsistency
        public Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO cassandraMessageIdToImapUidDAO, CassandraMessageIdDAO cassandraMessageIdDAO) {
            context.addErrors(this.message.getComposedMessageId().getComposedMessageId());
            SolveMessageInconsistenciesService.LOGGER.error("Failed to retrieve record: {}", this.message.getComposedMessageId());
            return Mono.just(Task.Result.PARTIAL);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService$Inconsistency.class */
    public interface Inconsistency {
        Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO cassandraMessageIdToImapUidDAO, CassandraMessageIdDAO cassandraMessageIdDAO);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService$OrphanImapUidEntry.class */
    public static class OrphanImapUidEntry implements Inconsistency {
        private final CassandraMessageMetadata message;

        private OrphanImapUidEntry(CassandraMessageMetadata cassandraMessageMetadata) {
            this.message = cassandraMessageMetadata;
        }

        @Override // org.apache.james.mailbox.cassandra.mail.task.SolveMessageInconsistenciesService.Inconsistency
        public Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO cassandraMessageIdToImapUidDAO, CassandraMessageIdDAO cassandraMessageIdDAO) {
            return cassandraMessageIdDAO.insert(this.message).doOnSuccess(r5 -> {
                notifySuccess(context);
            }).thenReturn(Task.Result.COMPLETED).onErrorResume(th -> {
                notifyFailure(context);
                return Mono.just(Task.Result.PARTIAL);
            });
        }

        private void notifyFailure(Context context) {
            context.addErrors(this.message.getComposedMessageId().getComposedMessageId());
            SolveMessageInconsistenciesService.LOGGER.error("Failed to fix inconsistency for orphan message in ImapUid: {}", this.message.getComposedMessageId());
        }

        private void notifySuccess(Context context) {
            SolveMessageInconsistenciesService.LOGGER.info("Inconsistency fixed for orphan message in ImapUid: {}", this.message.getComposedMessageId());
            context.incrementAddedMessageIdEntries();
            context.addFixedInconsistency(this.message.getComposedMessageId().getComposedMessageId());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService$OrphanMessageIdEntry.class */
    public static class OrphanMessageIdEntry implements Inconsistency {
        private final CassandraMessageMetadata message;

        private OrphanMessageIdEntry(CassandraMessageMetadata cassandraMessageMetadata) {
            this.message = cassandraMessageMetadata;
        }

        @Override // org.apache.james.mailbox.cassandra.mail.task.SolveMessageInconsistenciesService.Inconsistency
        public Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO cassandraMessageIdToImapUidDAO, CassandraMessageIdDAO cassandraMessageIdDAO) {
            return cassandraMessageIdDAO.delete((CassandraId) this.message.getComposedMessageId().getComposedMessageId().getMailboxId(), this.message.getComposedMessageId().getComposedMessageId().getUid()).doOnSuccess(r5 -> {
                notifySuccess(context);
            }).thenReturn(Task.Result.COMPLETED).onErrorResume(th -> {
                notifyFailure(context);
                return Mono.just(Task.Result.PARTIAL);
            });
        }

        private void notifyFailure(Context context) {
            context.addErrors(this.message.getComposedMessageId().getComposedMessageId());
            SolveMessageInconsistenciesService.LOGGER.error("Failed to fix inconsistency for orphan message in MessageId: {}", this.message.getComposedMessageId());
        }

        private void notifySuccess(Context context) {
            SolveMessageInconsistenciesService.LOGGER.info("Inconsistency fixed for orphan message in MessageId: {}", this.message.getComposedMessageId());
            context.incrementRemovedMessageIdEntries();
            context.addFixedInconsistency(this.message.getComposedMessageId().getComposedMessageId());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService$OutdatedMessageIdEntry.class */
    public static class OutdatedMessageIdEntry implements Inconsistency {
        private final CassandraMessageMetadata messageFromMessageId;
        private final CassandraMessageMetadata messageFromImapUid;

        private OutdatedMessageIdEntry(CassandraMessageMetadata cassandraMessageMetadata, CassandraMessageMetadata cassandraMessageMetadata2) {
            this.messageFromMessageId = cassandraMessageMetadata;
            this.messageFromImapUid = cassandraMessageMetadata2;
        }

        @Override // org.apache.james.mailbox.cassandra.mail.task.SolveMessageInconsistenciesService.Inconsistency
        public Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO cassandraMessageIdToImapUidDAO, CassandraMessageIdDAO cassandraMessageIdDAO) {
            ComposedMessageIdWithMetaData composedMessageId = this.messageFromImapUid.getComposedMessageId();
            return cassandraMessageIdDAO.updateMetadata(composedMessageId.getComposedMessageId(), UpdatedFlags.builder().oldFlags(new Flags()).newFlags(composedMessageId.getFlags()).modSeq(composedMessageId.getModSeq()).messageId(composedMessageId.getComposedMessageId().getMessageId()).uid(composedMessageId.getComposedMessageId().getUid()).build()).doOnSuccess(r5 -> {
                notifySuccess(context);
            }).thenReturn(Task.Result.COMPLETED).onErrorResume(th -> {
                notifyFailure(context);
                return Mono.just(Task.Result.PARTIAL);
            });
        }

        private void notifyFailure(Context context) {
            context.addErrors(this.messageFromMessageId.getComposedMessageId().getComposedMessageId());
            SolveMessageInconsistenciesService.LOGGER.error("Failed to fix inconsistency for outdated message in MessageId: {}", this.messageFromMessageId.getComposedMessageId());
        }

        private void notifySuccess(Context context) {
            SolveMessageInconsistenciesService.LOGGER.info("Inconsistency fixed for outdated message in MessageId: {}", this.messageFromMessageId.getComposedMessageId());
            context.incrementUpdatedMessageIdEntries();
            context.addFixedInconsistency(this.messageFromMessageId.getComposedMessageId().getComposedMessageId());
        }
    }

    /* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService$RunningOptions.class */
    public static class RunningOptions {
        public static final RunningOptions DEFAULT = new RunningOptions(100);
        private final int messagesPerSecond;

        public RunningOptions(int i) {
            Preconditions.checkArgument(i > 0, "'messagesPerSecond' must be strictly positive");
            this.messagesPerSecond = i;
        }

        public int getMessagesPerSecond() {
            return this.messagesPerSecond;
        }
    }

    @Inject
    SolveMessageInconsistenciesService(CassandraMessageIdToImapUidDAO cassandraMessageIdToImapUidDAO, CassandraMessageIdDAO cassandraMessageIdDAO) {
        this.messageIdToImapUidDAO = cassandraMessageIdToImapUidDAO;
        this.messageIdDAO = cassandraMessageIdDAO;
    }

    public Mono<Task.Result> fixMessageInconsistencies(Context context, RunningOptions runningOptions) {
        return Flux.concat(new Publisher[]{fixInconsistenciesInMessageId(context, runningOptions), fixInconsistenciesInImapUid(context, runningOptions)}).reduce(Task.Result.COMPLETED, Task::combine);
    }

    private Flux<Task.Result> fixInconsistenciesInImapUid(Context context, RunningOptions runningOptions) {
        return this.messageIdToImapUidDAO.retrieveAllMessages().transform(ReactorUtils.throttle().elements(runningOptions.getMessagesPerSecond()).per(PERIOD).forOperation(cassandraMessageMetadata -> {
            return detectInconsistencyInImapUid(cassandraMessageMetadata).doOnNext(inconsistency -> {
                context.incrementProcessedImapUidEntries();
            }).flatMap(inconsistency2 -> {
                return inconsistency2.fix(context, this.messageIdToImapUidDAO, this.messageIdDAO);
            });
        }));
    }

    private Mono<Inconsistency> detectInconsistencyInImapUid(CassandraMessageMetadata cassandraMessageMetadata) {
        return compareWithMessageIdRecord(cassandraMessageMetadata).onErrorResume(th -> {
            return Mono.just(new FailedToRetrieveRecord(cassandraMessageMetadata));
        });
    }

    private Mono<Inconsistency> compareWithMessageIdRecord(CassandraMessageMetadata cassandraMessageMetadata) {
        ComposedMessageId composedMessageId = cassandraMessageMetadata.getComposedMessageId().getComposedMessageId();
        CassandraId cassandraId = (CassandraId) composedMessageId.getMailboxId();
        MessageUid uid = composedMessageId.getUid();
        CassandraMessageId cassandraMessageId = (CassandraMessageId) composedMessageId.getMessageId();
        return this.messageIdDAO.retrieve(cassandraId, uid).handle(ReactorUtils.publishIfPresent()).flatMap(cassandraMessageMetadata2 -> {
            return cassandraMessageMetadata2.equals(cassandraMessageMetadata) ? Mono.just(NO_INCONSISTENCY) : detectOutdatedMessageIdEntry(cassandraId, cassandraMessageId, cassandraMessageMetadata2);
        }).switchIfEmpty(detectOrphanImapUidEntry(cassandraId, cassandraMessageId));
    }

    private Mono<Inconsistency> detectOutdatedMessageIdEntry(CassandraId cassandraId, CassandraMessageId cassandraMessageId, CassandraMessageMetadata cassandraMessageMetadata) {
        return this.messageIdToImapUidDAO.retrieve(cassandraMessageId, Optional.of(cassandraId), JamesExecutionProfiles.ConsistencyChoice.STRONG).filter(Predicate.not(Predicate.isEqual(cassandraMessageMetadata))).map(cassandraMessageMetadata2 -> {
            return new OutdatedMessageIdEntry(cassandraMessageMetadata, cassandraMessageMetadata2);
        }).next().switchIfEmpty(Mono.just(NO_INCONSISTENCY));
    }

    private Mono<Inconsistency> detectOrphanImapUidEntry(CassandraId cassandraId, CassandraMessageId cassandraMessageId) {
        return this.messageIdToImapUidDAO.retrieve(cassandraMessageId, Optional.of(cassandraId), JamesExecutionProfiles.ConsistencyChoice.STRONG).next().map(OrphanImapUidEntry::new).switchIfEmpty(Mono.just(NO_INCONSISTENCY));
    }

    private Flux<Task.Result> fixInconsistenciesInMessageId(Context context, RunningOptions runningOptions) {
        return this.messageIdDAO.retrieveAllMessages().transform(ReactorUtils.throttle().elements(runningOptions.getMessagesPerSecond()).per(PERIOD).forOperation(cassandraMessageMetadata -> {
            return detectInconsistencyInMessageId(cassandraMessageMetadata).doOnNext(inconsistency -> {
                context.incrementMessageIdEntries();
            }).flatMap(inconsistency2 -> {
                return inconsistency2.fix(context, this.messageIdToImapUidDAO, this.messageIdDAO);
            });
        }));
    }

    private Mono<Inconsistency> detectInconsistencyInMessageId(CassandraMessageMetadata cassandraMessageMetadata) {
        return this.messageIdToImapUidDAO.retrieve((CassandraMessageId) cassandraMessageMetadata.getComposedMessageId().getComposedMessageId().getMessageId(), Optional.of((CassandraId) cassandraMessageMetadata.getComposedMessageId().getComposedMessageId().getMailboxId()), JamesExecutionProfiles.ConsistencyChoice.STRONG).map(cassandraMessageMetadata2 -> {
            return NO_INCONSISTENCY;
        }).next().switchIfEmpty(detectOrphanMessageIdEntry(cassandraMessageMetadata)).onErrorResume(th -> {
            return Mono.just(new FailedToRetrieveRecord(cassandraMessageMetadata));
        });
    }

    private Mono<Inconsistency> detectOrphanMessageIdEntry(CassandraMessageMetadata cassandraMessageMetadata) {
        return this.messageIdDAO.retrieve((CassandraId) cassandraMessageMetadata.getComposedMessageId().getComposedMessageId().getMailboxId(), cassandraMessageMetadata.getComposedMessageId().getComposedMessageId().getUid()).handle(ReactorUtils.publishIfPresent()).map(OrphanMessageIdEntry::new).switchIfEmpty(Mono.just(NO_INCONSISTENCY));
    }
}
