package org.apache.james.modules.mailbox;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.MessageProperties;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import java.util.Date;
import java.util.Optional;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.blob.api.BlobId;
import org.apache.james.core.Username;
import org.apache.james.lifecycle.api.Startable;
import org.apache.james.mailbox.cassandra.DeleteMessageListener;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.vault.metadata.DeletedMessageVaultDeletionCallback;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.Sender;

/* loaded from: input_file:org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.class */
public class DistributedDeletedMessageVaultDeletionCallback implements DeleteMessageListener.DeletionCallback, Startable {
    public static final Logger LOGGER = LoggerFactory.getLogger(DistributedDeletedMessageVaultDeletionCallback.class);
    private static final String EXCHANGE = "deleted-message-vault";
    private static final String QUEUE = "deleted-message-vault-work-queue";
    private static final String DEAD_LETTER = "deleted-message-vault-work-queue-dead-letter";
    private static final boolean REQUEUE = true;
    private static final int QOS = 5;
    private final ReactorRabbitMQChannelPool channelPool;
    private final RabbitMQConfiguration rabbitMQConfiguration;
    private final DeletedMessageVaultDeletionCallback callback;
    private final Sender sender;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final MailboxId.Factory mailboxIdFactory;
    private final MessageId.Factory messageIdFactory;
    private final BlobId.Factory blobIdFactory;
    private Receiver receiver;
    private Disposable disposable;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback$CopyCommandDTO.class */
    public static class CopyCommandDTO {
        private final String messageId;
        private final String mailboxId;
        private final String owner;
        private final Date internalDate;
        private final long size;
        private final boolean hasAttachments;
        private final String headerId;
        private final String bodyId;

        public static CopyCommandDTO of(DeleteMessageListener.DeletedMessageCopyCommand deletedMessageCopyCommand) {
            return new CopyCommandDTO(deletedMessageCopyCommand.getMessageId().serialize(), deletedMessageCopyCommand.getMailboxId().serialize(), deletedMessageCopyCommand.getOwner().asString(), deletedMessageCopyCommand.getInternalDate(), deletedMessageCopyCommand.getSize(), deletedMessageCopyCommand.hasAttachments(), deletedMessageCopyCommand.getHeaderId().asString(), deletedMessageCopyCommand.getBodyId().asString());
        }

        @JsonCreator
        public CopyCommandDTO(@JsonProperty("messageId") String str, @JsonProperty("mailboxId") String str2, @JsonProperty("owner") String str3, @JsonProperty("internalDate") Date date, @JsonProperty("size") long j, @JsonProperty("hasAttachments") boolean z, @JsonProperty("headerId") String str4, @JsonProperty("bodyId") String str5) {
            this.messageId = str;
            this.mailboxId = str2;
            this.owner = str3;
            this.internalDate = date;
            this.size = j;
            this.hasAttachments = z;
            this.headerId = str4;
            this.bodyId = str5;
        }

        public String getMessageId() {
            return this.messageId;
        }

        public String getMailboxId() {
            return this.mailboxId;
        }

        public String getOwner() {
            return this.owner;
        }

        public Date getInternalDate() {
            return this.internalDate;
        }

        public long getSize() {
            return this.size;
        }

        public boolean isHasAttachments() {
            return this.hasAttachments;
        }

        public String getHeaderId() {
            return this.headerId;
        }

        public String getBodyId() {
            return this.bodyId;
        }

        @JsonIgnore
        DeleteMessageListener.DeletedMessageCopyCommand asPojo(MailboxId.Factory factory, MessageId.Factory factory2, BlobId.Factory factory3) {
            return new DeleteMessageListener.DeletedMessageCopyCommand(factory2.fromString(this.messageId), factory.fromString(this.messageId), Username.of(this.owner), this.internalDate, this.size, this.hasAttachments, factory3.parse(this.headerId), factory3.parse(this.bodyId));
        }
    }

    @Inject
    public DistributedDeletedMessageVaultDeletionCallback(Sender sender, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, RabbitMQConfiguration rabbitMQConfiguration, DeletedMessageVaultDeletionCallback deletedMessageVaultDeletionCallback, MailboxId.Factory factory, MessageId.Factory factory2, BlobId.Factory factory3) {
        this.sender = sender;
        this.rabbitMQConfiguration = rabbitMQConfiguration;
        this.callback = deletedMessageVaultDeletionCallback;
        this.mailboxIdFactory = factory;
        this.messageIdFactory = factory2;
        this.blobIdFactory = factory3;
        this.channelPool = reactorRabbitMQChannelPool;
    }

    public void init() {
        Flux.concat(new Publisher[]{this.sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE).durable(true).type("direct")), this.sender.declareQueue(QueueSpecification.queue(DEAD_LETTER).durable(true).exclusive(false).autoDelete(false).arguments(this.rabbitMQConfiguration.workQueueArgumentsBuilder().deadLetter(DEAD_LETTER).build())), this.sender.declareQueue(QueueSpecification.queue(QUEUE).durable(true).exclusive(false).autoDelete(false).arguments(this.rabbitMQConfiguration.workQueueArgumentsBuilder().deadLetter(DEAD_LETTER).build())), this.sender.bind(BindingSpecification.binding().exchange(EXCHANGE).queue(QUEUE).routingKey(""))}).then().block();
        this.receiver = this.channelPool.createReceiver();
        this.disposable = this.receiver.consumeManualAck(QUEUE, new ConsumeOptions().qos(QOS)).flatMap(this::handleMessage).subscribeOn(Schedulers.boundedElastic()).subscribe();
    }

    @PreDestroy
    public void stop() {
        Optional.ofNullable(this.disposable).ifPresent((v0) -> {
            v0.dispose();
        });
        Optional.ofNullable(this.receiver).ifPresent((v0) -> {
            v0.close();
        });
    }

    private Mono<Void> handleMessage(AcknowledgableDelivery acknowledgableDelivery) {
        try {
            CopyCommandDTO copyCommandDTO = (CopyCommandDTO) this.objectMapper.readValue(acknowledgableDelivery.getBody(), CopyCommandDTO.class);
            return this.callback.forMessage(copyCommandDTO.asPojo(this.mailboxIdFactory, this.messageIdFactory, this.blobIdFactory)).doOnError(th -> {
                LOGGER.error("Failed executing deletion callback for {}", copyCommandDTO.messageId, th);
                acknowledgableDelivery.nack(true);
            }).doOnSuccess(r3 -> {
                acknowledgableDelivery.ack();
            }).doOnCancel(() -> {
                acknowledgableDelivery.nack(true);
            });
        } catch (Exception e) {
            LOGGER.error("Deserialization error: reject poisonous message for distributed Deleted message vault callback", e);
            acknowledgableDelivery.nack(false);
            return Mono.empty();
        }
    }

    public Mono<Void> forMessage(DeleteMessageListener.DeletedMessageCopyCommand deletedMessageCopyCommand) {
        try {
            return this.sender.send(Mono.just(new OutboundMessage(EXCHANGE, "", new AMQP.BasicProperties.Builder().deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode()).priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority()).contentType(MessageProperties.PERSISTENT_TEXT_PLAIN.getContentType()).build(), this.objectMapper.writeValueAsBytes(CopyCommandDTO.of(deletedMessageCopyCommand)))));
        } catch (JsonProcessingException e) {
            return Mono.error(e);
        }
    }
}
