/*
 * Decompiled with CFR 0.152.
 */
package org.apache.james.queue.pulsar;

import com.github.fge.lambdas.Throwing;
import com.sksamuel.pulsar4s.ConsumerMessage;
import jakarta.mail.MessagingException;
import jakarta.mail.internet.MimeMessage;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import java.util.stream.IntStream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.james.backends.pulsar.DockerPulsarExtension;
import org.apache.james.backends.pulsar.PulsarClients;
import org.apache.james.backends.pulsar.PulsarConfiguration;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BlobStoreDAO;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.PlainBlobId;
import org.apache.james.blob.api.Store;
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.blob.memory.MemoryBlobStoreDAO;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.DelayedMailQueueContract;
import org.apache.james.queue.api.DelayedManageableMailQueueContract;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueContract;
import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
import org.apache.james.queue.api.MailQueueMetricContract;
import org.apache.james.queue.api.MailQueueMetricExtension;
import org.apache.james.queue.api.MailQueueName;
import org.apache.james.queue.api.Mails;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.api.ManageableMailQueueContract;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
import org.apache.james.queue.pulsar.JavaClient;
import org.apache.james.queue.pulsar.PulsarMailQueue;
import org.apache.james.queue.pulsar.PulsarMailQueueConfiguration;
import org.apache.james.server.blob.deduplication.PassThroughBlobStore;
import org.apache.mailet.Mail;
import org.apache.mailet.base.MailAddressFixture;
import org.apache.pekko.actor.ActorSystem;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import scala.jdk.javaapi.OptionConverters;

