/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.slf4j.Logger;

public class CopartitionedTopicsEnforcer {
    private final String logPrefix;
    private final Logger log;

    public CopartitionedTopicsEnforcer(String logPrefix) {
        this.logPrefix = logPrefix;
        LogContext logContext = new LogContext(logPrefix);
        this.log = logContext.logger(this.getClass());
    }

    public void enforce(Set<String> copartitionGroup, Map<String, InternalTopicConfig> allRepartitionTopicsNumPartitions, Cluster metadata) {
        Collection internalTopicConfigsWithEnforcedNumberOfPartitions;
        if (copartitionGroup.isEmpty()) {
            return;
        }
        Map<Object, InternalTopicConfig> repartitionTopicConfigs = copartitionGroup.stream().filter(allRepartitionTopicsNumPartitions::containsKey).collect(Collectors.toMap(topic -> topic, allRepartitionTopicsNumPartitions::get));
        Map<String, Integer> nonRepartitionTopicPartitions = copartitionGroup.stream().filter(topic -> !allRepartitionTopicsNumPartitions.containsKey(topic)).collect(Collectors.toMap(topic -> topic, topic -> {
            Integer partitions = metadata.partitionCountForTopic(topic);
            if (partitions == null) {
                String str = String.format("%sTopic not found: %s", this.logPrefix, topic);
                this.log.error(str);
                throw new IllegalStateException(str);
            }
            return partitions;
        }));
        Collection<InternalTopicConfig> internalTopicConfigs = repartitionTopicConfigs.values();
        int numPartitionsToUseForRepartitionTopics = copartitionGroup.equals(repartitionTopicConfigs.keySet()) ? (!(internalTopicConfigsWithEnforcedNumberOfPartitions = (Collection)internalTopicConfigs.stream().filter(InternalTopicConfig::hasEnforcedNumberOfPartitions).collect(Collectors.toList())).isEmpty() ? this.validateAndGetNumOfPartitions(repartitionTopicConfigs, internalTopicConfigsWithEnforcedNumberOfPartitions) : this.getMaxPartitions(repartitionTopicConfigs)) : this.getSamePartitions(nonRepartitionTopicPartitions);
        for (InternalTopicConfig config : internalTopicConfigs) {
            CopartitionedTopicsEnforcer.maybeSetNumberOfPartitionsForInternalTopic(numPartitionsToUseForRepartitionTopics, config);
            int numberOfPartitionsOfInternalTopic = config.numberOfPartitions().orElseThrow(CopartitionedTopicsEnforcer.emptyNumberOfPartitionsExceptionSupplier(config.name()));
            if (numberOfPartitionsOfInternalTopic == numPartitionsToUseForRepartitionTopics) continue;
            String msg = String.format("%sNumber of partitions [%d] of repartition topic [%s] doesn't match number of partitions [%d] of the source topic.", this.logPrefix, numberOfPartitionsOfInternalTopic, config.name(), numPartitionsToUseForRepartitionTopics);
            throw new TopologyException(msg);
        }
    }

    private static void maybeSetNumberOfPartitionsForInternalTopic(int numPartitionsToUseForRepartitionTopics, InternalTopicConfig config) {
        if (!config.hasEnforcedNumberOfPartitions()) {
            config.setNumberOfPartitions(numPartitionsToUseForRepartitionTopics);
        }
    }

    private int validateAndGetNumOfPartitions(Map<Object, InternalTopicConfig> repartitionTopicConfigs, Collection<InternalTopicConfig> internalTopicConfigs) {
        InternalTopicConfig firstInternalTopicConfig = internalTopicConfigs.iterator().next();
        int firstNumberOfPartitionsOfInternalTopic = firstInternalTopicConfig.numberOfPartitions().orElseThrow(CopartitionedTopicsEnforcer.emptyNumberOfPartitionsExceptionSupplier(firstInternalTopicConfig.name()));
        for (InternalTopicConfig internalTopicConfig : internalTopicConfigs) {
            Integer numberOfPartitions = internalTopicConfig.numberOfPartitions().orElseThrow(CopartitionedTopicsEnforcer.emptyNumberOfPartitionsExceptionSupplier(internalTopicConfig.name()));
            if (numberOfPartitions == firstNumberOfPartitionsOfInternalTopic) continue;
            Map<Object, Integer> repartitionTopics = repartitionTopicConfigs.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((InternalTopicConfig)entry.getValue()).numberOfPartitions().get()));
            String msg = String.format("%sFollowing topics do not have the same number of partitions: [%s]", this.logPrefix, new TreeMap<Object, Integer>(repartitionTopics));
            throw new TopologyException(msg);
        }
        return firstNumberOfPartitionsOfInternalTopic;
    }

    private static Supplier<TopologyException> emptyNumberOfPartitionsExceptionSupplier(String topic) {
        return () -> new TopologyException("Number of partitions is not set for topic: " + topic);
    }

    private int getSamePartitions(Map<String, Integer> nonRepartitionTopicsInCopartitionGroup) {
        int partitions = nonRepartitionTopicsInCopartitionGroup.values().iterator().next();
        for (Map.Entry<String, Integer> entry : nonRepartitionTopicsInCopartitionGroup.entrySet()) {
            if (entry.getValue() == partitions) continue;
            TreeMap<String, Integer> sorted = new TreeMap<String, Integer>(nonRepartitionTopicsInCopartitionGroup);
            throw new TopologyException(String.format("%sTopics not co-partitioned: [%s]", this.logPrefix, sorted));
        }
        return partitions;
    }

    private int getMaxPartitions(Map<Object, InternalTopicConfig> repartitionTopicsInCopartitionGroup) {
        int maxPartitions = 0;
        for (InternalTopicConfig config : repartitionTopicsInCopartitionGroup.values()) {
            Optional<Integer> partitions = config.numberOfPartitions();
            maxPartitions = Integer.max(maxPartitions, partitions.orElse(maxPartitions));
        }
        if (maxPartitions <= 0) {
            throw new IllegalStateException(this.logPrefix + "Could not validate the copartitioning of topics: " + String.valueOf(repartitionTopicsInCopartitionGroup.keySet()));
        }
        return maxPartitions;
    }
}

