package org.apache.james.blob.cassandra.cache;

import com.google.common.base.Strings;
import com.google.common.io.ByteSource;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import javax.mail.internet.MimeMessage;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.Scenario;
import org.apache.james.backends.cassandra.StatementRecorder;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BlobStoreContract;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.api.ObjectNotFoundException;
import org.apache.james.blob.api.Store;
import org.apache.james.blob.api.TestBlobId;
import org.apache.james.blob.cassandra.CassandraBlobModule;
import org.apache.james.blob.cassandra.CassandraBlobStoreFactory;
import org.apache.james.blob.cassandra.cache.CassandraCacheConfiguration;
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.core.builder.MimeMessageBuilder;
import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IntegerAssert;
import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.class */
public class CachedBlobStoreTest implements BlobStoreContract {
    private static final BucketName DEFAULT_BUCKETNAME = BucketName.DEFAULT;
    private static final BucketName TEST_BUCKETNAME = BucketName.of("test");
    private static final byte[] APPROXIMATELY_FIVE_KILOBYTES = Strings.repeat("0123456789\n", 500).getBytes(StandardCharsets.UTF_8);

    @RegisterExtension
    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules(new CassandraModule[]{CassandraBlobModule.MODULE, CassandraBlobCacheModule.MODULE}));
    private BlobStore testee;
    private BlobStore backend;
    private BlobStoreCache cache;
    private RecordingMetricFactory metricFactory;

    @Nested
    /* loaded from: input_file:org/apache/james/blob/cassandra/cache/CachedBlobStoreTest$MetricsTest.class */
    class MetricsTest {
        MetricsTest() {
        }

        @Test
        void readBlobStoreCacheWithNoneDefaultBucketNameShouldNotImpact() {
            BlobId blobId = (BlobId) Mono.from(CachedBlobStoreTest.this.testee.save(CachedBlobStoreTest.TEST_BUCKETNAME, CachedBlobStoreTest.APPROXIMATELY_FIVE_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
            CachedBlobStoreTest.this.testee.read(CachedBlobStoreTest.TEST_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE);
            CachedBlobStoreTest.this.testee.read(CachedBlobStoreTest.TEST_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE);
            SoftAssertions.assertSoftly(softAssertions -> {
                ((IntegerAssert) softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheMisses")).describedAs("blobStoreCacheMisses", new Object[0])).isEqualTo(0);
                ((IntegerAssert) softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheHits")).describedAs("blobStoreCacheHits", new Object[0])).isEqualTo(0);
                softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.executionTimesFor("blobStoreCacheLatency")).describedAs("blobStoreCacheLatency", new Object[0]).hasSize(0);
            });
        }

        @Test
        void readBlobStoreWithNoneDefaultBucketNameShouldRecordByBackendLatency() {
            BlobId blobId = (BlobId) Mono.from(CachedBlobStoreTest.this.testee.save(CachedBlobStoreTest.TEST_BUCKETNAME, CachedBlobStoreTest.APPROXIMATELY_FIVE_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
            CachedBlobStoreTest.this.testee.read(CachedBlobStoreTest.TEST_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE);
            CachedBlobStoreTest.this.testee.read(CachedBlobStoreTest.TEST_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE);
            SoftAssertions.assertSoftly(softAssertions -> {
                softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.executionTimesFor("blobStoreBackEndLatency")).describedAs("blobStoreBackEndLatency", new Object[0]).hasSize(2);
            });
        }

        @Test
        void readBytesWithNoneDefaultBucketNameShouldNotImpact() {
            BlobId blobId = (BlobId) Mono.from(CachedBlobStoreTest.this.testee.save(CachedBlobStoreTest.TEST_BUCKETNAME, CachedBlobStoreTest.APPROXIMATELY_FIVE_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
            Mono.from(CachedBlobStoreTest.this.testee.readBytes(CachedBlobStoreTest.TEST_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
            Mono.from(CachedBlobStoreTest.this.testee.readBytes(CachedBlobStoreTest.TEST_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
            SoftAssertions.assertSoftly(softAssertions -> {
                ((AbstractIntegerAssert) Assertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheMisses")).describedAs("blobStoreCacheMisses", new Object[0])).isEqualTo(0);
                ((IntegerAssert) softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheHits")).describedAs("blobStoreCacheHits", new Object[0])).isEqualTo(0);
                softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.executionTimesFor("blobStoreCacheLatency")).describedAs("blobStoreCacheLatency", new Object[0]).hasSize(0);
                softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.executionTimesFor("blobStoreBackEndLatency")).describedAs("blobStoreBackEndLatency", new Object[0]).hasSize(2);
            });
        }

        @Test
        void readBytesWithNoneDefaultBucketNameShouldPublishBackendTimerMetrics() {
            BlobId blobId = (BlobId) Mono.from(CachedBlobStoreTest.this.testee.save(CachedBlobStoreTest.TEST_BUCKETNAME, CachedBlobStoreTest.APPROXIMATELY_FIVE_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
            Mono.from(CachedBlobStoreTest.this.testee.readBytes(CachedBlobStoreTest.TEST_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
            Mono.from(CachedBlobStoreTest.this.testee.readBytes(CachedBlobStoreTest.TEST_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
            SoftAssertions.assertSoftly(softAssertions -> {
                softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.executionTimesFor("blobStoreBackEndLatency")).describedAs("blobStoreBackEndLatency", new Object[0]).hasSize(2);
            });
        }

        @Test
        void readBlobStoreCacheShouldPublishTimerMetrics() {
            BlobId blobId = (BlobId) Mono.from(CachedBlobStoreTest.this.testee.save(CachedBlobStoreTest.DEFAULT_BUCKETNAME, CachedBlobStoreTest.APPROXIMATELY_FIVE_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
            CachedBlobStoreTest.this.testee.read(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE);
            CachedBlobStoreTest.this.testee.read(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE);
            SoftAssertions.assertSoftly(softAssertions -> {
                softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.executionTimesFor("blobStoreCacheLatency")).describedAs("blobStoreCacheLatency", new Object[0]).hasSize(2);
            });
        }

        @Test
        void readBytesCacheShouldPublishTimerMetrics() {
            BlobId blobId = (BlobId) Mono.from(CachedBlobStoreTest.this.testee.save(CachedBlobStoreTest.DEFAULT_BUCKETNAME, CachedBlobStoreTest.APPROXIMATELY_FIVE_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
            Mono.from(CachedBlobStoreTest.this.testee.readBytes(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
            Mono.from(CachedBlobStoreTest.this.testee.readBytes(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
            SoftAssertions.assertSoftly(softAssertions -> {
                softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.executionTimesFor("blobStoreCacheLatency")).describedAs("blobStoreCacheLatency", new Object[0]).hasSize(2);
                ((IntegerAssert) softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheHits")).describedAs("blobStoreCacheHits", new Object[0])).isEqualTo(2);
            });
        }

        @Test
        void readBytesShouldPublishBackendTimerMetricsForBigBlobs() {
            BlobId blobId = (BlobId) Mono.from(CachedBlobStoreTest.this.backend.save(CachedBlobStoreTest.DEFAULT_BUCKETNAME, BlobStoreContract.ELEVEN_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
            Mono.from(CachedBlobStoreTest.this.testee.readBytes(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
            Mono.from(CachedBlobStoreTest.this.testee.readBytes(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
            SoftAssertions.assertSoftly(softAssertions -> {
                softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.executionTimesFor("blobStoreBackEndLatency")).describedAs("blobStoreBackEndLatency", new Object[0]).hasSize(2);
            });
        }

        @Test
        void readInputStreamShouldPublishBackendTimerForBigBlobs() {
            BlobId blobId = (BlobId) Mono.from(CachedBlobStoreTest.this.backend.save(CachedBlobStoreTest.DEFAULT_BUCKETNAME, BlobStoreContract.ELEVEN_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
            CachedBlobStoreTest.this.testee.read(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE);
            CachedBlobStoreTest.this.testee.read(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE);
            SoftAssertions.assertSoftly(softAssertions -> {
                softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.executionTimesFor("blobStoreBackEndLatency")).describedAs("blobStoreBackEndLatency", new Object[0]).hasSize(2);
            });
        }

        @Test
        void readBytesShouldNotIncreaseCacheCounterForBigBlobs() {
            BlobId blobId = (BlobId) Mono.from(CachedBlobStoreTest.this.backend.save(CachedBlobStoreTest.DEFAULT_BUCKETNAME, BlobStoreContract.ELEVEN_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
            Mono.from(CachedBlobStoreTest.this.testee.readBytes(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
            Mono.from(CachedBlobStoreTest.this.testee.readBytes(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
            SoftAssertions.assertSoftly(softAssertions -> {
                ((IntegerAssert) softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheMisses")).describedAs("blobStoreCacheMisses", new Object[0])).isEqualTo(0);
                ((IntegerAssert) softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheHits")).describedAs("blobStoreCacheHits", new Object[0])).isEqualTo(0);
            });
        }

        @Test
        void readInputStreamShouldNotIncreaseCacheCounterForBigBlobs() {
            BlobId blobId = (BlobId) Mono.from(CachedBlobStoreTest.this.backend.save(CachedBlobStoreTest.DEFAULT_BUCKETNAME, BlobStoreContract.ELEVEN_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
            CachedBlobStoreTest.this.testee.read(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId);
            CachedBlobStoreTest.this.testee.read(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId);
            SoftAssertions.assertSoftly(softAssertions -> {
                ((IntegerAssert) softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheMisses")).describedAs("blobStoreCacheMisses", new Object[0])).isEqualTo(0);
                ((IntegerAssert) softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheHits")).describedAs("blobStoreCacheHits", new Object[0])).isEqualTo(0);
            });
        }

        @Test
        void readBytesShouldRecordDistinctTimingsWhenRepeatAndBackendRead() {
            BlobId blobId = (BlobId) Mono.from(CachedBlobStoreTest.this.testee.save(CachedBlobStoreTest.DEFAULT_BUCKETNAME, BlobStoreContract.ELEVEN_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
            Duration ofMillis = Duration.ofMillis(500L);
            Mono.from(CachedBlobStoreTest.this.testee.readBytes(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).then(Mono.delay(ofMillis)).repeat(2L).blockLast();
            Assertions.assertThat(CachedBlobStoreTest.this.metricFactory.executionTimesFor("blobStoreBackEndLatency")).hasSize(3).allSatisfy(duration -> {
                Assertions.assertThat(duration).isLessThan(ofMillis);
            });
        }

        @Test
        void readBytesShouldRecordDistinctTimingsWhenRepeat() {
            BlobId blobId = (BlobId) Mono.from(CachedBlobStoreTest.this.testee.save(CachedBlobStoreTest.DEFAULT_BUCKETNAME, CachedBlobStoreTest.APPROXIMATELY_FIVE_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
            Duration ofMillis = Duration.ofMillis(500L);
            Mono.from(CachedBlobStoreTest.this.testee.readBytes(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).then(Mono.delay(ofMillis)).repeat(2L).blockLast();
            Assertions.assertThat(CachedBlobStoreTest.this.metricFactory.executionTimesFor("blobStoreCacheLatency")).hasSize(3).allSatisfy(duration -> {
                Assertions.assertThat(duration).isLessThan(ofMillis);
            });
        }

        @Test
        void readBlobStoreCacheShouldCountWhenHit() {
            BlobId blobId = (BlobId) Mono.from(CachedBlobStoreTest.this.testee.save(CachedBlobStoreTest.DEFAULT_BUCKETNAME, CachedBlobStoreTest.APPROXIMATELY_FIVE_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
            CachedBlobStoreTest.this.testee.read(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE);
            CachedBlobStoreTest.this.testee.read(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE);
            Assertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheHits")).isEqualTo(2);
        }

        @Test
        void readBytesCacheShouldCountWhenHit() {
            BlobId blobId = (BlobId) Mono.from(CachedBlobStoreTest.this.testee.save(CachedBlobStoreTest.DEFAULT_BUCKETNAME, CachedBlobStoreTest.APPROXIMATELY_FIVE_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
            Mono.from(CachedBlobStoreTest.this.testee.readBytes(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
            Mono.from(CachedBlobStoreTest.this.testee.readBytes(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
            Assertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheHits")).isEqualTo(2);
        }

        @Test
        void readBlobStoreCacheShouldCountWhenMissed() {
            BlobId blobId = (BlobId) Mono.from(CachedBlobStoreTest.this.backend.save(CachedBlobStoreTest.DEFAULT_BUCKETNAME, CachedBlobStoreTest.APPROXIMATELY_FIVE_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
            Mono.from(CachedBlobStoreTest.this.cache.remove(blobId)).block();
            CachedBlobStoreTest.this.testee.read(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE);
            Assertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheMisses")).isEqualTo(1);
        }

        @Test
        void readBytesCacheShouldCountWhenMissed() {
            BlobId blobId = (BlobId) Mono.from(CachedBlobStoreTest.this.testee.save(CachedBlobStoreTest.DEFAULT_BUCKETNAME, CachedBlobStoreTest.APPROXIMATELY_FIVE_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
            Mono.from(CachedBlobStoreTest.this.cache.remove(blobId)).block();
            Mono.from(CachedBlobStoreTest.this.testee.readBytes(CachedBlobStoreTest.DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
            Assertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheMisses")).isEqualTo(1);
        }

        @Test
        void metricsShouldNotWorkExceptLatencyWhenReadNonExistingBlob() {
            SoftAssertions.assertSoftly(softAssertions -> {
                softAssertions.assertThatThrownBy(() -> {
                    CachedBlobStoreTest.this.testee.read(CachedBlobStoreTest.DEFAULT_BUCKETNAME, new TestBlobId.Factory().randomId(), BlobStore.StoragePolicy.HIGH_PERFORMANCE);
                }).isInstanceOf(ObjectNotFoundException.class);
                ((IntegerAssert) softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheMisses")).describedAs("blobStoreCacheMisses", new Object[0])).isEqualTo(0);
                ((IntegerAssert) softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheHits")).describedAs("blobStoreCacheHits", new Object[0])).isEqualTo(0);
                softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.executionTimesFor("blobStoreCacheLatency")).describedAs("blobStoreCacheLatency", new Object[0]).hasSize(1);
                softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.executionTimesFor("blobStoreBackEndLatency")).describedAs("blobStoreBackEndLatency", new Object[0]).hasSize(1);
            });
        }

        @Test
        void metricsShouldNotWorkExceptLatencyWhenReadNonExistingBlobAsBytes() {
            SoftAssertions.assertSoftly(softAssertions -> {
                softAssertions.assertThatThrownBy(() -> {
                    Mono.from(CachedBlobStoreTest.this.testee.readBytes(CachedBlobStoreTest.DEFAULT_BUCKETNAME, new TestBlobId.Factory().randomId(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).blockOptional();
                }).isInstanceOf(ObjectNotFoundException.class);
                ((IntegerAssert) softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheMisses")).describedAs("blobStoreCacheMisses", new Object[0])).isEqualTo(0);
                ((IntegerAssert) softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.countFor("blobStoreCacheHits")).describedAs("blobStoreCacheHits", new Object[0])).isEqualTo(0);
                softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.executionTimesFor("blobStoreCacheLatency")).describedAs("blobStoreCacheLatency", new Object[0]).hasSize(1);
                softAssertions.assertThat(CachedBlobStoreTest.this.metricFactory.executionTimesFor("blobStoreBackEndLatency")).describedAs("blobStoreBackEndLatency", new Object[0]).hasSize(1);
            });
        }
    }

    @BeforeEach
    void setUp(CassandraCluster cassandraCluster2) {
        this.backend = CassandraBlobStoreFactory.forTesting(cassandraCluster2.getConf(), new RecordingMetricFactory()).passthrough();
        CassandraCacheConfiguration build = new CassandraCacheConfiguration.Builder().sizeThresholdInBytes(BlobStoreCacheContract.EIGHT_KILOBYTES.length + 1).build();
        this.metricFactory = new RecordingMetricFactory();
        this.cache = new CassandraBlobStoreCache(cassandraCluster2.getConf(), build);
        this.testee = new CachedBlobStore(this.cache, this.backend, build, this.metricFactory);
    }

    public BlobStore testee() {
        return this.testee;
    }

    public BlobId.Factory blobIdFactory() {
        return new HashBlobId.Factory();
    }

    @Test
    public void shouldCacheWhenDefaultBucketName() {
        Assertions.assertThat((byte[]) Mono.from(this.cache.read((BlobId) Mono.from(testee().save(DEFAULT_BUCKETNAME, BlobStoreCacheContract.EIGHT_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block())).block()).containsExactly(BlobStoreCacheContract.EIGHT_KILOBYTES);
    }

    @Test
    public void shouldNotCacheWhenNotDefaultBucketName() {
        BlobId blobId = (BlobId) Mono.from(testee().save(TEST_BUCKETNAME, BlobStoreCacheContract.EIGHT_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
            softAssertions.assertThat((byte[]) Mono.from(this.backend.readBytes(TEST_BUCKETNAME, blobId)).block()).containsExactly(BlobStoreCacheContract.EIGHT_KILOBYTES);
        });
    }

    @Test
    public void shouldNotCacheWhenDefaultBucketNameAndBigByteDataAndSizeBase() {
        BlobId blobId = (BlobId) Mono.from(testee().save(DEFAULT_BUCKETNAME, TWELVE_MEGABYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
            softAssertions.assertThat((byte[]) Mono.from(this.backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(TWELVE_MEGABYTES);
        });
    }

    @Test
    public void shouldSavedBothInCacheAndBackendWhenSizeBase() {
        BlobId blobId = (BlobId) Mono.from(testee().save(DEFAULT_BUCKETNAME, BlobStoreCacheContract.EIGHT_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat((byte[]) Mono.from(this.cache.read(blobId)).block()).containsExactly(BlobStoreCacheContract.EIGHT_KILOBYTES);
            softAssertions.assertThat((byte[]) Mono.from(this.backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(BlobStoreCacheContract.EIGHT_KILOBYTES);
        });
    }

    @Test
    public void shouldSavedBothInCacheAndBackendWhenHighPerformance() {
        BlobId blobId = (BlobId) Mono.from(testee().save(DEFAULT_BUCKETNAME, BlobStoreCacheContract.EIGHT_KILOBYTES, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat((byte[]) Mono.from(this.cache.read(blobId)).block()).containsExactly(BlobStoreCacheContract.EIGHT_KILOBYTES);
            softAssertions.assertThat((byte[]) Mono.from(this.backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(BlobStoreCacheContract.EIGHT_KILOBYTES);
        });
    }

    @Test
    public void shouldNotCacheWhenLowCost() {
        BlobId blobId = (BlobId) Mono.from(testee().save(DEFAULT_BUCKETNAME, BlobStoreCacheContract.EIGHT_KILOBYTES, BlobStore.StoragePolicy.LOW_COST)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
            softAssertions.assertThat((byte[]) Mono.from(this.backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(BlobStoreCacheContract.EIGHT_KILOBYTES);
        });
    }

    @Test
    public void shouldCacheWhenEmptyStream() {
        BlobId blobId = (BlobId) Mono.from(testee().save(DEFAULT_BUCKETNAME, new ByteArrayInputStream(EMPTY_BYTEARRAY), BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(new ByteArrayInputStream((byte[]) Mono.from(this.cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(EMPTY_BYTEARRAY));
            softAssertions.assertThat((byte[]) Mono.from(this.backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
        });
    }

    @Test
    public void shouldCacheWhenEmptyByteSource() {
        BlobId blobId = (BlobId) Mono.from(testee().save(DEFAULT_BUCKETNAME, ByteSource.wrap(EMPTY_BYTEARRAY), BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(new ByteArrayInputStream((byte[]) Mono.from(this.cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(EMPTY_BYTEARRAY));
            softAssertions.assertThat((byte[]) Mono.from(this.backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
        });
    }

    @Test
    public void shouldNotCacheWhenEmptyByteArray() {
        BlobId blobId = (BlobId) Mono.from(testee().save(DEFAULT_BUCKETNAME, EMPTY_BYTEARRAY, BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(new ByteArrayInputStream((byte[]) Mono.from(this.cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(EMPTY_BYTEARRAY));
            softAssertions.assertThat((byte[]) Mono.from(this.backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
        });
    }

    @Test
    public void shouldCacheWhenFiveKilobytesSteam() {
        BlobId blobId = (BlobId) Mono.from(testee().save(DEFAULT_BUCKETNAME, new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES), BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(new ByteArrayInputStream((byte[]) Mono.from(this.cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
            softAssertions.assertThat(new ByteArrayInputStream((byte[]) Mono.from(this.backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block())).hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
        });
    }

    @Test
    public void shouldCacheWhenFiveKilobytesByteSource() {
        BlobId blobId = (BlobId) Mono.from(testee().save(DEFAULT_BUCKETNAME, ByteSource.wrap(APPROXIMATELY_FIVE_KILOBYTES), BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(new ByteArrayInputStream((byte[]) Mono.from(this.cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
            softAssertions.assertThat(new ByteArrayInputStream((byte[]) Mono.from(this.backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block())).hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
        });
    }

    @Test
    public void shouldCacheWhenFiveKilobytesBytes() {
        BlobId blobId = (BlobId) Mono.from(testee().save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(new ByteArrayInputStream((byte[]) Mono.from(this.cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
            softAssertions.assertThat(new ByteArrayInputStream((byte[]) Mono.from(this.backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block())).hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
        });
    }

    @Test
    public void shouldRemoveBothInCacheAndBackendWhenDefaultBucketName() {
        BlobId blobId = (BlobId) Mono.from(testee().save(DEFAULT_BUCKETNAME, BlobStoreCacheContract.EIGHT_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            Mono from = Mono.from(testee().delete(DEFAULT_BUCKETNAME, blobId));
            Objects.requireNonNull(from);
            softAssertions.assertThatCode(from::block).doesNotThrowAnyException();
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
            softAssertions.assertThatThrownBy(() -> {
                Mono.from(this.backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block();
            }).isInstanceOf(ObjectNotFoundException.class);
        });
    }

    @Test
    public void shouldCacheWhenReadBytesWithDefaultBucket() {
        BlobId blobId = (BlobId) Mono.from(this.backend.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
            softAssertions.assertThat(new ByteArrayInputStream((byte[]) Mono.from(testee().readBytes(DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())).hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
            softAssertions.assertThat(new ByteArrayInputStream((byte[]) Mono.from(this.cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
        });
    }

    @Test
    public void shouldCacheWhenReadWithDefaultBucket() {
        BlobId blobId = (BlobId) Mono.from(this.backend.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
            softAssertions.assertThat(testee().read(DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
            softAssertions.assertThat(new ByteArrayInputStream((byte[]) Mono.from(this.cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
        });
    }

    @Test
    public void shouldNotCacheWhenReadBytesWithOutDefaultBucket() {
        BlobId blobId = (BlobId) Mono.from(this.backend.save(TEST_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
            softAssertions.assertThat(new ByteArrayInputStream((byte[]) Mono.from(testee().readBytes(TEST_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())).hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
        });
    }

    @Test
    public void shouldNotCacheWhenReadWithOutDefaultBucket() {
        BlobId blobId = (BlobId) Mono.from(this.backend.save(TEST_BUCKETNAME, new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES), BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
            softAssertions.assertThat(testee().read(TEST_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
        });
    }

    @Test
    public void shouldNotCacheWhenReadWithBigByteArray() {
        BlobId blobId = (BlobId) Mono.from(this.backend.save(DEFAULT_BUCKETNAME, new ByteArrayInputStream(TWELVE_MEGABYTES), BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
            softAssertions.assertThat(new ByteArrayInputStream((byte[]) Mono.from(testee().readBytes(DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())).hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES));
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
        });
    }

    @Test
    public void shouldNotCacheWhenReadWithBigStream() {
        BlobId blobId = (BlobId) Mono.from(this.testee.save(DEFAULT_BUCKETNAME, new ByteArrayInputStream(TWELVE_MEGABYTES), BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
            softAssertions.assertThat(testee().read(DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES));
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
        });
    }

    @Test
    public void shouldNotCacheWhenReadWithOutDefaultBucketByteSource() {
        BlobId blobId = (BlobId) Mono.from(this.backend.save(TEST_BUCKETNAME, ByteSource.wrap(APPROXIMATELY_FIVE_KILOBYTES), BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
            softAssertions.assertThat(testee().read(TEST_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
        });
    }

    @Test
    public void shouldNotCacheWhenReadWithBigByteSource() {
        BlobId blobId = (BlobId) Mono.from(this.backend.save(DEFAULT_BUCKETNAME, ByteSource.wrap(TWELVE_MEGABYTES), BlobStore.StoragePolicy.SIZE_BASED)).block();
        SoftAssertions.assertSoftly(softAssertions -> {
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
            softAssertions.assertThat(new ByteArrayInputStream((byte[]) Mono.from(testee().readBytes(DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())).hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES));
            softAssertions.assertThat(Mono.from(this.cache.read(blobId)).blockOptional()).isEmpty();
        });
    }

    @Test
    public void readShouldNotPropagateFailures(CassandraCluster cassandraCluster2) {
        BlobId blobId = (BlobId) Mono.from(this.testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, BlobStore.StoragePolicy.SIZE_BASED)).block();
        cassandraCluster2.getConf().registerScenario(new Scenario.ExecutionHook[]{Scenario.Builder.fail().times(1).whenQueryStartsWith("SELECT * FROM blob_cache WHERE id=:id")});
        Mono.from(this.cache.read(blobId)).block();
        Assertions.assertThat(testee().read(DEFAULT_BUCKETNAME, blobId, BlobStore.StoragePolicy.HIGH_PERFORMANCE)).hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
    }

    @Test
    public void cachedBlobStoreShouldOnlyBeQueriedForHeaders(CassandraCluster cassandraCluster2) throws Exception {
        cassandraCluster2.getConf().printStatements();
        MimeMessage build = MimeMessageBuilder.mimeMessageBuilder().addHeader("Date", "Thu, 6 Sep 2018 13:29:13 +0700 (ICT)").addHeader("Message-ID", "<84739718.0.1536215353507@localhost.localdomain>").addFrom("any@any.com").addToRecipient("toddy@any.com").setSubject("Important Mail").setText("Important mail content").build();
        Store mimeMessageStore = new MimeMessageStore.Factory(testee()).mimeMessageStore();
        MimeMessagePartsId mimeMessagePartsId = (MimeMessagePartsId) mimeMessageStore.save(build).block();
        StatementRecorder statementRecorder = new StatementRecorder();
        cassandraCluster2.getConf().recordStatements(statementRecorder);
        mimeMessageStore.read(mimeMessagePartsId).block();
        Assertions.assertThat(statementRecorder.listExecutedStatements(StatementRecorder.Selector.preparedStatementStartingWith("SELECT data FROM blob_cache"))).hasSize(1);
    }
}
