package org.apache.james.backends.cassandra.init;

import com.datastax.oss.driver.api.core.CqlSession;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.inject.Singleton;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.Callable;
import org.apache.james.backends.cassandra.init.configuration.ClusterConfiguration;
import org.apache.james.backends.cassandra.init.configuration.KeyspaceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

@Singleton
/* loaded from: input_file:org/apache/james/backends/cassandra/init/ResilientClusterProvider.class */
public class ResilientClusterProvider implements Provider<CqlSession> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ResilientClusterProvider.class);
    private final CqlSession cluster;
    private final ClusterConfiguration clusterConfiguration;

    @Inject
    @VisibleForTesting
    ResilientClusterProvider(ClusterConfiguration clusterConfiguration, KeyspaceConfiguration keyspaceConfiguration) {
        this.clusterConfiguration = clusterConfiguration;
        Duration ofMillis = Duration.ofMillis(this.clusterConfiguration.getMinDelay());
        this.cluster = (CqlSession) Mono.fromCallable(getClusterRetryCallable(clusterConfiguration, keyspaceConfiguration)).doOnError(th -> {
            LOGGER.warn("Error establishing Cassandra connection. Next retry scheduled in {} ms", ofMillis, th);
        }).retryWhen(Retry.backoff(clusterConfiguration.getMaxRetry(), ofMillis).scheduler(Schedulers.boundedElastic())).block();
    }

    private Callable<CqlSession> getClusterRetryCallable(ClusterConfiguration clusterConfiguration, KeyspaceConfiguration keyspaceConfiguration) {
        LOGGER.info("Trying to connect to Cassandra service at {} (list {})", LocalDateTime.now(), ImmutableList.copyOf(clusterConfiguration.getHosts()).toString());
        return () -> {
            CqlSession create = ClusterFactory.create(clusterConfiguration, keyspaceConfiguration);
            try {
                KeyspaceFactory.keyspaceExist(create, "any");
                return create;
            } catch (Exception e) {
                create.close();
                throw e;
            }
        };
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public CqlSession m9get() {
        return this.cluster;
    }

    public CqlSession get(KeyspaceConfiguration keyspaceConfiguration) {
        Duration ofMillis = Duration.ofMillis(this.clusterConfiguration.getMinDelay());
        return (CqlSession) Mono.fromCallable(getClusterRetryCallable(this.clusterConfiguration, keyspaceConfiguration)).doOnError(th -> {
            LOGGER.warn("Error establishing Cassandra connection. Next retry scheduled in {} ms", ofMillis, th);
        }).retryWhen(Retry.backoff(this.clusterConfiguration.getMaxRetry(), ofMillis).scheduler(Schedulers.boundedElastic())).block();
    }

    @PreDestroy
    public void stop() {
        this.cluster.closeAsync();
    }
}
