package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import io.lettuce.core.codec.RedisCodec;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.ReactiveGeoCommands;
import org.springframework.data.redis.connection.ReactiveHashCommands;
import org.springframework.data.redis.connection.ReactiveHyperLogLogCommands;
import org.springframework.data.redis.connection.ReactiveKeyCommands;
import org.springframework.data.redis.connection.ReactiveListCommands;
import org.springframework.data.redis.connection.ReactiveNumberCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveScriptingCommands;
import org.springframework.data.redis.connection.ReactiveServerCommands;
import org.springframework.data.redis.connection.ReactiveSetCommands;
import org.springframework.data.redis.connection.ReactiveStringCommands;
import org.springframework.data.redis.connection.ReactiveZSetCommands;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:BOOT-INF/lib/spring-data-redis-2.0.6.RELEASE.jar:org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.class */
class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
    static final RedisCodec<ByteBuffer, ByteBuffer> CODEC = ByteBufferCodec.INSTANCE;
    private final AsyncConnect dedicatedConnection;

    @Nullable
    private Mono<StatefulConnection<ByteBuffer, ByteBuffer>> sharedConnection;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/spring-data-redis-2.0.6.RELEASE.jar:org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection$AsyncConnect.class */
    public static class AsyncConnect {
        private final Mono<StatefulConnection<ByteBuffer, ByteBuffer>> connectionPublisher;
        private final LettuceConnectionProvider connectionProvider;
        private AtomicReference<State> state = new AtomicReference<>(State.INITIAL);

        @Nullable
        private volatile CompletableFuture<StatefulConnection<ByteBuffer, ByteBuffer>> connection;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:BOOT-INF/lib/spring-data-redis-2.0.6.RELEASE.jar:org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection$AsyncConnect$State.class */
        public enum State {
            INITIAL,
            CONNECTION_REQUESTED,
            CLOSING,
            CLOSED
        }

        AsyncConnect(LettuceConnectionProvider lettuceConnectionProvider) {
            Assert.notNull(lettuceConnectionProvider, "LettuceConnectionProvider must not be null!");
            this.connectionProvider = lettuceConnectionProvider;
            this.connectionPublisher = Mono.defer(() -> {
                return Mono.just(lettuceConnectionProvider.getConnection(StatefulConnection.class));
            }).subscribeOn(Schedulers.elastic());
        }

        Mono<StatefulConnection<ByteBuffer, ByteBuffer>> getConnection() {
            Thread currentThread;
            if (this.state.get() == State.CLOSED) {
                throw new IllegalStateException("Unable to connect. Connection is closed!");
            }
            CompletableFuture<StatefulConnection<ByteBuffer, ByteBuffer>> completableFuture = this.connection;
            if (completableFuture != null) {
                return Mono.fromCompletionStage(completableFuture);
            }
            if (this.state.compareAndSet(State.INITIAL, State.CONNECTION_REQUESTED)) {
                this.connection = this.connectionPublisher.toFuture();
            }
            do {
                CompletableFuture<StatefulConnection<ByteBuffer, ByteBuffer>> completableFuture2 = this.connection;
                if (completableFuture2 != null) {
                    return Mono.fromCompletionStage(completableFuture2);
                }
                currentThread = Thread.currentThread();
            } while (!currentThread.isInterrupted());
            currentThread.interrupt();
            return Mono.error(new InterruptedException());
        }

        void close() {
            if (this.state.compareAndSet(State.CONNECTION_REQUESTED, State.CLOSING)) {
                Mono<StatefulConnection<ByteBuffer, ByteBuffer>> connection = getConnection();
                LettuceConnectionProvider lettuceConnectionProvider = this.connectionProvider;
                lettuceConnectionProvider.getClass();
                connection.doOnSuccess(lettuceConnectionProvider::release).doOnSuccess(statefulConnection -> {
                    this.state.set(State.CLOSED);
                }).block();
            }
            this.state.compareAndSet(State.INITIAL, State.CLOSED);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/spring-data-redis-2.0.6.RELEASE.jar:org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection$ByteBufferCodec.class */
    enum ByteBufferCodec implements RedisCodec<ByteBuffer, ByteBuffer> {
        INSTANCE;

        /* renamed from: decodeKey, reason: merged with bridge method [inline-methods] */
        public ByteBuffer m13782decodeKey(ByteBuffer byteBuffer) {
            ByteBuffer allocate = ByteBuffer.allocate(byteBuffer.remaining());
            allocate.put(byteBuffer);
            allocate.flip();
            return allocate;
        }

        /* renamed from: decodeValue, reason: merged with bridge method [inline-methods] */
        public ByteBuffer m13781decodeValue(ByteBuffer byteBuffer) {
            return m13782decodeKey(byteBuffer);
        }

        public ByteBuffer encodeKey(ByteBuffer byteBuffer) {
            return byteBuffer.duplicate();
        }

        public ByteBuffer encodeValue(ByteBuffer byteBuffer) {
            return byteBuffer.duplicate();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/spring-data-redis-2.0.6.RELEASE.jar:org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection$LettuceReactiveCallback.class */
    public interface LettuceReactiveCallback<T> {
        Publisher<T> doWithCommands(RedisClusterReactiveCommands<ByteBuffer, ByteBuffer> redisClusterReactiveCommands);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LettuceReactiveRedisConnection(LettuceConnectionProvider lettuceConnectionProvider) {
        Assert.notNull(lettuceConnectionProvider, "LettuceConnectionProvider must not be null!");
        this.dedicatedConnection = new AsyncConnect(lettuceConnectionProvider);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LettuceReactiveRedisConnection(StatefulConnection<ByteBuffer, ByteBuffer> statefulConnection, LettuceConnectionProvider lettuceConnectionProvider) {
        Assert.notNull(statefulConnection, "Shared StatefulConnection must not be null!");
        Assert.notNull(lettuceConnectionProvider, "LettuceConnectionProvider must not be null!");
        this.dedicatedConnection = new AsyncConnect(lettuceConnectionProvider);
        this.sharedConnection = Mono.just(statefulConnection);
    }

    @Override // org.springframework.data.redis.connection.ReactiveRedisConnection
    public ReactiveKeyCommands keyCommands() {
        return new LettuceReactiveKeyCommands(this);
    }

    @Override // org.springframework.data.redis.connection.ReactiveRedisConnection
    public ReactiveStringCommands stringCommands() {
        return new LettuceReactiveStringCommands(this);
    }

    @Override // org.springframework.data.redis.connection.ReactiveRedisConnection
    public ReactiveNumberCommands numberCommands() {
        return new LettuceReactiveNumberCommands(this);
    }

    @Override // org.springframework.data.redis.connection.ReactiveRedisConnection
    public ReactiveListCommands listCommands() {
        return new LettuceReactiveListCommands(this);
    }

    @Override // org.springframework.data.redis.connection.ReactiveRedisConnection
    public ReactiveSetCommands setCommands() {
        return new LettuceReactiveSetCommands(this);
    }

    @Override // org.springframework.data.redis.connection.ReactiveRedisConnection
    public ReactiveZSetCommands zSetCommands() {
        return new LettuceReactiveZSetCommands(this);
    }

    @Override // org.springframework.data.redis.connection.ReactiveRedisConnection
    public ReactiveHashCommands hashCommands() {
        return new LettuceReactiveHashCommands(this);
    }

    @Override // org.springframework.data.redis.connection.ReactiveRedisConnection
    public ReactiveGeoCommands geoCommands() {
        return new LettuceReactiveGeoCommands(this);
    }

    @Override // org.springframework.data.redis.connection.ReactiveRedisConnection
    public ReactiveHyperLogLogCommands hyperLogLogCommands() {
        return new LettuceReactiveHyperLogLogCommands(this);
    }

    @Override // org.springframework.data.redis.connection.ReactiveRedisConnection
    public ReactiveScriptingCommands scriptingCommands() {
        return new LettuceReactiveScriptingCommands(this);
    }

    @Override // org.springframework.data.redis.connection.ReactiveRedisConnection
    public ReactiveServerCommands serverCommands() {
        return new LettuceReactiveServerCommands(this);
    }

    @Override // org.springframework.data.redis.connection.ReactiveRedisConnection
    public Mono<String> ping() {
        return execute((v0) -> {
            return v0.ping();
        }).next();
    }

    public <T> Flux<T> execute(LettuceReactiveCallback<T> lettuceReactiveCallback) {
        Mono<? extends RedisClusterReactiveCommands<ByteBuffer, ByteBuffer>> commands = getCommands();
        lettuceReactiveCallback.getClass();
        return commands.flatMapMany(lettuceReactiveCallback::doWithCommands).onErrorMap(translateException());
    }

    public <T> Flux<T> executeDedicated(LettuceReactiveCallback<T> lettuceReactiveCallback) {
        Mono<? extends RedisClusterReactiveCommands<ByteBuffer, ByteBuffer>> dedicatedCommands = getDedicatedCommands();
        lettuceReactiveCallback.getClass();
        return dedicatedCommands.flatMapMany(lettuceReactiveCallback::doWithCommands).onErrorMap(translateException());
    }

    @Override // org.springframework.data.redis.connection.ReactiveRedisConnection, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.dedicatedConnection.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<? extends StatefulConnection<ByteBuffer, ByteBuffer>> getConnection() {
        return this.sharedConnection != null ? this.sharedConnection : getDedicatedConnection();
    }

    protected Mono<StatefulConnection<ByteBuffer, ByteBuffer>> getDedicatedConnection() {
        return this.dedicatedConnection.getConnection().onErrorMap(translateException());
    }

    protected Mono<? extends RedisClusterReactiveCommands<ByteBuffer, ByteBuffer>> getCommands() {
        return this.sharedConnection != null ? this.sharedConnection.map(LettuceReactiveRedisConnection::getRedisClusterReactiveCommands) : getDedicatedCommands();
    }

    protected Mono<? extends RedisClusterReactiveCommands<ByteBuffer, ByteBuffer>> getDedicatedCommands() {
        return this.dedicatedConnection.getConnection().map(LettuceReactiveRedisConnection::getRedisClusterReactiveCommands);
    }

    private static RedisClusterReactiveCommands<ByteBuffer, ByteBuffer> getRedisClusterReactiveCommands(StatefulConnection<ByteBuffer, ByteBuffer> statefulConnection) {
        if (statefulConnection instanceof StatefulRedisConnection) {
            return ((StatefulRedisConnection) statefulConnection).reactive();
        }
        if (statefulConnection instanceof StatefulRedisClusterConnection) {
            return ((StatefulRedisClusterConnection) statefulConnection).reactive();
        }
        throw new IllegalStateException("o.O unknown connection type " + statefulConnection);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> Function<Throwable, Throwable> translateException() {
        return th -> {
            DataAccessException convert2;
            if ((th instanceof RuntimeException) && (convert2 = LettuceConverters.exceptionConverter().convert2((RuntimeException) th)) != null) {
                return convert2;
            }
            return th;
        };
    }
}
