/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.rank;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction;
import org.apache.flink.table.runtime.operators.rank.ComparableRecordComparator;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RetractableTopNFunction
extends AbstractTopNFunction {
    private static final long serialVersionUID = 1365312180599454480L;
    private static final Logger LOG = LoggerFactory.getLogger(RetractableTopNFunction.class);
    private static final String STATE_CLEARED_WARN_MSG = "The state is cleared because of state ttl. This will result in incorrect result. You can increase the state ttl to avoid this.";
    private final InternalTypeInfo<RowData> sortKeyType;
    private final boolean lenient = true;
    private transient MapState<RowData, List<RowData>> dataState;
    private transient ValueState<SortedMap<RowData, Long>> treeMap;
    private GeneratedRecordEqualiser generatedEqualiser;
    private RecordEqualiser equaliser;
    private final ComparableRecordComparator serializableComparator;
    private final TypeSerializer<RowData> inputRowSer;

    public RetractableTopNFunction(StateTtlConfig ttlConfig, InternalTypeInfo<RowData> inputRowType, ComparableRecordComparator comparableRecordComparator, RowDataKeySelector sortKeySelector, RankType rankType, RankRange rankRange, GeneratedRecordEqualiser generatedEqualiser, boolean generateUpdateBefore, boolean outputRankNumber) {
        super(ttlConfig, inputRowType, comparableRecordComparator.getGeneratedRecordComparator(), sortKeySelector, rankType, rankRange, generateUpdateBefore, outputRankNumber);
        this.sortKeyType = sortKeySelector.getProducedType();
        this.serializableComparator = comparableRecordComparator;
        this.generatedEqualiser = generatedEqualiser;
        this.inputRowSer = inputRowType.createSerializer(new ExecutionConfig());
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.equaliser = (RecordEqualiser)this.generatedEqualiser.newInstance(this.getRuntimeContext().getUserCodeClassLoader());
        this.generatedEqualiser = null;
        ListTypeInfo valueTypeInfo = new ListTypeInfo(this.inputRowType);
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("data-state", (TypeInformation<RowData>)this.sortKeyType, valueTypeInfo);
        if (this.ttlConfig.isEnabled()) {
            mapStateDescriptor.enableTimeToLive(this.ttlConfig);
        }
        this.dataState = this.getRuntimeContext().getMapState(mapStateDescriptor);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("sorted-map", new SortedMapTypeInfo<RowData, Long>(this.sortKeyType, BasicTypeInfo.LONG_TYPE_INFO, this.serializableComparator));
        if (this.ttlConfig.isEnabled()) {
            valueStateDescriptor.enableTimeToLive(this.ttlConfig);
        }
        this.treeMap = this.getRuntimeContext().getState(valueStateDescriptor);
    }

    @Override
    public void processElement(RowData input, KeyedProcessFunction.Context ctx, Collector<RowData> out) throws Exception {
        this.initRankEnd(input);
        SortedMap<RowData, Long> sortedMap = this.treeMap.value();
        if (sortedMap == null) {
            sortedMap = new TreeMap<RowData, Long>(this.sortKeyComparator);
        }
        RowData sortKey = (RowData)this.sortKeySelector.getKey(input);
        boolean isAccumulate = RowDataUtil.isAccumulateMsg(input);
        input.setRowKind(RowKind.INSERT);
        if (isAccumulate) {
            if (sortedMap.containsKey(sortKey)) {
                sortedMap.put(sortKey, (Long)sortedMap.get(sortKey) + 1L);
            } else {
                sortedMap.put(sortKey, 1L);
            }
            if (this.outputRankNumber || this.hasOffset()) {
                this.emitRecordsWithRowNumber(sortedMap, sortKey, input, out);
            } else {
                this.emitRecordsWithoutRowNumber(sortedMap, sortKey, input, out);
            }
            List<RowData> inputs = this.dataState.get(sortKey);
            if (inputs == null) {
                inputs = new ArrayList<RowData>();
            }
            inputs.add(input);
            this.dataState.put(sortKey, inputs);
        } else {
            List<RowData> inputs;
            boolean stateRemoved = this.outputRankNumber || this.hasOffset() ? this.retractRecordWithRowNumber(sortedMap, sortKey, input, out) : this.retractRecordWithoutRowNumber(sortedMap, sortKey, input, out);
            if (sortedMap.containsKey(sortKey)) {
                long count = (Long)sortedMap.get(sortKey) - 1L;
                if (count == 0L) {
                    sortedMap.remove(sortKey);
                } else {
                    sortedMap.put(sortKey, count);
                }
            } else {
                this.stateStaledErrorHandle();
            }
            if (!stateRemoved && (inputs = this.dataState.get(sortKey)) != null) {
                Iterator<RowData> inputsIter = inputs.iterator();
                while (inputsIter.hasNext()) {
                    if (!this.equaliser.equals(inputsIter.next(), input)) continue;
                    inputsIter.remove();
                    break;
                }
                if (inputs.isEmpty()) {
                    this.dataState.remove(sortKey);
                } else {
                    this.dataState.put(sortKey, inputs);
                }
            }
        }
        this.treeMap.update(sortedMap);
    }

    private void processStateStaled(Iterator<Map.Entry<RowData, Long>> sortedMapIterator) throws RuntimeException {
        sortedMapIterator.remove();
        this.stateStaledErrorHandle();
    }

    private void stateStaledErrorHandle() {
        LOG.warn(STATE_CLEARED_WARN_MSG);
    }

    private void emitRecordsWithRowNumber(SortedMap<RowData, Long> sortedMap, RowData sortKey, RowData inputRow, Collector<RowData> out) throws Exception {
        Iterator<Map.Entry<RowData, Long>> iterator = sortedMap.entrySet().iterator();
        long currentRank = 0L;
        RowData currentRow = null;
        boolean findsSortKey = false;
        while (iterator.hasNext() && this.isInRankEnd(currentRank)) {
            Map.Entry<RowData, Long> entry = iterator.next();
            RowData key = entry.getKey();
            if (!findsSortKey && key.equals(sortKey)) {
                currentRank += entry.getValue().longValue();
                currentRow = inputRow;
                findsSortKey = true;
                continue;
            }
            if (findsSortKey) {
                List<RowData> inputs = this.dataState.get(key);
                if (inputs == null) {
                    this.processStateStaled(iterator);
                    continue;
                }
                for (int i = 0; i < inputs.size() && this.isInRankEnd(currentRank); ++i) {
                    RowData prevRow = inputs.get(i);
                    this.collectUpdateBefore(out, prevRow, currentRank);
                    this.collectUpdateAfter(out, currentRow, currentRank);
                    currentRow = prevRow;
                    ++currentRank;
                }
                continue;
            }
            currentRank += entry.getValue().longValue();
        }
        if (this.isInRankEnd(currentRank)) {
            this.collectInsert(out, currentRow, currentRank);
        }
    }

    private void emitRecordsWithoutRowNumber(SortedMap<RowData, Long> sortedMap, RowData sortKey, RowData inputRow, Collector<RowData> out) throws Exception {
        Iterator<Map.Entry<RowData, Long>> iterator = sortedMap.entrySet().iterator();
        long curRank = 0L;
        boolean findsSortKey = false;
        RowData toCollect = null;
        RowData toDelete = null;
        while (iterator.hasNext() && this.isInRankEnd(curRank)) {
            Map.Entry<RowData, Long> entry = iterator.next();
            RowData key = entry.getKey();
            if (!findsSortKey && key.equals(sortKey)) {
                if (this.isInRankRange(curRank += entry.getValue().longValue())) {
                    toCollect = inputRow;
                }
                findsSortKey = true;
                continue;
            }
            if (findsSortKey) {
                List<RowData> inputs = this.dataState.get(key);
                if (inputs == null) {
                    this.processStateStaled(iterator);
                    continue;
                }
                long count = entry.getValue();
                long rankOfLastRecord = curRank + count;
                if (this.isInRankEnd(rankOfLastRecord)) {
                    curRank = rankOfLastRecord;
                    continue;
                }
                int index = Long.valueOf(this.rankEnd - curRank).intValue();
                toDelete = inputs.get(index);
                break;
            }
            curRank += entry.getValue().longValue();
        }
        if (toDelete != null) {
            this.collectDelete(out, this.inputRowSer.copy(toDelete));
        }
        if (toCollect != null) {
            this.collectInsert(out, inputRow);
        }
    }

    private boolean retractRecordWithRowNumber(SortedMap<RowData, Long> sortedMap, RowData sortKey, RowData inputRow, Collector<RowData> out) throws Exception {
        Iterator<Map.Entry<RowData, Long>> iterator = sortedMap.entrySet().iterator();
        long currentRank = 0L;
        RowData prevRow = null;
        boolean findsSortKey = false;
        while (iterator.hasNext() && this.isInRankEnd(currentRank)) {
            RowData currentRow;
            List<RowData> inputs;
            Map.Entry<RowData, Long> entry = iterator.next();
            RowData key = entry.getKey();
            if (!findsSortKey && key.equals(sortKey)) {
                inputs = this.dataState.get(key);
                if (inputs == null) {
                    this.processStateStaled(iterator);
                    continue;
                }
                Iterator<RowData> inputIter = inputs.iterator();
                while (inputIter.hasNext() && this.isInRankEnd(currentRank)) {
                    currentRow = inputIter.next();
                    if (!findsSortKey && this.equaliser.equals(currentRow, inputRow)) {
                        prevRow = currentRow;
                        findsSortKey = true;
                        inputIter.remove();
                    } else if (findsSortKey) {
                        this.collectUpdateBefore(out, prevRow, currentRank);
                        this.collectUpdateAfter(out, currentRow, currentRank);
                        prevRow = currentRow;
                    }
                    ++currentRank;
                }
                if (inputs.isEmpty()) {
                    this.dataState.remove(key);
                    continue;
                }
                this.dataState.put(key, inputs);
                continue;
            }
            if (findsSortKey) {
                inputs = this.dataState.get(key);
                if (inputs == null) {
                    this.processStateStaled(iterator);
                    continue;
                }
                for (int i = 0; i < inputs.size() && this.isInRankEnd(currentRank); ++i) {
                    currentRow = inputs.get(i);
                    this.collectUpdateBefore(out, prevRow, currentRank);
                    this.collectUpdateAfter(out, currentRow, currentRank);
                    prevRow = currentRow;
                    ++currentRank;
                }
                continue;
            }
            currentRank += entry.getValue().longValue();
        }
        if (this.isInRankEnd(currentRank)) {
            if (!findsSortKey && null == prevRow) {
                this.stateStaledErrorHandle();
            } else {
                this.collectDelete(out, prevRow, currentRank);
            }
        }
        return findsSortKey;
    }

    private boolean retractRecordWithoutRowNumber(SortedMap<RowData, Long> sortedMap, RowData sortKey, RowData inputRow, Collector<RowData> out) throws Exception {
        Iterator<Map.Entry<RowData, Long>> iterator = sortedMap.entrySet().iterator();
        long nextRank = 1L;
        boolean findsSortKey = false;
        while (iterator.hasNext() && this.isInRankEnd(nextRank)) {
            Map.Entry<RowData, Long> entry = iterator.next();
            RowData key = entry.getKey();
            if (!findsSortKey && key.equals(sortKey)) {
                List<RowData> inputs = this.dataState.get(key);
                if (inputs == null) {
                    this.processStateStaled(iterator);
                    continue;
                }
                Iterator<RowData> inputIter = inputs.iterator();
                while (inputIter.hasNext() && this.isInRankEnd(nextRank)) {
                    RowData prevRow = inputIter.next();
                    if (!findsSortKey && this.equaliser.equals(prevRow, inputRow)) {
                        this.collectDelete(out, prevRow, nextRank);
                        --nextRank;
                        findsSortKey = true;
                        inputIter.remove();
                    } else if (findsSortKey && nextRank == this.rankEnd) {
                        this.collectInsert(out, prevRow, nextRank);
                    }
                    ++nextRank;
                }
                if (inputs.isEmpty()) {
                    this.dataState.remove(key);
                    continue;
                }
                this.dataState.put(key, inputs);
                continue;
            }
            if (findsSortKey) {
                long count = entry.getValue();
                long rankOfLastRecord = nextRank + count - 1L;
                if (rankOfLastRecord < this.rankEnd) {
                    nextRank = rankOfLastRecord + 1L;
                    continue;
                }
                int index = Long.valueOf(this.rankEnd - nextRank).intValue();
                List<RowData> inputs = this.dataState.get(key);
                if (inputs == null) {
                    this.processStateStaled(iterator);
                    continue;
                }
                RowData toAdd = inputs.get(index);
                this.collectInsert(out, toAdd);
                break;
            }
            nextRank += entry.getValue().longValue();
        }
        return findsSortKey;
    }
}

