package org.apache.james.queue.pulsar;

import com.github.fge.lambdas.Throwing;
import jakarta.mail.MessagingException;
import jakarta.mail.internet.MimeMessage;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.stream.IntStream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.james.backends.pulsar.DockerPulsarExtension;
import org.apache.james.backends.pulsar.PulsarClients;
import org.apache.james.backends.pulsar.PulsarConfiguration;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.PlainBlobId;
import org.apache.james.blob.api.Store;
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.blob.memory.MemoryBlobStoreDAO;
import org.apache.james.queue.api.DelayedMailQueueContract;
import org.apache.james.queue.api.DelayedManageableMailQueueContract;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueContract;
import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
import org.apache.james.queue.api.MailQueueMetricContract;
import org.apache.james.queue.api.MailQueueMetricExtension;
import org.apache.james.queue.api.MailQueueName;
import org.apache.james.queue.api.Mails;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.api.ManageableMailQueueContract;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
import org.apache.james.server.blob.deduplication.PassThroughBlobStore;
import org.apache.mailet.base.MailAddressFixture;
import org.apache.pekko.actor.ActorSystem;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import scala.jdk.javaapi.OptionConverters;

@Tag("unstable")
@ExtendWith({DockerPulsarExtension.class})
/* loaded from: input_file:org/apache/james/queue/pulsar/PulsarMailQueueTest.class */
public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricContract, ManageableMailQueueContract, DelayedMailQueueContract, DelayedManageableMailQueueContract {
    int maxConcurrency = 10;
    PulsarMailQueue mailQueue;
    private PlainBlobId.Factory blobIdFactory;
    private Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
    private MailQueueItemDecoratorFactory factory;
    private MailQueueName mailQueueName;
    private MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem;
    private PulsarConfiguration pulsarConfiguration;
    private PulsarClients pulsarClients;
    private ActorSystem system;
    private MemoryBlobStoreDAO memoryBlobStore;

    @BeforeEach
    void setUp(DockerPulsarExtension.DockerPulsar dockerPulsar, MailQueueMetricExtension.MailQueueMetricTestSystem mailQueueMetricTestSystem) {
        this.metricTestSystem = mailQueueMetricTestSystem;
        this.blobIdFactory = new PlainBlobId.Factory();
        this.memoryBlobStore = new MemoryBlobStoreDAO();
        this.mimeMessageStore = new MimeMessageStore.Factory(new PassThroughBlobStore(this.memoryBlobStore, BucketName.DEFAULT, this.blobIdFactory)).mimeMessageStore();
        this.factory = new RawMailQueueItemDecoratorFactory();
        this.mailQueueName = MailQueueName.of(RandomStringUtils.randomAlphabetic(10));
        this.system = ActorSystem.apply();
        this.mailQueue = newInstance(dockerPulsar);
    }

    @AfterEach
    void tearDown() throws Exception {
        this.mailQueue.close();
        this.system.terminate();
        this.pulsarClients.stop();
    }

    public void awaitRemove() {
        try {
            Thread.sleep(50L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public MailQueue getMailQueue() {
        return this.mailQueue;
    }

    public int getMailQueueMaxConcurrency() {
        return this.maxConcurrency;
    }

    public ManageableMailQueue getManageableMailQueue() {
        return this.mailQueue;
    }

    public PulsarMailQueue newInstance(DockerPulsarExtension.DockerPulsar dockerPulsar) {
        this.pulsarConfiguration = dockerPulsar.getConfiguration();
        this.pulsarClients = PulsarClients.create(this.pulsarConfiguration);
        return new PulsarMailQueue(new PulsarMailQueueConfiguration(this.mailQueueName, this.pulsarConfiguration, this.maxConcurrency, 10, 10), this.pulsarClients, this.blobIdFactory, this.mimeMessageStore, this.factory, this.metricTestSystem.getMetricFactory(), this.metricTestSystem.getSpyGaugeRegistry(), this.system);
    }

    @Test
    void badMessagesShouldNotAlterDelivery(DockerPulsarExtension.DockerPulsar dockerPulsar) throws Exception {
        new JavaClient(dockerPulsar.getConfiguration().brokerUri(), String.format("persistent://%s/James-%s", dockerPulsar.getConfiguration().namespace().asString(), this.mailQueueName.asString())).send("BAD").get();
        getMailQueue().enQueue(Mails.defaultMail().name("name").build());
        Assertions.assertThat(((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).take(1L).single().block()).getMail().getName()).isEqualTo("name");
    }

    @Test
    void badMessagesShouldBeMovedToADeadLetterTopic(DockerPulsarExtension.DockerPulsar dockerPulsar) throws Exception {
        new JavaClient(dockerPulsar.getConfiguration().brokerUri(), String.format("persistent://%s/James-%s", dockerPulsar.getConfiguration().namespace().asString(), this.mailQueueName.asString())).send("BAD").get();
        getMailQueue().enQueue(Mails.defaultMail().name("name").build());
        getMailQueue().enQueue(Mails.defaultMail().name("name2").build());
        Flux.from(getMailQueue().deQueue()).delayElements(Duration.ofSeconds(3L)).take(2L).collectList().block();
        Assertions.assertThat(OptionConverters.toJava(new JavaClient(dockerPulsar.getConfiguration().brokerUri(), String.format("persistent://%s/James-%s-subscription-%s-DLQ", dockerPulsar.getConfiguration().namespace().asString(), this.mailQueueName.asString(), this.mailQueueName.asString())).consumeOne()).map((v0) -> {
            return v0.value();
        })).contains("BAD");
    }

    @Test
    void ensureThatDeletionDoNotDeleteFutureEmailsWithTwoInstancesOfMailQueue(DockerPulsarExtension.DockerPulsar dockerPulsar) throws MessagingException, InterruptedException {
        PulsarMailQueue newInstance = newInstance(dockerPulsar);
        IntStream.range(0, 50).forEach(Throwing.intConsumer(i -> {
            enQueue(Mails.defaultMail().name("name" + i).build());
        }));
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(getManageableMailQueue().getSize()).isEqualTo(50L);
        });
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(newInstance.getSize()).isEqualTo(50L);
        });
        getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, MailAddressFixture.RECIPIENT1.asString());
        enQueue(Mails.defaultMail().name("namez").build());
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(((MailQueue.MailQueueItem) Flux.merge(new Publisher[]{Flux.from(newInstance.deQueue()), Flux.from(getManageableMailQueue().deQueue())}).blockFirst()).getMail().getName()).isEqualTo("namez");
        });
    }

    @Test
    void ensureThatDeletionApplyOnBrowsingBothInstancesWithTwoInstancesOfMailQueue(DockerPulsarExtension.DockerPulsar dockerPulsar) throws MessagingException, InterruptedException {
        PulsarMailQueue newInstance = newInstance(dockerPulsar);
        IntStream.range(0, 50).forEach(Throwing.intConsumer(i -> {
            enQueue(Mails.defaultMail().name("name" + i).build());
        }));
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(getManageableMailQueue().getSize()).isEqualTo(50L);
        });
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(newInstance.getSize()).isEqualTo(50L);
        });
        getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, MailAddressFixture.RECIPIENT1.asString());
        enQueue(Mails.defaultMail().name("namez").build());
        Assertions.assertThat(newInstance.browse()).toIterable().extracting(mailQueueItemView -> {
            return mailQueueItemView.getMail().getName();
        }).containsExactly(new String[]{"namez"});
        Assertions.assertThat(getManageableMailQueue().browse()).toIterable().extracting(mailQueueItemView2 -> {
            return mailQueueItemView2.getMail().getName();
        }).containsExactly(new String[]{"namez"});
    }

    @Test
    void queueShouldRemoveMailFromStoreOnAcknowledgedDequeue() throws Exception {
        enQueue(Mails.defaultMail().name("name").build());
        MailQueue.MailQueueItem mailQueueItem = (MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst();
        mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
        Assertions.assertThat(mailQueueItem.getMail().getName()).isEqualTo("name");
        Awaitility.await().untilAsserted(this::assertThatStoreIsEmpty);
    }

    @Test
    void removeShouldRemoveMailFromStoreWhenFilteredOut() throws Exception {
        enQueue(Mails.defaultMail().name("name1").build());
        enQueue(Mails.defaultMail().name("name2").build());
        enQueue(Mails.defaultMail().name("name3").build());
        getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "name2");
        awaitRemove();
        Assertions.assertThat(getManageableMailQueue().browse()).toIterable().extracting((v0) -> {
            return v0.getMail();
        }).extracting((v0) -> {
            return v0.getName();
        }).containsExactly(new String[]{"name1", "name3"});
        Flux.from(getMailQueue().deQueue()).take(2L).doOnNext(Throwing.consumer(mailQueueItem -> {
            mailQueueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
        })).blockLast();
        Awaitility.await().untilAsserted(this::assertThatStoreIsEmpty);
    }

    private void assertThatStoreIsEmpty() {
        Assertions.assertThat((List) Flux.from(this.memoryBlobStore.listBlobs(BucketName.DEFAULT)).map((v0) -> {
            return Objects.toString(v0);
        }).collectList().defaultIfEmpty(List.of()).block()).isEmpty();
    }

    @Disabled("this guarantee is too strong for Pulsar implementation and doesn't match any domain requirement")
    public void flushShouldPreserveBrowseOrder() {
    }

    @Test
    public void browseShouldReturnEmptyWhenSingleDequeueMessageEvenWhenStoreIsGuaranteedEmpty() throws Exception {
        enQueue(Mails.defaultMail().name("name").build());
        ((MailQueue.MailQueueItem) Flux.from(getMailQueue().deQueue()).blockFirst()).done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
        Awaitility.await().untilAsserted(this::assertThatStoreIsEmpty);
        Assertions.assertThat(getManageableMailQueue().browse()).toIterable().isEmpty();
    }
}
