package org.apache.hadoop.streaming;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.streaming.io.InputWriter;
import org.apache.hadoop.streaming.io.OutputReader;
import org.apache.hadoop.streaming.io.TextInputWriter;
import org.apache.hadoop.streaming.io.TextOutputReader;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.ReflectionUtils;

/* JADX WARN: Classes with same name are omitted:
  input_file:classes/org/apache/hadoop/streaming/PipeMapRed.class
 */
/* loaded from: input_file:hadoop-streaming-2.7.3.jar:org/apache/hadoop/streaming/PipeMapRed.class */
public abstract class PipeMapRed {
    protected static final Log LOG = LogFactory.getLog(PipeMapRed.class.getName());
    static final int OUTSIDE = 1;
    static final int SINGLEQ = 2;
    static final int DOUBLEQ = 3;
    private static final int BUFFER_SIZE = 131072;
    long startTime_;
    long joinDelay_;
    JobConf job_;
    boolean doPipe_;
    Class<? extends InputWriter> mapInputWriterClass_;
    Class<? extends OutputReader> mapOutputReaderClass_;
    Class<? extends InputWriter> reduceInputWriterClass_;
    Class<? extends OutputReader> reduceOutputReaderClass_;
    boolean nonZeroExitIsFailure_;
    Process sim;
    InputWriter inWriter_;
    OutputReader outReader_;
    MROutputThread outThread_;
    MRErrorThread errThread_;
    DataOutputStream clientOut_;
    DataInputStream clientErr_;
    DataInputStream clientIn_;
    int numExceptions_;
    protected volatile Throwable outerrThreadsThrowable;
    long numRecRead_ = 0;
    long numRecWritten_ = 0;
    long numRecSkipped_ = 0;
    long nextRecReadLog_ = 1;
    long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
    long reporterOutDelay_ = 10000;
    long reporterErrDelay_ = 10000;
    volatile boolean processProvidedStatus_ = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/streaming/PipeMapRed$MRErrorThread.class
     */
    /* loaded from: input_file:hadoop-streaming-2.7.3.jar:org/apache/hadoop/streaming/PipeMapRed$MRErrorThread.class */
    public class MRErrorThread extends Thread {
        long lastStderrReport = 0;
        volatile Reporter reporter;
        private final String reporterPrefix;
        private final String counterPrefix;
        private final String statusPrefix;

        public MRErrorThread() {
            this.reporterPrefix = PipeMapRed.this.job_.get("stream.stderr.reporter.prefix", "reporter:");
            this.counterPrefix = this.reporterPrefix + "counter:";
            this.statusPrefix = this.reporterPrefix + "status:";
            setDaemon(true);
        }

        public void setReporter(Reporter reporter) {
            this.reporter = reporter;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Text text = new Text();
            LineReader lineReader = null;
            try {
                lineReader = new LineReader(PipeMapRed.this.clientErr_, PipeMapRed.this.job_);
                while (lineReader.readLine(text) > 0) {
                    String text2 = text.toString();
                    if (!matchesReporter(text2)) {
                        System.err.println(text2);
                    } else if (matchesCounter(text2)) {
                        incrCounter(text2);
                    } else if (matchesStatus(text2)) {
                        PipeMapRed.this.processProvidedStatus_ = true;
                        setStatus(text2);
                    } else {
                        PipeMapRed.LOG.warn("Cannot parse reporter line: " + text2);
                    }
                    long currentTimeMillis = System.currentTimeMillis();
                    if (this.reporter != null && currentTimeMillis - this.lastStderrReport > PipeMapRed.this.reporterErrDelay_) {
                        this.lastStderrReport = currentTimeMillis;
                        this.reporter.progress();
                    }
                    text.clear();
                }
                if (lineReader != null) {
                    lineReader.close();
                }
                if (PipeMapRed.this.clientErr_ != null) {
                    PipeMapRed.this.clientErr_.close();
                    PipeMapRed.this.clientErr_ = null;
                    PipeMapRed.LOG.info("MRErrorThread done");
                }
            } catch (Throwable th) {
                PipeMapRed.this.outerrThreadsThrowable = th;
                PipeMapRed.LOG.warn(th);
                if (lineReader != null) {
                    try {
                        lineReader.close();
                    } catch (IOException e) {
                        PipeMapRed.LOG.info(e);
                        return;
                    }
                }
                if (PipeMapRed.this.clientErr_ != null) {
                    PipeMapRed.this.clientErr_.close();
                    PipeMapRed.this.clientErr_ = null;
                }
            }
        }

        private boolean matchesReporter(String str) {
            return str.startsWith(this.reporterPrefix);
        }

        private boolean matchesCounter(String str) {
            return str.startsWith(this.counterPrefix);
        }

        private boolean matchesStatus(String str) {
            return str.startsWith(this.statusPrefix);
        }

