package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.TestCheckpointStorageWorkerView;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.MoreExecutors;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.class */
public class SubtaskCheckpointCoordinatorTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest$BlockingRunnableFuture.class */
    private static final class BlockingRunnableFuture implements RunnableFuture<SnapshotResult<KeyedStateHandle>> {
        private final CompletableFuture<SnapshotResult<KeyedStateHandle>> future;
        private final OneShotLatch signalRunLatch;
        private final CountDownLatch countDownLatch;
        private final SnapshotResult<KeyedStateHandle> value;

        private BlockingRunnableFuture() {
            this.future = new CompletableFuture<>();
            this.signalRunLatch = new OneShotLatch();
            this.countDownLatch = new CountDownLatch(2);
            this.value = SnapshotResult.empty();
        }

        @Override // java.util.concurrent.RunnableFuture, java.lang.Runnable
        public void run() {
            this.signalRunLatch.trigger();
            this.countDownLatch.countDown();
            try {
                this.countDownLatch.await();
            } catch (InterruptedException e) {
                ExceptionUtils.rethrow(e);
            }
            this.future.complete(this.value);
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            this.future.cancel(z);
            return true;
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return this.future.isCancelled();
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return this.future.isDone();
        }

        @Override // java.util.concurrent.Future
        public SnapshotResult<KeyedStateHandle> get() throws InterruptedException, ExecutionException {
            return this.future.get();
        }

        @Override // java.util.concurrent.Future
        public SnapshotResult<KeyedStateHandle> get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException {
            return this.future.get();
        }

        void awaitRun() throws InterruptedException {
            this.signalRunLatch.await();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest$CheckpointOperator.class */
    private static class CheckpointOperator implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 1;
        private final OperatorSnapshotFutures operatorSnapshotFutures;
        private boolean checkpointed = false;

        CheckpointOperator(OperatorSnapshotFutures operatorSnapshotFutures) {
            this.operatorSnapshotFutures = operatorSnapshotFutures;
        }

        boolean isCheckpointed() {
            return this.checkpointed;
        }

        public void open() throws Exception {
        }

        public void close() throws Exception {
        }

        public void dispose() {
        }

        public void prepareSnapshotPreBarrier(long j) {
        }

        public OperatorSnapshotFutures snapshotState(long j, long j2, CheckpointOptions checkpointOptions, CheckpointStreamFactory checkpointStreamFactory) throws Exception {
            this.checkpointed = true;
            return this.operatorSnapshotFutures;
        }

        public void initializeState(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
        }

        public void setKeyContextElement1(StreamRecord<?> streamRecord) {
        }

        public void setKeyContextElement2(StreamRecord<?> streamRecord) {
        }

        public MetricGroup getMetricGroup() {
            return null;
        }

        public OperatorID getOperatorID() {
            return new OperatorID();
        }

        public void notifyCheckpointComplete(long j) {
        }

        public void notifyCheckpointAborted(long j) {
        }

        public void setCurrentKey(Object obj) {
        }

        public Object getCurrentKey() {
            return null;
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
        }

        public void processWatermark(Watermark watermark) throws Exception {
        }

        public void processLatencyMarker(LatencyMarker latencyMarker) {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest$MapOperator.class */
    private static class MapOperator extends StreamMap<String, String> {
        private static final long serialVersionUID = 1;

        public MapOperator() {
            super(str -> {
                return str;
            });
        }

        public void notifyCheckpointAborted(long j) throws Exception {
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -2104837527:
                    if (implMethodName.equals("lambda$new$e0defa2f$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest$MapOperator") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/String;")) {
                        return str -> {
                            return str;
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    @Test
    public void testInitCheckpoint() throws IOException {
        Assert.assertTrue(initCheckpoint(true, CheckpointType.CHECKPOINT));
        Assert.assertFalse(initCheckpoint(false, CheckpointType.CHECKPOINT));
        Assert.assertFalse(initCheckpoint(false, CheckpointType.SAVEPOINT));
    }

    private boolean initCheckpoint(boolean z, CheckpointType checkpointType) throws IOException {
        ChannelStateWriter channelStateWriter = new ChannelStateWriter.NoOpChannelStateWriter() { // from class: org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorTest.1MockWriter
            private boolean started;

            public void start(long j, CheckpointOptions checkpointOptions) {
                this.started = true;
            }
        };
        coordinator(z, channelStateWriter).initCheckpoint(1L, new CheckpointOptions(checkpointType, CheckpointStorageLocationReference.getDefault(), true, z, 0L));
        return ((C1MockWriter) channelStateWriter).started;
    }

    @Test
    public void testNotifyCheckpointComplete() throws Exception {
        TestTaskStateManager testTaskStateManager = new TestTaskStateManager();
        Environment build = MockEnvironment.builder().setTaskStateManager(testTaskStateManager).build();
        SubtaskCheckpointCoordinator build2 = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment(build).build();
        OperatorChain<?, ?> operatorChain = getOperatorChain(build);
        build2.notifyCheckpointComplete(42L, operatorChain, () -> {
            return true;
        });
        Assert.assertEquals(42L, testTaskStateManager.getNotifiedCompletedCheckpointId());
        long j = 42 + 1;
        build2.notifyCheckpointComplete(j, operatorChain, () -> {
            return false;
        });
        Assert.assertEquals(j, testTaskStateManager.getNotifiedCompletedCheckpointId());
    }

    @Test
    public void testSavepointNotResultingInPriorityEvents() throws Exception {
        Environment build = MockEnvironment.builder().build();
        SubtaskCheckpointCoordinator build2 = new MockSubtaskCheckpointCoordinatorBuilder().setUnalignedCheckpointEnabled(true).setEnvironment(build).build();
        final AtomicReference atomicReference = new AtomicReference(null);
        build2.checkpointState(new CheckpointMetaData(0L, 0L), new CheckpointOptions(CheckpointType.SAVEPOINT, CheckpointStorageLocationReference.getDefault()), new CheckpointMetricsBuilder(), new OperatorChain(new MockStreamTaskBuilder(build).build(), new NonRecordWriter()) { // from class: org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorTest.1
            public void broadcastEvent(AbstractEvent abstractEvent, boolean z) throws IOException {
                super.broadcastEvent(abstractEvent, z);
                atomicReference.set(Boolean.valueOf(z));
            }
        }, () -> {
            return true;
        });
        Assert.assertEquals(false, atomicReference.get());
    }

    @Test
    public void testSkipChannelStateForSavepoints() throws Exception {
        new MockSubtaskCheckpointCoordinatorBuilder().setUnalignedCheckpointEnabled(true).setPrepareInputSnapshot((channelStateWriter, l) -> {
            Assert.fail("should not prepare input snapshot for savepoint");
            return null;
        }).build().checkpointState(new CheckpointMetaData(0L, 0L), new CheckpointOptions(CheckpointType.SAVEPOINT, CheckpointStorageLocationReference.getDefault()), new CheckpointMetricsBuilder(), new OperatorChain(new StreamTaskTest.NoOpStreamTask(new DummyEnvironment()), new NonRecordWriter()), () -> {
            return true;
        });
    }

    @Test
    public void testNotifyCheckpointAbortedManyTimes() throws Exception {
        Environment build = MockEnvironment.builder().build();
        SubtaskCheckpointCoordinatorImpl build2 = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment(build).setMaxRecordAbortedCheckpoints(256).build();
        OperatorChain<?, ?> operatorChain = getOperatorChain(build);
        long j = 256 + 42;
        for (int i = 1; i < j; i++) {
            build2.notifyCheckpointAborted(i, operatorChain, () -> {
                return true;
            });
            Assert.assertEquals(Math.min(256, i), build2.getAbortedCheckpointSize());
        }
    }

    @Test
    public void testNotifyCheckpointAbortedBeforeAsyncPhase() throws Exception {
        TestTaskStateManager testTaskStateManager = new TestTaskStateManager();
        SubtaskCheckpointCoordinatorImpl build = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment(MockEnvironment.builder().setTaskStateManager(testTaskStateManager).build()).setUnalignedCheckpointEnabled(true).build();
        CheckpointOperator checkpointOperator = new CheckpointOperator(new OperatorSnapshotFutures());
        OperatorChain operatorChain = operatorChain(checkpointOperator);
        build.notifyCheckpointAborted(42L, operatorChain, () -> {
            return true;
        });
        Assert.assertEquals(1L, build.getAbortedCheckpointSize());
        build.getChannelStateWriter().start(42L, CheckpointOptions.forCheckpointWithDefaultLocation());
        build.checkpointState(new CheckpointMetaData(42L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder(), operatorChain, () -> {
            return false;
        });
        Assert.assertFalse(checkpointOperator.isCheckpointed());
        Assert.assertEquals(-1L, testTaskStateManager.getReportedCheckpointId());
        Assert.assertEquals(0L, build.getAbortedCheckpointSize());
        Assert.assertEquals(0L, build.getAsyncCheckpointRunnableSize());
    }

    @Test
    public void testBroadcastCancelCheckpointMarkerOnAbortingFromCoordinator() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 1, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = oneInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new MapOperator());
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        Environment build = MockEnvironment.builder().build();
        SubtaskCheckpointCoordinator build2 = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment(build).build();
        ArrayList arrayList = new ArrayList();
        build.addOutputs(Collections.singletonList(new RecordOrEventCollectingResultPartitionWriter(arrayList, new StreamElementSerializer(StringSerializer.INSTANCE))));
        OperatorChain operatorChain = new OperatorChain(oneInputStreamTaskTestHarness.mo134getTask(), StreamTask.createRecordWriterDelegate(streamConfig, build));
        build2.notifyCheckpointAborted(42L, operatorChain, () -> {
            return true;
        });
        build2.checkpointState(new CheckpointMetaData(42L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder(), operatorChain, () -> {
            return false;
        });
        Assert.assertEquals(1L, arrayList.size());
        Object obj = arrayList.get(0);
        Assert.assertTrue(obj instanceof CancelCheckpointMarker);
        Assert.assertEquals(42L, ((CancelCheckpointMarker) obj).getCheckpointId());
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
    }

    @Test
    public void testNotifyCheckpointAbortedDuringAsyncPhase() throws Exception {
        SubtaskCheckpointCoordinatorImpl build = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment(MockEnvironment.builder().build()).setExecutor(Executors.newSingleThreadExecutor()).setUnalignedCheckpointEnabled(true).build();
        BlockingRunnableFuture blockingRunnableFuture = new BlockingRunnableFuture();
        OperatorChain operatorChain = operatorChain(new CheckpointOperator(new OperatorSnapshotFutures(DoneFuture.of(SnapshotResult.empty()), blockingRunnableFuture, DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()))));
        build.getChannelStateWriter().start(42L, CheckpointOptions.forCheckpointWithDefaultLocation());
        build.checkpointState(new CheckpointMetaData(42L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder(), operatorChain, () -> {
            return false;
        });
        blockingRunnableFuture.awaitRun();
        Assert.assertEquals(1L, build.getAsyncCheckpointRunnableSize());
        Assert.assertFalse(blockingRunnableFuture.isCancelled());
        build.notifyCheckpointAborted(42L, operatorChain, () -> {
            return true;
        });
        Assert.assertTrue(blockingRunnableFuture.isCancelled());
        Assert.assertEquals(0L, build.getAsyncCheckpointRunnableSize());
    }

    @Test
    public void testNotifyCheckpointAbortedAfterAsyncPhase() throws Exception {
        TestTaskStateManager testTaskStateManager = new TestTaskStateManager();
        Environment build = MockEnvironment.builder().setTaskStateManager(testTaskStateManager).build();
        SubtaskCheckpointCoordinatorImpl build2 = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment(build).build();
        OperatorChain<?, ?> operatorChain = getOperatorChain(build);
        build2.checkpointState(new CheckpointMetaData(42L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder(), operatorChain, () -> {
            return false;
        });
        build2.notifyCheckpointAborted(42L, operatorChain, () -> {
            return true;
        });
        Assert.assertEquals(0L, build2.getAbortedCheckpointSize());
        Assert.assertEquals(42L, testTaskStateManager.getNotifiedAbortedCheckpointId());
    }

    private OperatorChain<?, ?> getOperatorChain(MockEnvironment mockEnvironment) throws Exception {
        return new OperatorChain<>(new MockStreamTaskBuilder(mockEnvironment).build(), new NonRecordWriter());
    }

    private <T> OperatorChain<T, AbstractStreamOperator<T>> operatorChain(OneInputStreamOperator<T, T>... oneInputStreamOperatorArr) throws Exception {
        return OperatorChainTest.setupOperatorChain(oneInputStreamOperatorArr);
    }

    private static SubtaskCheckpointCoordinator coordinator(boolean z, ChannelStateWriter channelStateWriter) throws IOException {
        return new SubtaskCheckpointCoordinatorImpl(new TestCheckpointStorageWorkerView(100), "test", StreamTaskActionExecutor.IMMEDIATE, new CloseableRegistry(), MoreExecutors.newDirectExecutorService(), new DummyEnvironment(), (str, th) -> {
            Assert.fail(str);
        }, (channelStateWriter2, l) -> {
            return CompletableFuture.completedFuture(null);
        }, 0, channelStateWriter);
    }
}
