package org.apache.james.backends.cassandra.migration;

import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
import java.time.Instant;
import java.util.Optional;
import javax.inject.Inject;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
import org.apache.james.backends.cassandra.versions.SchemaTransition;
import org.apache.james.backends.cassandra.versions.SchemaVersion;
import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

/* loaded from: input_file:org/apache/james/backends/cassandra/migration/MigrationTask.class */
public class MigrationTask implements Task {
    private static final Logger LOGGER = LoggerFactory.getLogger(MigrationTask.class);
    public static final TaskType CASSANDRA_MIGRATION = TaskType.of("cassandra-migration");
    private final CassandraSchemaVersionDAO schemaVersionDAO;
    private final CassandraSchemaTransitions transitions;
    private final SchemaVersion target;

    /* loaded from: input_file:org/apache/james/backends/cassandra/migration/MigrationTask$AdditionalInformation.class */
    public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
        private final SchemaVersion toVersion;
        private final Instant timestamp;

        public AdditionalInformation(SchemaVersion schemaVersion, Instant instant) {
            this.toVersion = schemaVersion;
            this.timestamp = instant;
        }

        public int getToVersion() {
            return this.toVersion.getValue();
        }

        public Instant timestamp() {
            return this.timestamp;
        }
    }

    /* loaded from: input_file:org/apache/james/backends/cassandra/migration/MigrationTask$Factory.class */
    public interface Factory {
        MigrationTask create(SchemaVersion schemaVersion);
    }

    /* loaded from: input_file:org/apache/james/backends/cassandra/migration/MigrationTask$Impl.class */
    public static class Impl implements Factory {
        private final CassandraSchemaVersionDAO schemaVersionDAO;
        private final CassandraSchemaTransitions transitions;

        @Inject
        private Impl(CassandraSchemaVersionDAO cassandraSchemaVersionDAO, CassandraSchemaTransitions cassandraSchemaTransitions) {
            this.schemaVersionDAO = cassandraSchemaVersionDAO;
            this.transitions = cassandraSchemaTransitions;
        }

        @Override // org.apache.james.backends.cassandra.migration.MigrationTask.Factory
        public MigrationTask create(SchemaVersion schemaVersion) {
            return new MigrationTask(this.schemaVersionDAO, this.transitions, schemaVersion);
        }
    }

    @VisibleForTesting
    public MigrationTask(CassandraSchemaVersionDAO cassandraSchemaVersionDAO, CassandraSchemaTransitions cassandraSchemaTransitions, SchemaVersion schemaVersion) {
        this.schemaVersionDAO = cassandraSchemaVersionDAO;
        this.transitions = cassandraSchemaTransitions;
        this.target = schemaVersion;
    }

    public Task.Result run() {
        getCurrentVersion().listTransitionsForTarget(this.target).stream().map(this::migration).forEach(Throwing.consumer(this::runMigration).sneakyThrow());
        return Task.Result.COMPLETED;
    }

    private SchemaVersion getCurrentVersion() {
        return (SchemaVersion) ((Optional) this.schemaVersionDAO.getCurrentSchemaVersion().block()).orElse(CassandraSchemaVersionManager.DEFAULT_VERSION);
    }

    private Tuple2<SchemaTransition, Migration> migration(SchemaTransition schemaTransition) {
        return Tuples.of(schemaTransition, this.transitions.findMigration(schemaTransition).orElseThrow(() -> {
            return new MigrationException("unable to find a required Migration for transition " + schemaTransition);
        }));
    }

    private void runMigration(Tuple2<SchemaTransition, Migration> tuple2) throws InterruptedException {
        SchemaVersion currentVersion = getCurrentVersion();
        SchemaTransition schemaTransition = (SchemaTransition) tuple2.getT1();
        if (currentVersion.isAfterOrEquals(schemaTransition.to())) {
            return;
        }
        LOGGER.info("Migrating to version {} ", schemaTransition.toAsString());
        ((Migration) tuple2.getT2()).asTask().run().onComplete(new Task.Operation[]{() -> {
            this.schemaVersionDAO.updateVersion(schemaTransition.to()).block();
        }, () -> {
            LOGGER.info("Migrating to version {} done", schemaTransition.toAsString());
        }}).onFailure(new Task.Operation[]{() -> {
            LOGGER.warn(failureMessage(schemaTransition.to()));
        }, () -> {
            throwMigrationException(schemaTransition.to());
        }});
    }

    private void throwMigrationException(SchemaVersion schemaVersion) {
        throw new MigrationException(failureMessage(schemaVersion));
    }

    private String failureMessage(SchemaVersion schemaVersion) {
        return String.format("Migrating to version %d partially done. Please check logs for cause of failure and re-run this migration.", Integer.valueOf(schemaVersion.getValue()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SchemaVersion getTarget() {
        return this.target;
    }

    public TaskType type() {
        return CASSANDRA_MIGRATION;
    }

    public Optional<TaskExecutionDetails.AdditionalInformation> details() {
        return Optional.of(new AdditionalInformation(this.target, Clock.systemUTC().instant()));
    }
}
