package org.apache.james.rspamd.task;

import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.core.Username;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MessageIdManager;
import org.apache.james.mailbox.model.MessageResult;
import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
import org.apache.james.rspamd.client.RspamdClientConfiguration;
import org.apache.james.rspamd.client.RspamdHttpClient;
import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskType;
import org.apache.james.user.api.UsersRepository;
import org.apache.james.util.ReactorUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/rspamd/task/FeedHamToRspamdTask.class */
public class FeedHamToRspamdTask implements Task {
    public static final TaskType TASK_TYPE = TaskType.of("FeedHamToRspamdTask");
    private final GetMailboxMessagesService messagesService;
    private final RspamdHttpClient rspamdHttpClient;
    private final RspamdClientConfiguration configuration;
    private final RunningOptions runningOptions;
    private final Context context = new Context();
    private final Clock clock;

    /* loaded from: input_file:org/apache/james/rspamd/task/FeedHamToRspamdTask$AdditionalInformation.class */
    public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
        private final Instant timestamp;
        private final long hamMessageCount;
        private final long reportedHamMessageCount;
        private final long errorCount;
        private final RunningOptions runningOptions;

        private static AdditionalInformation from(Context context, RunningOptions runningOptions) {
            Context.Snapshot snapshot = context.snapshot();
            return new AdditionalInformation(Clock.systemUTC().instant(), snapshot.getHamMessageCount(), snapshot.getReportedHamMessageCount(), snapshot.getErrorCount(), runningOptions);
        }

        public AdditionalInformation(Instant instant, long j, long j2, long j3, RunningOptions runningOptions) {
            this.timestamp = instant;
            this.hamMessageCount = j;
            this.reportedHamMessageCount = j2;
            this.errorCount = j3;
            this.runningOptions = runningOptions;
        }

        public long getHamMessageCount() {
            return this.hamMessageCount;
        }

        public long getReportedHamMessageCount() {
            return this.reportedHamMessageCount;
        }

        public long getErrorCount() {
            return this.errorCount;
        }

        public RunningOptions getRunningOptions() {
            return this.runningOptions;
        }

        public Instant timestamp() {
            return this.timestamp;
        }

        public final boolean equals(Object obj) {
            if (!(obj instanceof AdditionalInformation)) {
                return false;
            }
            AdditionalInformation additionalInformation = (AdditionalInformation) obj;
            return Objects.equals(Long.valueOf(this.hamMessageCount), Long.valueOf(additionalInformation.hamMessageCount)) && Objects.equals(Long.valueOf(this.reportedHamMessageCount), Long.valueOf(additionalInformation.reportedHamMessageCount)) && Objects.equals(Long.valueOf(this.errorCount), Long.valueOf(additionalInformation.errorCount)) && Objects.equals(this.timestamp, additionalInformation.timestamp) && Objects.equals(this.runningOptions, additionalInformation.runningOptions);
        }

