package com.starrocks.connector.flink.manager;

import com.starrocks.connector.flink.row.StarRocksDelimiterParser;
import com.starrocks.connector.flink.table.StarRocksSinkOptions;
import com.starrocks.shade.com.alibaba.fastjson.JSON;
import com.starrocks.shade.org.apache.commons.codec.binary.Base64;
import java.io.IOException;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/connector/flink/manager/StarRocksStreamLoadVisitor.class */
public class StarRocksStreamLoadVisitor implements Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class);
    private final StarRocksSinkOptions sinkOptions;
    private final String[] fieldNames;
    private int pos;

    public StarRocksStreamLoadVisitor(StarRocksSinkOptions starRocksSinkOptions, String[] strArr) {
        this.fieldNames = strArr;
        this.sinkOptions = starRocksSinkOptions;
    }

    public void doStreamLoad(Tuple3<String, Long, ArrayList<byte[]>> tuple3) throws IOException {
        String availableHost = getAvailableHost();
        if (null == availableHost) {
            throw new IOException("None of the hosts in `load_url` could be connected.");
        }
        String str = availableHost + "/api/" + this.sinkOptions.getDatabaseName() + "/" + this.sinkOptions.getTableName() + "/_stream_load";
        LOG.info(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", Integer.valueOf(((ArrayList) tuple3.f2).size()), tuple3.f1, tuple3.f0));
        Map<String, Object> doHttpPut = doHttpPut(str, (String) tuple3.f0, joinRows((List) tuple3.f2, ((Long) tuple3.f1).intValue()));
        if (null == doHttpPut || !doHttpPut.containsKey("Status")) {
            throw new IOException("Unable to flush data to StarRocks: unknown result status, usually caused by authentication or permission related problems.");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Stream Load response: \n%s\n", JSON.toJSONString(doHttpPut)));
        }
        if (doHttpPut.get("Status").equals("Fail")) {
            LOG.error(String.format("Stream Load response: \n%s\n", JSON.toJSONString(doHttpPut)));
            throw new StarRocksStreamLoadFailedException(String.format("Failed to flush data to StarRocks, Error response: \n%s\n", JSON.toJSONString(doHttpPut)), doHttpPut);
        }
    }

    private String getAvailableHost() {
        List<String> loadUrlList = this.sinkOptions.getLoadUrlList();
        if (this.pos >= loadUrlList.size()) {
            this.pos = 0;
        }
        while (this.pos < loadUrlList.size()) {
            String str = "http://" + loadUrlList.get(this.pos);
            if (tryHttpConnection(str)) {
                return str;
            }
            this.pos++;
        }
        return null;
    }

    private boolean tryHttpConnection(String str) {
        try {
            HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(str).openConnection();
            httpURLConnection.setConnectTimeout(this.sinkOptions.getConnectTimout());
            httpURLConnection.connect();
            httpURLConnection.disconnect();
            return true;
        } catch (Exception e) {
            LOG.warn("Failed to connect to address:{}", str, e);
            return false;
        }
    }

    private byte[] joinRows(List<byte[]> list, int i) throws IOException {
        if (StarRocksSinkOptions.StreamLoadFormat.CSV.equals(this.sinkOptions.getStreamLoadFormat())) {
            byte[] bytes = StarRocksDelimiterParser.parse(this.sinkOptions.getSinkStreamLoadProperties().get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
            ByteBuffer allocate = ByteBuffer.allocate(i + (list.size() * bytes.length));
            Iterator<byte[]> it = list.iterator();
            while (it.hasNext()) {
                allocate.put(it.next());
                allocate.put(bytes);
            }
            return allocate.array();
        }
        if (!StarRocksSinkOptions.StreamLoadFormat.JSON.equals(this.sinkOptions.getStreamLoadFormat())) {
            throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
        }
        ByteBuffer allocate2 = ByteBuffer.allocate(i + (list.isEmpty() ? 2 : list.size() + 1));
        allocate2.put("[".getBytes(StandardCharsets.UTF_8));
        byte[] bytes2 = ",".getBytes(StandardCharsets.UTF_8);
        boolean z = true;
        for (byte[] bArr : list) {
            if (!z) {
                allocate2.put(bytes2);
            }
            allocate2.put(bArr);
            z = false;
        }
        allocate2.put("]".getBytes(StandardCharsets.UTF_8));
        return allocate2.array();
    }

    /* JADX WARN: Code restructure failed: missing block: B:14:0x00af, code lost:
    
        if (com.starrocks.connector.flink.table.StarRocksSinkOptions.StreamLoadFormat.CSV.equals(r7.sinkOptions.getStreamLoadFormat()) != false) goto L13;
     */
    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v109, types: [java.lang.Throwable, java.util.Map$Entry] */
    /* JADX WARN: Type inference failed for: r0v25 */
    /* JADX WARN: Type inference failed for: r0v29, types: [com.starrocks.shade.org.apache.http.client.methods.CloseableHttpResponse] */
    /* JADX WARN: Type inference failed for: r0v30, types: [com.starrocks.shade.org.apache.http.client.methods.CloseableHttpResponse] */
    /* JADX WARN: Type inference failed for: r16v0, types: [java.util.Iterator] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private java.util.Map<java.lang.String, java.lang.Object> doHttpPut(java.lang.String r8, java.lang.String r9, byte[] r10) throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 796
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor.doHttpPut(java.lang.String, java.lang.String, byte[]):java.util.Map");
    }

    private String getBasicAuthHeader(String str, String str2) {
        return "Basic " + new String(Base64.encodeBase64((str + ":" + str2).getBytes(StandardCharsets.UTF_8)));
    }
}
