package org.apache.james.events;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.stream.IntStream;
import org.apache.james.events.EventBusContract;
import org.apache.james.events.EventBusTestFixture;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/events/EventBusConcurrentTestContract.class */
public interface EventBusConcurrentTestContract {
    public static final int THREAD_COUNT = 10;
    public static final int OPERATION_COUNT = 30;
    public static final int TOTAL_DISPATCH_OPERATIONS = 300;
    public static final Duration FIVE_SECONDS = Duration.ofSeconds(5);
    public static final ConditionFactory AWAIT_CONDITION = Awaitility.await().timeout(Duration.ofSeconds(5));
    public static final Set<RegistrationKey> ALL_KEYS = ImmutableSet.of(EventBusTestFixture.KEY_1, EventBusTestFixture.KEY_2, EventBusTestFixture.KEY_3);

    /* loaded from: input_file:org/apache/james/events/EventBusConcurrentTestContract$MultiEventBusConcurrentContract.class */
    public interface MultiEventBusConcurrentContract extends EventBusContract.MultipleEventBusContract {
        EventBus eventBus3();

        @Test
        default void concurrentDispatchGroupShouldDeliverAllEventsToListenersWithMultipleEventBus() throws Exception {
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener = EventBusConcurrentTestContract.newCountingListener();
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener2 = EventBusConcurrentTestContract.newCountingListener();
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener3 = EventBusConcurrentTestContract.newCountingListener();
            eventBus().register(newCountingListener, new EventBusTestFixture.GroupA());
            eventBus().register(newCountingListener2, new EventBusTestFixture.GroupB());
            eventBus().register(newCountingListener3, new EventBusTestFixture.GroupC());
            eventBus2().register(newCountingListener, new EventBusTestFixture.GroupA());
            eventBus2().register(newCountingListener2, new EventBusTestFixture.GroupB());
            eventBus2().register(newCountingListener3, new EventBusTestFixture.GroupC());
            int i = 3;
            ConcurrentTestRunner.builder().reactorOperation((i2, i3) -> {
                return eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS);
            }).threadCount(10).operationCount(30).runSuccessfullyWithin(EventBusConcurrentTestContract.FIVE_SECONDS);
            EventBusConcurrentTestContract.AWAIT_CONDITION.untilAsserted(() -> {
                Assertions.assertThat(EventBusConcurrentTestContract.totalEventsReceived(ImmutableList.of(newCountingListener, newCountingListener2, newCountingListener3))).isEqualTo(i * EventBusConcurrentTestContract.TOTAL_DISPATCH_OPERATIONS);
            });
        }

        @Test
        default void concurrentDispatchKeyShouldDeliverAllEventsToListenersWithMultipleEventBus() throws Exception {
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener = EventBusConcurrentTestContract.newCountingListener();
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener2 = EventBusConcurrentTestContract.newCountingListener();
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener3 = EventBusConcurrentTestContract.newCountingListener();
            Mono.from(eventBus().register(newCountingListener, EventBusTestFixture.KEY_1)).block();
            Mono.from(eventBus().register(newCountingListener2, EventBusTestFixture.KEY_2)).block();
            Mono.from(eventBus().register(newCountingListener3, EventBusTestFixture.KEY_3)).block();
            Mono.from(eventBus2().register(newCountingListener, EventBusTestFixture.KEY_1)).block();
            Mono.from(eventBus2().register(newCountingListener2, EventBusTestFixture.KEY_2)).block();
            Mono.from(eventBus2().register(newCountingListener3, EventBusTestFixture.KEY_3)).block();
            int i = 3;
            int i2 = 2;
            ConcurrentTestRunner.builder().reactorOperation((i3, i4) -> {
                return eventBus().dispatch(EventBusTestFixture.EVENT, EventBusConcurrentTestContract.ALL_KEYS);
            }).threadCount(10).operationCount(30).runSuccessfullyWithin(EventBusConcurrentTestContract.FIVE_SECONDS);
            EventBusConcurrentTestContract.AWAIT_CONDITION.untilAsserted(() -> {
                Assertions.assertThat(EventBusConcurrentTestContract.totalEventsReceived(ImmutableList.of(newCountingListener, newCountingListener2, newCountingListener3))).isEqualTo(i * i2 * EventBusConcurrentTestContract.TOTAL_DISPATCH_OPERATIONS);
            });
        }

