package org.apache.james.blob.cassandra.cache;

import com.github.fge.lambdas.Throwing;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteSource;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.ObjectNotFoundException;
import org.apache.james.blob.api.ObjectStoreIOException;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.util.ReactorUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/blob/cassandra/cache/CachedBlobStore.class */
public class CachedBlobStore implements BlobStore {
    public static final String BACKEND = "blobStoreBackend";
    public static final String BLOBSTORE_CACHED_LATENCY_METRIC_NAME = "blobStoreCacheLatency";
    public static final String BLOBSTORE_BACKEND_LATENCY_METRIC_NAME = "blobStoreBackEndLatency";
    public static final String BLOBSTORE_CACHED_MISS_COUNT_METRIC_NAME = "blobStoreCacheMisses";
    public static final String BLOBSTORE_CACHED_HIT_COUNT_METRIC_NAME = "blobStoreCacheHits";
    private final MetricFactory metricFactory;
    private final Metric metricRetrieveHitCount;
    private final Metric metricRetrieveMissCount;
    private final BlobStoreCache cache;
    private final BlobStore backend;
    private final Integer sizeThresholdInBytes;

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/james/blob/cassandra/cache/CachedBlobStore$ByteSourceBackendSaver.class */
    public interface ByteSourceBackendSaver {
        Mono<BlobId> save();
    }

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/james/blob/cassandra/cache/CachedBlobStore$InputStreamBackendSaver.class */
    public interface InputStreamBackendSaver {
        Mono<BlobId> save(ReadAheadInputStream readAheadInputStream);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/blob/cassandra/cache/CachedBlobStore$ReadAheadInputStream.class */
    public static class ReadAheadInputStream {
        final PushbackInputStream in;
        final Optional<byte[]> firstBytes;
        final boolean hasMore;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/james/blob/cassandra/cache/CachedBlobStore$ReadAheadInputStream$RequireLength.class */
        public interface RequireLength {
            ReadAheadInputStream length(int i) throws IOException;
        }

        @FunctionalInterface
        /* loaded from: input_file:org/apache/james/blob/cassandra/cache/CachedBlobStore$ReadAheadInputStream$RequireStream.class */
        interface RequireStream {
            RequireLength of(InputStream inputStream);
        }

        static RequireStream eager() {
            return inputStream -> {
                return i -> {
                    boolean hasMore;
                    Optional of;
                    PushbackInputStream pushbackInputStream = new PushbackInputStream(inputStream, i + 1);
                    byte[] bArr = new byte[i];
                    int read = IOUtils.read(pushbackInputStream, bArr);
                    if (read < 0) {
                        of = Optional.empty();
                        hasMore = false;
                    } else {
                        byte[] copyOf = Arrays.copyOf(bArr, read);
                        hasMore = hasMore(pushbackInputStream);
                        pushbackInputStream.unread(copyOf);
                        of = Optional.of(copyOf);
                    }
                    return new ReadAheadInputStream(pushbackInputStream, of, hasMore);
                };
            };
        }

        private static boolean hasMore(PushbackInputStream pushbackInputStream) throws IOException {
            int read = pushbackInputStream.read();
            if (read < 0) {
                return false;
            }
            pushbackInputStream.unread(read);
            return true;
        }

        private ReadAheadInputStream(PushbackInputStream pushbackInputStream, Optional<byte[]> optional, boolean z) {
            this.in = pushbackInputStream;
            this.firstBytes = optional;
            this.hasMore = z;
        }
    }

    @Inject
    public CachedBlobStore(BlobStoreCache blobStoreCache, @Named("blobStoreBackend") BlobStore blobStore, CassandraCacheConfiguration cassandraCacheConfiguration, MetricFactory metricFactory) {
        this.cache = blobStoreCache;
        this.backend = blobStore;
        this.sizeThresholdInBytes = Integer.valueOf(cassandraCacheConfiguration.getSizeThresholdInBytes());
        this.metricFactory = metricFactory;
        this.metricRetrieveMissCount = metricFactory.generate(BLOBSTORE_CACHED_MISS_COUNT_METRIC_NAME);
        this.metricRetrieveHitCount = metricFactory.generate(BLOBSTORE_CACHED_HIT_COUNT_METRIC_NAME);
    }

