package org.apache.james.jmap.cassandra.filtering;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.select.Select;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import javax.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.core.Username;
import org.apache.james.eventsourcing.AggregateId;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.EventId;
import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.ReactiveSubscriber;
import org.apache.james.jmap.api.filtering.Rule;
import org.apache.james.jmap.api.filtering.Rules;
import org.apache.james.jmap.api.filtering.Version;
import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
import org.apache.james.jmap.api.filtering.impl.FilteringAggregate;
import org.apache.james.jmap.api.filtering.impl.FilteringAggregateId;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.class */
public class CassandraFilteringProjection implements EventSourcingFilteringManagement.ReadProjection, ReactiveSubscriber {
    private final CassandraAsyncExecutor executor;
    private final PreparedStatement insertStatement;
    private final PreparedStatement readStatement;
    private final PreparedStatement readVersionStatement;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Inject
    public CassandraFilteringProjection(CqlSession cqlSession) {
        this.executor = new CassandraAsyncExecutor(cqlSession);
        this.insertStatement = cqlSession.prepare(QueryBuilder.insertInto(CassandraFilteringProjectionModule.TABLE_NAME).value(CassandraFilteringProjectionModule.AGGREGATE_ID, QueryBuilder.bindMarker(CassandraFilteringProjectionModule.AGGREGATE_ID)).value(CassandraFilteringProjectionModule.EVENT_ID, QueryBuilder.bindMarker(CassandraFilteringProjectionModule.EVENT_ID)).value(CassandraFilteringProjectionModule.RULES, QueryBuilder.bindMarker(CassandraFilteringProjectionModule.RULES)).build());
        this.readStatement = cqlSession.prepare(((Select) QueryBuilder.selectFrom(CassandraFilteringProjectionModule.TABLE_NAME).all().whereColumn(CassandraFilteringProjectionModule.AGGREGATE_ID).isEqualTo(QueryBuilder.bindMarker(CassandraFilteringProjectionModule.AGGREGATE_ID))).build());
        this.readVersionStatement = cqlSession.prepare(((Select) QueryBuilder.selectFrom(CassandraFilteringProjectionModule.TABLE_NAME).column(CassandraFilteringProjectionModule.EVENT_ID).whereColumn(CassandraFilteringProjectionModule.AGGREGATE_ID).isEqualTo(QueryBuilder.bindMarker(CassandraFilteringProjectionModule.AGGREGATE_ID))).build());
    }

    public Publisher<Rules> listRulesForUser(Username username) {
        return this.executor.executeSingleRow(this.readStatement.bind(new Object[0]).setString(CassandraFilteringProjectionModule.AGGREGATE_ID, new FilteringAggregateId(username).asAggregateKey())).handle((row, synchronousSink) -> {
            try {
                synchronousSink.next(parseRules(row));
            } catch (JsonProcessingException e) {
                synchronousSink.error(e);
            }
        });
    }

    public Publisher<Version> getLatestVersion(Username username) {
        return this.executor.executeSingleRow(this.readVersionStatement.bind(new Object[0]).setString(CassandraFilteringProjectionModule.AGGREGATE_ID, new FilteringAggregateId(username).asAggregateKey())).map(this::parseVersion);
    }

    public Optional<ReactiveSubscriber> subscriber(Function<Username, Publisher<Rules>> function) {
        return Optional.of(this);
    }

    public Publisher<Void> handleReactive(EventWithState eventWithState) {
        Event event = eventWithState.event();
        return persistRules(event.getAggregateId(), event.eventId(), ((FilteringAggregate.FilterState) eventWithState.state().get()).getRules());
    }

    private Mono<Void> persistRules(AggregateId aggregateId, EventId eventId, ImmutableList<Rule> immutableList) {
        try {
            return this.executor.executeVoid(this.insertStatement.bind(new Object[0]).setString(CassandraFilteringProjectionModule.AGGREGATE_ID, aggregateId.asAggregateKey()).setInt(CassandraFilteringProjectionModule.EVENT_ID, eventId.value()).setString(CassandraFilteringProjectionModule.RULES, this.objectMapper.writeValueAsString(RuleDTO.from((List<Rule>) immutableList))));
        } catch (JsonProcessingException e) {
            return Mono.error(e);
        }
    }

    private Version parseVersion(Row row) {
        return new Version(row.getInt(CassandraFilteringProjectionModule.EVENT_ID));
    }

    private Rules parseRules(Row row) throws JsonProcessingException {
        List list = (List) this.objectMapper.readValue(row.getString(CassandraFilteringProjectionModule.RULES), new TypeReference<List<RuleDTO>>() { // from class: org.apache.james.jmap.cassandra.filtering.CassandraFilteringProjection.1
        });
        return new Rules(RuleDTO.toRules(list), parseVersion(row));
    }
}
