package org.apache.james.mailbox.quota.task;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.james.core.Username;
import org.apache.james.core.quota.QuotaComponent;
import org.apache.james.task.Task;
import org.apache.james.user.api.UsersRepository;
import org.apache.james.user.api.UsersRepositoryException;
import org.apache.james.util.ReactorUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.class */
public class RecomputeCurrentQuotasService {
    private static final Logger LOGGER = LoggerFactory.getLogger(RecomputeCurrentQuotasService.class);
    private final UsersRepository usersRepository;
    private final Map<QuotaComponent, RecomputeSingleComponentCurrentQuotasService> recomputeSingleComponentCurrentQuotasServiceMap;

    /* loaded from: input_file:org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService$Context.class */
    public static class Context {
        private Map<QuotaComponent, Statistic> mapQuotaComponentToStatistic;

        /* loaded from: input_file:org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService$Context$Snapshot.class */
        static class Snapshot {
            private final List<RecomputeSingleQuotaComponentResult> recomputeSingleQuotaComponentResults;

            private Snapshot(List<RecomputeSingleQuotaComponentResult> list) {
                this.recomputeSingleQuotaComponentResults = list;
            }

            public List<RecomputeSingleQuotaComponentResult> getResults() {
                return this.recomputeSingleQuotaComponentResults;
            }

            public final boolean equals(Object obj) {
                if (obj instanceof Snapshot) {
                    return Objects.equals(this.recomputeSingleQuotaComponentResults, ((Snapshot) obj).recomputeSingleQuotaComponentResults);
                }
                return false;
            }

            public final int hashCode() {
                return Objects.hash(this.recomputeSingleQuotaComponentResults);
            }

            public String toString() {
                return MoreObjects.toStringHelper(this).add("results", this.recomputeSingleQuotaComponentResults).toString();
            }
        }

        /* loaded from: input_file:org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService$Context$Statistic.class */
        public static class Statistic {
            private final AtomicLong processedIdentifierCount;
            private final ConcurrentLinkedDeque<String> failedIdentifiers;

            public Statistic(AtomicLong atomicLong, ConcurrentLinkedDeque<String> concurrentLinkedDeque) {
                this.processedIdentifierCount = atomicLong;
                this.failedIdentifiers = concurrentLinkedDeque;
            }

            public Statistic(long j, Collection<String> collection) {
                this.processedIdentifierCount = new AtomicLong(j);
                this.failedIdentifiers = new ConcurrentLinkedDeque<>(collection);
            }

            public void incrementProcessed() {
                this.processedIdentifierCount.incrementAndGet();
            }

            public void addToFailedIdentifiers(String str) {
                this.failedIdentifiers.add(str);
            }
        }

        public Context() {
            this.mapQuotaComponentToStatistic = new ConcurrentHashMap();
        }

        public Context(Map<QuotaComponent, Statistic> map) {
            this.mapQuotaComponentToStatistic = new ConcurrentHashMap(map);
        }

        public Statistic getStatistic(QuotaComponent quotaComponent) {
            return this.mapQuotaComponentToStatistic.computeIfAbsent(quotaComponent, quotaComponent2 -> {
                return new Statistic(new AtomicLong(), (ConcurrentLinkedDeque<String>) new ConcurrentLinkedDeque());
            });
        }

        public Snapshot snapshot() {
            return new Snapshot((List) this.mapQuotaComponentToStatistic.entrySet().stream().map(entry -> {
                return new RecomputeSingleQuotaComponentResult(((QuotaComponent) entry.getKey()).getValue(), ((Statistic) entry.getValue()).processedIdentifierCount.get(), ImmutableList.copyOf(((Statistic) entry.getValue()).failedIdentifiers));
            }).collect(Collectors.toUnmodifiableList()));
        }
    }

    /* loaded from: input_file:org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService$RunningOptions.class */
    public static class RunningOptions {
        public static final int DEFAULT_USERS_PER_SECOND = 1;
        public static final RunningOptions DEFAULT = of(1, ImmutableList.of());
        private final int usersPerSecond;
        private final List<QuotaComponent> quotaComponents;