    public InputStream read(BucketName bucketName, BlobId blobId, BlobStore.StoragePolicy storagePolicy) throws ObjectStoreIOException, ObjectNotFoundException {
        return storagePolicy == BlobStore.StoragePolicy.LOW_COST ? this.backend.read(bucketName, blobId) : (InputStream) readInputStream(bucketName, blobId).blockOptional().orElseThrow(() -> {
            return new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId.asString()));
        });
    }

    public Publisher<InputStream> readReactive(BucketName bucketName, BlobId blobId, BlobStore.StoragePolicy storagePolicy) {
        return storagePolicy == BlobStore.StoragePolicy.LOW_COST ? this.backend.readReactive(bucketName, blobId) : readInputStream(bucketName, blobId).switchIfEmpty(Mono.error(() -> {
            return new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId.asString()));
        }));
    }

    private Mono<InputStream> readInputStream(BucketName bucketName, BlobId blobId) {
        return bucketName.equals(getDefaultBucketName()) ? readInDefaultBucket(bucketName, blobId) : readFromBackend(bucketName, blobId);
    }

    private Mono<InputStream> readInDefaultBucket(BucketName bucketName, BlobId blobId) {
        return readFromCache(blobId).map(ByteArrayInputStream::new).switchIfEmpty(readFromBackend(bucketName, blobId).flatMap(inputStream -> {
            return Mono.fromCallable(() -> {
                return ReadAheadInputStream.eager().of(inputStream).length(this.sizeThresholdInBytes.intValue());
            }).subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER).flatMap(readAheadInputStream -> {
                return putInCacheIfNeeded(bucketName, readAheadInputStream, blobId).thenReturn(readAheadInputStream.in);
            });
        }));
    }

    /* renamed from: readBytes, reason: merged with bridge method [inline-methods] */
    public Mono<byte[]> m14readBytes(BucketName bucketName, BlobId blobId, BlobStore.StoragePolicy storagePolicy) {
        if (storagePolicy != BlobStore.StoragePolicy.LOW_COST && getDefaultBucketName().equals(bucketName)) {
            return readBytesInDefaultBucket(bucketName, blobId);
        }
        return readBytesFromBackend(bucketName, blobId);
    }

    public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
        return m14readBytes(bucketName, blobId, BlobStore.StoragePolicy.LOW_COST);
    }

    public InputStream read(BucketName bucketName, BlobId blobId) {
        return read(bucketName, blobId, BlobStore.StoragePolicy.LOW_COST);
    }

    public Publisher<InputStream> readReactive(BucketName bucketName, BlobId blobId) {
        return readReactive(bucketName, blobId, BlobStore.StoragePolicy.LOW_COST);
    }

    private Mono<byte[]> readBytesInDefaultBucket(BucketName bucketName, BlobId blobId) {
        return readFromCache(blobId).switchIfEmpty(readBytesFromBackend(bucketName, blobId).flatMap(bArr -> {
            if (!isAbleToCache(bArr)) {
                return Mono.just(bArr);
            }
            this.metricRetrieveMissCount.increment();
            return saveInCache(blobId, bArr).then(Mono.just(bArr));
        }));
    }

    public Publisher<BlobId> save(BucketName bucketName, byte[] bArr, BlobStore.StoragePolicy storagePolicy) {
        return Mono.from(this.backend.save(bucketName, bArr, storagePolicy)).flatMap(blobId -> {
            return isAbleToCache(bucketName, bArr, storagePolicy) ? saveInCache(blobId, bArr).thenReturn(blobId) : Mono.just(blobId);
        });
    }

    public Publisher<BlobId> save(BucketName bucketName, InputStream inputStream, BlobStore.StoragePolicy storagePolicy) {
        Preconditions.checkNotNull(inputStream, "InputStream must not be null");
        return isAbleToCache(bucketName, storagePolicy) ? saveInCache(bucketName, inputStream, storagePolicy, readAheadInputStream -> {
            return Mono.from(this.backend.save(bucketName, readAheadInputStream.in, storagePolicy));
        }) : this.backend.save(bucketName, inputStream, storagePolicy);
    }

    public Publisher<BlobId> save(BucketName bucketName, ByteSource byteSource, BlobStore.StoragePolicy storagePolicy) {
        Preconditions.checkNotNull(byteSource, "ByteSource must not be null");
        return isAbleToCache(bucketName, storagePolicy) ? saveInCache(byteSource, () -> {
            return Mono.from(this.backend.save(bucketName, byteSource, storagePolicy));
        }) : this.backend.save(bucketName, byteSource, storagePolicy);
    }

    /* renamed from: save, reason: merged with bridge method [inline-methods] */
    public Mono<BlobId> m15save(BucketName bucketName, byte[] bArr, BlobStore.BlobIdProvider blobIdProvider, BlobStore.StoragePolicy storagePolicy) {
        return Mono.from(this.backend.save(bucketName, bArr, blobIdProvider, storagePolicy)).flatMap(blobId -> {
            return isAbleToCache(bucketName, bArr, storagePolicy) ? saveInCache(blobId, bArr).thenReturn(blobId) : Mono.just(blobId);
        });
    }

    public Publisher<BlobId> save(BucketName bucketName, InputStream inputStream, BlobStore.BlobIdProvider blobIdProvider, BlobStore.StoragePolicy storagePolicy) {
        Preconditions.checkNotNull(inputStream, "InputStream must not be null");
        return isAbleToCache(bucketName, storagePolicy) ? saveInCache(bucketName, inputStream, storagePolicy, readAheadInputStream -> {
            return Mono.from(this.backend.save(bucketName, readAheadInputStream.in, blobIdProvider, storagePolicy));
        }) : this.backend.save(bucketName, inputStream, blobIdProvider, storagePolicy);
    }

    public Publisher<BlobId> save(BucketName bucketName, ByteSource byteSource, BlobStore.BlobIdProvider blobIdProvider, BlobStore.StoragePolicy storagePolicy) {
        Preconditions.checkNotNull(byteSource, "ByteSource must not be null");
        return isAbleToCache(bucketName, storagePolicy) ? saveInCache(byteSource, () -> {
            return Mono.from(this.backend.save(bucketName, byteSource, blobIdProvider, storagePolicy));
        }) : this.backend.save(bucketName, byteSource, blobIdProvider, storagePolicy);
    }

    private Mono<BlobId> saveInCache(ByteSource byteSource, ByteSourceBackendSaver byteSourceBackendSaver) {
        return Mono.from(byteSourceBackendSaver.save()).flatMap(Throwing.function(blobId -> {
            return (Mono) ReadAheadInputStream.eager().of(byteSource.openStream()).length(this.sizeThresholdInBytes.intValue()).firstBytes.map(bArr -> {
                return Mono.from(this.cache.mo20cache(blobId, bArr)).thenReturn(blobId);
            }).orElse(Mono.just(blobId));
        }));
    }

    public BucketName getDefaultBucketName() {
        return this.backend.getDefaultBucketName();
    }

    /* renamed from: delete, reason: merged with bridge method [inline-methods] */
    public Mono<Boolean> m13delete(BucketName bucketName, BlobId blobId) {
        return Mono.from(this.backend.delete(bucketName, blobId)).flatMap(bool -> {
            return (this.backend.getDefaultBucketName().equals(bucketName) && bool.booleanValue()) ? Mono.from(this.cache.mo18remove(blobId)).thenReturn(bool) : Mono.just(bool);
        });
    }

    public Publisher<Void> deleteBucket(BucketName bucketName) {
        return Mono.from(this.backend.deleteBucket(bucketName));
    }

    private Mono<BlobId> saveInCache(BucketName bucketName, InputStream inputStream, BlobStore.StoragePolicy storagePolicy, InputStreamBackendSaver inputStreamBackendSaver) {
        return Mono.fromCallable(() -> {
            return ReadAheadInputStream.eager().of(inputStream).length(this.sizeThresholdInBytes.intValue());
        }).flatMap(readAheadInputStream -> {
            return inputStreamBackendSaver.save(readAheadInputStream).flatMap(blobId -> {
                return putInCacheIfNeeded(bucketName, storagePolicy, readAheadInputStream, blobId).thenReturn(blobId);
            });
        });
    }

    private Mono<Void> putInCacheIfNeeded(BucketName bucketName, BlobStore.StoragePolicy storagePolicy, ReadAheadInputStream readAheadInputStream, BlobId blobId) {
        return (Mono) readAheadInputStream.firstBytes.filter(bArr -> {
            return isAbleToCache(bucketName, readAheadInputStream, storagePolicy);
        }).map(bArr2 -> {
            return Mono.from(this.cache.mo20cache(blobId, bArr2));
        }).orElse(Mono.empty());
    }

    private Mono<Void> putInCacheIfNeeded(BucketName bucketName, ReadAheadInputStream readAheadInputStream, BlobId blobId) {
        return (Mono) readAheadInputStream.firstBytes.filter(bArr -> {
            return isAbleToCache(readAheadInputStream, bucketName);
        }).map(bArr2 -> {
            Metric metric = this.metricRetrieveMissCount;
            Objects.requireNonNull(metric);
            return Mono.fromRunnable(metric::increment).then(Mono.from(this.cache.mo20cache(blobId, bArr2)));
        }).orElse(Mono.empty());
    }

    private Mono<Void> saveInCache(BlobId blobId, byte[] bArr) {
        return Mono.from(this.cache.mo20cache(blobId, bArr));
    }

    private boolean isAbleToCache(BucketName bucketName, byte[] bArr, BlobStore.StoragePolicy storagePolicy) {
        return isAbleToCache(bucketName, storagePolicy) && isAbleToCache(bArr);
    }

    private boolean isAbleToCache(BucketName bucketName, ReadAheadInputStream readAheadInputStream, BlobStore.StoragePolicy storagePolicy) {
        return isAbleToCache(bucketName, storagePolicy) && !readAheadInputStream.hasMore;
    }

    private boolean isAbleToCache(BucketName bucketName, BlobStore.StoragePolicy storagePolicy) {
        return this.backend.getDefaultBucketName().equals(bucketName) && !storagePolicy.equals(BlobStore.StoragePolicy.LOW_COST);
    }

    private boolean isAbleToCache(ReadAheadInputStream readAheadInputStream, BucketName bucketName) {
        return !readAheadInputStream.hasMore && this.backend.getDefaultBucketName().equals(bucketName);
    }

    private boolean isAbleToCache(byte[] bArr) {
        return bArr.length <= this.sizeThresholdInBytes.intValue();
    }

    private Mono<byte[]> readFromCache(BlobId blobId) {
        return Mono.from(this.metricFactory.decoratePublisherWithTimerMetric(BLOBSTORE_CACHED_LATENCY_METRIC_NAME, this.cache.mo19read(blobId))).doOnNext(bArr -> {
            this.metricRetrieveHitCount.increment();
        });
    }

    private Mono<InputStream> readFromBackend(BucketName bucketName, BlobId blobId) {
        return Mono.from(this.metricFactory.decoratePublisherWithTimerMetric(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME, Mono.from(this.backend.readReactive(bucketName, blobId))));
    }

    private Mono<byte[]> readBytesFromBackend(BucketName bucketName, BlobId blobId) {
        return Mono.from(this.metricFactory.decoratePublisherWithTimerMetric(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME, this.backend.readBytes(bucketName, blobId)));
    }

    public Publisher<BucketName> listBuckets() {
        return this.backend.listBuckets();
    }

    public Publisher<BlobId> listBlobs(BucketName bucketName) {
        return this.backend.listBlobs(bucketName);
    }
}
