package org.apache.james.imapserver.netty;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.james.imap.api.ImapMessage;
import org.apache.james.metrics.api.GaugeRegistry;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:org/apache/james/imapserver/netty/ReactiveThrottler.class */
public class ReactiveThrottler {
    private final int maxConcurrentRequests;
    private final int maxQueueSize;
    private final AtomicInteger concurrentRequests = new AtomicInteger(0);
    private final Queue<Publisher<Void>> queue = new ConcurrentLinkedQueue();

    /* loaded from: input_file:org/apache/james/imapserver/netty/ReactiveThrottler$RejectedException.class */
    public static class RejectedException extends RuntimeException {
        private final ImapMessage imapMessage;

        public RejectedException(String str, ImapMessage imapMessage) {
            super(str);
            this.imapMessage = imapMessage;
        }

        public ImapMessage getImapMessage() {
            return this.imapMessage;
        }
    }

    public ReactiveThrottler(GaugeRegistry gaugeRegistry, int i, int i2) {
        gaugeRegistry.register("imap.request.queue.size", () -> {
            return Integer.valueOf(Math.max(this.concurrentRequests.get() - i, 0));
        });
        this.maxConcurrentRequests = i;
        this.maxQueueSize = i2;
    }

    public Mono<Void> throttle(Publisher<Void> publisher, ImapMessage imapMessage) {
        if (this.maxConcurrentRequests < 0) {
            return Mono.from(publisher);
        }
        int incrementAndGet = this.concurrentRequests.incrementAndGet();
        if (incrementAndGet <= this.maxConcurrentRequests) {
            return Mono.from(publisher).doFinally(signalType -> {
                onRequestDone();
            });
        }
        if (incrementAndGet > this.maxQueueSize + this.maxConcurrentRequests) {
            this.concurrentRequests.decrementAndGet();
            return Mono.error(new RejectedException(String.format("The IMAP server has reached its maximum capacity (concurrent requests: %d, queue size: %d)", Integer.valueOf(this.maxConcurrentRequests), Integer.valueOf(this.maxQueueSize)), imapMessage));
        }
        Sinks.One one = Sinks.one();
        this.queue.add(Mono.from(publisher).then(Mono.fromRunnable(() -> {
            one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
        })));
        return one.asMono();
    }

    private void onRequestDone() {
        this.concurrentRequests.decrementAndGet();
        Publisher<Void> poll = this.queue.poll();
        if (poll != null) {
            Mono.from(poll).doFinally(signalType -> {
                onRequestDone();
            }).subscribe();
        }
    }
}
