package org.apache.james.util;

import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Bytes;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.slf4j.MDC;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/james/util/ReactorUtilsTest.class */
class ReactorUtilsTest {
    static final int BUFFER_SIZE = 5;
    public static final ExecutorService EXECUTOR = Executors.newCachedThreadPool();

    @Nested
    /* loaded from: input_file:org/apache/james/util/ReactorUtilsTest$ExecuteAndEmpty.class */
    class ExecuteAndEmpty {

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/james/util/ReactorUtilsTest$ExecuteAndEmpty$Counter.class */
        public class Counter {
            private Integer counter;

            public Counter(Integer num) {
                this.counter = num;
            }

            public void increment(Integer num) {
                this.counter = Integer.valueOf(this.counter.intValue() + num.intValue());
            }

            public Integer getCounter() {
                return this.counter;
            }
        }

        ExecuteAndEmpty() {
        }

        @Test
        void shouldExecuteTheRunnableAndReturnEmpty() {
            Counter counter = new Counter(1);
            Assertions.assertThat((Boolean) Mono.empty().switchIfEmpty(ReactorUtils.executeAndEmpty(() -> {
                counter.increment(2);
            })).map(FunctionalUtils.toFunction(obj -> {
                counter.increment(4);
            })).hasElement().block()).isFalse();
            Assertions.assertThat(counter.getCounter()).isEqualTo(3);
        }

