/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group.streams.topics;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.OptionalInt;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.slf4j.Logger;

public class ChangelogTopics {
    private final Logger log;
    private final Collection<StreamsGroupTopologyValue.Subtopology> subtopologies;
    private final Function<String, OptionalInt> topicPartitionCountProvider;

    public ChangelogTopics(LogContext logContext, Collection<StreamsGroupTopologyValue.Subtopology> subtopologies, Function<String, OptionalInt> topicPartitionCountProvider) {
        this.log = logContext.logger(this.getClass());
        this.subtopologies = subtopologies;
        this.topicPartitionCountProvider = topicPartitionCountProvider;
    }

    public Map<String, Integer> setup() {
        HashMap<String, Integer> changelogTopicPartitions = new HashMap<String, Integer>();
        for (StreamsGroupTopologyValue.Subtopology subtopology : this.subtopologies) {
            OptionalInt maxNumPartitions = Stream.concat(subtopology.sourceTopics().stream(), subtopology.repartitionSourceTopics().stream().map(StreamsGroupTopologyValue.TopicInfo::name)).mapToInt(this::getPartitionCountOrFail).max();
            if (maxNumPartitions.isEmpty()) {
                throw new StreamsInvalidTopologyException("No source topics found for subtopology " + subtopology.subtopologyId());
            }
            for (StreamsGroupTopologyValue.TopicInfo topicInfo : subtopology.stateChangelogTopics()) {
                changelogTopicPartitions.put(topicInfo.name(), maxNumPartitions.getAsInt());
            }
        }
        if (!changelogTopicPartitions.isEmpty()) {
            this.log.debug("Expecting state changelog topic partitions {} for the requested topology.", (Object)changelogTopicPartitions.entrySet().stream().map(e -> (String)e.getKey() + ":" + String.valueOf(e.getValue())).collect(Collectors.joining(", ")));
        }
        return changelogTopicPartitions;
    }

    private int getPartitionCountOrFail(String topic) {
        OptionalInt topicPartitionCount = this.topicPartitionCountProvider.apply(topic);
        if (topicPartitionCount.isEmpty()) {
            throw new IllegalStateException("No partition count for source topic " + topic);
        }
        return topicPartitionCount.getAsInt();
    }
}

