package org.apache.james.rspamd;

import com.github.fge.lambdas.Throwing;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import jakarta.inject.Inject;
import jakarta.mail.MessagingException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.core.MailAddress;
import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.rspamd.client.RspamdClientConfiguration;
import org.apache.james.rspamd.client.RspamdHttpClient;
import org.apache.james.rspamd.model.AnalysisResult;
import org.apache.james.util.AuditTrail;
import org.apache.james.util.ReactorUtils;
import org.apache.mailet.Attribute;
import org.apache.mailet.AttributeName;
import org.apache.mailet.AttributeValue;
import org.apache.mailet.Mail;
import org.apache.mailet.PerRecipientHeaders;
import org.apache.mailet.ProcessingState;
import org.apache.mailet.base.GenericMailet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/rspamd/RspamdScanner.class */
public class RspamdScanner extends GenericMailet {
    private static final Logger LOGGER = LoggerFactory.getLogger(RspamdScanner.class);
    public static final AttributeName FLAG_MAIL = AttributeName.of("org.apache.james.rspamd.flag");
    public static final AttributeName STATUS_MAIL = AttributeName.of("org.apache.james.rspamd.status");
    private final RspamdHttpClient rspamdHttpClient;
    private final RspamdClientConfiguration configuration;
    private boolean rewriteSubject;
    private Optional<String> virusProcessor;
    private Optional<String> rejectSpamProcessor;

    @Inject
    public RspamdScanner(RspamdHttpClient rspamdHttpClient, RspamdClientConfiguration rspamdClientConfiguration) {
        this.rspamdHttpClient = rspamdHttpClient;
        this.configuration = rspamdClientConfiguration;
    }

    public void init() {
        this.rewriteSubject = getBooleanParameter(getInitParameter("rewriteSubject"), false);
        this.virusProcessor = getInitParameterAsOptional("virusProcessor");
        this.rejectSpamProcessor = getInitParameterAsOptional("rejectSpamProcessor");
    }

    public void service(Mail mail) throws MessagingException {
        if (this.configuration.usePerUserBayes()) {
            scanPerUser(mail);
        } else {
            scanAll(mail);
        }
    }

    private void scanPerUser(Mail mail) {
        Flux.fromIterable(mail.getRecipients()).flatMap(Throwing.function(mailAddress -> {
            return this.rspamdHttpClient.checkV2(mail, RspamdHttpClient.Options.forMailAddress(mailAddress)).map(analysisResult -> {
                return Pair.of(mailAddress, analysisResult);
            });
        }), 16).concatMap(pair -> {
            return Mono.fromRunnable(() -> {
                handleScanResult(mail, pair);
            }).subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
        }).blockLast();
    }

    private void handleScanResult(Mail mail, Pair<MailAddress, AnalysisResult> pair) {
        AuditTrail.entry().protocol("mailetcontainer").action("RspamdScanner").parameters(Throwing.supplier(() -> {
            return ImmutableMap.of("mailId", mail.getName(), "mimeMessageId", (String) Optional.ofNullable(mail.getMessage()).map(Throwing.function((v0) -> {
                return v0.getMessageID();
            })).orElse(""), "sender", mail.getMaybeSender().asString(), "recipient", ((MailAddress) pair.getKey()).asString(), "rspamDAction", ((AnalysisResult) pair.getValue()).getAction().name(), "rspamDRequiredScore", Float.toString(((AnalysisResult) pair.getValue()).getRequiredScore()), "rspamRewrittenSubject", ((AnalysisResult) pair.getValue()).getDesiredRewriteSubject().orElse(""), "rspamDScore", Float.toString(((AnalysisResult) pair.getValue()).getScore()));
        })).log("Mail scanned with RSpamD.");
        if (AnalysisResult.Action.REJECT == ((AnalysisResult) pair.getValue()).getAction()) {
            this.rejectSpamProcessor.ifPresent(str -> {
                processorPerUser(mail, (MailAddress) pair.getKey(), str);
            });
        }
        appendRspamdResultHeader(mail, (MailAddress) pair.getKey(), (AnalysisResult) pair.getRight());
        if (((AnalysisResult) pair.getRight()).hasVirus()) {
            this.virusProcessor.ifPresent(str2 -> {
                processorPerUser(mail, (MailAddress) pair.getKey(), str2);
            });
        }
    }

