package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import junit.framework.TestCase;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/AlternatingControllerTest.class */
public class AlternatingControllerTest {
    @Test
    public void testCheckpointHandling() throws Exception {
        testBarrierHandling(CheckpointType.CHECKPOINT);
    }

    @Test
    public void testSavepointHandling() throws Exception {
        testBarrierHandling(CheckpointType.SAVEPOINT);
    }

    @Test
    public void testAlternation() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate buildGate = buildGate(validatingCheckpointHandler, 123);
        ArrayList arrayList = new ArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 123) {
                Assert.assertEquals(arrayList, validatingCheckpointHandler.triggeredCheckpoints);
                return;
            }
            arrayList.add(Long.valueOf(j2));
            CheckpointType checkpointType = j2 % 2 == 0 ? CheckpointType.CHECKPOINT : CheckpointType.SAVEPOINT;
            for (int i = 0; i < 123; i++) {
                sendBarrier(j2, checkpointType, (TestInputChannel) buildGate.getChannel(i), buildGate);
            }
            j = j2 + 1;
        }
    }

    @Test
    public void testAlignedTimeoutableCheckpoint() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate buildGate = buildGate(validatingCheckpointHandler, 2);
        Buffer barrier = barrier(1L, CheckpointType.CHECKPOINT, System.currentTimeMillis(), Long.MAX_VALUE);
        send(barrier, buildGate, 0);
        sendBuffer(1000, buildGate, 1);
        Assert.assertEquals(0L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
        send(barrier, buildGate, 1);
        Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
    }

    @Test
    public void testMetricsAlternation() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate buildGate = buildGate(validatingCheckpointHandler, 2);
        long nanoTime = System.nanoTime();
        long currentTimeMillis = System.currentTimeMillis() - 10;
        sendBarrier(1L, currentTimeMillis, CheckpointType.CHECKPOINT, buildGate, 0);
        sendBuffer(1000, buildGate, 0);
        sendBuffer(1000, buildGate, 1);
        Thread.sleep(6L);
        sendBarrier(1L, currentTimeMillis, CheckpointType.CHECKPOINT, buildGate, 1);
        sendBuffer(1000, buildGate, 0);
        assertMetrics(validatingCheckpointHandler, buildGate.getCheckpointBarrierHandler(), 1L, nanoTime, 6000000L, 10000000L, 1000 * 2);
        long nanoTime2 = System.nanoTime();
        long currentTimeMillis2 = System.currentTimeMillis() - 5;
        sendBarrier(2L, currentTimeMillis2, CheckpointType.SAVEPOINT, buildGate, 0);
        sendBuffer(1000, buildGate, 1);
        assertMetrics(validatingCheckpointHandler, buildGate.getCheckpointBarrierHandler(), 2L, nanoTime2, 0L, 5000000L, 1000 * 2);
        Thread.sleep(5L);
        sendBarrier(2L, currentTimeMillis2, CheckpointType.SAVEPOINT, buildGate, 1);
        sendBuffer(1000, buildGate, 0);
        assertMetrics(validatingCheckpointHandler, buildGate.getCheckpointBarrierHandler(), 2L, nanoTime2, 5000000L, 5000000L, 1000);
        long nanoTime3 = System.nanoTime();
        sendBarrier(3L, System.currentTimeMillis() - 7, CheckpointType.CHECKPOINT, buildGate, 0);
        sendBuffer(1000, buildGate, 0);
        sendBuffer(1000, buildGate, 1);
        assertMetrics(validatingCheckpointHandler, buildGate.getCheckpointBarrierHandler(), 3L, nanoTime3, 0L, 7000000L, -1L);
        Thread.sleep(10L);
        sendBarrier(3L, currentTimeMillis2, CheckpointType.CHECKPOINT, buildGate, 1);
        assertMetrics(validatingCheckpointHandler, buildGate.getCheckpointBarrierHandler(), 3L, nanoTime3, 10000000L, 7000000L, 1000 * 2);
    }

    @Test
    public void testMetricsSingleChannel() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate buildGate = buildGate(validatingCheckpointHandler, 1);
        long currentTimeMillis = System.currentTimeMillis() - 10;
        long nanoTime = System.nanoTime();
        sendBuffer(1000, buildGate, 0);
        sendBarrier(1L, currentTimeMillis, CheckpointType.CHECKPOINT, buildGate, 0);
        sendBuffer(1000, buildGate, 0);
        Thread.sleep(6L);
        assertMetrics(validatingCheckpointHandler, buildGate.getCheckpointBarrierHandler(), 1L, nanoTime, 0L, 10000000L, 0L);
        long currentTimeMillis2 = System.currentTimeMillis() - 5;
        long nanoTime2 = System.nanoTime();
        sendBuffer(1000, buildGate, 0);
        sendBarrier(2L, currentTimeMillis2, CheckpointType.SAVEPOINT, buildGate, 0);
        sendBuffer(1000, buildGate, 0);
        Thread.sleep(5L);
        assertMetrics(validatingCheckpointHandler, buildGate.getCheckpointBarrierHandler(), 2L, nanoTime2, 0L, 5000000L, 0L);
    }

    private void assertMetrics(ValidatingCheckpointHandler validatingCheckpointHandler, CheckpointBarrierHandler checkpointBarrierHandler, long j, long j2, long j3, long j4, long j5) {
        MatcherAssert.assertThat(Long.valueOf(checkpointBarrierHandler.getLatestCheckpointId()), Matchers.equalTo(Long.valueOf(j)));
        long alignmentDurationNanos = checkpointBarrierHandler.getAlignmentDurationNanos();
        long nanoTime = System.nanoTime() - j2;
        MatcherAssert.assertThat(Long.valueOf(alignmentDurationNanos), Matchers.greaterThanOrEqualTo(Long.valueOf(j3)));
        MatcherAssert.assertThat(Long.valueOf(alignmentDurationNanos), Matchers.lessThanOrEqualTo(Long.valueOf(nanoTime)));
        MatcherAssert.assertThat(Long.valueOf(checkpointBarrierHandler.getCheckpointStartDelayNanos()), Matchers.greaterThanOrEqualTo(Long.valueOf(j4)));
        MatcherAssert.assertThat(FutureUtils.getOrDefault(validatingCheckpointHandler.getLastBytesProcessedDuringAlignment(), -1L), Matchers.equalTo(Long.valueOf(j5)));
    }

    @Test
    public void testPreviousHandlerReset() throws Exception {
        SingleInputGate build = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        TestInputChannel[] testInputChannelArr = {new TestInputChannel(build, 0), new TestInputChannel(build, 1)};
        build.setInputChannels(testInputChannelArr);
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        SingleCheckpointBarrierHandler barrierHandler = barrierHandler(build, validatingCheckpointHandler);
        for (int i = 0; i < 4; i++) {
            int i2 = i % 2;
            CheckpointType checkpointType = i2 == 0 ? CheckpointType.SAVEPOINT : CheckpointType.CHECKPOINT;
            validatingCheckpointHandler.setNextExpectedCheckpointId(-1L);
            if (checkpointType.isSavepoint()) {
                testInputChannelArr[i2].setBlocked(true);
            }
            barrierHandler.processBarrier(new CheckpointBarrier(i, System.currentTimeMillis(), new CheckpointOptions(checkpointType, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, i2));
            if (checkpointType.isSavepoint()) {
                TestCase.assertTrue(testInputChannelArr[i2].isBlocked());
                Assert.assertFalse(testInputChannelArr[(i2 + 1) % 2].isBlocked());
            } else {
                Assert.assertFalse(testInputChannelArr[0].isBlocked());
                Assert.assertFalse(testInputChannelArr[1].isBlocked());
            }
            TestCase.assertTrue(barrierHandler.isCheckpointPending());
            Assert.assertFalse(barrierHandler.getAllBarriersReceivedFuture(i).isDone());
            testInputChannelArr[0].setBlocked(false);
            testInputChannelArr[1].setBlocked(false);
        }
    }

    @Test
    public void testHasInflightDataBeforeProcessBarrier() throws Exception {
        SingleInputGate build = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        build.setInputChannels(new InputChannel[]{new TestInputChannel(build, 0), new TestInputChannel(build, 1)});
        SingleCheckpointBarrierHandler barrierHandler = barrierHandler(build, new ValidatingCheckpointHandler());
        barrierHandler.processBarrier(new CheckpointBarrier(1L, System.currentTimeMillis(), new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 0));
        Assert.assertFalse(barrierHandler.getAllBarriersReceivedFuture(1L).isDone());
    }

    @Test
    public void testOutOfOrderBarrier() throws Exception {
        SingleInputGate build = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        InputChannel testInputChannel = new TestInputChannel(build, 0);
        InputChannel testInputChannel2 = new TestInputChannel(build, 1);
        build.setInputChannels(new InputChannel[]{testInputChannel, testInputChannel2});
        SingleCheckpointBarrierHandler barrierHandler = barrierHandler(build, new ValidatingCheckpointHandler());
        barrierHandler.processBarrier(new CheckpointBarrier(10L, System.currentTimeMillis(), new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 0));
        testInputChannel2.setBlocked(true);
        barrierHandler.processBarrier(new CheckpointBarrier(5L, System.currentTimeMillis(), new CheckpointOptions(CheckpointType.SAVEPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 1));
        Assert.assertEquals(10L, barrierHandler.getLatestCheckpointId());
        Assert.assertFalse(testInputChannel2.isBlocked());
    }

    private void testBarrierHandling(CheckpointType checkpointType) throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        SingleInputGate build = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        InputChannel testInputChannel = new TestInputChannel(build, 0, false, true);
        InputChannel testInputChannel2 = new TestInputChannel(build, 1, false, true);
        build.setInputChannels(new InputChannel[]{testInputChannel, testInputChannel2});
        CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate(build, barrierHandler(build, validatingCheckpointHandler), new SyncMailboxExecutor());
        if (checkpointType.isSavepoint()) {
            testInputChannel.setBlocked(true);
            testInputChannel2.setBlocked(true);
        }
        sendBarrier(123L, checkpointType, testInputChannel, checkpointedInputGate);
        Assert.assertEquals(Boolean.valueOf(checkpointType.isSavepoint()), Boolean.valueOf(validatingCheckpointHandler.triggeredCheckpoints.isEmpty()));
        sendBarrier(123L, checkpointType, testInputChannel2, checkpointedInputGate);
        Assert.assertEquals(Collections.singletonList(123L), validatingCheckpointHandler.triggeredCheckpoints);
        if (checkpointType.isSavepoint()) {
            for (TestInputChannel testInputChannel3 : build.getInputChannels().values()) {
                Assert.assertFalse(String.format("channel %d should be resumed", Integer.valueOf(testInputChannel3.getChannelIndex())), testInputChannel3.isBlocked());
            }
        }
    }

    private void sendBarrier(long j, long j2, CheckpointType checkpointType, CheckpointedInputGate checkpointedInputGate, int i) throws Exception {
        send(barrier(j, checkpointType, j2), checkpointedInputGate, i);
    }

    private void send(Buffer buffer, CheckpointedInputGate checkpointedInputGate, int i) throws Exception {
        checkpointedInputGate.getChannel(i).read(buffer.retainBuffer());
        do {
        } while (checkpointedInputGate.pollNext().isPresent());
    }

    private void sendBarrier(long j, CheckpointType checkpointType, TestInputChannel testInputChannel, CheckpointedInputGate checkpointedInputGate) throws Exception {
        testInputChannel.read(barrier(j, checkpointType).retainBuffer());
        do {
        } while (checkpointedInputGate.pollNext().isPresent());
    }

    private void sendBuffer(int i, CheckpointedInputGate checkpointedInputGate, int i2) throws Exception {
        checkpointedInputGate.getChannel(i2).read(TestBufferFactory.createBuffer(i));
        do {
        } while (checkpointedInputGate.pollNext().isPresent());
    }

    public static SingleCheckpointBarrierHandler barrierHandler(SingleInputGate singleInputGate, AbstractInvokable abstractInvokable) {
        return barrierHandler(singleInputGate, abstractInvokable, ChannelStateWriter.NO_OP);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static SingleCheckpointBarrierHandler barrierHandler(SingleInputGate singleInputGate, AbstractInvokable abstractInvokable, ChannelStateWriter channelStateWriter) {
        return new SingleCheckpointBarrierHandler("test", abstractInvokable, singleInputGate.getNumberOfInputChannels(), new AlternatingController(new AlignedController(new CheckpointableInput[]{singleInputGate}), new UnalignedController(new TestSubtaskCheckpointCoordinator(channelStateWriter), new CheckpointableInput[]{singleInputGate})));
    }

    private Buffer barrier(long j, CheckpointType checkpointType) throws IOException {
        return barrier(j, checkpointType, System.currentTimeMillis());
    }

    private Buffer barrier(long j, CheckpointType checkpointType, long j2) throws IOException {
        return barrier(j, checkpointType, j2, 0L);
    }

    private Buffer barrier(long j, CheckpointType checkpointType, long j2, long j3) throws IOException {
        CheckpointOptions create = CheckpointOptions.create(checkpointType, CheckpointStorageLocationReference.getDefault(), true, true, j3);
        return EventSerializer.toBuffer(new CheckpointBarrier(j, j2, create), create.isUnalignedCheckpoint());
    }

    private static CheckpointedInputGate buildGate(AbstractInvokable abstractInvokable, int i) {
        SingleInputGate build = new SingleInputGateBuilder().setNumberOfChannels(i).build();
        TestInputChannel[] testInputChannelArr = new TestInputChannel[i];
        for (int i2 = 0; i2 < i; i2++) {
            testInputChannelArr[i2] = new TestInputChannel(build, i2, false, true);
        }
        build.setInputChannels(testInputChannelArr);
        return new CheckpointedInputGate(build, barrierHandler(build, abstractInvokable), new SyncMailboxExecutor());
    }
}
