package org.apache.james.backends.rabbitmq;

import com.github.fge.lambdas.Throwing;
import com.rabbitmq.client.Connection;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.james.lifecycle.api.Startable;
import org.apache.james.util.ReactorUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

/* loaded from: input_file:org/apache/james/backends/rabbitmq/SimpleConnectionPool.class */
public class SimpleConnectionPool implements AutoCloseable, Startable {
    public static final Logger LOGGER = LoggerFactory.getLogger(SimpleConnectionPool.class);
    private final RabbitMQConnectionFactory connectionFactory;
    private final Configuration configuration;
    private Set<ReconnectionHandler> reconnectionHandlers = new HashSet();
    private final AtomicReference<Connection> connectionReference = new AtomicReference<>();

    /* loaded from: input_file:org/apache/james/backends/rabbitmq/SimpleConnectionPool$Configuration.class */
    public static class Configuration {
        public static final Configuration DEFAULT = builder().retries(10).initialDelay(Duration.ofMillis(100));
        private final int numRetries;
        private final Duration initialDelay;

        @FunctionalInterface
        /* loaded from: input_file:org/apache/james/backends/rabbitmq/SimpleConnectionPool$Configuration$RequiresInitialDelay.class */
        public interface RequiresInitialDelay {
            Configuration initialDelay(Duration duration);
        }

        @FunctionalInterface
        /* loaded from: input_file:org/apache/james/backends/rabbitmq/SimpleConnectionPool$Configuration$RequiresRetries.class */
        public interface RequiresRetries {
            RequiresInitialDelay retries(int i);
        }

        public static RequiresRetries builder() {
            return i -> {
                return duration -> {
                    return new Configuration(i, duration);
                };
            };
        }

        public static Configuration from(org.apache.commons.configuration2.Configuration configuration) {
            return builder().retries(configuration.getInt("connection.pool.retries", 10)).initialDelay(Duration.ofMillis(configuration.getLong("connection.pool.min.delay.ms", 100L)));
        }

        public Configuration(int i, Duration duration) {
            this.numRetries = i;
            this.initialDelay = duration;
        }

        public int getNumRetries() {
            return this.numRetries;
        }

        public Duration getInitialDelay() {
            return this.initialDelay;
        }
    }

    /* loaded from: input_file:org/apache/james/backends/rabbitmq/SimpleConnectionPool$ReconnectionHandler.class */
    public interface ReconnectionHandler {
        Publisher<Void> handleReconnection(Connection connection);
    }

    @Inject
    public SimpleConnectionPool(RabbitMQConnectionFactory rabbitMQConnectionFactory, Configuration configuration) {
        this.connectionFactory = rabbitMQConnectionFactory;
        this.configuration = configuration;
    }

    public void init(Set<ReconnectionHandler> set) {
        this.reconnectionHandlers = set;
    }

    @Override // java.lang.AutoCloseable
    @PreDestroy
    public void close() {
        Optional.ofNullable(this.connectionReference.get()).filter((v0) -> {
            return v0.isOpen();
        }).ifPresent(Throwing.consumer((v0) -> {
            v0.close();
        }).orDoNothing());
    }

    public Mono<Connection> getResilientConnection() {
        return getOpenConnection().retryWhen(Retry.backoff(this.configuration.getNumRetries(), this.configuration.getInitialDelay()).scheduler(Schedulers.boundedElastic()));
    }

    private Mono<Connection> getOpenConnection() {
        return Mono.defer(() -> {
            Connection connection = this.connectionReference.get();
            Optional filter = Optional.ofNullable(connection).filter((v0) -> {
                return v0.isOpen();
            });
            RabbitMQConnectionFactory rabbitMQConnectionFactory = this.connectionFactory;
            Objects.requireNonNull(rabbitMQConnectionFactory);
            Connection connection2 = (Connection) filter.orElseGet(rabbitMQConnectionFactory::create);
            if (!this.connectionReference.compareAndSet(connection, connection2)) {
                try {
                    connection2.close();
                } catch (IOException e) {
                }
                return Mono.error(new RuntimeException("unable to create and register a new Connection"));
            }
            if (connection == null || connection == connection2) {
                return Mono.just(connection2);
            }
            LOGGER.warn("Replacing current RabbitMQ connection...");
            return Flux.fromIterable(this.reconnectionHandlers).concatMap(reconnectionHandler -> {
                return reconnectionHandler.handleReconnection(connection2);
            }).then().thenReturn(connection2);
        }).subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
    }

    public Mono<Boolean> tryConnection() {
        return getOpenConnection().timeout(Duration.ofSeconds(1L)).hasElement().onErrorResume(th -> {
            return Mono.just(false);
        });
    }

    public Mono<RabbitMQServerVersion> version() {
        return getOpenConnection().map((v0) -> {
            return v0.getServerProperties();
        }).map(map -> {
            return Optional.ofNullable(map.get("version"));
        }).handle(ReactorUtils.publishIfPresent()).map((v0) -> {
            return v0.toString();
        }).map(RabbitMQServerVersion::of).timeout(Duration.ofSeconds(1L)).onErrorResume(th -> {
            return Mono.empty();
        });
    }
}