        @Test
        default void concurrentDispatchShouldDeliverAllEventsToListenersWithMultipleEventBus() throws Exception {
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener = EventBusConcurrentTestContract.newCountingListener();
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener2 = EventBusConcurrentTestContract.newCountingListener();
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener3 = EventBusConcurrentTestContract.newCountingListener();
            eventBus2().register(newCountingListener, EventDeadLettersContract.GROUP_A);
            eventBus2().register(newCountingListener2, new EventBusTestFixture.GroupB());
            eventBus2().register(newCountingListener3, new EventBusTestFixture.GroupC());
            int i = 3 * EventBusConcurrentTestContract.TOTAL_DISPATCH_OPERATIONS;
            Mono.from(eventBus().register(newCountingListener, EventBusTestFixture.KEY_1)).block();
            Mono.from(eventBus().register(newCountingListener2, EventBusTestFixture.KEY_2)).block();
            Mono.from(eventBus2().register(newCountingListener, EventBusTestFixture.KEY_1)).block();
            Mono.from(eventBus2().register(newCountingListener2, EventBusTestFixture.KEY_2)).block();
            Mono.from(eventBus3().register(newCountingListener3, EventBusTestFixture.KEY_1)).block();
            Mono.from(eventBus3().register(newCountingListener3, EventBusTestFixture.KEY_2)).block();
            int i2 = 2 * 3 * EventBusConcurrentTestContract.TOTAL_DISPATCH_OPERATIONS;
            ConcurrentTestRunner.builder().reactorOperation((i3, i4) -> {
                return eventBus().dispatch(EventBusTestFixture.EVENT, EventBusConcurrentTestContract.ALL_KEYS);
            }).threadCount(10).operationCount(30).runSuccessfullyWithin(EventBusConcurrentTestContract.FIVE_SECONDS);
            EventBusConcurrentTestContract.AWAIT_CONDITION.untilAsserted(() -> {
                Assertions.assertThat(EventBusConcurrentTestContract.totalEventsReceived(ImmutableList.of(newCountingListener, newCountingListener2, newCountingListener3))).isEqualTo(i + i2);
            });
        }
    }

    /* loaded from: input_file:org/apache/james/events/EventBusConcurrentTestContract$SingleEventBusConcurrentContract.class */
    public interface SingleEventBusConcurrentContract extends EventBusContract {
        @Test
        default void concurrentDispatchGroupShouldDeliverAllEventsToListenersWithSingleEventBus() throws Exception {
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener = EventBusConcurrentTestContract.newCountingListener();
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener2 = EventBusConcurrentTestContract.newCountingListener();
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener3 = EventBusConcurrentTestContract.newCountingListener();
            eventBus().register(newCountingListener, new EventBusTestFixture.GroupA());
            eventBus().register(newCountingListener2, new EventBusTestFixture.GroupB());
            eventBus().register(newCountingListener3, new EventBusTestFixture.GroupC());
            int i = 3;
            ConcurrentTestRunner.builder().reactorOperation((i2, i3) -> {
                return eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS);
            }).threadCount(10).operationCount(30).runSuccessfullyWithin(EventBusConcurrentTestContract.FIVE_SECONDS);
            EventBusConcurrentTestContract.AWAIT_CONDITION.untilAsserted(() -> {
                Assertions.assertThat(EventBusConcurrentTestContract.totalEventsReceived(ImmutableList.of(newCountingListener, newCountingListener2, newCountingListener3))).isEqualTo(i * EventBusConcurrentTestContract.TOTAL_DISPATCH_OPERATIONS);
            });
        }

        @Test
        default void concurrentDispatchKeyShouldDeliverAllEventsToListenersWithSingleEventBus() throws Exception {
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener = EventBusConcurrentTestContract.newCountingListener();
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener2 = EventBusConcurrentTestContract.newCountingListener();
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener3 = EventBusConcurrentTestContract.newCountingListener();
            Mono.from(eventBus().register(newCountingListener, EventBusTestFixture.KEY_1)).block();
            Mono.from(eventBus().register(newCountingListener2, EventBusTestFixture.KEY_2)).block();
            Mono.from(eventBus().register(newCountingListener3, EventBusTestFixture.KEY_3)).block();
            int i = 3;
            int i2 = 1;
            ConcurrentTestRunner.builder().reactorOperation((i3, i4) -> {
                return eventBus().dispatch(EventBusTestFixture.EVENT, EventBusConcurrentTestContract.ALL_KEYS);
            }).threadCount(10).operationCount(30).runSuccessfullyWithin(EventBusConcurrentTestContract.FIVE_SECONDS);
            EventBusConcurrentTestContract.AWAIT_CONDITION.untilAsserted(() -> {
                Assertions.assertThat(EventBusConcurrentTestContract.totalEventsReceived(ImmutableList.of(newCountingListener, newCountingListener2, newCountingListener3))).isEqualTo(i * i2 * EventBusConcurrentTestContract.TOTAL_DISPATCH_OPERATIONS);
            });
        }

        @Test
        default void concurrentDispatchShouldDeliverAllEventsToListenersWithSingleEventBus() throws Exception {
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener = EventBusConcurrentTestContract.newCountingListener();
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener2 = EventBusConcurrentTestContract.newCountingListener();
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener3 = EventBusConcurrentTestContract.newCountingListener();
            eventBus().register(newCountingListener, new EventBusTestFixture.GroupA());
            eventBus().register(newCountingListener2, new EventBusTestFixture.GroupB());
            eventBus().register(newCountingListener3, new EventBusTestFixture.GroupC());
            int i = 3 * EventBusConcurrentTestContract.TOTAL_DISPATCH_OPERATIONS;
            Mono.from(eventBus().register(newCountingListener, EventBusTestFixture.KEY_1)).block();
            Mono.from(eventBus().register(newCountingListener2, EventBusTestFixture.KEY_2)).block();
            Mono.from(eventBus().register(newCountingListener3, EventBusTestFixture.KEY_3)).block();
            int i2 = 3 * EventBusConcurrentTestContract.TOTAL_DISPATCH_OPERATIONS;
            ConcurrentTestRunner.builder().reactorOperation((i3, i4) -> {
                return eventBus().dispatch(EventBusTestFixture.EVENT, EventBusConcurrentTestContract.ALL_KEYS);
            }).threadCount(10).operationCount(30).runSuccessfullyWithin(EventBusConcurrentTestContract.FIVE_SECONDS);
            EventBusConcurrentTestContract.AWAIT_CONDITION.untilAsserted(() -> {
                Assertions.assertThat(EventBusConcurrentTestContract.totalEventsReceived(ImmutableList.of(newCountingListener, newCountingListener2, newCountingListener3))).isEqualTo(i + i2);
            });
        }

        @Test
        default void concurrentRegisterThenDispatchShouldDeliverAllEventsToAtLeastOneListenerWithSingleEventBus() throws Exception {
            List list = IntStream.range(0, EventBusConcurrentTestContract.TOTAL_DISPATCH_OPERATIONS).mapToObj(i -> {
                return EventBusConcurrentTestContract.newCountingListener();
            }).toList();
            ConcurrentTestRunner.builder().reactorOperation((i2, i3) -> {
                return Mono.from(eventBus().register((EventListener) list.get((i2 * 30) + i3), EventBusTestFixture.KEY_1)).then(eventBus().dispatch(EventBusTestFixture.EVENT, EventBusConcurrentTestContract.ALL_KEYS));
            }).threadCount(10).operationCount(30).runSuccessfullyWithin(EventBusConcurrentTestContract.FIVE_SECONDS);
            EventBusConcurrentTestContract.AWAIT_CONDITION.untilAsserted(() -> {
                Assertions.assertThat(list.stream().mapToInt((v0) -> {
                    return v0.numberOfEventCalls();
                }).max().getAsInt()).isEqualTo(EventBusConcurrentTestContract.TOTAL_DISPATCH_OPERATIONS);
            });
        }
    }

    static EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener() {
        return new EventBusTestFixture.EventListenerCountingSuccessfulExecution();
    }

    static int totalEventsReceived(ImmutableList<EventBusTestFixture.EventListenerCountingSuccessfulExecution> immutableList) {
        return immutableList.stream().mapToInt((v0) -> {
            return v0.numberOfEventCalls();
        }).sum();
    }
}
