package com.starrocks.connector.flink.manager;

import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions;
import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider;
import com.starrocks.connector.flink.table.StarRocksSinkOptions;
import com.starrocks.connector.flink.table.StarRocksSinkSemantic;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/connector/flink/manager/StarRocksSinkManager.class */
public class StarRocksSinkManager implements Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(StarRocksSinkManager.class);
    private final StarRocksJdbcConnectionProvider jdbcConnProvider;
    private final StarRocksQueryVisitor starrocksQueryVisitor;
    private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
    private final StarRocksSinkOptions sinkOptions;
    private transient Counter totalFlushBytes;
    private transient Counter totalFlushRows;
    private transient Counter totalFlushTime;
    private transient Counter totalFlushTimeWithoutRetries;
    private static final String COUNTER_TOTAL_FLUSH_BYTES = "totalFlushBytes";
    private static final String COUNTER_TOTAL_FLUSH_ROWS = "totalFlushRows";
    private static final String COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES = "totalFlushTimeNsWithoutRetries";
    private static final String COUNTER_TOTAL_FLUSH_COST_TIME = "totalFlushTimeNs";
    private volatile Exception flushException;
    private ScheduledExecutorService scheduler;
    private ScheduledFuture<?> scheduledFuture;
    private final LinkedBlockingDeque<Tuple3<String, Long, ArrayList<byte[]>>> flushQueue = new LinkedBlockingDeque<>(1);
    private final ArrayList<byte[]> buffer = new ArrayList<>();
    private int batchCount = 0;
    private long batchSize = 0;
    private volatile boolean closed = false;
    private final Map<String, List<LogicalTypeRoot>> typesMap = new HashMap();

    public StarRocksSinkManager(StarRocksSinkOptions starRocksSinkOptions, TableSchema tableSchema) {
        this.sinkOptions = starRocksSinkOptions;
        this.jdbcConnProvider = new StarRocksJdbcConnectionProvider(new StarRocksJdbcConnectionOptions(starRocksSinkOptions.getJdbcUrl(), starRocksSinkOptions.getUsername(), starRocksSinkOptions.getPassword()));
        this.starrocksQueryVisitor = new StarRocksQueryVisitor(this.jdbcConnProvider, starRocksSinkOptions.getDatabaseName(), starRocksSinkOptions.getTableName());
        this.typesMap.put("bigint", Lists.newArrayList(new LogicalTypeRoot[]{LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER}));
        this.typesMap.put("largeint", Lists.newArrayList(new LogicalTypeRoot[]{LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER}));
        this.typesMap.put("boolean", Lists.newArrayList(new LogicalTypeRoot[]{LogicalTypeRoot.BOOLEAN}));
        this.typesMap.put("char", Lists.newArrayList(new LogicalTypeRoot[]{LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR}));
        this.typesMap.put("date", Lists.newArrayList(new LogicalTypeRoot[]{LogicalTypeRoot.DATE, LogicalTypeRoot.VARCHAR}));
        this.typesMap.put("datetime", Lists.newArrayList(new LogicalTypeRoot[]{LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, LogicalTypeRoot.VARCHAR}));
        this.typesMap.put("decimal", Lists.newArrayList(new LogicalTypeRoot[]{LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.DOUBLE, LogicalTypeRoot.FLOAT}));
        this.typesMap.put("double", Lists.newArrayList(new LogicalTypeRoot[]{LogicalTypeRoot.DOUBLE, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER}));
        this.typesMap.put("float", Lists.newArrayList(new LogicalTypeRoot[]{LogicalTypeRoot.FLOAT, LogicalTypeRoot.INTEGER}));
        this.typesMap.put("int", Lists.newArrayList(new LogicalTypeRoot[]{LogicalTypeRoot.INTEGER}));
        this.typesMap.put("tinyint", Lists.newArrayList(new LogicalTypeRoot[]{LogicalTypeRoot.TINYINT, LogicalTypeRoot.INTEGER}));
        this.typesMap.put("smallint", Lists.newArrayList(new LogicalTypeRoot[]{LogicalTypeRoot.SMALLINT, LogicalTypeRoot.INTEGER}));
        this.typesMap.put("varchar", Lists.newArrayList(new LogicalTypeRoot[]{LogicalTypeRoot.VARCHAR}));
        this.typesMap.put("bitmap", Lists.newArrayList(new LogicalTypeRoot[]{LogicalTypeRoot.VARCHAR, LogicalTypeRoot.TINYINT, LogicalTypeRoot.SMALLINT, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER}));
        validateTableStructure(tableSchema);
        this.starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(starRocksSinkOptions, null == tableSchema ? new String[0] : tableSchema.getFieldNames());
    }

    public void setRuntimeContext(RuntimeContext runtimeContext) {
        this.totalFlushBytes = runtimeContext.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_BYTES);
        this.totalFlushRows = runtimeContext.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_ROWS);
        this.totalFlushTime = runtimeContext.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME);
        this.totalFlushTimeWithoutRetries = runtimeContext.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES);
    }

    public void startAsyncFlushing() {
        Thread thread = new Thread(new Runnable() { // from class: com.starrocks.connector.flink.manager.StarRocksSinkManager.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        StarRocksSinkManager.this.asyncFlush();
                    } catch (Exception e) {
                        StarRocksSinkManager.this.flushException = e;
                    }
                }
            }
        });
        thread.setDaemon(true);
        thread.start();
    }

    public void startScheduler() throws IOException {
        if (StarRocksSinkSemantic.EXACTLY_ONCE.equals(this.sinkOptions.getSemantic())) {
            return;
        }
        stopScheduler();
        this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("starrocks-interval-sink"));
        this.scheduledFuture = this.scheduler.schedule(() -> {
            synchronized (this) {
                if (!this.closed) {
                    try {
                        String createBatchLabel = createBatchLabel();
                        LOG.info(String.format("StarRocks interval Sinking triggered: label[%s].", createBatchLabel));
                        if (this.batchCount == 0) {
                            startScheduler();
                        }
                        flush(createBatchLabel, false);
                    } catch (Exception e) {
                        this.flushException = e;
                    }
                }
            }
        }, this.sinkOptions.getSinkMaxFlushInterval(), TimeUnit.MILLISECONDS);
    }

    public void stopScheduler() {
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
            this.scheduler.shutdown();
        }
    }

    public final synchronized void writeRecord(String str) throws IOException {
        checkFlushException();
        try {
            this.buffer.add(str.getBytes());
            this.batchCount++;
            this.batchSize += r0.length;
            if (StarRocksSinkSemantic.EXACTLY_ONCE.equals(this.sinkOptions.getSemantic())) {
                return;
            }
            if (this.batchCount >= this.sinkOptions.getSinkMaxRows() || this.batchSize >= this.sinkOptions.getSinkMaxBytes()) {
                String createBatchLabel = createBatchLabel();
                LOG.info(String.format("StarRocks buffer Sinking triggered: rows[%d] label[%s].", Integer.valueOf(this.batchCount), createBatchLabel));
                flush(createBatchLabel, false);
            }
        } catch (Exception e) {
            throw new IOException("Writing records to StarRocks failed.", e);
        }
    }

    public synchronized void flush(String str, boolean z) throws Exception {
        checkFlushException();
        if (this.batchCount == 0) {
            if (z) {
                waitAsyncFlushingDone();
            }
        } else {
            this.flushQueue.put(new Tuple3<>(str, Long.valueOf(this.batchSize), new ArrayList(this.buffer)));
            if (z) {
                waitAsyncFlushingDone();
            }
            this.buffer.clear();
            this.batchCount = 0;
            this.batchSize = 0L;
        }
    }

    public synchronized void close() {
        if (!this.closed) {
            this.closed = true;
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduler.shutdown();
            }
            try {
                String createBatchLabel = createBatchLabel();
                if (this.batchCount > 0) {
                    LOG.info(String.format("StarRocks Sink is about to close: label[%s].", createBatchLabel));
                }
                flush(createBatchLabel, true);
            } catch (Exception e) {
                throw new RuntimeException("Writing records to StarRocks failed.", e);
            }
        }
        checkFlushException();
    }

    public String createBatchLabel() {
        return UUID.randomUUID().toString();
    }

    public List<byte[]> getBufferedBatchList() {
        return this.buffer;
    }

    public void setBufferedBatchList(List<byte[]> list) throws IOException {
        if (StarRocksSinkSemantic.EXACTLY_ONCE.equals(this.sinkOptions.getSemantic())) {
            this.buffer.clear();
            this.batchCount = 0;
            this.batchSize = 0L;
            Iterator<byte[]> it = list.iterator();
            while (it.hasNext()) {
                writeRecord(new String(it.next(), StandardCharsets.UTF_8));
            }
        }
    }

    private void waitAsyncFlushingDone() throws InterruptedException {
        this.flushQueue.put(new Tuple3<>("", 0L, (Object) null));
        this.flushQueue.put(new Tuple3<>("", 0L, (Object) null));
        checkFlushException();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void asyncFlush() throws Exception {
        Tuple3<String, Long, ArrayList<byte[]>> take = this.flushQueue.take();
        if (Strings.isNullOrEmpty((String) take.f0)) {
            return;
        }
        stopScheduler();
        LOG.info(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", Integer.valueOf(((ArrayList) take.f2).size()), take.f1, take.f0));
        long nanoTime = System.nanoTime();
        for (int i = 0; i <= this.sinkOptions.getSinkMaxRetries(); i++) {
            try {
                long nanoTime2 = System.nanoTime();
                this.starrocksStreamLoadVisitor.doStreamLoad(take);
                LOG.info(String.format("Async stream load finished: label[%s].", take.f0));
                if (null != this.totalFlushBytes) {
                    this.totalFlushBytes.inc(((Long) take.f1).longValue());
                    this.totalFlushRows.inc(((ArrayList) take.f2).size());
                    this.totalFlushTime.inc(System.nanoTime() - nanoTime);
                    this.totalFlushTimeWithoutRetries.inc(System.nanoTime() - nanoTime2);
                }
                startScheduler();
                return;
            } catch (Exception e) {
                LOG.warn("Failed to flush batch data to StarRocks, retry times = {}", Integer.valueOf(i), e);
                if (i >= this.sinkOptions.getSinkMaxRetries()) {
                    throw new IOException(e);
                }
                try {
                    Thread.sleep(1000 * (i + 1));
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Unable to flush, interrupted while doing another attempt", e);
                }
            }
        }
    }

    private void checkFlushException() {
        if (this.flushException != null) {
            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
            for (int i = 0; i < stackTrace.length; i++) {
                LOG.info(stackTrace[i].getClassName() + "." + stackTrace[i].getMethodName() + " line:" + stackTrace[i].getLineNumber());
            }
            throw new RuntimeException("Writing records to StarRocks failed.", this.flushException);
        }
    }

    private void validateTableStructure(TableSchema tableSchema) {
        if (null == tableSchema) {
            return;
        }
        Optional primaryKey = tableSchema.getPrimaryKey();
        List<Map<String, Object>> tableColumnsMetaData = this.starrocksQueryVisitor.getTableColumnsMetaData();
        if (null == tableColumnsMetaData) {
            throw new IllegalArgumentException("Couldn't get the sink table's column info.");
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < tableColumnsMetaData.size(); i++) {
            if ("PRI".equals(tableColumnsMetaData.get(i).get("COLUMN_KEY").toString())) {
                arrayList.add(tableColumnsMetaData.get(i).get("COLUMN_NAME").toString().toLowerCase());
            }
        }
        if (!arrayList.isEmpty()) {
            if (!primaryKey.isPresent()) {
                throw new IllegalArgumentException("Source table schema should contain primary keys.");
            }
            if (((UniqueConstraint) primaryKey.get()).getColumns().size() != arrayList.size() || !((UniqueConstraint) primaryKey.get()).getColumns().stream().allMatch(str -> {
                return arrayList.contains(str);
            })) {
                throw new IllegalArgumentException("Primary keys of the source table does not match the ones of the sink table.");
            }
            this.sinkOptions.enableUpsertDelete();
        }
        if (this.sinkOptions.hasColumnMappingProperty()) {
            return;
        }
        if (tableSchema.getFieldCount() != tableColumnsMetaData.size()) {
            throw new IllegalArgumentException("Fields count mismatch.");
        }
        List tableColumns = tableSchema.getTableColumns();
        for (int i2 = 0; i2 < tableColumnsMetaData.size(); i2++) {
            String lowerCase = tableColumnsMetaData.get(i2).get("COLUMN_NAME").toString().toLowerCase();
            String lowerCase2 = tableColumnsMetaData.get(i2).get("DATA_TYPE").toString().toLowerCase();
            if (((List) tableColumns.stream().filter(tableColumn -> {
                return tableColumn.getName().toLowerCase().equals(lowerCase) && (!this.typesMap.containsKey(lowerCase2) || this.typesMap.get(lowerCase2).contains(tableColumn.getType().getLogicalType().getTypeRoot()));
            }).collect(Collectors.toList())).isEmpty()) {
                throw new IllegalArgumentException("Fields name or type mismatch for:" + lowerCase);
            }
        }
    }
}
