package org.apache.james.task.eventsourcing.distributed;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Delivery;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import org.apache.james.backends.rabbitmq.Constants;
import org.apache.james.backends.rabbitmq.QueueArguments;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.eventstore.JsonEventSerializer;
import org.apache.james.lifecycle.api.Startable;
import org.apache.james.task.eventsourcing.TerminationSubscriber;
import org.apache.james.util.ReactorUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Sender;

/* loaded from: input_file:org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.class */
public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Startable, Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQTerminationSubscriber.class);
    static final String EXCHANGE_NAME = "terminationSubscriberExchange";
    static final String QUEUE_NAME_PREFIX = "terminationSubscriber";
    static final String ROUTING_KEY = "terminationSubscriberRoutingKey";
    private final TerminationQueueName queueName;
    private final JsonEventSerializer serializer;
    private final Sender sender;
    private final ReceiverProvider receiverProvider;
    private final RabbitMQConfiguration rabbitMQConfiguration;
    private Sinks.Many<OutboundMessage> sendQueue;
    private Sinks.Many<Event> listener;
    private Disposable sendQueueHandle;
    private Disposable listenQueueHandle;

    @Inject
    RabbitMQTerminationSubscriber(TerminationQueueName terminationQueueName, Sender sender, ReceiverProvider receiverProvider, JsonEventSerializer jsonEventSerializer, RabbitMQConfiguration rabbitMQConfiguration) {
        this.queueName = terminationQueueName;
        this.sender = sender;
        this.receiverProvider = receiverProvider;
        this.serializer = jsonEventSerializer;
        this.rabbitMQConfiguration = rabbitMQConfiguration;
    }

    public void start() {
        this.sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
        QueueArguments.Builder workQueueArgumentsBuilder = this.rabbitMQConfiguration.workQueueArgumentsBuilder();
        Optional queueTTL = this.rabbitMQConfiguration.getQueueTTL();
        Objects.requireNonNull(workQueueArgumentsBuilder);
        queueTTL.ifPresent((v1) -> {
            r1.queueTTL(v1);
        });
        this.sender.declare(QueueSpecification.queue(this.queueName.asString()).durable(Constants.evaluateDurable(false, this.rabbitMQConfiguration.isQuorumQueuesUsed())).autoDelete(Constants.evaluateAutoDelete(false, this.rabbitMQConfiguration.isQuorumQueuesUsed())).arguments(workQueueArgumentsBuilder.build())).block();
        this.sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, this.queueName.asString())).block();
        this.sendQueue = Sinks.many().unicast().onBackpressureBuffer();
        this.sendQueueHandle = this.sender.send(this.sendQueue.asFlux()).subscribeOn(Schedulers.boundedElastic()).subscribe();
        this.listener = Sinks.many().multicast().directBestEffort();
        this.listenQueueHandle = consumeTerminationQueue();
    }

    public void restart() {
        Disposable disposable = this.listenQueueHandle;
        this.listenQueueHandle = consumeTerminationQueue();
        disposable.dispose();
    }

    private Disposable consumeTerminationQueue() {
        ReceiverProvider receiverProvider = this.receiverProvider;
        Objects.requireNonNull(receiverProvider);
        return Flux.using(receiverProvider::createReceiver, receiver -> {
            return receiver.consumeAutoAck(this.queueName.asString());
        }, (v0) -> {
            v0.close();
        }).subscribeOn(Schedulers.boundedElastic()).map(this::toEvent).handle(ReactorUtils.publishIfPresent()).subscribe(event -> {
            this.listener.emitNext(event, Sinks.EmitFailureHandler.FAIL_FAST);
        });
    }

    public void addEvent(Event event) {
        try {
            this.sendQueue.emitNext(new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, new AMQP.BasicProperties.Builder().build(), this.serializer.serialize(event).getBytes(StandardCharsets.UTF_8)), Sinks.EmitFailureHandler.FAIL_FAST);
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public Publisher<Event> listenEvents() {
        return this.listener.asFlux();
    }

    private Optional<Event> toEvent(Delivery delivery) {
        String str = new String(delivery.getBody(), StandardCharsets.UTF_8);
        try {
            return Optional.of(this.serializer.deserialize(str));
        } catch (Exception e) {
            LOGGER.error("Unable to deserialize '{}'", str, e);
            return Optional.empty();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    @PreDestroy
    public void close() {
        Optional.ofNullable(this.sendQueueHandle).ifPresent((v0) -> {
            v0.dispose();
        });
        Optional.ofNullable(this.listenQueueHandle).ifPresent((v0) -> {
            v0.dispose();
        });
    }
}