    private void processorPerUser(Mail mail, MailAddress mailAddress, String str) {
        Mail mail2 = null;
        try {
            try {
                mail2 = mail.duplicate();
                mail2.setRecipients(ImmutableList.of(mailAddress));
                getMailetContext().sendMail(mail2, str);
                if (mail2 != null) {
                    mail.setRecipients(Sets.difference(ImmutableSet.copyOf(mail.getRecipients()), ImmutableSet.of(mailAddress)));
                    LifecycleUtil.dispose(mail2);
                }
                if (this.virusProcessor.equals(Optional.of(str))) {
                    LOGGER.info("Detected a mail containing virus. Sending mail {} to {}", mail, this.virusProcessor);
                    mail.setState(str);
                }
            } catch (MessagingException e) {
                throw new RuntimeException("Error when processor per user", e);
            }
        } catch (Throwable th) {
            if (mail2 != null) {
                mail.setRecipients(Sets.difference(ImmutableSet.copyOf(mail.getRecipients()), ImmutableSet.of(mailAddress)));
                LifecycleUtil.dispose(mail2);
            }
            throw th;
        }
    }

    private void scanAll(Mail mail) throws MessagingException {
        AnalysisResult analysisResult = (AnalysisResult) this.rspamdHttpClient.checkV2(mail).block();
        Preconditions.checkNotNull(analysisResult);
        if (analysisResult.getAction() == AnalysisResult.Action.REJECT) {
            Optional<String> optional = this.rejectSpamProcessor;
            Objects.requireNonNull(mail);
            optional.ifPresent(mail::setState);
        }
        mail.getRecipients().forEach(mailAddress -> {
            appendRspamdResultHeader(mail, mailAddress, analysisResult);
        });
        if (this.rewriteSubject) {
            analysisResult.getDesiredRewriteSubject().ifPresent(Throwing.consumer(str -> {
                mail.getMessage().setSubject(str);
            }));
        }
        if (analysisResult.hasVirus()) {
            this.virusProcessor.ifPresent(str2 -> {
                LOGGER.info("Detected a mail containing virus. Sending mail {} to {}", mail, this.virusProcessor);
                mail.setState(str2);
            });
        }
    }

    private void appendRspamdResultHeader(Mail mail, MailAddress mailAddress, AnalysisResult analysisResult) {
        for (Attribute attribute : getHeadersAsAttributes(analysisResult)) {
            mail.addSpecificHeaderForRecipient(PerRecipientHeaders.Header.builder().name(attribute.getName().asString()).value((String) attribute.getValue().value()).build(), mailAddress);
        }
    }

    private List<Attribute> getHeadersAsAttributes(AnalysisResult analysisResult) {
        String str = "NO";
        Object obj = "No";
        if (analysisResult.getAction().equals(AnalysisResult.Action.REJECT) || analysisResult.getAction().equals(AnalysisResult.Action.ADD_HEADER) || analysisResult.getAction().equals(AnalysisResult.Action.REWRITE_SUBJECT)) {
            str = "YES";
            obj = "Yes";
        }
        return ImmutableList.of(new Attribute(FLAG_MAIL, AttributeValue.of(str)), new Attribute(STATUS_MAIL, AttributeValue.of(obj + ", actions=" + analysisResult.getAction().getDescription() + " score=" + analysisResult.getScore() + " requiredScore=" + analysisResult.getRequiredScore())));
    }

    public Collection<ProcessingState> requiredProcessingState() {
        return (Collection) Stream.of((Object[]) new Optional[]{this.virusProcessor, this.rejectSpamProcessor}).flatMap((v0) -> {
            return v0.stream();
        }).map(ProcessingState::new).collect(ImmutableList.toImmutableList());
    }
}
