package org.apache.james.events;

import com.google.common.collect.ImmutableSet;
import com.rabbitmq.client.Delivery;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.james.backends.rabbitmq.QueueArguments;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.backends.rabbitmq.RabbitMQFixture;
import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.events.EventBusConcurrentTestContract;
import org.apache.james.events.EventBusContract;
import org.apache.james.events.EventBusTestFixture;
import org.apache.james.events.EventCollector;
import org.apache.james.events.EventListener;
import org.apache.james.events.GroupConsumerRetry;
import org.apache.james.events.GroupContract;
import org.apache.james.events.KeyContract;
import org.apache.james.events.RegistrationKey;
import org.apache.james.events.RoutingKeyConverter;
import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.ThrowingConsumer;
import org.assertj.core.data.Percentage;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.Sender;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/james/events/RabbitMQEventBusTest.class */
public class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, GroupContract.MultipleEventBusGroupContract, KeyContract.SingleEventBusKeyContract, KeyContract.MultipleEventBusKeyContract, ErrorHandlingContract {
    static EventBusName TEST_EVENT_BUS = new EventBusName("test");
    static NamingStrategy TEST_NAMING_STRATEGY = new NamingStrategy(TEST_EVENT_BUS);
    static DispatchingFailureGroup dispatchingFailureGroup = new DispatchingFailureGroup(TEST_EVENT_BUS);

    @RegisterExtension
    static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ().isolationPolicy(RabbitMQExtension.IsolationPolicy.WEAK);
    private RabbitMQEventBus eventBus;
    private RabbitMQEventBus eventBus2;
    private RabbitMQEventBus eventBus3;
    private RabbitMQEventBus eventBusWithKeyHandlerNotStarted;
    private EventSerializer eventSerializer;
    private RoutingKeyConverter routingKeyConverter;
    private MemoryEventDeadLetters memoryEventDeadLetters;

    @Nested
    /* loaded from: input_file:org/apache/james/events/RabbitMQEventBusTest$AtLeastOnceTest.class */
    class AtLeastOnceTest {
        AtLeastOnceTest() {
        }

