/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.changelog.inmemory;

import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryChangelogStateHandle;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
class InMemoryStateChangelogWriter
implements StateChangelogWriter<InMemoryChangelogStateHandle> {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryStateChangelogWriter.class);
    private static final SequenceNumber INITIAL_SQN = SequenceNumber.of(0L);
    private final Map<Integer, NavigableMap<SequenceNumber, byte[]>> changesByKeyGroup = new HashMap<Integer, NavigableMap<SequenceNumber, byte[]>>();
    private final KeyGroupRange keyGroupRange;
    private SequenceNumber sqn = INITIAL_SQN;
    private boolean closed;

    public InMemoryStateChangelogWriter(KeyGroupRange keyGroupRange) {
        this.keyGroupRange = keyGroupRange;
    }

    @Override
    public void append(int keyGroup, byte[] value) {
        Preconditions.checkState(!this.closed, "LogWriter is closed");
        LOG.trace("append, keyGroup={}, {} bytes", (Object)keyGroup, (Object)value.length);
        this.changesByKeyGroup.computeIfAbsent(keyGroup, unused -> new TreeMap()).put(this.sqn, value);
        this.sqn = this.sqn.next();
    }

    @Override
    public SequenceNumber initialSequenceNumber() {
        return INITIAL_SQN;
    }

    @Override
    public SequenceNumber nextSequenceNumber() {
        return this.sqn;
    }

    @Override
    public CompletableFuture<InMemoryChangelogStateHandle> persist(SequenceNumber from) {
        LOG.debug("Persist after {}", (Object)from);
        Preconditions.checkNotNull(from);
        return CompletableFuture.completedFuture(new InMemoryChangelogStateHandle(this.collectChanges(from), from, this.sqn, this.keyGroupRange));
    }

    private List<StateChange> collectChanges(SequenceNumber after) {
        return this.changesByKeyGroup.entrySet().stream().flatMap(e -> this.toChangeStream((NavigableMap)e.getValue(), after, (Integer)e.getKey())).sorted(Comparator.comparing(sqnAndChange -> (SequenceNumber)sqnAndChange.f0)).map(t -> (StateChange)t.f1).collect(Collectors.toList());
    }

    private Stream<Tuple2<SequenceNumber, StateChange>> toChangeStream(NavigableMap<SequenceNumber, byte[]> changeMap, SequenceNumber after, int keyGroup) {
        return changeMap.tailMap(after, true).entrySet().stream().map(e2 -> Tuple2.of(e2.getKey(), new StateChange(keyGroup, (byte[])e2.getValue())));
    }

    @Override
    public void close() {
        Preconditions.checkState(!this.closed);
        this.closed = true;
    }

    @Override
    public SequenceNumber getLowestSequenceNumber() {
        return this.changesByKeyGroup.values().stream().filter(map -> !map.isEmpty()).map(SortedMap::firstKey).min(Comparator.naturalOrder()).orElse(this.nextSequenceNumber());
    }

    @Override
    public void truncate(SequenceNumber before) {
        this.changesByKeyGroup.forEach((kg, changesBySqn) -> changesBySqn.headMap(before, false).clear());
    }

    @Override
    public void confirm(SequenceNumber from, SequenceNumber to) {
    }

    @Override
    public void reset(SequenceNumber from, SequenceNumber to) {
    }
}