@Tag(value="unstable")
@ExtendWith(value={DockerPulsarExtension.class})
public class PulsarMailQueueTest
implements MailQueueContract,
MailQueueMetricContract,
ManageableMailQueueContract,
DelayedMailQueueContract,
DelayedManageableMailQueueContract {
    int maxConcurrency = 10;
    PulsarMailQueue mailQueue;
    private PlainBlobId.Factory blobIdFactory;
    private Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
    private MailQueueItemDecoratorFactory factory;
    private MailQueueName mailQueueName;
    private MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem;
    private PulsarConfiguration pulsarConfiguration;
    private PulsarClients pulsarClients;
    private ActorSystem system;
    private MemoryBlobStoreDAO memoryBlobStore;

    @BeforeEach
    void setUp(DockerPulsarExtension.DockerPulsar pulsar, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) {
        this.metricTestSystem = metricTestSystem;
        this.blobIdFactory = new PlainBlobId.Factory();
        this.memoryBlobStore = new MemoryBlobStoreDAO();
        PassThroughBlobStore blobStore = new PassThroughBlobStore((BlobStoreDAO)this.memoryBlobStore, BucketName.DEFAULT, (BlobId.Factory)this.blobIdFactory);
        MimeMessageStore.Factory mimeMessageStoreFactory = new MimeMessageStore.Factory((BlobStore)blobStore);
        this.mimeMessageStore = mimeMessageStoreFactory.mimeMessageStore();
        this.factory = new RawMailQueueItemDecoratorFactory();
        this.mailQueueName = MailQueueName.of((String)RandomStringUtils.randomAlphabetic((int)10));
        this.system = ActorSystem.apply();
        this.mailQueue = this.newInstance(pulsar);
    }

    @AfterEach
    void tearDown() throws Exception {
        this.mailQueue.close();
        this.system.terminate();
        this.pulsarClients.stop();
    }

    public void awaitRemove() {
        try {
            Thread.sleep(50L);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public MailQueue getMailQueue() {
        return this.mailQueue;
    }

    public int getMailQueueMaxConcurrency() {
        return this.maxConcurrency;
    }

    public ManageableMailQueue getManageableMailQueue() {
        return this.mailQueue;
    }

    public PulsarMailQueue newInstance(DockerPulsarExtension.DockerPulsar pulsar) {
        this.pulsarConfiguration = pulsar.getConfiguration();
        this.pulsarClients = PulsarClients.create((PulsarConfiguration)this.pulsarConfiguration);
        int enqueueBufferSize = 10;
        int requeueBufferSize = 10;
        return new PulsarMailQueue(new PulsarMailQueueConfiguration(this.mailQueueName, this.pulsarConfiguration, this.maxConcurrency, enqueueBufferSize, requeueBufferSize), this.pulsarClients, (BlobId.Factory)this.blobIdFactory, this.mimeMessageStore, this.factory, (MetricFactory)this.metricTestSystem.getMetricFactory(), this.metricTestSystem.getSpyGaugeRegistry(), this.system);
    }

    @Test
    void badMessagesShouldNotAlterDelivery(DockerPulsarExtension.DockerPulsar pulsar) throws Exception {
        new JavaClient(pulsar.getConfiguration().brokerUri(), String.format("persistent://%s/James-%s", pulsar.getConfiguration().namespace().asString(), this.mailQueueName.asString())).send("BAD").get();
        this.getMailQueue().enQueue((Mail)Mails.defaultMail().name("name").build());
        MailQueue.MailQueueItem mail = (MailQueue.MailQueueItem)Flux.from((Publisher)this.getMailQueue().deQueue()).take(1L).single().block();
        Assertions.assertThat((String)mail.getMail().getName()).isEqualTo("name");
    }

    @Test
    void badMessagesShouldBeMovedToADeadLetterTopic(DockerPulsarExtension.DockerPulsar pulsar) throws Exception {
        new JavaClient(pulsar.getConfiguration().brokerUri(), String.format("persistent://%s/James-%s", pulsar.getConfiguration().namespace().asString(), this.mailQueueName.asString())).send("BAD").get();
        this.getMailQueue().enQueue((Mail)Mails.defaultMail().name("name").build());
        this.getMailQueue().enQueue((Mail)Mails.defaultMail().name("name2").build());
        Flux.from((Publisher)this.getMailQueue().deQueue()).delayElements(Duration.ofSeconds(3L)).take(2L).collectList().block();
        Optional<String> deadletterMessage = OptionConverters.toJava(new JavaClient(pulsar.getConfiguration().brokerUri(), String.format("persistent://%s/James-%s-subscription-%s-DLQ", pulsar.getConfiguration().namespace().asString(), this.mailQueueName.asString(), this.mailQueueName.asString())).consumeOne()).map(ConsumerMessage::value);
        Assertions.assertThat(deadletterMessage).contains((Object)"BAD");
    }

    @Test
    void ensureThatDeletionDoNotDeleteFutureEmailsWithTwoInstancesOfMailQueue(DockerPulsarExtension.DockerPulsar pulsar) throws MessagingException, InterruptedException {
        PulsarMailQueue secondQueue = this.newInstance(pulsar);
        IntStream.range(0, 50).forEach((IntConsumer)Throwing.intConsumer(i -> this.enQueue((Mail)Mails.defaultMail().name("name" + i).build())));
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((long)this.getManageableMailQueue().getSize()).isEqualTo(50L));
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((long)secondQueue.getSize()).isEqualTo(50L));
        this.getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, MailAddressFixture.RECIPIENT1.asString());
        this.enQueue((Mail)Mails.defaultMail().name("namez").build());
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((String)((MailQueue.MailQueueItem)Flux.merge((Publisher[])new Publisher[]{Flux.from((Publisher)secondQueue.deQueue()), Flux.from((Publisher)this.getManageableMailQueue().deQueue())}).blockFirst()).getMail().getName()).isEqualTo("namez"));
    }

    @Test
    void ensureThatDeletionApplyOnBrowsingBothInstancesWithTwoInstancesOfMailQueue(DockerPulsarExtension.DockerPulsar pulsar) throws MessagingException, InterruptedException {
        PulsarMailQueue secondQueue = this.newInstance(pulsar);
        IntStream.range(0, 50).forEach((IntConsumer)Throwing.intConsumer(i -> this.enQueue((Mail)Mails.defaultMail().name("name" + i).build())));
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((long)this.getManageableMailQueue().getSize()).isEqualTo(50L));
        Awaitility.await().untilAsserted(() -> Assertions.assertThat((long)secondQueue.getSize()).isEqualTo(50L));
        this.getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, MailAddressFixture.RECIPIENT1.asString());
        this.enQueue((Mail)Mails.defaultMail().name("namez").build());
        Assertions.assertThat((Iterator)secondQueue.browse()).toIterable().extracting(mail -> mail.getMail().getName()).containsExactly((Object[])new String[]{"namez"});
        Assertions.assertThat((Iterator)this.getManageableMailQueue().browse()).toIterable().extracting(mail -> mail.getMail().getName()).containsExactly((Object[])new String[]{"namez"});
    }

    @Test
    void queueShouldRemoveMailFromStoreOnAcknowledgedDequeue() throws Exception {
        String expectedName = "name";
        this.enQueue((Mail)Mails.defaultMail().name(expectedName).build());
        MailQueue.MailQueueItem mailQueueItem = (MailQueue.MailQueueItem)Flux.from((Publisher)this.getMailQueue().deQueue()).blockFirst();
        mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
        Assertions.assertThat((String)mailQueueItem.getMail().getName()).isEqualTo(expectedName);
        Awaitility.await().untilAsserted(this::assertThatStoreIsEmpty);
    }

    @Test
    void removeShouldRemoveMailFromStoreWhenFilteredOut() throws Exception {
        this.enQueue((Mail)Mails.defaultMail().name("name1").build());
        this.enQueue((Mail)Mails.defaultMail().name("name2").build());
        this.enQueue((Mail)Mails.defaultMail().name("name3").build());
        this.getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "name2");
        this.awaitRemove();
        Assertions.assertThat((Iterator)this.getManageableMailQueue().browse()).toIterable().extracting(ManageableMailQueue.MailQueueItemView::getMail).extracting(Mail::getName).containsExactly((Object[])new String[]{"name1", "name3"});
        Flux.from((Publisher)this.getMailQueue().deQueue()).take(2L).doOnNext((Consumer)Throwing.consumer(x -> x.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS))).blockLast();
        Awaitility.await().untilAsserted(this::assertThatStoreIsEmpty);
    }

    private void assertThatStoreIsEmpty() {
        List blobIds = (List)Flux.from((Publisher)this.memoryBlobStore.listBlobs(BucketName.DEFAULT)).map(Objects::toString).collectList().defaultIfEmpty(List.of()).block();
        Assertions.assertThat((List)blobIds).isEmpty();
    }

    @Disabled(value="this guarantee is too strong for Pulsar implementation and doesn't match any domain requirement")
    public void flushShouldPreserveBrowseOrder() {
    }

    @Test
    public void browseShouldReturnEmptyWhenSingleDequeueMessageEvenWhenStoreIsGuaranteedEmpty() throws Exception {
        this.enQueue((Mail)Mails.defaultMail().name("name").build());
        MailQueue.MailQueueItem mailQueueItem = (MailQueue.MailQueueItem)Flux.from((Publisher)this.getMailQueue().deQueue()).blockFirst();
        mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
        Awaitility.await().untilAsserted(this::assertThatStoreIsEmpty);
        ManageableMailQueue.MailQueueIterator items = this.getManageableMailQueue().browse();
        Assertions.assertThat((Iterator)items).toIterable().isEmpty();
    }
}

