package com.starrocks.connector.flink.table.source;

import com.starrocks.connector.flink.row.source.StarRocksSourceFlinkRows;
import com.starrocks.connector.flink.table.source.struct.ColunmRichInfo;
import com.starrocks.connector.flink.table.source.struct.Const;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
import com.starrocks.connector.flink.table.source.struct.StarRocksSchema;
import com.starrocks.shade.org.apache.thrift.TException;
import com.starrocks.shade.org.apache.thrift.protocol.TBinaryProtocol;
import com.starrocks.shade.org.apache.thrift.protocol.TMultiplexedProtocol;
import com.starrocks.shade.org.apache.thrift.transport.TSocket;
import com.starrocks.shade.org.apache.thrift.transport.TTransportException;
import com.starrocks.thrift.TScanBatchResult;
import com.starrocks.thrift.TScanCloseParams;
import com.starrocks.thrift.TScanNextBatchParams;
import com.starrocks.thrift.TScanOpenParams;
import com.starrocks.thrift.TScanOpenResult;
import com.starrocks.thrift.TStarrocksExternalService;
import com.starrocks.thrift.TStatusCode;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.table.data.GenericRowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/connector/flink/table/source/StarRocksSourceBeReader.class */
public class StarRocksSourceBeReader implements StarRocksSourceDataReader, Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(StarRocksSourceBeReader.class);
    private TStarrocksExternalService.Client client;
    private final String IP;
    private final int PORT;
    private final List<ColunmRichInfo> colunmRichInfos;
    private final SelectColumn[] selectColumns;
    private String contextId;
    private int readerOffset = 0;
    private StarRocksSchema srSchema;
    private StarRocksSourceFlinkRows curFlinkRows;
    private GenericRowData curData;

    public StarRocksSourceBeReader(String str, List<ColunmRichInfo> list, SelectColumn[] selectColumnArr, StarRocksSourceOptions starRocksSourceOptions) {
        if (starRocksSourceOptions.getBeHostMappingList().length() > 0) {
            String beHostMappingList = starRocksSourceOptions.getBeHostMappingList();
            HashMap hashMap = new HashMap();
            for (String str2 : beHostMappingList.split(";")) {
                String[] split = str2.split(",");
                hashMap.put(split[1].trim(), split[0].trim());
            }
            if (!hashMap.containsKey(str)) {
                throw new RuntimeException("Not find be node info from the be port mappping list");
            }
            str = (String) hashMap.get(str);
            LOG.info("query data from be by using be-hostname");
        } else {
            LOG.info("query data from be by using be-ip");
        }
        String[] split2 = str.split(TMultiplexedProtocol.SEPARATOR);
        String trim = split2[0].trim();
        int parseInt = Integer.parseInt(split2[1].trim());
        this.IP = trim;
        this.PORT = parseInt;
        this.colunmRichInfos = list;
        this.selectColumns = selectColumnArr;
        TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
        TSocket tSocket = new TSocket(this.IP, this.PORT, starRocksSourceOptions.getConnectTimeoutMs(), starRocksSourceOptions.getConnectTimeoutMs());
        try {
            tSocket.open();
            this.client = new TStarrocksExternalService.Client(factory.getProtocol(tSocket));
        } catch (TTransportException e) {
            tSocket.close();
            throw new RuntimeException("Failed to create brpc source:" + e.getMessage());
        }
    }

    public void openScanner(List<Long> list, String str, StarRocksSourceOptions starRocksSourceOptions) {
        TScanOpenParams tScanOpenParams = new TScanOpenParams();
        tScanOpenParams.setTablet_ids(list);
        tScanOpenParams.setOpaqued_query_plan(str);
        tScanOpenParams.setCluster(Const.DEFAULT_CLUSTER_NAME);
        tScanOpenParams.setDatabase(starRocksSourceOptions.getDatabaseName());
        tScanOpenParams.setTable(starRocksSourceOptions.getTableName());
        tScanOpenParams.setUser(starRocksSourceOptions.getUsername());
        tScanOpenParams.setPasswd(starRocksSourceOptions.getPassword());
        tScanOpenParams.setBatch_size(starRocksSourceOptions.getBatchRows());
        if (starRocksSourceOptions.getProperties() != null) {
            tScanOpenParams.setProperties(starRocksSourceOptions.getProperties());
        }
        tScanOpenParams.setKeep_alive_min((short) starRocksSourceOptions.getKeepAliveMin());
        tScanOpenParams.setQuery_timeout(starRocksSourceOptions.getQueryTimeout());
        tScanOpenParams.setMem_limit(starRocksSourceOptions.getMemLimit());
        LOG.info("open Scan params.mem_limit {} B", Long.valueOf(tScanOpenParams.getMem_limit()));
        LOG.info("open Scan params.keep-alive-min {} min", Short.valueOf(tScanOpenParams.getKeep_alive_min()));
        try {
            TScanOpenResult open_scanner = this.client.open_scanner(tScanOpenParams);
            if (!TStatusCode.OK.equals(open_scanner.getStatus().getStatus_code())) {
                throw new RuntimeException("Failed to open scanner." + open_scanner.getStatus().getStatus_code() + open_scanner.getStatus().getError_msgs());
            }
            this.srSchema = StarRocksSchema.genSchema(open_scanner.getSelected_columns());
            this.contextId = open_scanner.getContext_id();
        } catch (TException e) {
            throw new RuntimeException("Failed to open scanner." + e.getMessage());
        }
    }

    public void startToRead() {
        TScanNextBatchParams tScanNextBatchParams = new TScanNextBatchParams();
        tScanNextBatchParams.setContext_id(this.contextId);
        tScanNextBatchParams.setOffset(this.readerOffset);
        try {
            TScanBatchResult tScanBatchResult = this.client.get_next(tScanNextBatchParams);
            if (!TStatusCode.OK.equals(tScanBatchResult.getStatus().getStatus_code())) {
                throw new RuntimeException("Failed to get next from be -> ip:[" + this.IP + "] " + tScanBatchResult.getStatus().getStatus_code() + " msg:" + tScanBatchResult.getStatus().getError_msgs());
            }
            if (!tScanBatchResult.eos) {
                handleResult(tScanBatchResult);
            }
        } catch (TException e) {
            throw new RuntimeException(e.getMessage());
        }
    }

    @Override // com.starrocks.connector.flink.table.source.StarRocksSourceDataReader
    public boolean hasNext() {
        return this.curData != null;
    }

    @Override // com.starrocks.connector.flink.table.source.StarRocksSourceDataReader
    public GenericRowData getNext() {
        GenericRowData genericRowData = this.curData;
        this.curData = null;
        if (this.curFlinkRows.hasNext()) {
            this.curData = this.curFlinkRows.next();
        }
        if (this.curData != null) {
            return genericRowData;
        }
        startToRead();
        return genericRowData;
    }

    private void handleResult(TScanBatchResult tScanBatchResult) {
        try {
            StarRocksSourceFlinkRows genFlinkRowsFromArrow = new StarRocksSourceFlinkRows(tScanBatchResult, this.colunmRichInfos, this.srSchema, this.selectColumns).genFlinkRowsFromArrow();
            this.readerOffset = genFlinkRowsFromArrow.getReadRowCount() + this.readerOffset;
            this.curFlinkRows = genFlinkRowsFromArrow;
            this.curData = genFlinkRowsFromArrow.next();
        } catch (IOException e) {
            throw new RuntimeException(e.getMessage());
        }
    }

    @Override // com.starrocks.connector.flink.table.source.StarRocksSourceDataReader
    public void close() {
        TScanCloseParams tScanCloseParams = new TScanCloseParams();
        tScanCloseParams.setContext_id(this.contextId);
        try {
            this.client.close_scanner(tScanCloseParams);
        } catch (TException e) {
            throw new RuntimeException(e.getMessage());
        }
    }
}