        public static RunningOptions of(int i, List<QuotaComponent> list) {
            return new RunningOptions(i, list);
        }

        public static RunningOptions withUsersPerSecond(int i) {
            return new RunningOptions(i, ImmutableList.of());
        }

        private RunningOptions(int i, List<QuotaComponent> list) {
            Preconditions.checkArgument(i > 0, "'usersPerSecond' needs to be strictly positive");
            this.usersPerSecond = i;
            this.quotaComponents = list;
        }

        public int getUsersPerSecond() {
            return this.usersPerSecond;
        }

        public List<QuotaComponent> getQuotaComponents() {
            return this.quotaComponents;
        }
    }

    @Inject
    public RecomputeCurrentQuotasService(UsersRepository usersRepository, Set<RecomputeSingleComponentCurrentQuotasService> set) {
        this.usersRepository = usersRepository;
        this.recomputeSingleComponentCurrentQuotasServiceMap = (Map) set.stream().collect(Collectors.toUnmodifiableMap(recomputeSingleComponentCurrentQuotasService -> {
            return recomputeSingleComponentCurrentQuotasService.getQuotaComponent();
        }, recomputeSingleComponentCurrentQuotasService2 -> {
            return recomputeSingleComponentCurrentQuotasService2;
        }));
    }

    public Mono<Task.Result> recomputeCurrentQuotas(Context context, RunningOptions runningOptions) {
        return Flux.from(this.usersRepository.listReactive()).transform(ReactorUtils.throttle().elements(runningOptions.getUsersPerSecond()).per(Duration.ofSeconds(1L)).forOperation(username -> {
            return recomputeQuotasOfUser(runningOptions.getQuotaComponents(), context, username);
        })).reduce(Task.Result.COMPLETED, Task::combine).onErrorResume(UsersRepositoryException.class, usersRepositoryException -> {
            LOGGER.error("Error while accessing users from repository", usersRepositoryException);
            return Mono.just(Task.Result.PARTIAL);
        });
    }

    private Mono<Task.Result> recomputeQuotasOfUser(List<QuotaComponent> list, Context context, Username username) {
        return list.isEmpty() ? Flux.merge((Iterable) this.recomputeSingleComponentCurrentQuotasServiceMap.values().stream().map(recomputeSingleComponentCurrentQuotasService -> {
            return recomputeCurrentQuotas(recomputeSingleComponentCurrentQuotasService, context, username);
        }).collect(Collectors.toUnmodifiableList())).reduce(Task.Result.COMPLETED, Task::combine) : Flux.fromIterable(list).flatMap(quotaComponent -> {
            return (Publisher) Optional.ofNullable(this.recomputeSingleComponentCurrentQuotasServiceMap.get(quotaComponent)).map(recomputeSingleComponentCurrentQuotasService2 -> {
                return recomputeCurrentQuotas(recomputeSingleComponentCurrentQuotasService2, context, username);
            }).orElse(Mono.just(Task.Result.PARTIAL));
        }).reduce(Task.Result.COMPLETED, Task::combine);
    }

    public Mono<Task.Result> recomputeCurrentQuotas(RecomputeSingleComponentCurrentQuotasService recomputeSingleComponentCurrentQuotasService, Context context, Username username) {
        return recomputeSingleComponentCurrentQuotasService.recomputeCurrentQuotas(username).then(Mono.just(Task.Result.COMPLETED)).doOnNext(result -> {
            LOGGER.info("jmap current upload usage quota recomputed for {}", username);
            context.getStatistic(recomputeSingleComponentCurrentQuotasService.getQuotaComponent()).incrementProcessed();
        }).onErrorResume(th -> {
            LOGGER.error("Error while recomputing jmap current upload usage quota for {}", username, th);
            context.getStatistic(recomputeSingleComponentCurrentQuotasService.getQuotaComponent()).addToFailedIdentifiers(username.asString());
            return Mono.just(Task.Result.PARTIAL);
        });
    }
}