        private void incrCounter(String str) {
            String[] split = str.substring(this.counterPrefix.length()).trim().split(",");
            if (split.length != PipeMapRed.DOUBLEQ) {
                PipeMapRed.LOG.warn("Cannot parse counter line: " + str);
                return;
            }
            try {
                this.reporter.incrCounter(split[0], split[PipeMapRed.OUTSIDE], Long.parseLong(split[PipeMapRed.SINGLEQ]));
            } catch (NumberFormatException e) {
                PipeMapRed.LOG.warn("Cannot parse counter increment '" + split[PipeMapRed.SINGLEQ] + "' from line: " + str);
            }
        }

        private void setStatus(String str) {
            this.reporter.setStatus(str.substring(this.statusPrefix.length()).trim());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/streaming/PipeMapRed$MROutputThread.class
     */
    /* loaded from: input_file:hadoop-streaming-2.7.3.jar:org/apache/hadoop/streaming/PipeMapRed$MROutputThread.class */
    public class MROutputThread extends Thread {
        OutputReader outReader;
        OutputCollector outCollector;
        Reporter reporter;
        long lastStdoutReport = 0;

        MROutputThread(OutputReader outputReader, OutputCollector outputCollector, Reporter reporter) {
            this.outReader = null;
            this.outCollector = null;
            this.reporter = null;
            setDaemon(true);
            this.outReader = outputReader;
            this.outCollector = outputCollector;
            this.reporter = reporter;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.outReader.readKeyValue()) {
                try {
                    try {
                        this.outCollector.collect(this.outReader.getCurrentKey(), this.outReader.getCurrentValue());
                        PipeMapRed.this.numRecWritten_++;
                        long currentTimeMillis = System.currentTimeMillis();
                        if (currentTimeMillis - this.lastStdoutReport > PipeMapRed.this.reporterOutDelay_) {
                            this.lastStdoutReport = currentTimeMillis;
                            String str = "Records R/W=" + PipeMapRed.this.numRecRead_ + "/" + PipeMapRed.this.numRecWritten_;
                            if (PipeMapRed.this.processProvidedStatus_) {
                                this.reporter.progress();
                            } else {
                                this.reporter.setStatus(str);
                            }
                            PipeMapRed.LOG.info(str);
                        }
                    } catch (Throwable th) {
                        PipeMapRed.this.outerrThreadsThrowable = th;
                        PipeMapRed.LOG.warn(th);
                        try {
                            if (PipeMapRed.this.clientIn_ != null) {
                                PipeMapRed.this.clientIn_.close();
                                PipeMapRed.this.clientIn_ = null;
                            }
                            return;
                        } catch (IOException e) {
                            PipeMapRed.LOG.info(e);
                            return;
                        }
                    }
                } catch (Throwable th2) {
                    try {
                        if (PipeMapRed.this.clientIn_ != null) {
                            PipeMapRed.this.clientIn_.close();
                            PipeMapRed.this.clientIn_ = null;
                        }
                    } catch (IOException e2) {
                        PipeMapRed.LOG.info(e2);
                    }
                    throw th2;
                }
            }
            try {
                if (PipeMapRed.this.clientIn_ != null) {
                    PipeMapRed.this.clientIn_.close();
                    PipeMapRed.this.clientIn_ = null;
                }
            } catch (IOException e3) {
                PipeMapRed.LOG.info(e3);
            }
        }
    }

    public Configuration getConfiguration() {
        return this.job_;
    }

    public DataOutput getClientOutput() {
        return this.clientOut_;
    }

    public DataInput getClientInput() {
        return this.clientIn_;
    }

    public abstract byte[] getInputSeparator();

    public abstract byte[] getFieldSeparator();

    public abstract int getNumOfKeyFields();

    abstract String getPipeCommand(JobConf jobConf);

    abstract boolean getDoPipe();

    /* JADX WARN: Multi-variable type inference failed */
    static String[] splitArgs(String str) {
        ArrayList arrayList = new ArrayList();
        char[] charArray = str.toCharArray();
        int length = charArray.length;
        boolean z = OUTSIDE;
        int i = 0;
        int i2 = 0;
        while (i2 <= length) {
            boolean z2 = i2 == length;
            boolean z3 = z;
            boolean z4 = false;
            if (!z2) {
                if (charArray[i2] == '\'') {
                    if (z == OUTSIDE) {
                        z = SINGLEQ;
                    } else if (z == SINGLEQ) {
                        z = OUTSIDE;
                    }
                    z4 = z != z3;
                } else if (charArray[i2] == '\"') {
                    if (z == OUTSIDE) {
                        z = DOUBLEQ;
                    } else if (z == DOUBLEQ) {
                        z = OUTSIDE;
                    }
                    z4 = z != z3;
                } else if (charArray[i2] == ' ' && z == OUTSIDE) {
                    z4 = OUTSIDE;
                }
            }
            if (z2 || z4) {
                if (i2 != i) {
                    arrayList.add(str.substring(i, i2));
                }
                i = i2 + OUTSIDE;
            }
            i2 += OUTSIDE;
        }
        return (String[]) arrayList.toArray(new String[0]);
    }