        @Test
        void shouldNotExecuteTheRunnableAndReturnTheValue() {
            Counter counter = new Counter(1);
            Assertions.assertThat((Boolean) Mono.just(42).switchIfEmpty(ReactorUtils.executeAndEmpty(() -> {
                counter.increment(2);
            })).map(FunctionalUtils.toFunction(num -> {
                counter.increment(4);
            })).hasElement().block()).isTrue();
            Assertions.assertThat(counter.getCounter()).isEqualTo(ReactorUtilsTest.BUFFER_SIZE);
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/util/ReactorUtilsTest$MDCTest.class */
    class MDCTest {
        MDCTest() {
        }

        @Test
        void contextShouldEnhanceMDC() {
            String str = "value";
            String str2 = "key";
            Flux.just(1).doOnEach(ReactorUtils.log(() -> {
                Assertions.assertThat(MDC.get(str2)).isEqualTo(str);
            })).contextWrite(ReactorUtils.context("test", MDCBuilder.ofValue("key", "value"))).blockLast();
        }

        @Test
        void contextShouldNotOverwritePreviousKeys() {
            String str = "value1";
            String str2 = "key";
            Flux.just(1).doOnEach(ReactorUtils.log(() -> {
                Assertions.assertThat(MDC.get(str2)).isEqualTo(str);
            })).contextWrite(ReactorUtils.context("test", MDCBuilder.ofValue("key", "value1"))).contextWrite(ReactorUtils.context("test", MDCBuilder.ofValue("key", "value2"))).blockLast();
        }

        @Test
        void contextShouldCombineMDCs() {
            String str = "value1";
            String str2 = "value2";
            String str3 = "key1";
            String str4 = "key2";
            Flux.just(1).doOnEach(ReactorUtils.log(() -> {
                Assertions.assertThat(MDC.get(str3)).isEqualTo(str);
                Assertions.assertThat(MDC.get(str4)).isEqualTo(str2);
            })).contextWrite(ReactorUtils.context("test1", MDCBuilder.ofValue("key1", "value1"))).contextWrite(ReactorUtils.context("test2", MDCBuilder.ofValue("key2", "value2"))).blockLast();
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/util/ReactorUtilsTest$Throttling.class */
    class Throttling {
        Throttling() {
        }

        @Test
        void windowShouldThrowWhenMaxSizeIsNegative() {
            Assertions.assertThatThrownBy(() -> {
                ReactorUtils.throttle().elements(-1).per(Duration.ofSeconds(1L)).forOperation((v0) -> {
                    return Mono.just(v0);
                });
            }).isInstanceOf(IllegalArgumentException.class);
        }

        @Test
        void windowShouldThrowWhenMaxSizeIsZero() {
            Assertions.assertThatThrownBy(() -> {
                ReactorUtils.throttle().elements(0).per(Duration.ofSeconds(1L)).forOperation(Mono::just);
            }).isInstanceOf(IllegalArgumentException.class);
        }

        @Test
        void windowShouldThrowWhenDurationIsNegative() {
            Assertions.assertThatThrownBy(() -> {
                ReactorUtils.throttle().elements(1).per(Duration.ofSeconds(-1L)).forOperation(Mono::just);
            }).isInstanceOf(IllegalArgumentException.class);
        }

        @Test
        void windowShouldThrowWhenDurationIsZero() {
            Assertions.assertThatThrownBy(() -> {
                ReactorUtils.throttle().elements(1).per(Duration.ofSeconds(0L)).forOperation(Mono::just);
            }).isInstanceOf(IllegalArgumentException.class);
        }

        @Test
        void throttleShouldApplyMaxSize() {
            Duration ofMillis = Duration.ofMillis(100L);
            Stopwatch createUnstarted = Stopwatch.createUnstarted();
            Assertions.assertThat((ImmutableList) Flux.range(0, 10).transform(ReactorUtils.throttle().elements(3).per(ofMillis).forOperation(num -> {
                return Mono.fromCallable(() -> {
                    return Long.valueOf(createUnstarted.elapsed(TimeUnit.MILLISECONDS));
                });
            })).map(l -> {
                return Long.valueOf(l.longValue() / 100);
            }).doOnSubscribe(subscription -> {
                createUnstarted.start();
            }).collect(ImmutableList.toImmutableList()).block()).containsExactly(new Long[]{1L, 1L, 1L, 2L, 2L, 2L, 3L, 3L, 3L, 4L});
        }

        @Test
        void largeWindowShouldNotOverrunIntermediateBuffers() {
            int i = 3000;
            Duration ofMillis = Duration.ofMillis(100L);
            Assertions.assertThatCode(() -> {
                Flux.range(0, 10000).transform(ReactorUtils.throttle().elements(i).per(ofMillis).forOperation(num -> {
                    return Mono.delay(ofMillis.multipliedBy(2L));
                })).blockLast();
            }).doesNotThrowAnyException();
        }

        @Test
        void throttleDownStreamConcurrencyShouldNotExceedWindowMaxSize() {
            int i = 3;
            Duration ofMillis = Duration.ofMillis(100L);
            AtomicInteger atomicInteger = new AtomicInteger();
            Assertions.assertThat((ImmutableList) Flux.range(0, 10).transform(ReactorUtils.throttle().elements(3).per(ofMillis).forOperation(num -> {
                Objects.requireNonNull(atomicInteger);
                return Mono.fromCallable(atomicInteger::incrementAndGet).flatMap(num -> {
                    return Mono.delay(ofMillis.multipliedBy(2L)).thenReturn(num);
                }).flatMap(num2 -> {
                    Objects.requireNonNull(atomicInteger);
                    return Mono.fromRunnable(atomicInteger::decrementAndGet).thenReturn(num2);
                });
            })).collect(ImmutableList.toImmutableList()).block()).allSatisfy(num2 -> {
                Assertions.assertThat(num2).isLessThanOrEqualTo(i);
            });
        }

        @Test
        void throttleShouldNotAbortProcessingUponError() {
            Assertions.assertThat((List) Flux.range(0, 10).transform(ReactorUtils.throttle().elements(3).per(Duration.ofMillis(100L)).forOperation(num -> {
                return num.intValue() == ReactorUtilsTest.BUFFER_SIZE ? Mono.error(new RuntimeException()) : Mono.just(num);
            })).collectList().block()).containsExactly(new Integer[]{0, 1, 2, 3, 4, 6, 7, 8, 9});
        }

        @Test
        void throttleShouldNotAbortProcessingUponUpstreamError() {
            Assertions.assertThat((List) Flux.range(0, 10).flatMap(num -> {
                return num.intValue() == ReactorUtilsTest.BUFFER_SIZE ? Mono.error(new RuntimeException()) : Mono.just(num);
            }).transform(ReactorUtils.throttle().elements(3).per(Duration.ofMillis(100L)).forOperation((v0) -> {
                return Mono.just(v0);
            })).collectList().block()).containsExactly(new Integer[]{0, 1, 2, 3, 4, 6, 7, 8, 9});
        }

        @Test
        void throttleShouldNotOverwriteErrorHandling() {
            Duration ofMillis = Duration.ofMillis(20L);
            Flux just = Flux.just(0L);
            ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
            just.transform(ReactorUtils.throttle().elements(3).per(ofMillis).forOperation(l -> {
                return Mono.error(new RuntimeException()).onErrorResume(th -> {
                    return Mono.fromRunnable(() -> {
                        concurrentLinkedDeque.add(th);
                    }).thenReturn(l);
                });
            })).blockLast();
            Assertions.assertThat(concurrentLinkedDeque).hasSize(1);
        }

        @Test
        void throttleShouldHandleLargeFluxes() {
            int i = 2;
            Duration ofMillis = Duration.ofMillis(1L);
            Flux range = Flux.range(0, 10000);
            Assertions.assertThatCode(() -> {
                range.transform(ReactorUtils.throttle().elements(i).per(ofMillis).forOperation((v0) -> {
                    return Mono.just(v0);
                })).blockLast();
            }).doesNotThrowAnyException();
        }

        @Disabled("We no longer rely on 'windowTimeout', this breakage is expected.'windowTimeout' solves this but create other, more critical issues (large flux cannot be throttledas described in https://github.com/reactor/reactor-core/issues/1099")
        @Test
        void throttleShouldGenerateSmallerWindowsWhenUpstreamIsSlow() {
            Duration ofMillis = Duration.ofMillis(20L);
            Stopwatch createUnstarted = Stopwatch.createUnstarted();
            Assertions.assertThat((ImmutableList) Flux.interval(Duration.ofMillis(10L)).transform(ReactorUtils.throttle().elements(3).per(ofMillis).forOperation(l -> {
                return Mono.fromCallable(() -> {
                    return Long.valueOf(createUnstarted.elapsed(TimeUnit.MILLISECONDS));
                });
            })).map(l2 -> {
                return Long.valueOf(l2.longValue() / 20);
            }).doOnSubscribe(subscription -> {
                createUnstarted.start();
            }).take(10L).groupBy(Function.identity()).flatMap((v0) -> {
                return v0.count();
            }).collect(ImmutableList.toImmutableList()).block()).allSatisfy(l3 -> {
                Assertions.assertThat(l3).isLessThanOrEqualTo(2L);
            });
        }

        @Test
        void throttleShouldNotDropEntriesWhenUpstreamIsSlow() {
            Assertions.assertThat((ImmutableList) Flux.interval(Duration.ofMillis(10L)).transform(ReactorUtils.throttle().elements(3).per(Duration.ofMillis(20L)).forOperation((v0) -> {
                return Mono.just(v0);
            })).take(10L).collect(ImmutableList.toImmutableList()).block()).containsExactly(new Long[]{0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L});
        }

        @Test
        void throttleShouldCompleteWhenOriginalFluxDoesNotFillAWindow() {
            Assertions.assertThat((ImmutableList) Flux.just(new Long[]{0L, 1L}).transform(ReactorUtils.throttle().elements(3).per(Duration.ofMillis(20L)).forOperation((v0) -> {
                return Mono.just(v0);
            })).take(10L).collect(ImmutableList.toImmutableList()).block()).containsExactly(new Long[]{0L, 1L});
        }

        @Test
        void throttleShouldSupportEmittingPartiallyCompleteWindowImmediately() {
            Duration ofMillis = Duration.ofMillis(20L);
            ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
            Flux.concat(new Publisher[]{Flux.just(new Long[]{0L, 1L}), Flux.never()}).transform(ReactorUtils.throttle().elements(3).per(ofMillis).forOperation(l -> {
                concurrentLinkedDeque.add(l);
                return Mono.just(l);
            })).subscribeOn(Schedulers.fromExecutor(ReactorUtilsTest.EXECUTOR)).subscribe();
            Awaitility.await().atMost(Durations.ONE_SECOND).untilAsserted(() -> {
                Assertions.assertThat(concurrentLinkedDeque).containsExactly(new Long[]{0L, 1L});
            });
        }

        @Test
        void throttleShouldTolerateSeveralEmptySlices() {
            List list = (List) Flux.concat(new Publisher[]{Flux.just(new Long[]{0L, 1L}), Mono.delay(Duration.ofMillis(150L)).thenReturn(2L)}).transform(ReactorUtils.throttle().elements(3).per(Duration.ofMillis(5L)).forOperation((v0) -> {
                return Mono.just(v0);
            })).collectList().block();
            System.out.println(list);
            Assertions.assertThat(list).containsExactly(new Long[]{0L, 1L, 2L});
        }

        @Test
        void throttleShouldTolerateManyEmptySuccessiveWindows() {
            List list = (List) Flux.concat(new Publisher[]{Flux.just(new Long[]{0L, 1L}), Mono.delay(Duration.ofMillis(165L)).thenReturn(2L)}).transform(ReactorUtils.throttle().elements(3).per(Duration.ofMillis(5L)).forOperation((v0) -> {
                return Mono.just(v0);
            })).collectList().block();
            System.out.println(list);
            Assertions.assertThat(list).containsExactly(new Long[]{0L, 1L, 2L});
        }

        @Test
        void throttleShouldTolerateManyEmptyWindows() {
            List list = (List) Flux.concat(new Publisher[]{Flux.just(new Long[]{0L, 1L}), Mono.delay(Duration.ofMillis(150L)).thenReturn(2L), Mono.delay(Duration.ofMillis(150L)).thenReturn(3L)}).transform(ReactorUtils.throttle().elements(3).per(Duration.ofMillis(5L)).forOperation((v0) -> {
                return Mono.just(v0);
            })).collectList().block();
            System.out.println(list);
            Assertions.assertThat(list).containsExactly(new Long[]{0L, 1L, 2L, 3L});
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/util/ReactorUtilsTest$ToChunks.class */
    class ToChunks {
        ToChunks() {
        }

        @Test
        void givenInputStreamSmallerThanBufferSizeShouldReturnOneChunk() {
            byte[] bytes = "foo".getBytes(StandardCharsets.UTF_8);
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
            Assertions.assertThat((List) ReactorUtils.toChunks(byteArrayInputStream, ReactorUtilsTest.BUFFER_SIZE).collectList().block()).isEqualTo(ImmutableList.of(ByteBuffer.wrap(bytes)));
        }

        @Test
        void givenInputStreamEqualToBufferSizeShouldReturnOneChunk() {
            byte[] bytes = "foooo".getBytes(StandardCharsets.UTF_8);
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
            Assertions.assertThat((List) ReactorUtils.toChunks(byteArrayInputStream, ReactorUtilsTest.BUFFER_SIZE).collectList().block()).isEqualTo(ImmutableList.of(ByteBuffer.wrap(bytes)));
        }

        @Test
        void givenInputStreamSlightlyBiggerThanBufferSizeShouldReturnTwoChunks() {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream("foobar...".getBytes(StandardCharsets.UTF_8));
            Assertions.assertThat((List) ReactorUtils.toChunks(byteArrayInputStream, ReactorUtilsTest.BUFFER_SIZE).collectList().block()).isEqualTo(ImmutableList.of(ByteBuffer.wrap("fooba".getBytes(StandardCharsets.UTF_8)), ByteBuffer.wrap("r...".getBytes(StandardCharsets.UTF_8))));
        }

        @Test
        void givenInputStreamBiggerThanBufferSizeShouldReturnMultipleChunks() {
            byte[] bytes = RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.UTF_8);
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
            Assertions.assertThat((List) ReactorUtils.toChunks(byteArrayInputStream, ReactorUtilsTest.BUFFER_SIZE).collectList().block()).isEqualTo((List) Flux.fromIterable(Bytes.asList(bytes)).window(ReactorUtilsTest.BUFFER_SIZE).flatMapSequential((v0) -> {
                return v0.collectList();
            }).map((v0) -> {
                return Bytes.toArray(v0);
            }).map(ByteBuffer::wrap).collectList().block());
        }

        @Test
        void givenEmptyInputStreamShouldReturnEmptyChunk() {
            byte[] bytes = "".getBytes(StandardCharsets.UTF_8);
            List list = (List) ReactorUtils.toChunks(new ByteArrayInputStream(bytes), ReactorUtilsTest.BUFFER_SIZE).collectList().block();
            Assertions.assertThat(list).isEqualTo(ImmutableList.of(ByteBuffer.wrap(bytes)));
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/util/ReactorUtilsTest$ToInputStream.class */
    class ToInputStream {
        ToInputStream() {
        }

        @Test
        void givenAFluxOf3BytesShouldReadSuccessfullyTheWholeSource() {
            byte[] bytes = "foo bar ...".getBytes(StandardCharsets.US_ASCII);
            Assertions.assertThat(ReactorUtils.toInputStream(Flux.fromIterable(Bytes.asList(bytes)).window(3).flatMapSequential((v0) -> {
                return v0.collectList();
            }).map((v0) -> {
                return Bytes.toArray(v0);
            }).map(ByteBuffer::wrap))).hasSameContentAs(new ByteArrayInputStream(bytes));
        }

        @Test
        void givenALongFluxBytesShouldReadSuccessfullyTheWholeSource() {
            byte[] bytes = RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.US_ASCII);
            Assertions.assertThat(ReactorUtils.toInputStream(Flux.fromIterable(Bytes.asList(bytes)).window(3).flatMapSequential((v0) -> {
                return v0.collectList();
            }).map((v0) -> {
                return Bytes.toArray(v0);
            }).map(ByteBuffer::wrap))).hasSameContentAs(new ByteArrayInputStream(bytes));
        }

        @Test
        void givenALongFluxBytesWhenIDoNotReadItBeforeClosingItThenTheOriginalFluxShouldBeDisposed() throws Exception {
            byte[] bytes = RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.US_ASCII);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            ReactorUtils.toInputStream(Flux.fromIterable(Bytes.asList(bytes)).window(3).flatMapSequential((v0) -> {
                return v0.collectList();
            }, 1, 1).map((v0) -> {
                return Bytes.toArray(v0);
            }).map(ByteBuffer::wrap).doOnCancel(() -> {
                atomicBoolean.set(true);
            })).close();
            Assertions.assertThat(atomicBoolean.get()).isTrue();
        }

        @Test
        void givenALongFluxBytesWhenIReadItPartiallyBeforeClosingItThenTheOriginalFluxShouldBeDisposed() throws Exception {
            byte[] bytes = RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.US_ASCII);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            InputStream inputStream = ReactorUtils.toInputStream(Flux.fromIterable(Bytes.asList(bytes)).window(3).flatMapSequential((v0) -> {
                return v0.collectList();
            }, 1, 1).map((v0) -> {
                return Bytes.toArray(v0);
            }).map(ByteBuffer::wrap).doOnCancel(() -> {
                atomicBoolean.set(true);
            }));
            inputStream.read(new byte[3]);
            inputStream.close();
            Assertions.assertThat(atomicBoolean.get()).isTrue();
        }

        @Test
        void givenALongFluxBytesWhenIReadItFullyWithoutClosingItThenTheOriginalFluxShouldBeDisposed() throws Exception {
            byte[] bytes = RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.US_ASCII);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            IOUtils.readFully(ReactorUtils.toInputStream(Flux.fromIterable(Bytes.asList(bytes)).window(3).flatMapSequential((v0) -> {
                return v0.collectList();
            }, 1, 1).map((v0) -> {
                return Bytes.toArray(v0);
            }).map(ByteBuffer::wrap).doFinally(signalType -> {
                atomicBoolean.set(true);
            })), 41111);
            Assertions.assertThat(atomicBoolean.get()).isTrue();
        }

        @Test
        void exceptionsShouldCancelOriginalFluxSubscription() {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            InputStream inputStream = ReactorUtils.toInputStream(Flux.fromIterable(ImmutableList.of(Mono.just("abc"), Mono.just("def"), Mono.error(new RuntimeException("Dummy")), Mono.just("mno"))).doFinally(signalType -> {
                atomicBoolean.set(true);
            }).concatMap(mono -> {
                return mono;
            }, 1).map((v0) -> {
                return v0.getBytes();
            }).map(ByteBuffer::wrap));
            try {
                byte[] bArr = new byte[3];
                inputStream.read(bArr);
                inputStream.read(bArr);
                inputStream.read(bArr);
            } catch (Exception e) {
            }
            Assertions.assertThat(atomicBoolean.get()).isTrue();
        }

        @Test
        void exceptionsShouldBePropagated() {
            InputStream inputStream = ReactorUtils.toInputStream(Flux.fromIterable(ImmutableList.of(Mono.just("abc"), Mono.just("def"), Mono.just("ghi"), Mono.just("jkl"), Mono.error(new RuntimeException("Dummy")), Mono.just("mno"))).concatMap(mono -> {
                return mono;
            }, 1).map((v0) -> {
                return v0.getBytes();
            }).map(ByteBuffer::wrap));
            Assertions.assertThatThrownBy(() -> {
                IOUtils.toByteArray(inputStream);
            }).hasMessage("Dummy");
        }

        @Test
        void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            Assertions.assertThat(IOUtils.readFully(ReactorUtils.toInputStream(Flux.range(0, 10).subscribeOn(Schedulers.fromExecutor(Executors.newCachedThreadPool())).limitRate(2).doOnRequest(j -> {
                atomicInteger.getAndAdd((int) j);
            }).map(num -> {
                return new byte[]{(byte) num.intValue()};
            }).map(ByteBuffer::wrap)), ReactorUtilsTest.BUFFER_SIZE)).contains(new int[]{0, 1, 2, 3, 4});
            Thread.sleep(200L);
            Assertions.assertThat(atomicInteger.get()).isEqualTo(6);
        }

        /* JADX WARN: Type inference failed for: r0v2, types: [byte[], java.lang.Object[]] */
        @Test
        void givenAFluxOf3BytesShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            Assertions.assertThat(IOUtils.readFully(ReactorUtils.toInputStream(Flux.just((Object[]) new byte[]{new byte[]{0, 1, 2}, new byte[]{3, 4, ReactorUtilsTest.BUFFER_SIZE}, new byte[]{6, 7, 8}}).subscribeOn(Schedulers.fromExecutor(Executors.newCachedThreadPool())).map(ByteBuffer::wrap).limitRate(2).doOnRequest(j -> {
                atomicInteger.getAndAdd((int) j);
            })), ReactorUtilsTest.BUFFER_SIZE)).contains(new int[]{0, 1, 2, 3, 4});
            Thread.sleep(200L);
            Assertions.assertThat(atomicInteger.get()).isEqualTo(3);
        }

        /* JADX WARN: Type inference failed for: r0v2, types: [byte[], java.lang.Object[]] */
        @Test
        void givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            InputStream inputStream = ReactorUtils.toInputStream(Flux.just((Object[]) new byte[]{new byte[]{0, 1, 2}, new byte[0], new byte[]{3, 4, ReactorUtilsTest.BUFFER_SIZE}, new byte[]{6, 7, 8}, new byte[]{9, 10, 11}}).subscribeOn(Schedulers.fromExecutor(Executors.newCachedThreadPool())).map(ByteBuffer::wrap).limitRate(2).doOnRequest(j -> {
                atomicInteger.getAndAdd((int) j);
            }));
            IOUtils.readFully(inputStream, ReactorUtilsTest.BUFFER_SIZE);
            Assertions.assertThat(IOUtils.readFully(inputStream, 2)).contains(new int[]{ReactorUtilsTest.BUFFER_SIZE, 6});
        }

        @Test
        void givenAnEmptyFluxShouldConsumeOnlyThePrefetch() throws IOException, InterruptedException {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            InputStream inputStream = ReactorUtils.toInputStream(Flux.empty().subscribeOn(Schedulers.fromExecutor(Executors.newCachedThreadPool())).map(ByteBuffer::wrap).limitRate(2).doOnRequest(j -> {
                atomicInteger.getAndAdd((int) j);
            }));
            byte[] bArr = new byte[ReactorUtilsTest.BUFFER_SIZE];
            inputStream.read(bArr, 0, bArr.length);
            Assertions.assertThat(bArr).contains(new int[]{0, 0, 0, 0, 0});
            Thread.sleep(200L);
            Assertions.assertThat(atomicInteger.get()).isEqualTo(1);
        }
    }

    ReactorUtilsTest() {
    }
}
