package org.apache.james.util;

import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import reactor.core.publisher.SynchronousSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

/* loaded from: input_file:recursive/extensions-jars/james-server-guice-custom-mailets-3.9.0-SNAPSHOT-jar-with-dependencies.jar:org/apache/james/util/ReactorUtils.class */
public class ReactorUtils {
    public static final String MDC_KEY_PREFIX = "MDC-";
    public static final int DEFAULT_CONCURRENCY = 16;
    public static final int LOW_CONCURRENCY = 4;
    private static final boolean DAEMON = true;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ReactorUtils.class);
    private static final int DEFAULT_BOUNDED_ELASTIC_SIZE = ((Integer) Optional.ofNullable(System.getProperty("james.schedulers.defaultBoundedElasticSize")).map(Integer::parseInt).orElseGet(() -> {
        return Integer.valueOf(10 * Runtime.getRuntime().availableProcessors());
    })).intValue();
    public static final int DEFAULT_BOUNDED_ELASTIC_QUEUESIZE = ((Integer) Optional.ofNullable(System.getProperty("james.schedulers.defaultBoundedElasticQueueSize")).map(Integer::parseInt).orElse(100000)).intValue();
    private static final int TTL_SECONDS = 60;
    public static final Scheduler BLOCKING_CALL_WRAPPER = Schedulers.newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "blocking-call-wrapper", TTL_SECONDS, true);

    @FunctionalInterface
    /* loaded from: input_file:recursive/extensions-jars/james-server-guice-custom-mailets-3.9.0-SNAPSHOT-jar-with-dependencies.jar:org/apache/james/util/ReactorUtils$RequiresOperation.class */
    public interface RequiresOperation<T, U> {
        Function<Flux<T>, Flux<U>> forOperation(Function<T, Publisher<U>> function);
    }

    @FunctionalInterface
    /* loaded from: input_file:recursive/extensions-jars/james-server-guice-custom-mailets-3.9.0-SNAPSHOT-jar-with-dependencies.jar:org/apache/james/util/ReactorUtils$RequiresPeriod.class */
    public interface RequiresPeriod<T, U> {
        RequiresOperation<T, U> per(Duration duration);
    }

    @FunctionalInterface
    /* loaded from: input_file:recursive/extensions-jars/james-server-guice-custom-mailets-3.9.0-SNAPSHOT-jar-with-dependencies.jar:org/apache/james/util/ReactorUtils$RequiresQuantity.class */
    public interface RequiresQuantity<T, U> {
        RequiresPeriod<T, U> elements(int i);
    }

    /* loaded from: input_file:recursive/extensions-jars/james-server-guice-custom-mailets-3.9.0-SNAPSHOT-jar-with-dependencies.jar:org/apache/james/util/ReactorUtils$StreamInputStream.class */
    private static class StreamInputStream extends InputStream {
        private static final int NO_MORE_DATA = -1;
        private final Iterator<ByteBuffer> source;
        private final Stream<ByteBuffer> sourceAsStream;
        private Optional<ByteBuffer> currentItemByteStream = Optional.empty();

        StreamInputStream(Stream<ByteBuffer> stream) {
            this.source = stream.iterator();
            this.sourceAsStream = stream;
        }

        @Override // java.io.InputStream
        public int read(byte[] bArr, int i, int i2) throws IOException {
            return ((Integer) nextNonEmptyBuffer().map(byteBuffer -> {
                int min = Math.min(i2, byteBuffer.remaining());
                byteBuffer.get(bArr, i, min);
                return Integer.valueOf(min);
            }).orElse(-1)).intValue();
        }

        @Override // java.io.InputStream
        public int read() {
            return ((Integer) nextNonEmptyBuffer().map(ReactorUtils::byteToInt).orElse(-1)).intValue();
        }

        private Optional<ByteBuffer> nextNonEmptyBuffer() {
            if (!((Boolean) this.currentItemByteStream.map(byteBuffer -> {
                return Boolean.valueOf(!byteBuffer.hasRemaining());
            }).orElse(true)).booleanValue()) {
                return this.currentItemByteStream;
            }
            if (!this.source.hasNext()) {
                return Optional.empty();
            }
            this.currentItemByteStream = Optional.of(this.source.next());
            return nextNonEmptyBuffer();
        }

        @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.sourceAsStream.close();
        }
    }

    public static <T, U> RequiresQuantity<T, U> throttle() {
        return i -> {
            return duration -> {
                return function -> {
                    Preconditions.checkArgument(i > 0, "'windowMaxSize' must be strictly positive");
                    Preconditions.checkArgument(!duration.isNegative(), "'windowDuration' must be strictly positive");
                    Preconditions.checkArgument(!duration.isZero(), "'windowDuration' must be strictly positive");
                    return flux -> {
                        return flux.onErrorContinue((th, obj) -> {
                            LOGGER.error("Error encountered while generating throttled entries", th);
                        }).window(i).delayElements(duration).concatMap(flux -> {
                            return flux.flatMap(function, Queues.SMALL_BUFFER_SIZE).onErrorResume(th2 -> {
                                LOGGER.error("Error encountered while throttling", th2);
                                return Mono.empty();
                            });
                        });
                    };
                };
            };
        };
    }

    public static <T> Mono<T> executeAndEmpty(Runnable runnable) {
        return Mono.fromRunnable(runnable).then(Mono.empty());
    }

    public static <T> BiConsumer<Optional<T>, SynchronousSink<T>> publishIfPresent() {
        return (optional, synchronousSink) -> {
            Objects.requireNonNull(synchronousSink);
            optional.ifPresent(synchronousSink::next);
        };
    }

    public static InputStream toInputStream(Flux<ByteBuffer> flux) {
        return new StreamInputStream(flux.toStream(1));
    }

    public static Flux<ByteBuffer> toChunks(InputStream inputStream, int i) {
        return Flux.generate(synchronousSink -> {
            try {
                byte[] bArr = new byte[i];
                int read = inputStream.read(bArr);
                if (read >= 0) {
                    synchronousSink.next(ByteBuffer.wrap(bArr, 0, read));
                } else {
                    synchronousSink.complete();
                }
            } catch (IOException e) {
                synchronousSink.error(e);
            }
        }).defaultIfEmpty(ByteBuffer.wrap(new byte[0]));
    }

    private static int byteToInt(ByteBuffer byteBuffer) {
        return byteBuffer.get() & 255;
    }

    public static Consumer<Signal<?>> logOnError(Consumer<Throwable> consumer) {
        return signal -> {
            if (signal.isOnError()) {
                logWithContext(() -> {
                    consumer.accept(signal.getThrowable());
                }, signal.getContextView());
            }
        };
    }

    public static Consumer<Signal<?>> log(Runnable runnable) {
        return signal -> {
            logWithContext(runnable, signal.getContextView());
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void logWithContext(Runnable runnable, ContextView contextView) {
        try {
            Closeable build = retrieveMDCBuilder(contextView).build();
            try {
                runnable.run();
                if (build != null) {
                    build.close();
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static Mono<Void> logAsMono(Runnable runnable) {
        return Mono.deferContextual(contextView -> {
            return Mono.fromRunnable(() -> {
                logWithContext(runnable, contextView);
            });
        });
    }

    public static Consumer<Signal<?>> logOnError(Class<? extends Throwable> cls, Consumer<Throwable> consumer) {
        return signal -> {
            if (signal.hasError() && cls.isInstance(signal.getThrowable())) {
                logWithContext(() -> {
                    consumer.accept(signal.getThrowable());
                }, signal.getContextView());
            }
        };
    }

    public static Context context(String str, MDCBuilder mDCBuilder) {
        return Context.of(mdcKey(str), mDCBuilder);
    }

    private static String mdcKey(String str) {
        return "MDC-" + str;
    }

    public static MDCBuilder retrieveMDCBuilder(Signal<?> signal) {
        return retrieveMDCBuilder(signal.getContextView());
    }

    public static MDCBuilder retrieveMDCBuilder(ContextView contextView) {
        return (MDCBuilder) contextView.stream().filter(entry -> {
            return entry.getKey() instanceof String;
        }).filter(entry2 -> {
            return entry2.getValue() instanceof MDCBuilder;
        }).filter(entry3 -> {
            return ((String) entry3.getKey()).startsWith(MDC_KEY_PREFIX);
        }).map(entry4 -> {
            return (MDCBuilder) entry4.getValue();
        }).reduce(MDCBuilder.create(), (v0, v1) -> {
            return v0.addToContext(v1);
        });
    }
}