    public void configure(JobConf jobConf) {
        try {
            String pipeCommand = getPipeCommand(jobConf);
            this.joinDelay_ = jobConf.getLong("stream.joindelay.milli", 0L);
            this.job_ = jobConf;
            this.mapInputWriterClass_ = this.job_.getClass("stream.map.input.writer.class", TextInputWriter.class, InputWriter.class);
            this.mapOutputReaderClass_ = this.job_.getClass("stream.map.output.reader.class", TextOutputReader.class, OutputReader.class);
            this.reduceInputWriterClass_ = this.job_.getClass("stream.reduce.input.writer.class", TextInputWriter.class, InputWriter.class);
            this.reduceOutputReaderClass_ = this.job_.getClass("stream.reduce.output.reader.class", TextOutputReader.class, OutputReader.class);
            this.nonZeroExitIsFailure_ = this.job_.getBoolean("stream.non.zero.exit.is.failure", true);
            this.doPipe_ = getDoPipe();
            if (this.doPipe_) {
                setStreamJobDetails(jobConf);
                String[] splitArgs = splitArgs(pipeCommand);
                String str = splitArgs[0];
                File absoluteFile = new File(".").getAbsoluteFile();
                if (!new File(str).isAbsolute()) {
                    FileUtil.chmod(new File(absoluteFile, str).toString(), "a+x");
                }
                if (!new File(splitArgs[0]).isAbsolute()) {
                    PathFinder pathFinder = new PathFinder("PATH");
                    pathFinder.prependPathComponent(absoluteFile.toString());
                    File absolutePath = pathFinder.getAbsolutePath(splitArgs[0]);
                    if (absolutePath != null) {
                        splitArgs[0] = absolutePath.getAbsolutePath();
                    }
                }
                LOG.info("PipeMapRed exec " + Arrays.asList(splitArgs));
                Environment environment = (Environment) StreamUtil.env().clone();
                addJobConfToEnvironment(this.job_, environment);
                addEnvironment(environment, this.job_.get("stream.addenvironment"));
                envPut(environment, "TMPDIR", System.getProperty("java.io.tmpdir"));
                ProcessBuilder processBuilder = new ProcessBuilder(splitArgs);
                processBuilder.environment().putAll(environment.toMap());
                this.sim = processBuilder.start();
                this.clientOut_ = new DataOutputStream(new BufferedOutputStream(this.sim.getOutputStream(), BUFFER_SIZE));
                this.clientIn_ = new DataInputStream(new BufferedInputStream(this.sim.getInputStream(), BUFFER_SIZE));
                this.clientErr_ = new DataInputStream(new BufferedInputStream(this.sim.getErrorStream()));
                this.startTime_ = System.currentTimeMillis();
            }
        } catch (IOException e) {
            LOG.error("configuration exception", e);
            throw new RuntimeException("configuration exception", e);
        } catch (InterruptedException e2) {
            LOG.error("configuration exception", e2);
            throw new RuntimeException("configuration exception", e2);
        }
    }

    void setStreamJobDetails(JobConf jobConf) {
        String str = jobConf.get("stream.minRecWrittenToEnableSkip_");
        if (str != null) {
            this.minRecWrittenToEnableSkip_ = Long.parseLong(str);
            LOG.info("JobConf set minRecWrittenToEnableSkip_ =" + this.minRecWrittenToEnableSkip_);
        }
    }

    void addJobConfToEnvironment(JobConf jobConf, Properties properties) {
        JobConf jobConf2 = new JobConf(jobConf);
        jobConf2.setDeprecatedProperties();
        Iterator it = jobConf2.iterator();
        while (it.hasNext()) {
            String str = (String) ((Map.Entry) it.next()).getKey();
            envPut(properties, safeEnvVarName(str), jobConf2.get(str));
        }
    }

    String safeEnvVarName(String str) {
        StringBuffer stringBuffer = new StringBuffer();
        int length = str.length();
        for (int i = 0; i < length; i += OUTSIDE) {
            char charAt = str.charAt(i);
            stringBuffer.append(((charAt < '0' || charAt > '9') && (charAt < 'A' || charAt > 'Z') && (charAt < 'a' || charAt > 'z')) ? '_' : charAt);
        }
        return stringBuffer.toString();
    }

