package org.apache.james.events;

import com.google.common.collect.ImmutableList;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.rabbitmq.Constants;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.events.EventListener;
import org.apache.james.events.GroupRegistration;
import org.apache.james.util.ReactorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Sender;
import reactor.util.retry.Retry;

/* loaded from: input_file:org/apache/james/events/GroupRegistrationHandler.class */
class GroupRegistrationHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistrationHandler.class);
    static final Group GROUP = new GroupRegistrationHandlerGroup();
    private final NamingStrategy namingStrategy;
    private final EventSerializer eventSerializer;
    private final ReactorRabbitMQChannelPool channelPool;
    private final Sender sender;
    private final ReceiverProvider receiverProvider;
    private final RetryBackoffConfiguration retryBackoff;
    private final EventDeadLetters eventDeadLetters;
    private final ListenerExecutor listenerExecutor;
    private final RabbitMQConfiguration configuration;
    private final GroupRegistration.WorkQueueName queueName;
    private final Map<Group, GroupRegistration> groupRegistrations = new ConcurrentHashMap();
    private final Scheduler scheduler = Schedulers.newBoundedElastic(10, ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler");
    private Optional<Disposable> consumer = Optional.empty();

    /* loaded from: input_file:org/apache/james/events/GroupRegistrationHandler$GroupRegistrationHandlerGroup.class */
    public static class GroupRegistrationHandlerGroup extends Group {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GroupRegistrationHandler(NamingStrategy namingStrategy, EventSerializer eventSerializer, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, Sender sender, ReceiverProvider receiverProvider, RetryBackoffConfiguration retryBackoffConfiguration, EventDeadLetters eventDeadLetters, ListenerExecutor listenerExecutor, EventBusId eventBusId, RabbitMQConfiguration rabbitMQConfiguration) {
        this.namingStrategy = namingStrategy;
        this.eventSerializer = eventSerializer;
        this.channelPool = reactorRabbitMQChannelPool;
        this.sender = sender;
        this.receiverProvider = receiverProvider;
        this.retryBackoff = retryBackoffConfiguration;
        this.eventDeadLetters = eventDeadLetters;
        this.listenerExecutor = listenerExecutor;
        this.configuration = rabbitMQConfiguration;
        this.queueName = namingStrategy.workQueue(GROUP);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GroupRegistration retrieveGroupRegistration(Group group) {
        return (GroupRegistration) Optional.ofNullable(this.groupRegistrations.get(group)).orElseThrow(() -> {
            return new GroupRegistrationNotFound(group);
        });
    }

    public void start() {
        this.channelPool.createWorkQueue(QueueSpecification.queue(this.queueName.asString()).durable(Constants.evaluateDurable(true, this.configuration.isQuorumQueuesUsed())).exclusive(Constants.evaluateExclusive(false, this.configuration.isQuorumQueuesUsed())).autoDelete(Constants.evaluateAutoDelete(false, this.configuration.isQuorumQueuesUsed())).arguments(this.configuration.workQueueArgumentsBuilder().deadLetter(this.namingStrategy.deadLetterExchange()).build()), BindingSpecification.binding().exchange(this.namingStrategy.exchange()).queue(this.queueName.asString()).routingKey("")).retryWhen(Retry.backoff(this.retryBackoff.getMaxRetries(), this.retryBackoff.getFirstBackoff()).jitter(this.retryBackoff.getJitterFactor()).scheduler(Schedulers.boundedElastic())).block();
        this.consumer = Optional.of(consumeWorkQueue());
    }

    private Disposable consumeWorkQueue() {
        ReceiverProvider receiverProvider = this.receiverProvider;
        Objects.requireNonNull(receiverProvider);
        return Flux.using(receiverProvider::createReceiver, receiver -> {
            return receiver.consumeManualAck(this.queueName.asString(), new ConsumeOptions().qos(10));
        }, (v0) -> {
            v0.close();
        }).filter(acknowledgableDelivery -> {
            return Objects.nonNull(acknowledgableDelivery.getBody());
        }).flatMap(this::deliver, 10).subscribeOn(this.scheduler).subscribe();
    }

    private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) {
        Flux flatMap = deserializeEvent(acknowledgableDelivery.getBody()).flatMapIterable(event -> {
            return (Iterable) this.groupRegistrations.values().stream().map(groupRegistration -> {
                return Pair.of(groupRegistration, event);
            }).collect(ImmutableList.toImmutableList());
        }).flatMap(pair -> {
            return ((GroupRegistration) pair.getLeft()).runListenerReliably(0, (Event) pair.getRight());
        });
        Objects.requireNonNull(acknowledgableDelivery);
        return flatMap.then(Mono.fromRunnable(acknowledgableDelivery::ack).subscribeOn(Schedulers.boundedElastic())).then().onErrorResume(th -> {
            LOGGER.error("Unable to process delivery for group {}", GROUP, th);
            return Mono.fromRunnable(() -> {
                acknowledgableDelivery.nack(false);
            }).subscribeOn(Schedulers.boundedElastic()).then();
        });
    }

    private Mono<Event> deserializeEvent(byte[] bArr) {
        return Mono.fromCallable(() -> {
            return this.eventSerializer.fromBytes(bArr);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.groupRegistrations.values().forEach(groupRegistration -> {
            Mono.from(groupRegistration.m3unregister()).block();
        });
        this.consumer.ifPresent((v0) -> {
            v0.dispose();
        });
        Optional.ofNullable(this.scheduler).ifPresent((v0) -> {
            v0.dispose();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void restart() {
        Optional<Disposable> optional = this.consumer;
        this.consumer = Optional.of(consumeWorkQueue());
        optional.filter(Predicate.not((v0) -> {
            return v0.isDisposed();
        })).ifPresent((v0) -> {
            v0.dispose();
        });
        this.groupRegistrations.values().forEach((v0) -> {
            v0.restart();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Registration register(EventListener.ReactiveEventListener reactiveEventListener, Group group) {
        if (this.groupRegistrations.isEmpty()) {
            start();
        }
        return this.groupRegistrations.compute(group, (group2, groupRegistration) -> {
            if (groupRegistration != null) {
                throw new GroupAlreadyRegistered(group);
            }
            return newGroupRegistration(reactiveEventListener, group2);
        }).start();
    }

    private GroupRegistration newGroupRegistration(EventListener.ReactiveEventListener reactiveEventListener, Group group) {
        return new GroupRegistration(this.namingStrategy, this.channelPool, this.sender, this.receiverProvider, this.eventSerializer, reactiveEventListener, group, this.retryBackoff, this.eventDeadLetters, () -> {
            this.groupRegistrations.remove(group);
        }, this.listenerExecutor, this.configuration);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<Group> registeredGroups() {
        return this.groupRegistrations.keySet();
    }
}
