package org.apache.james.imapserver.netty;

import com.github.fge.lambdas.Throwing;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.james.imap.api.ImapMessage;
import org.apache.james.imapserver.netty.ReactiveThrottler;
import org.apache.james.metrics.api.NoopGaugeRegistry;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/james/imapserver/netty/ReactiveThrottlerTest.class */
class ReactiveThrottlerTest {
    private static final ImapMessage NO_IMAP_MESSAGE = null;

    ReactiveThrottlerTest() {
    }

    @Test
    void throttleShouldExecuteSubmittedTasks() {
        ReactiveThrottler reactiveThrottler = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(Mono.fromRunnable(() -> {
            atomicBoolean.getAndSet(true);
        })), NO_IMAP_MESSAGE)).block();
        Assertions.assertThat(atomicBoolean.get()).isTrue();
    }

    @Test
    void throttleShouldNotExecuteQueuedTasksLogicRightAway() {
        ReactiveThrottler reactiveThrottler = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(200L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(200L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(() -> {
            atomicBoolean.getAndSet(true);
        }), NO_IMAP_MESSAGE)).subscribe();
        Assertions.assertThat(atomicBoolean.get()).isFalse();
    }

    @RepeatedTest(10)
    void shouldPropagateCancel() throws Exception {
        ReactiveThrottler reactiveThrottler = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 5);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Objects.requireNonNull(countDownLatch);
        Disposable subscribe = Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(Throwing.runnable(countDownLatch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe();
        Objects.requireNonNull(countDownLatch);
        Disposable subscribe2 = Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(Throwing.runnable(countDownLatch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe();
        Objects.requireNonNull(countDownLatch);
        Disposable subscribe3 = Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(Throwing.runnable(countDownLatch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe();
        Objects.requireNonNull(countDownLatch);
        Disposable subscribe4 = Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(Throwing.runnable(countDownLatch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe();
        Objects.requireNonNull(countDownLatch);
        Disposable subscribe5 = Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(Throwing.runnable(countDownLatch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe();
        Objects.requireNonNull(countDownLatch);
        Disposable subscribe6 = Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(Throwing.runnable(countDownLatch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe();
        Objects.requireNonNull(countDownLatch);
        Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(Throwing.runnable(countDownLatch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe().dispose();
        subscribe6.dispose();
        subscribe5.dispose();
        subscribe4.dispose();
        subscribe3.dispose();
        subscribe2.dispose();
        subscribe.dispose();
        Thread.sleep(200L);
        Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(() -> {
            atomicBoolean.getAndSet(true);
        }), NO_IMAP_MESSAGE)).block();
        Assertions.assertThat(atomicBoolean.get()).isTrue();
    }

    @RepeatedTest(10)
    void shouldPropagateCancelInReverseOrder() throws Exception {
        ReactiveThrottler reactiveThrottler = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 5);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Objects.requireNonNull(countDownLatch);
        Disposable subscribe = Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(Throwing.runnable(countDownLatch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe();
        Objects.requireNonNull(countDownLatch);
        Disposable subscribe2 = Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(Throwing.runnable(countDownLatch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe();
        Objects.requireNonNull(countDownLatch);
        Disposable subscribe3 = Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(Throwing.runnable(countDownLatch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe();
        Objects.requireNonNull(countDownLatch);
        Disposable subscribe4 = Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(Throwing.runnable(countDownLatch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe();
        Objects.requireNonNull(countDownLatch);
        Disposable subscribe5 = Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(Throwing.runnable(countDownLatch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe();
        Objects.requireNonNull(countDownLatch);
        Disposable subscribe6 = Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(Throwing.runnable(countDownLatch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe();
        Objects.requireNonNull(countDownLatch);
        Disposable subscribe7 = Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(Throwing.runnable(countDownLatch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe();
        subscribe.dispose();
        subscribe2.dispose();
        subscribe3.dispose();
        subscribe4.dispose();
        subscribe5.dispose();
        subscribe6.dispose();
        subscribe7.dispose();
        Thread.sleep(200L);
        Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(() -> {
            atomicBoolean.getAndSet(true);
        }), NO_IMAP_MESSAGE)).block();
        Assertions.assertThat(atomicBoolean.get()).isTrue();
    }

    @Test
    void throttleShouldEventuallyExecuteQueuedTasks() {
        ReactiveThrottler reactiveThrottler = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(() -> {
            atomicBoolean.getAndSet(true);
        }), NO_IMAP_MESSAGE)).subscribe();
        Awaitility.await().atMost(Duration.ofSeconds(10L)).untilAsserted(() -> {
            Assertions.assertThat(atomicBoolean.get()).isTrue();
        });
    }

    @Test
    void throttleShouldCompleteWhenSubmittedTaskCompletes() {
        ReactiveThrottler reactiveThrottler = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(() -> {
            atomicBoolean.getAndSet(true);
        }), NO_IMAP_MESSAGE)).block();
        Assertions.assertThat(atomicBoolean.get()).isTrue();
    }

    @Test
    void throttleShouldRejectTasksWhenTheQueueIsFull() {
        ReactiveThrottler reactiveThrottler = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Assertions.assertThatThrownBy(() -> {
            Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(() -> {
                atomicBoolean.getAndSet(true);
            }), NO_IMAP_MESSAGE)).block();
        }).isInstanceOf(ReactiveThrottler.RejectedException.class);
        Assertions.assertThat(atomicBoolean.get()).isFalse();
    }

    @Test
    void throttleShouldRecoverFromABurst() throws Exception {
        ReactiveThrottler reactiveThrottler = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Thread.sleep(500L);
        Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(() -> {
            atomicBoolean.getAndSet(true);
        }), NO_IMAP_MESSAGE)).block();
        Assertions.assertThat(atomicBoolean.get()).isTrue();
    }

    @Test
    void throttleShouldHandleDisposal() throws Exception {
        ReactiveThrottler reactiveThrottler = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Disposable subscribe = Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Disposable subscribe2 = Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Disposable subscribe3 = Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Disposable subscribe4 = Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Disposable subscribe5 = Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Disposable subscribe6 = Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Disposable subscribe7 = Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Disposable subscribe8 = Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).subscribe();
        subscribe.dispose();
        subscribe2.dispose();
        subscribe3.dispose();
        subscribe4.dispose();
        subscribe5.dispose();
        subscribe6.dispose();
        subscribe7.dispose();
        subscribe8.dispose();
        Thread.sleep(100L);
        Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(() -> {
            atomicBoolean.getAndSet(true);
        }), NO_IMAP_MESSAGE)).block();
        Assertions.assertThat(atomicBoolean.get()).isTrue();
    }

    @RepeatedTest(10)
    void throttleShouldBeConcurrentFriendly() throws Exception {
        ReactiveThrottler reactiveThrottler = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
        ConcurrentTestRunner.builder().operation((i, i2) -> {
            Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(50L)).then(), NO_IMAP_MESSAGE)).onErrorResume(ReactiveThrottler.RejectedException.class, rejectedException -> {
                return Mono.empty();
            }).block();
        }).threadCount(20).operationCount(5).runSuccessfullyWithin(Duration.ofSeconds(10L));
        Thread.sleep(100L);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Mono.from(reactiveThrottler.throttle(Mono.fromRunnable(() -> {
            atomicBoolean.getAndSet(true);
        }), NO_IMAP_MESSAGE)).block();
        Assertions.assertThat(atomicBoolean.get()).isTrue();
    }

    @Test
    void throttleShouldNotAwaitOtherTasks() throws Exception {
        ReactiveThrottler reactiveThrottler = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofMillis(100L)).then(), NO_IMAP_MESSAGE)).then(Mono.fromRunnable(() -> {
            atomicBoolean.getAndSet(true);
        })).subscribe();
        Mono.from(reactiveThrottler.throttle(Mono.delay(Duration.ofSeconds(2L)).then(), NO_IMAP_MESSAGE)).subscribe();
        Thread.sleep(200L);
        Assertions.assertThat(atomicBoolean.get()).isTrue();
    }

    @Test
    void throttleShouldNotExceedItsConcurrency() {
        ReactiveThrottler reactiveThrottler = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 2);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        Mono then = Mono.fromRunnable(() -> {
            concurrentLinkedDeque.add(Integer.valueOf(atomicInteger.incrementAndGet()));
        }).then(Mono.delay(Duration.ofMillis(50L))).then(Mono.fromRunnable(() -> {
            concurrentLinkedDeque.add(Integer.valueOf(atomicInteger.getAndDecrement()));
        }));
        Mono.from(reactiveThrottler.throttle(then, NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(then, NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(then, NO_IMAP_MESSAGE)).subscribe();
        Mono.from(reactiveThrottler.throttle(then, NO_IMAP_MESSAGE)).subscribe();
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(concurrentLinkedDeque.size()).isEqualTo(8);
        });
        Assertions.assertThat(concurrentLinkedDeque).allSatisfy(num -> {
            Assertions.assertThat(num).isBetween(0, 2);
        });
    }
}
