package org.apache.james.queue.rabbitmq.view.cassandra;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.data.TupleValue;
import com.datastax.oss.driver.api.core.type.DataType;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.TupleType;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
import com.datastax.oss.driver.api.querybuilder.select.Select;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import jakarta.inject.Inject;
import java.nio.ByteBuffer;
import java.util.Optional;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.queue.rabbitmq.EnqueuedItem;
import org.apache.james.queue.rabbitmq.MailQueueName;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule;
import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices;
import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
import org.apache.mailet.Mail;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.class */
public class EnqueuedMailsDAO {
    private final CassandraAsyncExecutor executor;
    private final PreparedStatement selectStatement;
    private final PreparedStatement selectBlobIdsStatement;
    private final PreparedStatement insertStatement;
    private final PreparedStatement deleteBucketStatement;
    private final BlobId.Factory blobFactory;
    private final TupleType userHeaderNameHeaderValueTriple = DataTypes.tupleOf(new DataType[]{DataTypes.TEXT, DataTypes.TEXT, DataTypes.TEXT});

    @Inject
    @VisibleForTesting
    public EnqueuedMailsDAO(CqlSession cqlSession, BlobId.Factory factory) {
        this.executor = new CassandraAsyncExecutor(cqlSession);
        this.selectStatement = prepareSelectFrom(cqlSession);
        this.insertStatement = prepareInsert(cqlSession);
        this.deleteBucketStatement = prepareDeleteBucket(cqlSession);
        this.selectBlobIdsStatement = prepareSelectBlobIds(cqlSession);
        this.blobFactory = factory;
    }