        public final int hashCode() {
            return Objects.hash(this.timestamp, Long.valueOf(this.hamMessageCount), Long.valueOf(this.reportedHamMessageCount), Long.valueOf(this.errorCount), this.runningOptions);
        }
    }

    /* loaded from: input_file:org/apache/james/rspamd/task/FeedHamToRspamdTask$Context.class */
    public static class Context {
        private final AtomicLong hamMessageCount = new AtomicLong();
        private final AtomicLong reportedHamMessageCount = new AtomicLong();
        private final AtomicLong errorCount = new AtomicLong();

        /* loaded from: input_file:org/apache/james/rspamd/task/FeedHamToRspamdTask$Context$Snapshot.class */
        public static class Snapshot {
            private final long hamMessageCount;
            private final long reportedHamMessageCount;
            private final long errorCount;

            /* JADX INFO: Access modifiers changed from: package-private */
            /* loaded from: input_file:org/apache/james/rspamd/task/FeedHamToRspamdTask$Context$Snapshot$Builder.class */
            public static class Builder {
                private Optional<Long> hamMessageCount = Optional.empty();
                private Optional<Long> reportedHamMessageCount = Optional.empty();
                private Optional<Long> errorCount = Optional.empty();

                Builder() {
                }

                public Snapshot build() {
                    return new Snapshot(this.hamMessageCount.orElse(0L).longValue(), this.reportedHamMessageCount.orElse(0L).longValue(), this.errorCount.orElse(0L).longValue());
                }

                public Builder hamMessageCount(long j) {
                    this.hamMessageCount = Optional.of(Long.valueOf(j));
                    return this;
                }

                public Builder reportedHamMessageCount(long j) {
                    this.reportedHamMessageCount = Optional.of(Long.valueOf(j));
                    return this;
                }

                public Builder errorCount(long j) {
                    this.errorCount = Optional.of(Long.valueOf(j));
                    return this;
                }
            }

            public static Builder builder() {
                return new Builder();
            }

            public Snapshot(long j, long j2, long j3) {
                this.hamMessageCount = j;
                this.reportedHamMessageCount = j2;
                this.errorCount = j3;
            }

            public long getHamMessageCount() {
                return this.hamMessageCount;
            }

            public long getReportedHamMessageCount() {
                return this.reportedHamMessageCount;
            }

            public long getErrorCount() {
                return this.errorCount;
            }

            public final boolean equals(Object obj) {
                if (!(obj instanceof Snapshot)) {
                    return false;
                }
                Snapshot snapshot = (Snapshot) obj;
                return Objects.equals(Long.valueOf(this.hamMessageCount), Long.valueOf(snapshot.hamMessageCount)) && Objects.equals(Long.valueOf(this.reportedHamMessageCount), Long.valueOf(snapshot.reportedHamMessageCount)) && Objects.equals(Long.valueOf(this.errorCount), Long.valueOf(snapshot.errorCount));
            }

            public final int hashCode() {
                return Objects.hash(Long.valueOf(this.hamMessageCount), Long.valueOf(this.reportedHamMessageCount), Long.valueOf(this.errorCount));
            }

            public String toString() {
                return MoreObjects.toStringHelper(this).add("hamMessageCount", this.hamMessageCount).add("reportedHamMessageCount", this.reportedHamMessageCount).add("errorCount", this.errorCount).toString();
            }
        }

        public void incrementHamMessageCount() {
            this.hamMessageCount.incrementAndGet();
        }

        public void incrementReportedHamMessageCount(int i) {
            this.reportedHamMessageCount.addAndGet(i);
        }

        public void incrementErrorCount() {
            this.errorCount.incrementAndGet();
        }

        public Snapshot snapshot() {
            return Snapshot.builder().hamMessageCount(this.hamMessageCount.get()).reportedHamMessageCount(this.reportedHamMessageCount.get()).errorCount(this.errorCount.get()).build();
        }
    }

    public FeedHamToRspamdTask(MailboxManager mailboxManager, UsersRepository usersRepository, MessageIdManager messageIdManager, MailboxSessionMapperFactory mailboxSessionMapperFactory, RspamdHttpClient rspamdHttpClient, RunningOptions runningOptions, Clock clock, RspamdClientConfiguration rspamdClientConfiguration) {
        this.runningOptions = runningOptions;
        this.messagesService = new GetMailboxMessagesService(mailboxManager, usersRepository, mailboxSessionMapperFactory, messageIdManager);
        this.rspamdHttpClient = rspamdHttpClient;
        this.clock = clock;
        this.configuration = rspamdClientConfiguration;
    }

    public Task.Result run() {
        return (Task.Result) this.messagesService.getHamMessagesOfAllUser(this.runningOptions.getPeriodInSecond().map(l -> {
            return Date.from(this.clock.instant().minusSeconds(l.longValue()));
        }), this.runningOptions, this.context).transform(ReactorUtils.throttle().elements(this.runningOptions.getMessagesPerSecond()).per(Duration.ofSeconds(1L)).forOperation(pair -> {
            return reportHam(pair).timeout(this.runningOptions.getRspamdTimeout()).then(Mono.fromCallable(() -> {
                this.context.incrementReportedHamMessageCount(1);
                return Task.Result.COMPLETED;
            })).onErrorResume(th -> {
                LOGGER.error("Error when report ham message to Rspamd", th);
                this.context.incrementErrorCount();
                return Mono.just(Task.Result.PARTIAL);
            });
        })).reduce(Task::combine).switchIfEmpty(Mono.just(Task.Result.COMPLETED)).block();
    }

    public TaskType type() {
        return TASK_TYPE;
    }

    public Optional<TaskExecutionDetails.AdditionalInformation> details() {
        return Optional.of(AdditionalInformation.from(this.context, this.runningOptions));
    }

    @VisibleForTesting
    public Context.Snapshot snapshot() {
        return this.context.snapshot();
    }

    public RunningOptions getRunningOptions() {
        return this.runningOptions;
    }

    private Mono<Void> reportHam(Pair<Username, MessageResult> pair) {
        return this.configuration.usePerUserBayes() ? this.rspamdHttpClient.reportAsHam((Publisher) Throwing.supplier(() -> {
            return ((MessageResult) pair.getRight()).getFullContent().reactiveBytes();
        }).get(), RspamdHttpClient.Options.forUser((Username) pair.getLeft())) : this.rspamdHttpClient.reportAsHam((Publisher) Throwing.supplier(() -> {
            return ((MessageResult) pair.getRight()).getFullContent().reactiveBytes();
        }).get());
    }
}
