package org.apache.flink.table.runtime.operators.rank;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.class */
public class AppendOnlyTopNFunction extends AbstractTopNFunction {
    private static final long serialVersionUID = -4708453213104128011L;
    private static final Logger LOG = LoggerFactory.getLogger(AppendOnlyTopNFunction.class);
    private final InternalTypeInfo<RowData> sortKeyType;
    private final TypeSerializer<RowData> inputRowSer;
    private final long cacheSize;
    private transient MapState<RowData, List<RowData>> dataState;
    private transient TopNBuffer buffer;
    private transient Cache<RowData, TopNBuffer> kvSortedMap;

    public AppendOnlyTopNFunction(StateTtlConfig stateTtlConfig, InternalTypeInfo<RowData> internalTypeInfo, GeneratedRecordComparator generatedRecordComparator, RowDataKeySelector rowDataKeySelector, RankType rankType, RankRange rankRange, boolean z, boolean z2, long j) {
        super(stateTtlConfig, internalTypeInfo, generatedRecordComparator, rowDataKeySelector, rankType, rankRange, z, z2);
        this.sortKeyType = rowDataKeySelector.mo6180getProducedType();
        this.inputRowSer = internalTypeInfo.createSerializer(new ExecutionConfig());
        this.cacheSize = j;
    }

    @Override // org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        int max = Math.max(1, (int) (this.cacheSize / getDefaultTopNSize()));
        CacheBuilder newBuilder = CacheBuilder.newBuilder();
        if (this.ttlConfig.isEnabled()) {
            newBuilder.expireAfterWrite(this.ttlConfig.getTtl().toMilliseconds(), TimeUnit.MILLISECONDS);
        }
        this.kvSortedMap = newBuilder.maximumSize(max).build();
        LOG.info("Top{} operator is using LRU caches key-size: {}", Long.valueOf(getDefaultTopNSize()), Integer.valueOf(max));
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("data-state-with-append", this.sortKeyType, new ListTypeInfo(this.inputRowType));
        if (this.ttlConfig.isEnabled()) {
            mapStateDescriptor.enableTimeToLive(this.ttlConfig);
        }
        this.dataState = getRuntimeContext().getMapState(mapStateDescriptor);
        registerMetric(this.kvSortedMap.size() * getDefaultTopNSize());
    }

    public void processElement(RowData rowData, KeyedProcessFunction<RowData, RowData, RowData>.Context context, Collector<RowData> collector) throws Exception {
        initHeapStates();
        initRankEnd(rowData);
        RowData rowData2 = (RowData) this.sortKeySelector.getKey(rowData);
        if (checkSortKeyInBufferRange(rowData2, this.buffer)) {
            this.buffer.put(rowData2, (RowData) this.inputRowSer.copy(rowData));
            this.dataState.put(rowData2, new ArrayList(this.buffer.get(rowData2)));
            if (this.outputRankNumber || hasOffset()) {
                processElementWithRowNumber(rowData2, rowData, collector);
            } else {
                processElementWithoutRowNumber(rowData, collector);
            }
        }
    }

    private void initHeapStates() throws Exception {
        this.requestCount++;
        RowData rowData = (RowData) this.keyContext.getCurrentKey();
        this.buffer = (TopNBuffer) this.kvSortedMap.getIfPresent(rowData);
        if (this.buffer != null) {
            this.hitCount++;
            return;
        }
        this.buffer = new TopNBuffer(this.sortKeyComparator, ArrayList::new);
        this.kvSortedMap.put(rowData, this.buffer);
        Iterator it = this.dataState.iterator();
        if (it != null) {
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                this.buffer.putAll((RowData) entry.getKey(), (List) entry.getValue());
            }
        }
    }

    private void processElementWithRowNumber(RowData rowData, RowData rowData2, Collector<RowData> collector) throws Exception {
        Iterator<Map.Entry<RowData, Collection<RowData>>> it = this.buffer.entrySet().iterator();
        long j = 0;
        boolean z = false;
        RowData rowData3 = null;
        while (it.hasNext() && isInRankEnd(j)) {
            Map.Entry<RowData, Collection<RowData>> next = it.next();
            Collection<RowData> value = next.getValue();
            if (!z && next.getKey().equals(rowData)) {
                j += value.size();
                rowData3 = rowData2;
                z = true;
            } else if (z) {
                Iterator<RowData> it2 = value.iterator();
                while (it2.hasNext() && isInRankEnd(j)) {
                    RowData next2 = it2.next();
                    collectUpdateBefore(collector, next2, j);
                    collectUpdateAfter(collector, rowData3, j);
                    rowData3 = next2;
                    j++;
                }
            } else {
                j += value.size();
            }
        }
        if (isInRankEnd(j)) {
            collectInsert(collector, rowData3, j);
        }
        ArrayList arrayList = new ArrayList();
        while (it.hasNext()) {
            RowData key = it.next().getKey();
            this.dataState.remove(key);
            arrayList.add(key);
        }
        Iterator it3 = arrayList.iterator();
        while (it3.hasNext()) {
            this.buffer.removeAll((RowData) it3.next());
        }
    }

    private void processElementWithoutRowNumber(RowData rowData, Collector<RowData> collector) throws Exception {
        if (this.buffer.getCurrentTopNum() > this.rankEnd) {
            Map.Entry<RowData, Collection<RowData>> lastEntry = this.buffer.lastEntry();
            RowData key = lastEntry.getKey();
            Collection<RowData> value = lastEntry.getValue();
            RowData lastElement = this.buffer.lastElement();
            int size = value.size();
            if (size <= 1) {
                this.buffer.removeAll(key);
                this.dataState.remove(key);
            } else {
                this.buffer.removeLast();
                this.dataState.put(key, new ArrayList(value));
            }
            if (size == 0 || rowData.equals(lastElement)) {
                return;
            } else {
                collectDelete(collector, lastElement);
            }
        }
        collectInsert(collector, rowData);
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
        processElement((RowData) obj, (KeyedProcessFunction<RowData, RowData, RowData>.Context) context, (Collector<RowData>) collector);
    }
}
