package org.apache.james.queue.rabbitmq.view.cassandra;

import jakarta.inject.Inject;
import jakarta.mail.internet.MimeMessage;
import java.time.Instant;
import java.util.Objects;
import org.apache.james.blob.api.Store;
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.rabbitmq.EnqueueId;
import org.apache.james.queue.rabbitmq.EnqueuedItem;
import org.apache.james.queue.rabbitmq.MailQueueName;
import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
import org.apache.james.util.FunctionalUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.class */
public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> {
    private static final int DELETION_CONCURRENCY = 8;
    private final CassandraMailQueueMailStore storeHelper;
    private final CassandraMailQueueBrowser cassandraMailQueueBrowser;
    private final CassandraMailQueueMailDelete cassandraMailQueueMailDelete;
    private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
    private final MailQueueName mailQueueName;

    /* loaded from: input_file:org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView$Factory.class */
    public static class Factory implements MailQueueView.Factory {
        private final CassandraMailQueueMailStore storeHelper;
        private final CassandraMailQueueBrowser cassandraMailQueueBrowser;
        private final CassandraMailQueueMailDelete cassandraMailQueueMailDelete;
        private final MimeMessageStore.Factory mimeMessageStoreFactory;

        @Inject
        public Factory(CassandraMailQueueMailStore cassandraMailQueueMailStore, CassandraMailQueueBrowser cassandraMailQueueBrowser, CassandraMailQueueMailDelete cassandraMailQueueMailDelete, MimeMessageStore.Factory factory) {
            this.storeHelper = cassandraMailQueueMailStore;
            this.cassandraMailQueueBrowser = cassandraMailQueueBrowser;
            this.cassandraMailQueueMailDelete = cassandraMailQueueMailDelete;
            this.mimeMessageStoreFactory = factory;
        }

        @Override // org.apache.james.queue.rabbitmq.view.api.MailQueueView.Factory
        public MailQueueView create(MailQueueName mailQueueName) {
            return new CassandraMailQueueView(this.storeHelper, mailQueueName, this.cassandraMailQueueBrowser, this.cassandraMailQueueMailDelete, this.mimeMessageStoreFactory.mimeMessageStore());
        }
    }

    CassandraMailQueueView(CassandraMailQueueMailStore cassandraMailQueueMailStore, MailQueueName mailQueueName, CassandraMailQueueBrowser cassandraMailQueueBrowser, CassandraMailQueueMailDelete cassandraMailQueueMailDelete, Store<MimeMessage, MimeMessagePartsId> store) {
        this.mailQueueName = mailQueueName;
        this.storeHelper = cassandraMailQueueMailStore;
        this.cassandraMailQueueBrowser = cassandraMailQueueBrowser;
        this.cassandraMailQueueMailDelete = cassandraMailQueueMailDelete;
        this.mimeMessageStore = store;
    }

    public Mono<Void> updateBrowseStart() {
        return this.cassandraMailQueueMailDelete.updateBrowseStartReactive(this.mailQueueName);
    }

    @Override // org.apache.james.queue.rabbitmq.view.api.MailQueueView
    public void initialize(MailQueueName mailQueueName) {
        this.storeHelper.initializeBrowseStart(mailQueueName).block();
        this.storeHelper.initializeContentStart(mailQueueName).block();
    }

    @Override // org.apache.james.queue.rabbitmq.view.api.MailQueueView
    public Mono<Void> storeMail(EnqueuedItem enqueuedItem) {
        return this.storeHelper.storeMail(enqueuedItem);
    }

    @Override // org.apache.james.queue.rabbitmq.view.api.MailQueueView
    public ManageableMailQueue.MailQueueIterator browse() {
        return new CassandraMailQueueBrowser.CassandraMailQueueIterator(browseReactive().toIterable().iterator());
    }

    @Override // org.apache.james.queue.rabbitmq.view.api.MailQueueView
    public Flux<CassandraMailQueueBrowser.CassandraMailQueueItemView> browseReactive() {
        return this.cassandraMailQueueBrowser.browse(this.mailQueueName);
    }

    @Override // org.apache.james.queue.rabbitmq.view.api.MailQueueView
    public Flux<CassandraMailQueueBrowser.CassandraMailQueueItemView> browseOlderThanReactive(Instant instant) {
        return this.cassandraMailQueueBrowser.browseOlderThan(this.mailQueueName, instant);
    }

    @Override // org.apache.james.queue.rabbitmq.view.api.MailQueueView
    public long getSize() {
        return ((Long) getSizeReactive().block()).longValue();
    }

    @Override // org.apache.james.queue.rabbitmq.view.api.MailQueueView
    public Mono<Long> getSizeReactive() {
        return this.cassandraMailQueueBrowser.browseReferences(this.mailQueueName).count();
    }

    @Override // org.apache.james.queue.rabbitmq.view.api.MailQueueView
    /* renamed from: delete, reason: merged with bridge method [inline-methods] */
    public Mono<Long> mo10delete(DeleteCondition deleteCondition) {
        if (!(deleteCondition instanceof DeleteCondition.WithEnqueueId)) {
            return browseThenDelete(deleteCondition);
        }
        DeleteCondition.WithEnqueueId withEnqueueId = (DeleteCondition.WithEnqueueId) deleteCondition;
        return delete(withEnqueueId.getEnqueueId(), withEnqueueId.getBlobIds()).thenReturn(1L);
    }

    private Mono<Long> browseThenDelete(DeleteCondition deleteCondition) {
        Flux map = this.cassandraMailQueueBrowser.browseReferences(this.mailQueueName).map((v0) -> {
            return v0.getEnqueuedItem();
        });
        Objects.requireNonNull(deleteCondition);
        return map.filter(deleteCondition::shouldBeDeleted).flatMap(enqueuedItem -> {
            return this.cassandraMailQueueMailDelete.considerDeleted(enqueuedItem.getEnqueueId(), this.mailQueueName).then(Mono.from(this.mimeMessageStore.delete(enqueuedItem.getPartsId())));
        }, DELETION_CONCURRENCY).count().doOnNext(l -> {
            this.cassandraMailQueueMailDelete.updateBrowseStart(this.mailQueueName);
        });
    }

    private Mono<Void> delete(EnqueueId enqueueId, MimeMessagePartsId mimeMessagePartsId) {
        return this.cassandraMailQueueMailDelete.considerDeleted(enqueueId, this.mailQueueName).then(Mono.from(this.mimeMessageStore.delete(mimeMessagePartsId)));
    }

    @Override // org.apache.james.queue.rabbitmq.view.api.MailQueueView
    public Mono<Boolean> isPresent(EnqueueId enqueueId) {
        return this.cassandraMailQueueMailDelete.isDeleted(enqueueId, this.mailQueueName).map(FunctionalUtils.negate());
    }
}
