package org.apache.flink.table.runtime.operators.sort;

import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/sort/SortOperator.class */
public class SortOperator extends TableStreamOperator<RowData> implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
    private static final Logger LOG = LoggerFactory.getLogger(SortOperator.class);
    private GeneratedNormalizedKeyComputer gComputer;
    private GeneratedRecordComparator gComparator;
    private transient BinaryExternalSorter sorter;
    private transient StreamRecordCollector<RowData> collector;
    private transient BinaryRowDataSerializer binarySerializer;

    public SortOperator(GeneratedNormalizedKeyComputer generatedNormalizedKeyComputer, GeneratedRecordComparator generatedRecordComparator) {
        this.gComputer = generatedNormalizedKeyComputer;
        this.gComparator = generatedRecordComparator;
    }

    @Override // org.apache.flink.table.runtime.operators.TableStreamOperator
    public void open() throws Exception {
        super.open();
        LOG.info("Opening SortOperator");
        ClassLoader userCodeClassLoader = getContainingTask().getUserCodeClassLoader();
        AbstractRowDataSerializer abstractRowDataSerializer = (AbstractRowDataSerializer) getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
        this.binarySerializer = new BinaryRowDataSerializer(abstractRowDataSerializer.getArity());
        NormalizedKeyComputer newInstance = this.gComputer.newInstance(userCodeClassLoader);
        RecordComparator newInstance2 = this.gComparator.newInstance(userCodeClassLoader);
        this.gComputer = null;
        this.gComparator = null;
        this.sorter = new BinaryExternalSorter(getContainingTask(), getContainingTask().getEnvironment().getMemoryManager(), computeMemorySize(), getContainingTask().getEnvironment().getIOManager(), abstractRowDataSerializer, this.binarySerializer, newInstance, newInstance2, getContainingTask().getJobConfiguration());
        this.sorter.startThreads();
        this.collector = new StreamRecordCollector<>(this.output);
        OperatorMetricGroup metricGroup = getMetricGroup();
        BinaryExternalSorter binaryExternalSorter = this.sorter;
        binaryExternalSorter.getClass();
        metricGroup.gauge("memoryUsedSizeInBytes", binaryExternalSorter::getUsedMemoryInBytes);
        OperatorMetricGroup metricGroup2 = getMetricGroup();
        BinaryExternalSorter binaryExternalSorter2 = this.sorter;
        binaryExternalSorter2.getClass();
        metricGroup2.gauge("numSpillFiles", binaryExternalSorter2::getNumSpillFiles);
        OperatorMetricGroup metricGroup3 = getMetricGroup();
        BinaryExternalSorter binaryExternalSorter3 = this.sorter;
        binaryExternalSorter3.getClass();
        metricGroup3.gauge("spillInBytes", binaryExternalSorter3::getSpillInBytes);
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        this.sorter.write((RowData) streamRecord.getValue());
    }

    public void endInput() throws Exception {
        BinaryRowData m6240createInstance = this.binarySerializer.m6240createInstance();
        MutableObjectIterator<BinaryRowData> iterator = this.sorter.getIterator();
        while (true) {
            BinaryRowData binaryRowData = (BinaryRowData) iterator.next(m6240createInstance);
            m6240createInstance = binaryRowData;
            if (binaryRowData == null) {
                return;
            } else {
                this.collector.collect(m6240createInstance);
            }
        }
    }

    public void close() throws Exception {
        LOG.info("Closing SortOperator");
        super.close();
        if (this.sorter != null) {
            this.sorter.close();
        }
    }
}
