package org.apache.james.queue.rabbitmq.view.cassandra;

import com.google.common.collect.ImmutableList;
import java.time.Clock;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
import javax.inject.Inject;
import org.apache.james.queue.rabbitmq.EnqueueId;
import org.apache.james.queue.rabbitmq.MailQueueName;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices;
import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.class */
public class CassandraMailQueueMailDelete {
    private final DeletedMailsDAO deletedMailsDao;
    private final BrowseStartDAO browseStartDao;
    private final ContentStartDAO contentStartDAO;
    private final EnqueuedMailsDAO enqueuedMailsDAO;
    private final CassandraMailQueueBrowser cassandraMailQueueBrowser;
    private final CassandraMailQueueViewConfiguration configuration;
    private final Clock clock;

    @Inject
    CassandraMailQueueMailDelete(DeletedMailsDAO deletedMailsDAO, BrowseStartDAO browseStartDAO, ContentStartDAO contentStartDAO, EnqueuedMailsDAO enqueuedMailsDAO, CassandraMailQueueBrowser cassandraMailQueueBrowser, CassandraMailQueueViewConfiguration cassandraMailQueueViewConfiguration, Clock clock) {
        this.deletedMailsDao = deletedMailsDAO;
        this.browseStartDao = browseStartDAO;
        this.contentStartDAO = contentStartDAO;
        this.enqueuedMailsDAO = enqueuedMailsDAO;
        this.cassandraMailQueueBrowser = cassandraMailQueueBrowser;
        this.configuration = cassandraMailQueueViewConfiguration;
        this.clock = clock;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> considerDeleted(EnqueueId enqueueId, MailQueueName mailQueueName) {
        return this.deletedMailsDao.markAsDeleted(mailQueueName, enqueueId).doFinally(signalType -> {
            maybeUpdateBrowseStart(mailQueueName);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Boolean> isDeleted(EnqueueId enqueueId, MailQueueName mailQueueName) {
        return this.deletedMailsDao.isDeleted(mailQueueName, enqueueId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updateBrowseStart(MailQueueName mailQueueName) {
        findNewBrowseStart(mailQueueName).flatMap(instant -> {
            return updateNewBrowseStart(mailQueueName, instant).then(clearContentBeforeBrowse(mailQueueName, instant));
        }).subscribeOn(Schedulers.parallel()).subscribe();
    }

    private void maybeUpdateBrowseStart(MailQueueName mailQueueName) {
        if (shouldUpdateBrowseStart()) {
            updateBrowseStart(mailQueueName);
        }
    }

    private Mono<Instant> findNewBrowseStart(MailQueueName mailQueueName) {
        BucketedSlices.Slice of = BucketedSlices.Slice.of(this.clock.instant());
        return this.browseStartDao.findBrowseStart(mailQueueName).filter(instant -> {
            return instant.isBefore(of.getStartSliceInstant());
        }).flatMapMany(instant2 -> {
            return this.cassandraMailQueueBrowser.browseReferences(mailQueueName, instant2);
        }).map(enqueuedItemWithSlicingContext -> {
            return enqueuedItemWithSlicingContext.getSlicingContext().getTimeRangeStart();
        }).next();
    }

    private Mono<Void> updateNewBrowseStart(MailQueueName mailQueueName, Instant instant) {
        return this.browseStartDao.updateBrowseStart(mailQueueName, instant);
    }

    private Mono<Void> clearContentBeforeBrowse(MailQueueName mailQueueName, Instant instant) {
        return this.contentStartDAO.findContentStart(mailQueueName).flatMapIterable(instant2 -> {
            return (Iterable) BucketedSlices.Slice.of(instant2).allSlicesTill(instant, this.configuration.getSliceWindow()).filter(slice -> {
                return slice.getStartSliceInstant().isBefore(instant);
            }).flatMap(slice2 -> {
                return IntStream.range(0, this.configuration.getBucketCount()).boxed().map(num -> {
                    return EnqueuedItemWithSlicingContext.SlicingContext.of(BucketedSlices.BucketId.of(num.intValue()), slice2.getStartSliceInstant());
                });
            }).collect(ImmutableList.toImmutableList());
        }).concatMap(slicingContext -> {
            return deleteEmailsFromBrowseProjection(mailQueueName, slicingContext);
        }).concatMap(slicingContext2 -> {
            return this.enqueuedMailsDAO.deleteBucket(mailQueueName, BucketedSlices.Slice.of(slicingContext2.getTimeRangeStart()), slicingContext2.getBucketId());
        }).then(this.contentStartDAO.updateContentStart(mailQueueName, instant));
    }

    private Mono<EnqueuedItemWithSlicingContext.SlicingContext> deleteEmailsFromBrowseProjection(MailQueueName mailQueueName, EnqueuedItemWithSlicingContext.SlicingContext slicingContext) {
        return this.enqueuedMailsDAO.selectEnqueuedMails(mailQueueName, BucketedSlices.Slice.of(slicingContext.getTimeRangeStart()), slicingContext.getBucketId()).flatMap(enqueuedItemWithSlicingContext -> {
            Mono<Void> removeDeletedMark = this.deletedMailsDao.removeDeletedMark(mailQueueName, enqueuedItemWithSlicingContext.getEnqueuedItem().getEnqueueId());
            Objects.requireNonNull(enqueuedItemWithSlicingContext);
            return removeDeletedMark.then(Mono.fromRunnable(enqueuedItemWithSlicingContext::dispose).subscribeOn(Schedulers.boundedElastic()));
        }, 16).then().thenReturn(slicingContext);
    }

    private boolean shouldUpdateBrowseStart() {
        return Math.abs(ThreadLocalRandom.current().nextInt()) % this.configuration.getUpdateBrowseStartPace() == 0;
    }
}
