package com.starrocks.connector.flink.table.sink;

import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;
import com.starrocks.connector.flink.manager.StarRocksSinkManager;
import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer;
import com.starrocks.connector.flink.row.sink.StarRocksISerializer;
import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.alter.Alter;
import net.sf.jsqlparser.statement.truncate.Truncate;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.NestedRowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.class */
public class StarRocksDynamicSinkFunction<T> extends RichSinkFunction<T> implements CheckpointedFunction {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(StarRocksDynamicSinkFunction.class);
    private StarRocksSinkManager sinkManager;
    private StarRocksIRowTransformer<T> rowTransformer;
    private StarRocksSinkOptions sinkOptions;
    private StarRocksISerializer serializer;
    private transient Counter totalInvokeRowsTime;
    private transient Counter totalInvokeRows;
    private static final String COUNTER_INVOKE_ROWS_COST_TIME = "totalInvokeRowsTimeNs";
    private static final String COUNTER_INVOKE_ROWS = "totalInvokeRows";
    private transient ListState<Map<String, StarRocksSinkBufferEntity>> checkpointedState;

    public StarRocksDynamicSinkFunction(StarRocksSinkOptions starRocksSinkOptions, TableSchema tableSchema, StarRocksIRowTransformer<T> starRocksIRowTransformer) {
        this.sinkManager = new StarRocksSinkManager(starRocksSinkOptions, tableSchema);
        starRocksIRowTransformer.setTableSchema(tableSchema);
        this.serializer = StarRocksSerializerFactory.createSerializer(starRocksSinkOptions, tableSchema.getFieldNames());
        this.rowTransformer = starRocksIRowTransformer;
        this.sinkOptions = starRocksSinkOptions;
    }

    public StarRocksDynamicSinkFunction(StarRocksSinkOptions starRocksSinkOptions) {
        this.sinkManager = new StarRocksSinkManager(starRocksSinkOptions, null);
        this.sinkOptions = starRocksSinkOptions;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.sinkManager.setRuntimeContext(getRuntimeContext());
        this.totalInvokeRows = getRuntimeContext().getMetricGroup().counter(COUNTER_INVOKE_ROWS);
        this.totalInvokeRowsTime = getRuntimeContext().getMetricGroup().counter(COUNTER_INVOKE_ROWS_COST_TIME);
        if (null != this.rowTransformer) {
            this.rowTransformer.setRuntimeContext(getRuntimeContext());
        }
        this.sinkManager.startScheduler();
        this.sinkManager.startAsyncFlushing();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public synchronized void invoke(T t, SinkFunction.Context context) throws Exception {
        long nanoTime = System.nanoTime();
        if (StarRocksSinkSemantic.EXACTLY_ONCE.equals(this.sinkOptions.getSemantic())) {
            flushPreviousState();
        }
        if (null == this.serializer) {
            if (!(t instanceof StarRocksSinkRowDataWithMeta)) {
                this.sinkManager.writeRecords(this.sinkOptions.getDatabaseName(), this.sinkOptions.getTableName(), (String) t);
                this.totalInvokeRows.inc(1L);
                this.totalInvokeRowsTime.inc(System.nanoTime() - nanoTime);
                return;
            }
            StarRocksSinkRowDataWithMeta starRocksSinkRowDataWithMeta = (StarRocksSinkRowDataWithMeta) t;
            if (Strings.isNullOrEmpty(starRocksSinkRowDataWithMeta.getDatabase()) || Strings.isNullOrEmpty(starRocksSinkRowDataWithMeta.getTable()) || null == starRocksSinkRowDataWithMeta.getDataRows()) {
                LOG.warn(String.format("json row data not fullfilled. {database: %s, table: %s, dataRows: %s}", starRocksSinkRowDataWithMeta.getDatabase(), starRocksSinkRowDataWithMeta.getTable(), starRocksSinkRowDataWithMeta.getDataRows()));
                return;
            } else {
                this.sinkManager.writeRecords(starRocksSinkRowDataWithMeta.getDatabase(), starRocksSinkRowDataWithMeta.getTable(), starRocksSinkRowDataWithMeta.getDataRows());
                return;
            }
        }
        if (t instanceof NestedRowData) {
            NestedRowData nestedRowData = (NestedRowData) t;
            if (nestedRowData.getSegments().length != 1 || nestedRowData.getSegments()[0].size() < 256) {
                return;
            }
            byte[] bArr = new byte[nestedRowData.getSegments()[0].size() - 256];
            nestedRowData.getSegments()[0].get(256, bArr);
            Map map = (Map) InstantiationUtil.deserializeObject(bArr, HashMap.class.getClassLoader());
            if (null == map || "true".equals(map.get("snapshot")) || Strings.isNullOrEmpty((String) map.get("ddl")) || Strings.isNullOrEmpty((String) map.get("databaseName"))) {
                return;
            }
            Truncate parse = CCJSqlParserUtil.parse((String) map.get("ddl"));
            if (parse instanceof Truncate) {
                if (!this.sinkOptions.getTableName().equalsIgnoreCase(parse.getTable().getName())) {
                    return;
                }
            } else if (parse instanceof Alter) {
            }
        }
        if (t instanceof RowData) {
            if (RowKind.UPDATE_BEFORE.equals(((RowData) t).getRowKind())) {
                return;
            }
            if (!this.sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData) t).getRowKind())) {
                return;
            }
        }
        this.sinkManager.writeRecords(this.sinkOptions.getDatabaseName(), this.sinkOptions.getTableName(), this.serializer.serialize(this.rowTransformer.transform(t, this.sinkOptions.supportUpsertDelete())));
        this.totalInvokeRows.inc(1L);
        this.totalInvokeRowsTime.inc(System.nanoTime() - nanoTime);
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        if (StarRocksSinkSemantic.EXACTLY_ONCE.equals(this.sinkOptions.getSemantic())) {
            this.checkpointedState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("buffered-rows", TypeInformation.of(new TypeHint<Map<String, StarRocksSinkBufferEntity>>() { // from class: com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction.1
            })));
        }
    }

    public synchronized void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (!StarRocksSinkSemantic.EXACTLY_ONCE.equals(this.sinkOptions.getSemantic())) {
            this.sinkManager.flush(null, true);
        } else {
            flushPreviousState();
            this.checkpointedState.add(this.sinkManager.getBufferedBatchMap());
        }
    }

    public synchronized void close() throws Exception {
        super.close();
        if (StarRocksSinkSemantic.EXACTLY_ONCE.equals(this.sinkOptions.getSemantic())) {
            flushPreviousState();
        }
        this.sinkManager.close();
    }

    private void flushPreviousState() throws Exception {
        Iterator<T> it = ((Iterable) this.checkpointedState.get()).iterator();
        while (it.hasNext()) {
            this.sinkManager.setBufferedBatchMap((Map) it.next());
            this.sinkManager.flush(null, true);
        }
        this.checkpointedState.clear();
    }
}