        @Test
        void inProcessingEventShouldBeReDispatchedToAnotherEventBusWhenOneIsDown() {
            EventBusTestFixture.EventListenerCountingSuccessfulExecution eventListenerCountingSuccessfulExecution = (EventBusTestFixture.EventListenerCountingSuccessfulExecution) Mockito.spy(new EventBusTestFixture.EventListenerCountingSuccessfulExecution());
            EventBusTestFixture.EventListenerCountingSuccessfulExecution eventListenerCountingSuccessfulExecution2 = (EventBusTestFixture.EventListenerCountingSuccessfulExecution) Mockito.spy(new EventBusTestFixture.EventListenerCountingSuccessfulExecution());
            EventBusTestFixture.EventListenerCountingSuccessfulExecution eventListenerCountingSuccessfulExecution3 = (EventBusTestFixture.EventListenerCountingSuccessfulExecution) Mockito.spy(new EventBusTestFixture.EventListenerCountingSuccessfulExecution());
            Answer answer = invocationOnMock -> {
                invocationOnMock.callRealMethod();
                TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
                return null;
            };
            ((EventBusTestFixture.EventListenerCountingSuccessfulExecution) Mockito.doAnswer(answer).when(eventListenerCountingSuccessfulExecution)).event((Event) ArgumentMatchers.any());
            ((EventBusTestFixture.EventListenerCountingSuccessfulExecution) Mockito.doAnswer(answer).when(eventListenerCountingSuccessfulExecution2)).event((Event) ArgumentMatchers.any());
            RabbitMQEventBusTest.this.eventBus.register(eventListenerCountingSuccessfulExecution, EventBusTestFixture.GROUP_A);
            RabbitMQEventBusTest.this.eventBus2.register(eventListenerCountingSuccessfulExecution2, EventBusTestFixture.GROUP_A);
            RabbitMQEventBusTest.this.eventBus3.register(eventListenerCountingSuccessfulExecution3, EventBusTestFixture.GROUP_A);
            RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            RabbitMQEventBusTest.this.getSpeedProfile().shortWaitCondition().untilAsserted(() -> {
                Assertions.assertThat(eventListenerCountingSuccessfulExecution.numberOfEventCalls()).isEqualTo(1);
            });
            RabbitMQEventBusTest.this.eventBus.stop();
            RabbitMQEventBusTest.this.getSpeedProfile().shortWaitCondition().untilAsserted(() -> {
                Assertions.assertThat(eventListenerCountingSuccessfulExecution2.numberOfEventCalls()).isEqualTo(1);
            });
            RabbitMQEventBusTest.this.eventBus2.stop();
            RabbitMQEventBusTest.this.getSpeedProfile().shortWaitCondition().untilAsserted(() -> {
                Assertions.assertThat(eventListenerCountingSuccessfulExecution3.numberOfEventCalls()).isEqualTo(1);
            });
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/events/RabbitMQEventBusTest$ConcurrentTest.class */
    class ConcurrentTest implements EventBusConcurrentTestContract.MultiEventBusConcurrentContract, EventBusConcurrentTestContract.SingleEventBusConcurrentContract {
        ConcurrentTest() {
        }

        public EventBusContract.EnvironmentSpeedProfile getSpeedProfile() {
            return EventBusContract.EnvironmentSpeedProfile.SLOW;
        }

        @Test
        void rabbitMQEventBusShouldHandleBulksGracefully() throws Exception {
            EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener = EventBusConcurrentTestContract.newCountingListener();
            eventBus().register(newCountingListener, new EventBusTestFixture.GroupA());
            int i = 1;
            int i2 = 10 * 10000;
            RabbitMQEventBusTest.this.eventBus = eventBus();
            ConcurrentTestRunner.builder().reactorOperation((i3, i4) -> {
                return RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS);
            }).threadCount(10).operationCount(10000).runSuccessfullyWithin(Duration.ofMinutes(3L));
            Awaitility.await().pollInterval(Durations.FIVE_SECONDS).timeout(Durations.TEN_MINUTES).untilAsserted(() -> {
                Assertions.assertThat(newCountingListener.numberOfEventCalls()).isEqualTo(i * i2);
            });
        }

        public EventBus eventBus3() {
            return RabbitMQEventBusTest.this.eventBus3;
        }

        public EventBus eventBus2() {
            return RabbitMQEventBusTest.this.eventBus2;
        }

        public EventBus eventBus() {
            return RabbitMQEventBusTest.this.eventBus;
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/events/RabbitMQEventBusTest$ErrorDispatchingTest.class */
    class ErrorDispatchingTest {
        ErrorDispatchingTest() {
        }

        @AfterEach
        void tearDown() {
            RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().unpause();
        }

        @Test
        void dispatchShouldNotSendToGroupListenerWhenError() {
            EventCollector eventCollector = RabbitMQEventBusTest.this.eventCollector();
            RabbitMQEventBusTest.this.eventBus().register(eventCollector, EventBusTestFixture.GROUP_A);
            RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
            doQuietly(() -> {
                RabbitMQEventBusTest.this.eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            });
            Assertions.assertThat(eventCollector.getEvents()).isEmpty();
        }

        @Test
        void dispatchShouldPersistEventWhenDispatchingNoKeyGetError() {
            RabbitMQEventBusTest.this.eventBus().register(RabbitMQEventBusTest.this.eventCollector(), EventBusTestFixture.GROUP_A);
            RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
            doQuietly(() -> {
                RabbitMQEventBusTest.this.eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            });
            Assertions.assertThat(dispatchingFailureEvents()).containsOnly(new Event[]{EventBusTestFixture.EVENT});
        }

        @Test
        void dispatchShouldPersistEventWhenDispatchingWithKeysGetError() {
            EventCollector eventCollector = RabbitMQEventBusTest.this.eventCollector();
            RabbitMQEventBusTest.this.eventBus().register(eventCollector, EventBusTestFixture.GROUP_A);
            Mono.from(RabbitMQEventBusTest.this.eventBus().register(eventCollector, EventBusTestFixture.KEY_1)).block();
            RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
            doQuietly(() -> {
                RabbitMQEventBusTest.this.eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            });
            Assertions.assertThat(dispatchingFailureEvents()).containsOnly(new Event[]{EventBusTestFixture.EVENT});
        }

        @Test
        void dispatchShouldPersistOnlyOneEventWhenDispatchingMultiGroupsGetError() {
            EventCollector eventCollector = RabbitMQEventBusTest.this.eventCollector();
            RabbitMQEventBusTest.this.eventBus().register(eventCollector, EventBusTestFixture.GROUP_A);
            RabbitMQEventBusTest.this.eventBus().register(eventCollector, EventBusTestFixture.GROUP_B);
            RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
            doQuietly(() -> {
                RabbitMQEventBusTest.this.eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            });
            Assertions.assertThat(dispatchingFailureEvents()).containsOnly(new Event[]{EventBusTestFixture.EVENT});
        }

        @Test
        void dispatchShouldPersistEventsWhenDispatchingGroupsGetErrorMultipleTimes() {
            RabbitMQEventBusTest.this.eventBus().register(RabbitMQEventBusTest.this.eventCollector(), EventBusTestFixture.GROUP_A);
            RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
            doQuietly(() -> {
                RabbitMQEventBusTest.this.eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            });
            doQuietly(() -> {
                RabbitMQEventBusTest.this.eventBus().dispatch(EventBusTestFixture.EVENT_2, EventBusTestFixture.NO_KEYS).block();
            });
            Assertions.assertThat(dispatchingFailureEvents()).containsExactly(new Event[]{EventBusTestFixture.EVENT, EventBusTestFixture.EVENT_2});
        }

        @Test
        void dispatchShouldPersistEventsWhenDispatchingTheSameEventGetErrorMultipleTimes() {
            RabbitMQEventBusTest.this.eventBus().register(RabbitMQEventBusTest.this.eventCollector(), EventBusTestFixture.GROUP_A);
            RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
            doQuietly(() -> {
                RabbitMQEventBusTest.this.eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            });
            doQuietly(() -> {
                RabbitMQEventBusTest.this.eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            });
            Assertions.assertThat(dispatchingFailureEvents()).containsExactly(new Event[]{EventBusTestFixture.EVENT, EventBusTestFixture.EVENT});
        }

        @Test
        void reDeliverShouldDeliverToAllGroupsWhenDispatchingFailure() {
            EventCollector eventCollector = RabbitMQEventBusTest.this.eventCollector();
            RabbitMQEventBusTest.this.eventBus().register(eventCollector, EventBusTestFixture.GROUP_A);
            EventCollector eventCollector2 = RabbitMQEventBusTest.this.eventCollector();
            RabbitMQEventBusTest.this.eventBus().register(eventCollector2, EventBusTestFixture.GROUP_B);
            RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
            doQuietly(() -> {
                RabbitMQEventBusTest.this.eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            });
            RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().unpause();
            dispatchingFailureEvents().forEach(event -> {
                RabbitMQEventBusTest.this.eventBus().reDeliver(RabbitMQEventBusTest.dispatchingFailureGroup, event).block();
            });
            RabbitMQEventBusTest.this.getSpeedProfile().shortWaitCondition().untilAsserted(() -> {
                Assertions.assertThat(eventCollector.getEvents()).hasSameElementsAs(eventCollector2.getEvents()).containsExactly(new Event[]{EventBusTestFixture.EVENT});
            });
        }

        @Test
        void reDeliverShouldAddEventInDeadLetterWhenGettingError() {
            RabbitMQEventBusTest.this.eventBus().register(RabbitMQEventBusTest.this.eventCollector(), EventBusTestFixture.GROUP_A);
            RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
            doQuietly(() -> {
                RabbitMQEventBusTest.this.eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            });
            RabbitMQEventBusTest.this.getSpeedProfile().longWaitCondition().until(() -> {
                return (Boolean) RabbitMQEventBusTest.this.deadLetter().containEvents().block();
            });
            doQuietly(() -> {
                RabbitMQEventBusTest.this.eventBus().reDeliver(RabbitMQEventBusTest.dispatchingFailureGroup, EventBusTestFixture.EVENT).block();
            });
            RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().unpause();
            RabbitMQEventBusTest.this.getSpeedProfile().shortWaitCondition().untilAsserted(() -> {
                Assertions.assertThat(dispatchingFailureEvents()).containsExactly(new Event[]{EventBusTestFixture.EVENT, EventBusTestFixture.EVENT});
            });
        }

        @Test
        void reDeliverShouldNotStoreEventInAnotherGroupWhenGettingError() {
            RabbitMQEventBusTest.this.eventBus().register(RabbitMQEventBusTest.this.eventCollector(), EventBusTestFixture.GROUP_A);
            RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
            doQuietly(() -> {
                RabbitMQEventBusTest.this.eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            });
            RabbitMQEventBusTest.this.getSpeedProfile().longWaitCondition().until(() -> {
                return (Boolean) RabbitMQEventBusTest.this.deadLetter().containEvents().block();
            });
            doQuietly(() -> {
                RabbitMQEventBusTest.this.eventBus().reDeliver(RabbitMQEventBusTest.dispatchingFailureGroup, EventBusTestFixture.EVENT).block();
            });
            RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().unpause();
            RabbitMQEventBusTest.this.getSpeedProfile().shortWaitCondition().untilAsserted(() -> {
                Assertions.assertThat(RabbitMQEventBusTest.this.deadLetter().groupsWithFailedEvents().toStream()).hasOnlyElementsOfType(DispatchingFailureGroup.class);
            });
        }

        private Stream<Event> dispatchingFailureEvents() {
            return RabbitMQEventBusTest.this.deadLetter().failedIds(RabbitMQEventBusTest.dispatchingFailureGroup).flatMap(insertionId -> {
                return RabbitMQEventBusTest.this.deadLetter().failedEvent(RabbitMQEventBusTest.dispatchingFailureGroup, insertionId);
            }).toStream();
        }

        private void doQuietly(Runnable runnable) {
            try {
                runnable.run();
            } catch (Exception e) {
            }
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/events/RabbitMQEventBusTest$IsolationTest.class */
    class IsolationTest {
        private RabbitMQEventBus otherEventBus;

        IsolationTest() {
        }

        @BeforeEach
        void beforeEach() throws Exception {
            this.otherEventBus = RabbitMQEventBusTest.this.newEventBus(new NamingStrategy(new EventBusName("other")), RabbitMQEventBusTest.rabbitMQExtension.getSender(), RabbitMQEventBusTest.rabbitMQExtension.getReceiverProvider());
            this.otherEventBus.start();
        }

        @AfterEach
        void tearDown() {
            this.otherEventBus.stop();
        }

        @Test
        void eventBusGroupsWithDistinctNamingStrategiesShouldBeIsolated() throws Exception {
            EventCollector eventCollector = new EventCollector();
            EventCollector eventCollector2 = new EventCollector();
            RabbitMQEventBusTest.this.eventBus.register(eventCollector, EventBusTestFixture.GROUP_A);
            this.otherEventBus.register(eventCollector2, EventBusTestFixture.GROUP_B);
            RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, ImmutableSet.of()).block();
            TimeUnit.SECONDS.sleep(1L);
            SoftAssertions.assertSoftly(softAssertions -> {
                softAssertions.assertThat(eventCollector.getEvents()).hasSize(1);
                softAssertions.assertThat(eventCollector2.getEvents()).isEmpty();
            });
        }

        @Test
        void eventBusPubSubWithDistinctNamingStrategiesShouldBeIsolated() throws Exception {
            EventCollector eventCollector = new EventCollector();
            EventCollector eventCollector2 = new EventCollector();
            Mono.from(RabbitMQEventBusTest.this.eventBus.register(eventCollector, EventBusTestFixture.KEY_1)).block();
            Mono.from(this.otherEventBus.register(eventCollector2, EventBusTestFixture.KEY_1)).block();
            RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1)).block();
            TimeUnit.SECONDS.sleep(1L);
            SoftAssertions.assertSoftly(softAssertions -> {
                softAssertions.assertThat(eventCollector.getEvents()).hasSize(1);
                softAssertions.assertThat(eventCollector2.getEvents()).isEmpty();
            });
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/events/RabbitMQEventBusTest$LifeCycleTest.class */
    class LifeCycleTest {
        private static final int THREAD_COUNT = 10;
        private static final int OPERATION_COUNT = 100000;
        private RabbitMQManagementAPI rabbitManagementAPI;

        @Nested
        /* loaded from: input_file:org/apache/james/events/RabbitMQEventBusTest$LifeCycleTest$MultiEventBus.class */
        class MultiEventBus {
            MultiEventBus() {
            }

            @Test
            void multipleEventBusStartShouldCreateOnlyOneEventExchange() {
                Assertions.assertThat(LifeCycleTest.this.rabbitManagementAPI.listExchanges()).filteredOn(exchange -> {
                    return exchange.getName().equals(RabbitMQEventBusTest.TEST_NAMING_STRATEGY.exchange());
                }).hasSize(1);
            }

            @Test
            void multipleEventBusShouldNotThrowWhenStartAndStopContinuously() {
                Assertions.assertThatCode(() -> {
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus2.start();
                    RabbitMQEventBusTest.this.eventBus2.start();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus3.start();
                    RabbitMQEventBusTest.this.eventBus3.start();
                    RabbitMQEventBusTest.this.eventBus3.start();
                    RabbitMQEventBusTest.this.eventBus3.stop();
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus2.start();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus2.stop();
                }).doesNotThrowAnyException();
            }

            @Test
            void multipleEventBusStopShouldNotDeleteEventBusExchange() {
                RabbitMQEventBusTest.this.eventBus.stop();
                RabbitMQEventBusTest.this.eventBus2.stop();
                RabbitMQEventBusTest.this.eventBus3.stop();
                RabbitMQEventBusTest.this.eventBusWithKeyHandlerNotStarted.stop();
                Assertions.assertThat(LifeCycleTest.this.rabbitManagementAPI.listExchanges()).anySatisfy(exchange -> {
                    Assertions.assertThat(exchange.getName()).isEqualTo(RabbitMQEventBusTest.TEST_NAMING_STRATEGY.exchange());
                });
            }

            @Test
            void multipleEventBusStopShouldNotDeleteGroupRegistrationWorkQueue() {
                RabbitMQEventBusTest.this.eventBus.register((EventListener) Mockito.mock(EventListener.class), EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus.stop();
                RabbitMQEventBusTest.this.eventBus2.stop();
                RabbitMQEventBusTest.this.eventBus3.stop();
                RabbitMQEventBusTest.this.eventBusWithKeyHandlerNotStarted.stop();
                Assertions.assertThat(LifeCycleTest.this.rabbitManagementAPI.listQueues()).anySatisfy(messageQueue -> {
                    Assertions.assertThat(messageQueue.getName()).contains(new CharSequence[]{EventBusTestFixture.GroupA.class.getName()});
                });
            }

            @Test
            void multipleEventBusStopShouldDeleteAllKeyRegistrationsWorkQueue() {
                RabbitMQEventBusTest.this.eventBus.stop();
                RabbitMQEventBusTest.this.eventBus2.stop();
                RabbitMQEventBusTest.this.eventBus3.stop();
                RabbitMQEventBusTest.this.eventBusWithKeyHandlerNotStarted.stop();
                Assertions.assertThat(LifeCycleTest.this.rabbitManagementAPI.listQueues()).filteredOn(messageQueue -> {
                    return (messageQueue.getName().startsWith("test-") || messageQueue.getName().startsWith("other-")) ? false : true;
                }).isEmpty();
            }

            @Test
            void dispatchShouldStopDeliveringEventsShortlyAfterStopIsCalled() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                RabbitMQEventBusTest.this.eventBus2.start();
                EventBusTestFixture.EventListenerCountingSuccessfulExecution eventListenerCountingSuccessfulExecution = new EventBusTestFixture.EventListenerCountingSuccessfulExecution();
                RabbitMQEventBusTest.this.eventBus.register(eventListenerCountingSuccessfulExecution, EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus2.register(eventListenerCountingSuccessfulExecution, EventBusTestFixture.GROUP_A);
                ConcurrentTestRunner run = ConcurrentTestRunner.builder().operation((i, i2) -> {
                    RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
                }).threadCount(LifeCycleTest.THREAD_COUNT).operationCount(LifeCycleTest.OPERATION_COUNT).noErrorLogs().run();
                try {
                    TimeUnit.SECONDS.sleep(2L);
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus2.stop();
                    int numberOfEventCalls = eventListenerCountingSuccessfulExecution.numberOfEventCalls();
                    TimeUnit.SECONDS.sleep(1L);
                    Assertions.assertThat(eventListenerCountingSuccessfulExecution.numberOfEventCalls()).isCloseTo(numberOfEventCalls, Percentage.withPercentage(2.0d));
                    if (run != null) {
                        run.close();
                    }
                } catch (Throwable th) {
                    if (run != null) {
                        try {
                            run.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
        }

        @Nested
        /* loaded from: input_file:org/apache/james/events/RabbitMQEventBusTest$LifeCycleTest$SingleEventBus.class */
        class SingleEventBus {

            @Nested
            /* loaded from: input_file:org/apache/james/events/RabbitMQEventBusTest$LifeCycleTest$SingleEventBus$DispatchingWhenNetWorkIssue.class */
            class DispatchingWhenNetWorkIssue {

                @RegisterExtension
                static RabbitMQExtension rabbitMQNetWorkIssueExtension = RabbitMQExtension.defaultRabbitMQ().restartPolicy(RabbitMQExtension.DockerRestartPolicy.PER_TEST).isolationPolicy(RabbitMQExtension.IsolationPolicy.WEAK);
                private RabbitMQEventBus rabbitMQEventBusWithNetWorkIssue;

                DispatchingWhenNetWorkIssue() {
                }

                @BeforeEach
                void beforeEach() throws Exception {
                    this.rabbitMQEventBusWithNetWorkIssue = RabbitMQEventBusTest.this.newEventBus(RabbitMQEventBusTest.TEST_NAMING_STRATEGY, rabbitMQNetWorkIssueExtension.getSender(), rabbitMQNetWorkIssueExtension.getReceiverProvider());
                }

                @Test
                void dispatchShouldWorkAfterNetworkIssuesForOldRegistrationAndKey() {
                    this.rabbitMQEventBusWithNetWorkIssue.start();
                    EventListener newListener = EventBusTestFixture.newListener();
                    Mono.from(this.rabbitMQEventBusWithNetWorkIssue.register(newListener, EventBusTestFixture.KEY_1)).block();
                    rabbitMQNetWorkIssueExtension.getRabbitMQ().pause();
                    Assertions.assertThatThrownBy(() -> {
                        this.rabbitMQEventBusWithNetWorkIssue.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
                    }).cause().isInstanceOf(NoSuchElementException.class).hasMessageContaining("Timeout waiting for idle object");
                    rabbitMQNetWorkIssueExtension.getRabbitMQ().unpause();
                    this.rabbitMQEventBusWithNetWorkIssue.dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1)).block();
                    RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
                }
            }

            SingleEventBus() {
            }

            @Test
            void startShouldCreateEventExchange() {
                RabbitMQEventBusTest.this.eventBus.start();
                Assertions.assertThat(LifeCycleTest.this.rabbitManagementAPI.listExchanges()).filteredOn(exchange -> {
                    return exchange.getName().equals(RabbitMQEventBusTest.TEST_NAMING_STRATEGY.exchange());
                }).singleElement().satisfies(new ThrowingConsumer[]{exchange2 -> {
                    Assertions.assertThat(exchange2.isDurable()).isTrue();
                    Assertions.assertThat(exchange2.getType()).isEqualTo("direct");
                }});
            }

            @Test
            void dispatchShouldWorkAfterRestartForOldRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                EventListener newListener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void dispatchShouldWorkAfterRestartForNewRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                EventListener newListener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void redeliverShouldWorkAfterRestartForOldRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                EventListener newListener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBus.reDeliver(EventBusTestFixture.GROUP_A, EventBusTestFixture.EVENT).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void redeliverShouldWorkAfterRestartForNewRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                EventListener newListener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus.reDeliver(EventBusTestFixture.GROUP_A, EventBusTestFixture.EVENT).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void dispatchShouldWorkAfterRestartForOldKeyRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                EventListener newListener = EventBusTestFixture.newListener();
                Mono.from(RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.KEY_1)).block();
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void dispatchedMessagesShouldSurviveARabbitMQRestart() throws Exception {
                RabbitMQEventBusTest.this.eventBusWithKeyHandlerNotStarted.startWithoutStartingKeyRegistrationHandler();
                EventListener newAsyncListener = EventBusTestFixture.newAsyncListener();
                Mono.from(RabbitMQEventBusTest.this.eventBusWithKeyHandlerNotStarted.register(newAsyncListener, EventBusTestFixture.KEY_1)).block();
                RabbitMQEventBusTest.this.eventBusWithKeyHandlerNotStarted.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBusWithKeyHandlerNotStarted.startKeyRegistrationHandler();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newAsyncListener);
            }

            @Disabled("JAMES-3083 Disable this unstable test")
            @Test
            void dispatchShouldWorkAfterRestartForNewKeyRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                EventListener newListener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().restart();
                Mono.from(RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.KEY_1)).block();
                RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void dispatchShouldWorkAfterNetworkIssuesForNewRegistration() {
                RabbitMQEventBusTest.this.eventBus.start();
                EventListener newListener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
                Assertions.assertThatThrownBy(() -> {
                    RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
                }).cause().isInstanceOf(NoSuchElementException.class).hasMessageContaining("Timeout waiting for idle object");
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().unpause();
                RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void redeliverShouldWorkAfterNetworkIssuesForNewRegistration() {
                RabbitMQEventBusTest.this.eventBus.start();
                EventListener newListener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
                Assertions.assertThatThrownBy(() -> {
                    RabbitMQEventBusTest.this.eventBus.reDeliver(EventBusTestFixture.GROUP_A, EventBusTestFixture.EVENT).block();
                }).isInstanceOf(GroupRegistrationNotFound.class);
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().unpause();
                RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus.reDeliver(EventBusTestFixture.GROUP_A, EventBusTestFixture.EVENT).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void dispatchShouldWorkAfterNetworkIssuesForOldKeyRegistration() {
                RabbitMQEventBusTest.this.eventBus.start();
                EventListener newListener = EventBusTestFixture.newListener();
                Mockito.when(newListener.getExecutionMode()).thenReturn(EventListener.ExecutionMode.ASYNCHRONOUS);
                Mono.from(RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.KEY_1)).block();
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
                Assertions.assertThatThrownBy(() -> {
                    RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
                }).cause().isInstanceOf(NoSuchElementException.class).hasMessageContaining("Timeout waiting for idle object");
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().unpause();
                RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void dispatchShouldWorkAfterNetworkIssuesForNewKeyRegistration() {
                RabbitMQEventBusTest.this.eventBus.start();
                EventListener newListener = EventBusTestFixture.newListener();
                Mockito.when(newListener.getExecutionMode()).thenReturn(EventListener.ExecutionMode.ASYNCHRONOUS);
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
                Assertions.assertThatThrownBy(() -> {
                    RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
                }).cause().isInstanceOf(NoSuchElementException.class).hasMessageContaining("Timeout waiting for idle object");
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().unpause();
                Mono.from(RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.KEY_1)).block();
                RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void stopShouldNotDeleteEventBusExchange() {
                RabbitMQEventBusTest.this.eventBus.start();
                RabbitMQEventBusTest.this.eventBus.stop();
                Assertions.assertThat(LifeCycleTest.this.rabbitManagementAPI.listExchanges()).anySatisfy(exchange -> {
                    Assertions.assertThat(exchange.getName()).isEqualTo(RabbitMQEventBusTest.TEST_NAMING_STRATEGY.exchange());
                });
            }

            @Test
            void stopShouldNotDeleteGroupRegistrationWorkQueue() {
                RabbitMQEventBusTest.this.eventBus.start();
                RabbitMQEventBusTest.this.eventBus.register((EventListener) Mockito.mock(EventListener.class), EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus.stop();
                Assertions.assertThat(LifeCycleTest.this.rabbitManagementAPI.listQueues()).anySatisfy(messageQueue -> {
                    Assertions.assertThat(messageQueue.getName()).contains(new CharSequence[]{EventBusTestFixture.GroupA.class.getName()});
                });
            }

            @Test
            void eventBusShouldNotThrowWhenContinuouslyStartAndStop() {
                Assertions.assertThatCode(() -> {
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus.stop();
                }).doesNotThrowAnyException();
            }

            @Test
            void dispatchShouldStopDeliveringEventsShortlyAfterStopIsCalled() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                EventBusTestFixture.EventListenerCountingSuccessfulExecution eventListenerCountingSuccessfulExecution = new EventBusTestFixture.EventListenerCountingSuccessfulExecution();
                RabbitMQEventBusTest.this.eventBus.register(eventListenerCountingSuccessfulExecution, EventBusTestFixture.GROUP_A);
                ConcurrentTestRunner run = ConcurrentTestRunner.builder().operation((i, i2) -> {
                    RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
                }).threadCount(LifeCycleTest.THREAD_COUNT).operationCount(LifeCycleTest.OPERATION_COUNT).noErrorLogs().run();
                try {
                    TimeUnit.SECONDS.sleep(2L);
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus2.stop();
                    int numberOfEventCalls = eventListenerCountingSuccessfulExecution.numberOfEventCalls();
                    TimeUnit.SECONDS.sleep(1L);
                    Assertions.assertThat(eventListenerCountingSuccessfulExecution.numberOfEventCalls()).isCloseTo(numberOfEventCalls, Percentage.withPercentage(2.0d));
                    if (run != null) {
                        run.close();
                    }
                } catch (Throwable th) {
                    if (run != null) {
                        try {
                            run.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
        }

        LifeCycleTest() {
        }

        @BeforeEach
        void setUp() throws Exception {
            this.rabbitManagementAPI = RabbitMQEventBusTest.rabbitMQExtension.managementAPI();
        }

        @AfterEach
        void tearDown() {
            RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().unpause();
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/events/RabbitMQEventBusTest$PublishingTest.class */
    class PublishingTest {
        private static final String WORK_QUEUE_NAME = "test-workQueue";

        PublishingTest() {
        }

        @BeforeEach
        void setUp() {
            Sender sender = RabbitMQEventBusTest.rabbitMQExtension.getSender();
            sender.declareQueue(QueueSpecification.queue(WORK_QUEUE_NAME).durable(true).exclusive(false).autoDelete(false).arguments(QueueArguments.NO_ARGUMENTS)).block();
            sender.bind(BindingSpecification.binding().exchange(RabbitMQEventBusTest.TEST_NAMING_STRATEGY.exchange()).queue(WORK_QUEUE_NAME).routingKey("")).block();
        }

        @Test
        void dispatchShouldPublishSerializedEventToRabbitMQ() {
            RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            Assertions.assertThat(dequeueEvent()).isEqualTo(EventBusTestFixture.EVENT);
        }

        @Test
        void dispatchShouldPublishSerializedEventToRabbitMQWhenNotBlocking() {
            RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            Assertions.assertThat(dequeueEvent()).isEqualTo(EventBusTestFixture.EVENT);
        }

        private Event dequeueEvent() {
            Receiver createReceiver = RabbitMQEventBusTest.rabbitMQExtension.getReceiverProvider().createReceiver();
            try {
                Event asEvent = RabbitMQEventBusTest.this.eventSerializer.asEvent(new String(((Delivery) createReceiver.consumeAutoAck(WORK_QUEUE_NAME).blockFirst()).getBody(), StandardCharsets.UTF_8));
                if (createReceiver != null) {
                    createReceiver.close();
                }
                return asEvent;
            } catch (Throwable th) {
                if (createReceiver != null) {
                    try {
                        createReceiver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    RabbitMQEventBusTest() {
    }

    public EventBusContract.EnvironmentSpeedProfile getSpeedProfile() {
        return EventBusContract.EnvironmentSpeedProfile.SLOW;
    }

    @BeforeEach
    void setUp() throws Exception {
        this.memoryEventDeadLetters = new MemoryEventDeadLetters();
        this.eventSerializer = new EventBusTestFixture.TestEventSerializer();
        this.routingKeyConverter = RoutingKeyConverter.forFactories(new RegistrationKey.Factory[]{new EventBusTestFixture.TestRegistrationKeyFactory()});
        this.eventBus = newEventBus();
        this.eventBus2 = newEventBus();
        this.eventBus3 = newEventBus();
        this.eventBusWithKeyHandlerNotStarted = newEventBus();
        this.eventBus.start();
        this.eventBus2.start();
        this.eventBus3.start();
        this.eventBusWithKeyHandlerNotStarted.startWithoutStartingKeyRegistrationHandler();
    }

    @AfterEach
    void tearDown() {
        this.eventBus.stop();
        this.eventBus2.stop();
        this.eventBus3.stop();
        this.eventBusWithKeyHandlerNotStarted.stop();
        Stream concat = Stream.concat(EventBusTestFixture.ALL_GROUPS.stream(), Stream.of(GroupRegistrationHandler.GROUP));
        NamingStrategy namingStrategy = TEST_NAMING_STRATEGY;
        Objects.requireNonNull(namingStrategy);
        concat.map(namingStrategy::workQueue).forEach(workQueueName -> {
            rabbitMQExtension.getSender().delete(QueueSpecification.queue(workQueueName.asString())).block();
        });
        rabbitMQExtension.getSender().delete(ExchangeSpecification.exchange(TEST_NAMING_STRATEGY.exchange())).block();
        rabbitMQExtension.getSender().delete(TEST_NAMING_STRATEGY.deadLetterQueue()).block();
    }

    private RabbitMQEventBus newEventBus() throws Exception {
        return newEventBus(TEST_NAMING_STRATEGY, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider());
    }

    private RabbitMQEventBus newEventBus(NamingStrategy namingStrategy, Sender sender, ReceiverProvider receiverProvider) throws Exception {
        return new RabbitMQEventBus(namingStrategy, sender, receiverProvider, this.eventSerializer, EventBusTestFixture.RETRY_BACKOFF_CONFIGURATION, this.routingKeyConverter, this.memoryEventDeadLetters, new RecordingMetricFactory(), rabbitMQExtension.getRabbitChannelPool(), EventBusId.random(), rabbitMQExtension.getRabbitMQ().getConfiguration());
    }

    public EventBus eventBus() {
        return this.eventBus;
    }

    public EventBus eventBus2() {
        return this.eventBus2;
    }

    public EventDeadLetters deadLetter() {
        return this.memoryEventDeadLetters;
    }

    @Disabled("This test is failing by design as the different registration keys are handled by distinct messages")
    @Test
    public void dispatchShouldCallListenerOnceWhenSeveralKeysMatching() {
    }

    @Test
    void eventProcessingShouldNotCrashOnInvalidMessage() {
        EventCollector eventCollector = new EventCollector();
        this.eventBus.register(eventCollector, new EventBusTestFixture.GroupA());
        rabbitMQExtension.getSender().send(Mono.just(new OutboundMessage(TEST_NAMING_STRATEGY.exchange(), "", "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))).block();
        this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
        Awaitility.await().timeout(Durations.TEN_SECONDS).untilAsserted(() -> {
            Assertions.assertThat(eventCollector.getEvents()).containsOnly(new Event[]{EventBusTestFixture.EVENT});
        });
    }

    @Test
    void eventProcessingShouldNotCrashOnInvalidMessages() {
        EventCollector eventCollector = new EventCollector();
        this.eventBus.register(eventCollector, new EventBusTestFixture.GroupA());
        String str = "";
        IntStream.range(0, 10).forEach(i -> {
            rabbitMQExtension.getSender().send(Mono.just(new OutboundMessage(TEST_NAMING_STRATEGY.exchange(), str, "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))).block();
        });
        this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
        Awaitility.await().timeout(Durations.TEN_SECONDS).untilAsserted(() -> {
            Assertions.assertThat(eventCollector.getEvents()).containsOnly(new Event[]{EventBusTestFixture.EVENT});
        });
    }

    @Test
    void eventProcessingShouldStoreInvalidMessagesInDeadLetterQueue() {
        this.eventBus.register(new EventCollector(), new EventBusTestFixture.GroupA());
        rabbitMQExtension.getSender().send(Mono.just(new OutboundMessage(TEST_NAMING_STRATEGY.exchange(), "", "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))).block();
        AtomicInteger atomicInteger = new AtomicInteger();
        rabbitMQExtension.getRabbitChannelPool().createReceiver().consumeAutoAck(TEST_NAMING_STRATEGY.deadLetterQueue().getName()).doOnNext(delivery -> {
            atomicInteger.incrementAndGet();
        }).subscribeOn(Schedulers.newSingle("test")).subscribe();
        Awaitility.await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> {
            Assertions.assertThat(atomicInteger.get()).isEqualTo(1);
        });
    }

    @Test
    void registrationShouldNotCrashOnInvalidMessage() {
        EventCollector eventCollector = new EventCollector();
        Mono.from(this.eventBus.register(eventCollector, EventBusTestFixture.KEY_1)).block();
        rabbitMQExtension.getSender().send(Mono.just(new OutboundMessage(TEST_NAMING_STRATEGY.exchange(), RoutingKeyConverter.RoutingKey.of(EventBusTestFixture.KEY_1).asString(), "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))).block();
        this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
        Awaitility.await().timeout(Durations.TEN_SECONDS).untilAsserted(() -> {
            Assertions.assertThat(eventCollector.getEvents()).containsOnly(new Event[]{EventBusTestFixture.EVENT});
        });
    }

    @Test
    void registrationShouldNotCrashOnInvalidMessages() {
        EventCollector eventCollector = new EventCollector();
        Mono.from(this.eventBus.register(eventCollector, EventBusTestFixture.KEY_1)).block();
        IntStream.range(0, 100).forEach(i -> {
            rabbitMQExtension.getSender().send(Mono.just(new OutboundMessage(TEST_NAMING_STRATEGY.exchange(), RoutingKeyConverter.RoutingKey.of(EventBusTestFixture.KEY_1).asString(), "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))).block();
        });
        this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
        Awaitility.await().timeout(Durations.TEN_SECONDS).untilAsserted(() -> {
            Assertions.assertThat(eventCollector.getEvents()).containsOnly(new Event[]{EventBusTestFixture.EVENT});
        });
    }

    @Test
    void deserializeEventCollectorGroup() throws Exception {
        Assertions.assertThat(Group.deserialize("org.apache.james.events.EventCollector$EventCollectorGroup")).isEqualTo(new EventCollector.EventCollectorGroup());
    }

    @Test
    void registerGroupShouldCreateRetryExchange() throws Exception {
        EventListener newListener = EventBusTestFixture.newListener();
        EventBusTestFixture.GroupA groupA = new EventBusTestFixture.GroupA();
        this.eventBus.register(newListener, groupA);
        GroupConsumerRetry.RetryExchangeName retryExchange = TEST_NAMING_STRATEGY.retryExchange(groupA);
        Assertions.assertThat(rabbitMQExtension.managementAPI().listExchanges()).anyMatch(exchange -> {
            return exchange.getName().equals(retryExchange.asString());
        });
    }

    private void assertThatListenerReceiveOneEvent(EventListener eventListener) {
        RabbitMQFixture.awaitAtMostThirtySeconds.untilAsserted(() -> {
            ((EventListener) Mockito.verify(eventListener)).event(EventBusTestFixture.EVENT);
        });
    }
}