    void addEnvironment(Properties properties, String str) {
        if (str == null) {
            return;
        }
        String[] split = str.split(" ");
        for (int i = 0; i < split.length; i += OUTSIDE) {
            String[] split2 = split[i].split("=", SINGLEQ);
            if (split2.length != SINGLEQ) {
                LOG.info("Skip env entry:" + split[i]);
            } else {
                envPut(properties, split2[0], split2[OUTSIDE]);
            }
        }
    }

    void envPut(Properties properties, String str, String str2) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Add  env entry:" + str + "=" + str2);
        }
        properties.put(str, str2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startOutputThreads(OutputCollector outputCollector, Reporter reporter) throws IOException {
        this.inWriter_ = createInputWriter();
        this.outReader_ = createOutputReader();
        this.outThread_ = new MROutputThread(this.outReader_, outputCollector, reporter);
        this.outThread_.start();
        this.errThread_ = new MRErrorThread();
        this.errThread_.setReporter(reporter);
        this.errThread_.start();
    }

    void waitOutputThreads() throws IOException {
        try {
            if (this.outThread_ == null) {
                startOutputThreads(new OutputCollector() { // from class: org.apache.hadoop.streaming.PipeMapRed.1
                    public void collect(Object obj, Object obj2) throws IOException {
                    }
                }, Reporter.NULL);
            }
            int waitFor = this.sim.waitFor();
            if (waitFor != 0) {
                if (this.nonZeroExitIsFailure_) {
                    throw new RuntimeException("PipeMapRed.waitOutputThreads(): subprocess failed with code " + waitFor);
                }
                LOG.info("PipeMapRed.waitOutputThreads(): subprocess exited with code " + waitFor + " in " + PipeMapRed.class.getName());
            }
            if (this.outThread_ != null) {
                this.outThread_.join(this.joinDelay_);
            }
            if (this.errThread_ != null) {
                this.errThread_.join(this.joinDelay_);
            }
            if (this.outerrThreadsThrowable != null) {
                throw new RuntimeException(this.outerrThreadsThrowable);
            }
        } catch (InterruptedException e) {
        }
    }

    abstract InputWriter createInputWriter() throws IOException;

    /* JADX INFO: Access modifiers changed from: package-private */
    public InputWriter createInputWriter(Class<? extends InputWriter> cls) throws IOException {
        InputWriter inputWriter = (InputWriter) ReflectionUtils.newInstance(cls, this.job_);
        inputWriter.initialize(this);
        return inputWriter;
    }

    abstract OutputReader createOutputReader() throws IOException;

    /* JADX INFO: Access modifiers changed from: package-private */
    public OutputReader createOutputReader(Class<? extends OutputReader> cls) throws IOException {
        OutputReader outputReader = (OutputReader) ReflectionUtils.newInstance(cls, this.job_);
        outputReader.initialize(this);
        return outputReader;
    }

    public void mapRedFinished() {
        try {
            if (!this.doPipe_) {
                LOG.info("mapRedFinished");
                return;
            }
            if (this.clientOut_ != null) {
                try {
                    this.clientOut_.flush();
                    this.clientOut_.close();
                } catch (IOException e) {
                    LOG.warn(e);
                }
            }
            try {
                waitOutputThreads();
            } catch (IOException e2) {
                LOG.warn(e2);
            }
            if (this.sim != null) {
                this.sim.destroy();
            }
            LOG.info("mapRedFinished");
        } catch (RuntimeException e3) {
            LOG.info("PipeMapRed failed!", e3);
            throw e3;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybeLogRecord() {
        if (this.numRecRead_ >= this.nextRecReadLog_) {
            LOG.info(numRecInfo());
            if (this.nextRecReadLog_ < 100000) {
                this.nextRecReadLog_ *= 10;
            } else {
                this.nextRecReadLog_ += 100000;
            }
        }
    }

    public String getContext() {
        String str = ((((numRecInfo() + "\n") + "minRecWrittenToEnableSkip_=" + this.minRecWrittenToEnableSkip_ + " ") + envline("HOST")) + envline("USER")) + envline("HADOOP_USER");
        if (this.outThread_ != null) {
            str = str + "last tool output: |" + this.outReader_.getLastOutput() + "|\n";
        }
        return str;
    }

    String envline(String str) {
        return str + "=" + StreamUtil.env().get(str) + "\n";
    }

    String numRecInfo() {
        long currentTimeMillis = (System.currentTimeMillis() - this.startTime_) / 1000;
        return "R/W/S=" + this.numRecRead_ + "/" + this.numRecWritten_ + "/" + this.numRecSkipped_ + " in:" + safeDiv(this.numRecRead_, currentTimeMillis) + " [rec/s] out:" + safeDiv(this.numRecWritten_, currentTimeMillis) + " [rec/s]";
    }

    String safeDiv(long j, long j2) {
        return j2 == 0 ? "NA" : "" + (j / j2) + "=" + j + "/" + j2;
    }
}