    private PreparedStatement prepareSelectFrom(CqlSession cqlSession) {
        return cqlSession.prepare(((Select) ((Select) ((Select) QueryBuilder.selectFrom(CassandraMailQueueViewModule.EnqueuedMailsTable.TABLE_NAME).all().whereColumn("queueName").isEqualTo(QueryBuilder.bindMarker("queueName"))).whereColumn(CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START).isEqualTo(QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START))).whereColumn(CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID).isEqualTo(QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID))).build());
    }

    private PreparedStatement prepareSelectBlobIds(CqlSession cqlSession) {
        return cqlSession.prepare(QueryBuilder.selectFrom(CassandraMailQueueViewModule.EnqueuedMailsTable.TABLE_NAME).columns(new String[]{CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_BLOB_ID, CassandraMailQueueViewModule.EnqueuedMailsTable.BODY_BLOB_ID}).build());
    }

    private PreparedStatement prepareDeleteBucket(CqlSession cqlSession) {
        return cqlSession.prepare(((Delete) ((Delete) ((Delete) QueryBuilder.deleteFrom(CassandraMailQueueViewModule.EnqueuedMailsTable.TABLE_NAME).whereColumn("queueName").isEqualTo(QueryBuilder.bindMarker("queueName"))).whereColumn(CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START).isEqualTo(QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START))).whereColumn(CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID).isEqualTo(QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID))).build());
    }

    private PreparedStatement prepareInsert(CqlSession cqlSession) {
        return cqlSession.prepare(QueryBuilder.insertInto(CassandraMailQueueViewModule.EnqueuedMailsTable.TABLE_NAME).value("queueName", QueryBuilder.bindMarker("queueName")).value(CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID)).value("enqueueId", QueryBuilder.bindMarker("enqueueId")).value(CassandraMailQueueViewModule.EnqueuedMailsTable.NAME, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.NAME)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_BLOB_ID, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_BLOB_ID)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.BODY_BLOB_ID, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.BODY_BLOB_ID)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUED_TIME, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUED_TIME)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.STATE, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.STATE)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.SENDER, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.SENDER)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.RECIPIENTS, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.RECIPIENTS)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.ATTRIBUTES, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.ATTRIBUTES)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.ERROR_MESSAGE, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.ERROR_MESSAGE)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_ADDR, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_ADDR)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_HOST, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_HOST)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.LAST_UPDATED, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.LAST_UPDATED)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS)).build());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> insert(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
        EnqueuedItem enqueuedItem = enqueuedItemWithSlicingContext.getEnqueuedItem();
        EnqueuedItemWithSlicingContext.SlicingContext slicingContext = enqueuedItemWithSlicingContext.getSlicingContext();
        Mail mail = enqueuedItem.getMail();
        MimeMessagePartsId partsId = enqueuedItem.getPartsId();
        BoundStatementBuilder list = this.insertStatement.boundStatementBuilder(new Object[0]).setString("queueName", enqueuedItem.getMailQueueName().asString()).setInstant(CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START, slicingContext.getTimeRangeStart()).setInt(CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID, slicingContext.getBucketId().getValue()).setInstant(CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUED_TIME, enqueuedItem.getEnqueuedTime()).setUuid("enqueueId", enqueuedItem.getEnqueueId().asUUID()).setString(CassandraMailQueueViewModule.EnqueuedMailsTable.NAME, mail.getName()).setString(CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_BLOB_ID, partsId.getHeaderBlobId().asString()).setString(CassandraMailQueueViewModule.EnqueuedMailsTable.BODY_BLOB_ID, partsId.getBodyBlobId().asString()).setString(CassandraMailQueueViewModule.EnqueuedMailsTable.STATE, mail.getState()).setList(CassandraMailQueueViewModule.EnqueuedMailsTable.RECIPIENTS, EnqueuedMailsDaoUtil.asStringList(mail.getRecipients()), String.class).setString(CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_ADDR, mail.getRemoteAddr()).setString(CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_HOST, mail.getRemoteHost()).setMap(CassandraMailQueueViewModule.EnqueuedMailsTable.ATTRIBUTES, EnqueuedMailsDaoUtil.toRawAttributeMap(mail), String.class, ByteBuffer.class).setList(CassandraMailQueueViewModule.EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS, EnqueuedMailsDaoUtil.toTupleList(this.userHeaderNameHeaderValueTriple, mail.getPerRecipientSpecificHeaders()), TupleValue.class);
        Optional.ofNullable(mail.getErrorMessage()).ifPresent(str -> {
            list.setString(CassandraMailQueueViewModule.EnqueuedMailsTable.ERROR_MESSAGE, mail.getErrorMessage());
        });
        Optional.ofNullable(mail.getLastUpdated()).map((v0) -> {
            return v0.toInstant();
        }).ifPresent(instant -> {
            list.setInstant(CassandraMailQueueViewModule.EnqueuedMailsTable.LAST_UPDATED, instant);
        });
        mail.getMaybeSender().asOptional().map((v0) -> {
            return v0.asString();
        }).ifPresent(str2 -> {
            list.setString(CassandraMailQueueViewModule.EnqueuedMailsTable.SENDER, str2);
        });
        return this.executor.executeVoid(list.build());
    }

    @VisibleForTesting
    public Flux<EnqueuedItemWithSlicingContext> selectEnqueuedMails(MailQueueName mailQueueName, BucketedSlices.Slice slice, BucketedSlices.BucketId bucketId) {
        return this.executor.executeRows(this.selectStatement.bind(new Object[0]).setString("queueName", mailQueueName.asString()).setInstant(CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START, slice.getStartSliceInstant()).setInt(CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID, bucketId.getValue())).map(row -> {
            return EnqueuedMailsDaoUtil.toEnqueuedMail(row, this.blobFactory);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> deleteBucket(MailQueueName mailQueueName, BucketedSlices.Slice slice, BucketedSlices.BucketId bucketId) {
        return this.executor.executeVoid(this.deleteBucketStatement.bind(new Object[0]).setString("queueName", mailQueueName.asString()).setInstant(CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START, slice.getStartSliceInstant()).setInt(CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID, bucketId.getValue()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<BlobId> listBlobIds() {
        return this.executor.executeRows(this.selectBlobIdsStatement.bind(new Object[0])).flatMapIterable(row -> {
            return ImmutableList.of(this.blobFactory.parse(row.getString(CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_BLOB_ID)), this.blobFactory.parse(row.getString(CassandraMailQueueViewModule.EnqueuedMailsTable.BODY_BLOB_ID)));
        });
    }
}
