package org.apache.james.backends.rabbitmq;

import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.james.backends.rabbitmq.DockerClusterRabbitMQExtension;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@Disabled("JAMES-2577 RabbitMQClusterTest is not stable and usually reach out of wait timeout when starting up 3 dockers or joining them together")
/* loaded from: input_file:org/apache/james/backends/rabbitmq/RabbitMQClusterTest.class */
public class RabbitMQClusterTest {
    private static final String QUEUE = "queue";
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQClusterTest.class);

    @RegisterExtension
    static DockerClusterRabbitMQExtension testExtension = new DockerClusterRabbitMQExtension();

    @Nested
    /* loaded from: input_file:org/apache/james/backends/rabbitmq/RabbitMQClusterTest$ClusterNodesFailure.class */
    class ClusterNodesFailure {
        private ConnectionFactory node1ConnectionFactory;
        private Connection resilientConnection;
        private Channel resilientChannel;
        private Connection node2Connection;
        private Channel node2Channel;

        ClusterNodesFailure() {
        }

        @BeforeEach
        void setup(DockerClusterRabbitMQExtension.DockerRabbitMQCluster dockerRabbitMQCluster) throws IOException, TimeoutException {
            this.node1ConnectionFactory = dockerRabbitMQCluster.getRabbitMQ1().connectionFactory();
            this.node1ConnectionFactory.setNetworkRecoveryInterval(100);
            this.resilientConnection = this.node1ConnectionFactory.newConnection(dockerRabbitMQCluster.getAddresses());
            this.resilientChannel = this.resilientConnection.createChannel();
            this.node2Connection = dockerRabbitMQCluster.getRabbitMQ2().connectionFactory().newConnection();
            this.node2Channel = this.node2Connection.createChannel();
        }

        @AfterEach
        void tearDown() {
            RabbitMQClusterTest.this.closeQuietly(this.resilientConnection, this.resilientChannel);
        }

        @Test
        void connectionShouldBeRecoveredWhenConnectedNodeIsDown(DockerClusterRabbitMQExtension.DockerRabbitMQCluster dockerRabbitMQCluster) throws Exception {
            getConnectedNode(dockerRabbitMQCluster, this.resilientChannel).stop();
            Thread.sleep(100L);
            this.resilientChannel.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
            this.resilientChannel.queueDeclare(RabbitMQClusterTest.QUEUE, true, false, false, ImmutableMap.of()).getQueue();
            this.resilientChannel.queueBind(RabbitMQClusterTest.QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
            int i = 10;
            IntStream.range(0, 10).mapToObj(i2 -> {
                return RabbitMQClusterTest.this.asBytes(String.valueOf(i2));
            }).forEach(Throwing.consumer(bArr -> {
                this.resilientChannel.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, Constants.NO_PROPERTIES, bArr);
            }));
            InMemoryConsumer inMemoryConsumer = new InMemoryConsumer(this.resilientChannel);
            this.resilientChannel.basicConsume(RabbitMQClusterTest.QUEUE, inMemoryConsumer);
            RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                return Boolean.valueOf(inMemoryConsumer.getConsumedMessages().size() == i);
            });
            Assertions.assertThat(inMemoryConsumer.getConsumedMessages()).containsOnly((Integer[]) IntStream.range(0, 10).boxed().toArray(i3 -> {
                return new Integer[i3];
            }));
        }

        private DockerRabbitMQ getConnectedNode(DockerClusterRabbitMQExtension.DockerRabbitMQCluster dockerRabbitMQCluster, Channel channel) {
            return (DockerRabbitMQ) dockerRabbitMQCluster.getNodes().stream().filter(dockerRabbitMQ -> {
                return dockerRabbitMQ.getNodeName().equals(channel.getConnection().getServerProperties().get("cluster_name").toString());
            }).findFirst().get();
        }

        @Disabled("JAMES-2334 For some reason, we are unable to recover topology when reconnectingSee https://github.com/rabbitmq/rabbitmq-server/issues/959")
        @Test
        void nodeKillingWhenProducing(DockerClusterRabbitMQExtension.DockerRabbitMQCluster dockerRabbitMQCluster) throws Exception {
            this.resilientChannel.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
            this.resilientChannel.queueDeclare(RabbitMQClusterTest.QUEUE, true, false, false, ImmutableMap.of()).getQueue();
            this.resilientChannel.queueBind(RabbitMQClusterTest.QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
            int i = 20;
            int i2 = 20 / 2;
            IntStream.range(0, i2).mapToObj(i3 -> {
                return RabbitMQClusterTest.this.asBytes(String.valueOf(i3));
            }).forEach(Throwing.consumer(bArr -> {
                this.resilientChannel.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, Constants.NO_PROPERTIES, bArr);
            }));
            InMemoryConsumer inMemoryConsumer = new InMemoryConsumer(this.node2Channel);
            this.node2Channel.basicConsume(RabbitMQClusterTest.QUEUE, inMemoryConsumer);
            RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                return Boolean.valueOf(inMemoryConsumer.getConsumedMessages().size() == i2);
            });
            dockerRabbitMQCluster.getRabbitMQ1().stop();
            IntStream.range(i2, 20).mapToObj(i4 -> {
                return RabbitMQClusterTest.this.asBytes(String.valueOf(i4));
            }).forEach(this::tryPublishWithRetry);
            RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                return Boolean.valueOf(inMemoryConsumer.getConsumedMessages().size() == i);
            });
            Assertions.assertThat(inMemoryConsumer.getConsumedMessages()).containsOnly((Integer[]) IntStream.range(0, 20).boxed().toArray(i5 -> {
                return new Integer[i5];
            }));
        }

        private void tryPublishWithRetry(byte[] bArr) {
            Awaitility.waitAtMost(Durations.ONE_MINUTE).pollInterval(Durations.ONE_SECOND).until(() -> {
                return Boolean.valueOf(tryPublish(bArr));
            });
        }

        private boolean tryPublish(byte[] bArr) {
            try {
                this.resilientChannel.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, Constants.NO_PROPERTIES, bArr);
                return true;
            } catch (Exception e) {
                RabbitMQClusterTest.LOGGER.error("failed publish", e);
                return false;
            }
        }

        @Test
        void connectingToAClusterWithAFailedRabbit(DockerClusterRabbitMQExtension.DockerRabbitMQCluster dockerRabbitMQCluster) throws Exception {
            ConnectionFactory connectionFactory = dockerRabbitMQCluster.getRabbitMQ3().connectionFactory();
            ImmutableList<Address> addresses = dockerRabbitMQCluster.getAddresses();
            dockerRabbitMQCluster.getRabbitMQ3().stop();
            Connection newConnection = connectionFactory.newConnection(addresses);
            try {
                Channel createChannel = newConnection.createChannel();
                try {
                    createChannel.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
                    createChannel.queueDeclare(RabbitMQClusterTest.QUEUE, true, false, false, ImmutableMap.of()).getQueue();
                    createChannel.queueBind(RabbitMQClusterTest.QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
                    int i = 10;
                    IntStream.range(0, 10).mapToObj(i2 -> {
                        return RabbitMQClusterTest.this.asBytes(String.valueOf(i2));
                    }).forEach(Throwing.consumer(bArr -> {
                        createChannel.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, Constants.NO_PROPERTIES, bArr);
                    }));
                    InMemoryConsumer inMemoryConsumer = new InMemoryConsumer(createChannel);
                    createChannel.basicConsume(RabbitMQClusterTest.QUEUE, inMemoryConsumer);
                    RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                        return Boolean.valueOf(inMemoryConsumer.getConsumedMessages().size() == i);
                    });
                    Assertions.assertThat(inMemoryConsumer.getConsumedMessages()).containsOnly((Integer[]) IntStream.range(0, 10).boxed().toArray(i3 -> {
                        return new Integer[i3];
                    }));
                    if (createChannel != null) {
                        createChannel.close();
                    }
                    if (newConnection != null) {
                        newConnection.close();
                    }
                } finally {
                }
            } catch (Throwable th) {
                if (newConnection != null) {
                    try {
                        newConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }

        @Disabled("JAMES-2334 For some reason, we are unable to recover topology when reconnectingSee https://github.com/rabbitmq/rabbitmq-server/issues/959This test have roughly 4% chance to fail")
        @Test
        void nodeKillingWhenConsuming(DockerClusterRabbitMQExtension.DockerRabbitMQCluster dockerRabbitMQCluster) throws Exception {
            this.resilientChannel.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
            this.resilientChannel.queueDeclare(RabbitMQClusterTest.QUEUE, true, false, false, ImmutableMap.of()).getQueue();
            this.resilientChannel.queueBind(RabbitMQClusterTest.QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
            int i = 10;
            IntStream.range(0, 10).mapToObj(i2 -> {
                return RabbitMQClusterTest.this.asBytes(String.valueOf(i2));
            }).forEach(Throwing.consumer(bArr -> {
                this.resilientChannel.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, Constants.NO_PROPERTIES, bArr);
            }));
            AtomicInteger atomicInteger = new AtomicInteger(0);
            InMemoryConsumer inMemoryConsumer = new InMemoryConsumer(this.resilientChannel, (envelope, bArr2) -> {
                stopWhenHalfProcessed(dockerRabbitMQCluster, i, atomicInteger);
            });
            this.resilientChannel.basicConsume(RabbitMQClusterTest.QUEUE, inMemoryConsumer);
            RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                return Boolean.valueOf(inMemoryConsumer.getConsumedMessages().size() == i);
            });
            Assertions.assertThat(inMemoryConsumer.getConsumedMessages()).containsOnly((Integer[]) IntStream.range(0, 10).boxed().toArray(i3 -> {
                return new Integer[i3];
            }));
        }

        private void stopWhenHalfProcessed(DockerClusterRabbitMQExtension.DockerRabbitMQCluster dockerRabbitMQCluster, int i, AtomicInteger atomicInteger) {
            if (atomicInteger.incrementAndGet() == i / 2) {
                dockerRabbitMQCluster.getRabbitMQ1().stop();
            }
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/backends/rabbitmq/RabbitMQClusterTest$ClusterSharing.class */
    class ClusterSharing {
        private ConnectionFactory node1ConnectionFactory;
        private ConnectionFactory node2ConnectionFactory;
        private Connection node1Connection;
        private Connection node2Connection;
        private Channel node1Channel;
        private Channel node2Channel;

        ClusterSharing() {
        }

        @BeforeEach
        void setup(DockerClusterRabbitMQExtension.DockerRabbitMQCluster dockerRabbitMQCluster) throws IOException, TimeoutException {
            this.node1ConnectionFactory = dockerRabbitMQCluster.getRabbitMQ1().connectionFactory();
            this.node2ConnectionFactory = dockerRabbitMQCluster.getRabbitMQ2().connectionFactory();
            this.node1Connection = this.node1ConnectionFactory.newConnection();
            this.node2Connection = this.node2ConnectionFactory.newConnection();
            this.node1Channel = this.node1Connection.createChannel();
            this.node2Channel = this.node2Connection.createChannel();
        }

        @AfterEach
        void tearDown() {
            RabbitMQClusterTest.this.closeQuietly(this.node1Channel, this.node2Channel, this.node1Connection, this.node2Connection);
        }

        @Test
        void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerClusterRabbitMQExtension.DockerRabbitMQCluster dockerRabbitMQCluster) throws Exception {
            Assertions.assertThat(dockerRabbitMQCluster.getRabbitMQ1().container().execInContainer(new String[]{"rabbitmqctl", "cluster_status"}).getStdout()).contains(new CharSequence[]{dockerRabbitMQCluster.getRabbitMQ1().getNodeName(), dockerRabbitMQCluster.getRabbitMQ2().getNodeName(), dockerRabbitMQCluster.getRabbitMQ3().getNodeName()});
        }

        @Test
        void queuesShouldBeShared() throws Exception {
            this.node1Channel.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
            this.node1Channel.queueDeclare(RabbitMQClusterTest.QUEUE, true, false, false, ImmutableMap.of()).getQueue();
            this.node1Channel.queueBind(RabbitMQClusterTest.QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
            int i = 10;
            IntStream.range(0, 10).mapToObj(i2 -> {
                return RabbitMQClusterTest.this.asBytes(String.valueOf(i2));
            }).forEach(Throwing.consumer(bArr -> {
                this.node1Channel.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, Constants.NO_PROPERTIES, bArr);
            }));
            InMemoryConsumer inMemoryConsumer = new InMemoryConsumer(this.node2Channel);
            this.node2Channel.basicConsume(RabbitMQClusterTest.QUEUE, inMemoryConsumer);
            RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                return Boolean.valueOf(inMemoryConsumer.getConsumedMessages().size() == i);
            });
            Assertions.assertThat(inMemoryConsumer.getConsumedMessages()).containsOnly((Integer[]) IntStream.range(0, 10).boxed().toArray(i3 -> {
                return new Integer[i3];
            }));
        }

        @Test
        void queuesShouldBeDeclarableOnAnotherNode() throws Exception {
            this.node1Channel.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
            this.node2Channel.queueDeclare(RabbitMQClusterTest.QUEUE, true, false, false, ImmutableMap.of()).getQueue();
            this.node2Channel.queueBind(RabbitMQClusterTest.QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
            int i = 10;
            IntStream.range(0, 10).mapToObj(i2 -> {
                return RabbitMQClusterTest.this.asBytes(String.valueOf(i2));
            }).forEach(Throwing.consumer(bArr -> {
                this.node1Channel.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, Constants.NO_PROPERTIES, bArr);
            }));
            InMemoryConsumer inMemoryConsumer = new InMemoryConsumer(this.node2Channel);
            this.node2Channel.basicConsume(RabbitMQClusterTest.QUEUE, inMemoryConsumer);
            RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                return Boolean.valueOf(inMemoryConsumer.getConsumedMessages().size() == i);
            });
            Assertions.assertThat(inMemoryConsumer.getConsumedMessages()).containsOnly((Integer[]) IntStream.range(0, 10).boxed().toArray(i3 -> {
                return new Integer[i3];
            }));
        }
    }

    RabbitMQClusterTest() {
    }

    @BeforeAll
    static void setup() {
        testExtension.beforeAll();
    }

    @AfterAll
    static void tearDown() throws Exception {
        testExtension.afterAll();
    }

    private void closeQuietly(AutoCloseable... autoCloseableArr) {
        Arrays.stream(autoCloseableArr).forEach(this::closeQuietly);
    }

    private void closeQuietly(AutoCloseable autoCloseable) {
        try {
            autoCloseable.close();
        } catch (Exception e) {
        }
    }

    private byte[] asBytes(String str) {
        return str.getBytes(StandardCharsets.UTF_8);
    }
}
