package org.apache.james;

import com.datastax.oss.driver.api.core.CqlSession;
import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableList;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.function.Predicate;
import org.apache.james.JamesServerExtension;
import org.apache.james.backends.cassandra.StatementRecorder;
import org.apache.james.backends.cassandra.TestingSession;
import org.apache.james.backends.cassandra.init.SessionWithInitializedTablesFactory;
import org.apache.james.core.Domain;
import org.apache.james.mailbox.store.BatchSizes;
import org.apache.james.modules.MailboxProbeImpl;
import org.apache.james.modules.TestJMAPServerModule;
import org.apache.james.util.Port;
import org.apache.james.utils.DataProbeImpl;
import org.apache.james.utils.GuiceProbe;
import org.apache.james.utils.SMTPMessageSender;
import org.apache.james.utils.SpoolerProbe;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/james/WithCassandraBlobStoreTest.class */
class WithCassandraBlobStoreTest implements MailsShouldBeWellReceivedConcreteContract {

    @RegisterExtension
    static JamesServerExtension jamesServerExtension = TestingDistributedJamesServerBuilder.withSearchConfiguration(SearchConfiguration.scanning()).extension(new CassandraExtension()).server(cassandraJamesServerConfiguration -> {
        return CassandraJamesServerMain.createServer(cassandraJamesServerConfiguration).overrideWith(new Module[]{new TestJMAPServerModule()});
    }).overrideServerModule(new TestingSessionModule()).overrideServerModule(binder -> {
        binder.bind(BatchSizes.class).toInstance(BatchSizes.uniqueBatchSize(5));
    }).lifeCycle(JamesServerExtension.Lifecycle.PER_TEST).build();

    /* loaded from: input_file:org/apache/james/WithCassandraBlobStoreTest$TestingSessionModule.class */
    private static class TestingSessionModule extends AbstractModule {
        private TestingSessionModule() {
        }

        protected void configure() {
            Multibinder.newSetBinder(binder(), GuiceProbe.class).addBinding().to(TestingSessionProbe.class);
            bind(CqlSession.class).to(TestingSession.class);
        }

        @Singleton
        @Provides
        TestingSession provideSession(SessionWithInitializedTablesFactory sessionWithInitializedTablesFactory) {
            return new TestingSession(sessionWithInitializedTablesFactory.get());
        }
    }

    /* loaded from: input_file:org/apache/james/WithCassandraBlobStoreTest$TestingSessionProbe.class */
    private static class TestingSessionProbe implements GuiceProbe {
        private final TestingSession testingSession;

        @Inject
        private TestingSessionProbe(TestingSession testingSession) {
            this.testingSession = testingSession;
        }

        public TestingSession getTestingSession() {
            return this.testingSession;
        }
    }

    WithCassandraBlobStoreTest() {
    }

    @Test
    void imapFetchBackPressureShouldNotLoadMoreDataThanNecessary(GuiceJamesServer guiceJamesServer) throws Exception {
        String str = "MIME-Version: 1.0\r\n\r\nCONTENT\r\n\r\n" + "0123456789\r\n0123456789\r\n0123456789\r\n".repeat(25600);
        guiceJamesServer.getProbe(DataProbeImpl.class).fluent().addDomain("apache.org").addUser("james-user@apache.org", "secret").addUser("bob@apache.org", "secret");
        guiceJamesServer.getProbe(MailboxProbeImpl.class).createMailbox("#private", "james-user@apache.org", "INBOX");
        Port of = Port.of(smtpPort(guiceJamesServer));
        SMTPMessageSender sMTPMessageSender = new SMTPMessageSender(Domain.LOCALHOST.asString());
        try {
            Mono.fromRunnable(Throwing.runnable(() -> {
                sMTPMessageSender.connect("127.0.0.1", of).authenticate("bob@apache.org", "secret");
                sMTPMessageSender.sendMessageWithHeaders("bob@apache.org", "james-user@apache.org", str);
            })).repeat(20 - 1).subscribeOn(Schedulers.fromExecutor(EXECUTOR)).blockLast();
            sMTPMessageSender.close();
            CALMLY_AWAIT_FIVE_MINUTE.until(() -> {
                return Boolean.valueOf(guiceJamesServer.getProbe(SpoolerProbe.class).processingFinished());
            });
            SocketChannel open = SocketChannel.open();
            open.connect(new InetSocketAddress("127.0.0.1", imapPort(guiceJamesServer)));
            readBytes(open);
            open.write(ByteBuffer.wrap(String.format("a0 LOGIN %s %s\r\n", "james-user@apache.org", "secret").getBytes(StandardCharsets.UTF_8)));
            readBytes(open);
            open.write(ByteBuffer.wrap("A1 SELECT INBOX\r\n".getBytes(StandardCharsets.UTF_8)));
            readStringUntil(open, str2 -> {
                return str2.contains("A1 OK [READ-WRITE] SELECT completed.");
            });
            StatementRecorder recordStatements = ((TestingSessionProbe) guiceJamesServer.getProbe(TestingSessionProbe.class)).getTestingSession().recordStatements();
            open.write(ByteBuffer.wrap("A2 UID FETCH 1:500 (BODY.PEEK[])\r\n".getBytes(StandardCharsets.UTF_8)));
            Thread.sleep(2000L);
            Assertions.assertThat(recordStatements.listExecutedStatements(StatementRecorder.Selector.preparedStatement("SELECT * FROM blobs WHERE id=:id"))).hasSizeLessThanOrEqualTo(30);
        } catch (Throwable th) {
            try {
                sMTPMessageSender.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private byte[] readBytes(SocketChannel socketChannel) throws IOException {
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        socketChannel.read(allocate);
        allocate.rewind();
        byte[] bArr = new byte[allocate.remaining()];
        allocate.get(bArr);
        return bArr;
    }

    private List<String> readStringUntil(SocketChannel socketChannel, Predicate<String> predicate) throws IOException {
        String str;
        ImmutableList.Builder builder = ImmutableList.builder();
        do {
            str = new String(readBytes(socketChannel), StandardCharsets.US_ASCII);
            System.out.println(str);
            builder.add(str);
        } while (!predicate.test(str));
        return builder.build();
    }
}
