package net.openhft.chronicle.tcp;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import net.openhft.chronicle.Chronicle;
import net.openhft.chronicle.ChronicleQueueBuilder;
import net.openhft.chronicle.ExcerptAppender;
import net.openhft.chronicle.ExcerptTailer;
import net.openhft.chronicle.IndexedChronicle;
import net.openhft.chronicle.MappingFunction;
import net.openhft.chronicle.MappingProvider;
import net.openhft.chronicle.VanillaChronicle;
import net.openhft.lang.io.Bytes;
import net.openhft.lang.io.DirectByteBufferBytes;
import net.openhft.lang.io.IByteBufferBytes;
import net.openhft.lang.model.constraints.NotNull;
import net.openhft.lang.thread.LightPauser;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/openhft/chronicle/tcp/SourceTcp.class */
public abstract class SourceTcp {
    protected final Logger logger;
    protected final String name;
    protected final AtomicBoolean running = new AtomicBoolean(false);
    protected final ChronicleQueueBuilder.ReplicaChronicleQueueBuilder builder;
    protected final ThreadPoolExecutor executor;
    protected final LightPauser pauser;

    /* loaded from: input_file:net/openhft/chronicle/tcp/SourceTcp$IndexedSessionHandler.class */
    private class IndexedSessionHandler extends SessionHandler {
        private long index;

        private IndexedSessionHandler(@NotNull SocketChannel socketChannel) {
            super(socketChannel);
            this.index = -1L;
        }

        @Override // net.openhft.chronicle.tcp.SourceTcp.SessionHandler
        protected boolean onSubscribe(SelectionKey selectionKey, long j) throws IOException {
            this.index = j;
            if (this.index == -1) {
                this.index = -1L;
            } else if (this.index == -2) {
                this.index = this.tailer.toEnd().index();
            }
            sendSizeAndIndex(ChronicleTcp.SYNC_IDX_LEN, this.index);
            selectionKey.interestOps(5);
            return true;
        }

        @Override // net.openhft.chronicle.tcp.SourceTcp.SessionHandler
        protected boolean onSubmit(SelectionKey selectionKey, long j, boolean z) throws IOException {
            if (!z) {
                return true;
            }
            sendSizeAndIndex(ChronicleTcp.NACK_LEN, -4L);
            return true;
        }

        @Override // net.openhft.chronicle.tcp.SourceTcp.SessionHandler
        protected boolean write(Object obj) throws IOException {
            if (!this.tailer.index(this.index)) {
                if (this.tailer.wasPadding()) {
                    if (this.index >= 0) {
                        sendSizeAndIndex(ChronicleTcp.PADDED_LEN, this.tailer.index());
                    }
                    this.index++;
                }
                pause();
                if (SourceTcp.this.running.get() && !this.tailer.index(this.index)) {
                    return false;
                }
            }
            pauseReset();
            Bytes applyMapping = applyMapping(this.tailer, obj);
            int limit = (int) applyMapping.limit();
            this.writeBuffer.clear();
            this.writeBuffer.putInt(limit);
            this.writeBuffer.putLong(this.tailer.index());
            if (limit > this.writeBuffer.capacity() / 2) {
                while (limit > 0) {
                    int min = Math.min(limit, this.writeBuffer.remaining());
                    applyMapping.read(this.writeBuffer, min);
                    this.writeBuffer.flip();
                    this.connection.writeAll(this.writeBuffer);
                    limit -= min;
                    if (limit > 0) {
                        this.writeBuffer.clear();
                    }
                }
            } else {
                applyMapping.read(this.writeBuffer, limit);
                int maxExcerptsPerMessage = SourceTcp.this.builder.maxExcerptsPerMessage();
                while (maxExcerptsPerMessage > 0 && this.tailer.index(this.index + 1)) {
                    if (!this.tailer.wasPadding()) {
                        Bytes applyMapping2 = applyMapping(this.tailer, obj);
                        if (!hasRoomForExcerpt(this.writeBuffer, applyMapping2)) {
                            break;
                        }
                        int limit2 = (int) applyMapping2.limit();
                        this.writeBuffer.putInt(limit2);
                        this.writeBuffer.putLong(this.tailer.index());
                        applyMapping2.read(this.writeBuffer, limit2);
                        this.index++;
                        maxExcerptsPerMessage--;
                        this.tailer.finish();
                    } else {
                        if (!hasRoomFor(this.writeBuffer, 12L)) {
                            break;
                        }
                        this.writeBuffer.putInt(ChronicleTcp.PADDED_LEN);
                        this.writeBuffer.putLong(this.index);
                        this.index++;
                    }
                }
                this.writeBuffer.flip();
                this.connection.writeAll(this.writeBuffer);
            }
            if (this.writeBuffer.remaining() > 0) {
                throw new EOFException("Failed to send index=" + this.index);
            }
            this.index++;
            return true;
        }
    }

    /* loaded from: input_file:net/openhft/chronicle/tcp/SourceTcp$SessionHandler.class */
    private abstract class SessionHandler implements Runnable, Closeable {
        private final SocketChannel socketChannel;
        private long lastUnpausedNS;
        protected final TcpConnection connection;
        protected ExcerptTailer tailer;
        protected ExcerptAppender appender;
        protected long lastHeartbeat;
        protected final ByteBuffer writeBuffer;
        protected IByteBufferBytes readBuffer;
        private Bytes withMappedBuffer;

        private SessionHandler(@NotNull SocketChannel socketChannel) {
            this.socketChannel = socketChannel;
            this.connection = new TcpConnection(socketChannel);
            this.tailer = null;
            this.appender = null;
            this.lastHeartbeat = 0L;
            this.lastUnpausedNS = 0L;
            this.readBuffer = new DirectByteBufferBytes(16);
            this.readBuffer.clearThreadAssociation();
            this.writeBuffer = ChronicleTcp.createBuffer(SourceTcp.this.builder.minBufferSize());
            this.writeBuffer.limit(0);
            this.withMappedBuffer = new DirectByteBufferBytes(1024);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.tailer != null) {
                this.tailer.close();
                this.tailer = null;
            }
            if (this.appender != null) {
                this.appender.close();
                this.appender = null;
            }
            if (this.socketChannel.isOpen()) {
                this.socketChannel.close();
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            VanillaSelectionKeySet vanillaSelectionKeySet = null;
            try {
                try {
                    this.socketChannel.configureBlocking(false);
                    this.socketChannel.socket().setTcpNoDelay(true);
                    this.socketChannel.socket().setSoTimeout(0);
                    this.socketChannel.socket().setSoLinger(false, 0);
                    if (SourceTcp.this.builder.receiveBufferSize() > 0) {
                        this.socketChannel.socket().setReceiveBufferSize(SourceTcp.this.builder.receiveBufferSize());
                    }
                    if (SourceTcp.this.builder.sendBufferSize() > 0) {
                        this.socketChannel.socket().setSendBufferSize(SourceTcp.this.builder.sendBufferSize());
                    }
                    VanillaSelector register = new VanillaSelector().open().register(this.socketChannel, 1, new Attached());
                    this.tailer = SourceTcp.this.builder.chronicle().createTailer();
                    this.appender = SourceTcp.this.builder.chronicle().createAppender();
                    VanillaSelectionKeySet vanillaSelectionKeys = register.vanillaSelectionKeys();
                    if (vanillaSelectionKeys != null) {
                        vanillaNioLoop(register, vanillaSelectionKeys);
                    } else {
                        nioLoop(register);
                    }
                    if (vanillaSelectionKeys != null) {
                        vanillaSelectionKeys.clear();
                    }
                } catch (EOFException e) {
                    if (SourceTcp.this.running.get()) {
                        SourceTcp.this.logger.info("Connection {} died", this.socketChannel);
                    }
                    if (0 != 0) {
                        vanillaSelectionKeySet.clear();
                    }
                } catch (Exception e2) {
                    if (SourceTcp.this.running.get()) {
                        String message = e2.getMessage();
                        if (message == null || !(message.contains("reset by peer") || message.contains("Broken pipe") || message.contains("was aborted by"))) {
                            SourceTcp.this.logger.info("Connection {} died", this.socketChannel, e2);
                        } else {
                            SourceTcp.this.logger.info("Connection {} closed from the other end: ", this.socketChannel, e2.getMessage());
                        }
                    }
                    if (0 != 0) {
                        vanillaSelectionKeySet.clear();
                    }
                }
                try {
                    close();
                } catch (IOException e3) {
                    SourceTcp.this.logger.warn("", (Throwable) e3);
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    vanillaSelectionKeySet.clear();
                }
                throw th;
            }
        }

        private void vanillaNioLoop(VanillaSelector vanillaSelector, VanillaSelectionKeySet vanillaSelectionKeySet) throws IOException {
            SelectionKey selectionKey;
            int selectorSpinLoopCount = SourceTcp.this.builder.selectorSpinLoopCount();
            long selectTimeout = SourceTcp.this.builder.selectTimeout();
            while (SourceTcp.this.running.get()) {
                if (vanillaSelector.select(selectorSpinLoopCount, selectTimeout) > 0) {
                    SelectionKey[] keys = vanillaSelectionKeySet.keys();
                    int size = vanillaSelectionKeySet.size();
                    for (int i = 0; i < size && ((selectionKey = keys[i]) == null || onSelectionKey(selectionKey)); i++) {
                    }
                    vanillaSelectionKeySet.clear();
                }
            }
        }

        private void nioLoop(VanillaSelector vanillaSelector) throws IOException {
            int selectorSpinLoopCount = SourceTcp.this.builder.selectorSpinLoopCount();
            long selectTimeout = SourceTcp.this.builder.selectTimeout();
            while (SourceTcp.this.running.get()) {
                if (vanillaSelector.select(selectorSpinLoopCount, selectTimeout) > 0) {
                    Set<SelectionKey> selectionKeys = vanillaSelector.selectionKeys();
                    Iterator<SelectionKey> it = selectionKeys.iterator();
                    while (it.hasNext() && onSelectionKey(it.next())) {
                    }
                    selectionKeys.clear();
                }
            }
        }

        protected boolean hasRoomForExcerpt(ByteBuffer byteBuffer, Bytes bytes) {
            return hasRoomFor(byteBuffer, bytes.remaining() + 12);
        }

        protected boolean hasRoomFor(ByteBuffer byteBuffer, long j) {
            return ((long) byteBuffer.remaining()) >= j;
        }

        protected void pauseReset() {
            this.lastUnpausedNS = System.nanoTime();
            SourceTcp.this.pauser.reset();
        }

        protected void pause() {
            if (this.lastUnpausedNS + ChronicleTcp.BUSY_WAIT_TIME_NS > System.nanoTime()) {
                return;
            }
            SourceTcp.this.pauser.pause();
        }

        protected void setLastHeartbeat() {
            this.lastHeartbeat = System.currentTimeMillis() + SourceTcp.this.builder.heartbeatIntervalMillis();
        }

        protected void setLastHeartbeat(long j) {
            this.lastHeartbeat = j + SourceTcp.this.builder.heartbeatIntervalMillis();
        }

        protected void sendSizeAndIndex(int i, long j) throws IOException {
            this.connection.writeSizeAndIndex(this.writeBuffer, i, j);
            setLastHeartbeat();
        }

        protected IByteBufferBytes readUpTo(int i) throws IOException {
            if (this.readBuffer.capacity() < i) {
                this.readBuffer = new DirectByteBufferBytes(i);
            }
            this.readBuffer.clear();
            this.readBuffer.buffer().clear();
            this.readBuffer.limit(i);
            this.readBuffer.buffer().limit(i);
            this.connection.readFullyOrEOF(this.readBuffer.buffer());
            this.readBuffer.buffer().flip();
            this.readBuffer.position(0L);
            this.readBuffer.limit(this.readBuffer.limit());
            return this.readBuffer;
        }

        protected boolean onSelectionKey(SelectionKey selectionKey) throws IOException {
            if (selectionKey != null) {
                return selectionKey.isReadable() ? onRead(selectionKey) : !selectionKey.isWritable() || onWrite(selectionKey);
            }
            return true;
        }

        protected boolean onRead(SelectionKey selectionKey) throws IOException {
            try {
                long readLong = readUpTo(8).readLong();
                switch ((int) readLong) {
                    case 1:
                        return onSubscribe(selectionKey, readUpTo(8).readLong());
                    case 2:
                        return onUnsubscribe(selectionKey, readUpTo(8).readLong());
                    case 10:
                        return onQuery(selectionKey, readUpTo(8).readLong());
                    case 20:
                        return onSubmit(selectionKey, readUpTo(8).readLong(), true);
                    case 21:
                        return onSubmit(selectionKey, readUpTo(8).readLong(), false);
                    case 30:
                        return onMapping(selectionKey, readUpTo(4).readInt());
                    default:
                        throw new IOException("Unknown action received (" + readLong + ")");
                }
            } catch (IOException e) {
                selectionKey.selector().close();
                throw e;
            }
        }

        protected boolean onWrite(SelectionKey selectionKey) throws IOException {
            long currentTimeMillis = System.currentTimeMillis();
            Object attachment = selectionKey.attachment();
            if (!SourceTcp.this.running.get() || write(attachment) || this.lastHeartbeat > currentTimeMillis) {
                return true;
            }
            sendSizeAndIndex(ChronicleTcp.IN_SYNC_LEN, 0L);
            return true;
        }

        protected boolean onMapping(SelectionKey selectionKey, int i) throws IOException {
            MappingProvider mappingProvider = (MappingProvider) selectionKey.attachment();
            if (mappingProvider == null) {
                return true;
            }
            mappingProvider.withMapping((MappingFunction) readUpTo(i).readObject(MappingFunction.class));
            return true;
        }

        protected boolean onQuery(SelectionKey selectionKey, long j) throws IOException {
            if (!this.tailer.index(j)) {
                sendSizeAndIndex(ChronicleTcp.IN_SYNC_LEN, 0L);
                return true;
            }
            long currentTimeMillis = System.currentTimeMillis();
            setLastHeartbeat(currentTimeMillis);
            while (!this.tailer.nextIndex()) {
                if (this.lastHeartbeat <= currentTimeMillis) {
                    sendSizeAndIndex(ChronicleTcp.IN_SYNC_LEN, 0L);
                    return true;
                }
            }
            sendSizeAndIndex(ChronicleTcp.SYNC_IDX_LEN, this.tailer.index());
            this.tailer.finish();
            return true;
        }

        protected boolean onUnsubscribe(SelectionKey selectionKey, long j) throws IOException {
            selectionKey.interestOps(selectionKey.interestOps() & (-5));
            return true;
        }

        protected abstract boolean onSubscribe(SelectionKey selectionKey, long j) throws IOException;

        protected abstract boolean onSubmit(SelectionKey selectionKey, long j, boolean z) throws IOException;

        protected abstract boolean write(Object obj) throws IOException;

        protected Bytes applyMapping(@NotNull ExcerptTailer excerptTailer, @Nullable Object obj) {
            MappingFunction withMapping;
            if (obj != null && (withMapping = ((MappingProvider) obj).withMapping()) != null) {
                this.withMappedBuffer.clear();
                if (this.withMappedBuffer.capacity() < excerptTailer.limit()) {
                    this.withMappedBuffer = new DirectByteBufferBytes((int) excerptTailer.capacity());
                }
                try {
                    withMapping.apply(excerptTailer, this.withMappedBuffer);
                } catch (IllegalArgumentException e) {
                    if (!e.getMessage().contains("Attempt to write")) {
                        throw e;
                    }
                    if (this.withMappedBuffer.capacity() == 2147483647L) {
                        throw e;
                    }
                    this.withMappedBuffer = new DirectByteBufferBytes(Math.min(Integer.MAX_VALUE, (int) (this.withMappedBuffer.capacity() * 1.5d)));
                }
                return this.withMappedBuffer.flip();
            }
            return excerptTailer;
        }
    }

    /* loaded from: input_file:net/openhft/chronicle/tcp/SourceTcp$VanillaSessionHandler.class */
    private class VanillaSessionHandler extends SessionHandler {
        private boolean nextIndex;
        private long index;

        private VanillaSessionHandler(@NotNull SocketChannel socketChannel) {
            super(socketChannel);
            this.nextIndex = true;
            this.index = -1L;
        }

        @Override // net.openhft.chronicle.tcp.SourceTcp.SessionHandler
        protected boolean onSubscribe(SelectionKey selectionKey, long j) throws IOException {
            this.index = j;
            if (this.index == -1) {
                this.nextIndex = true;
                this.tailer = this.tailer.toStart();
                this.index = -1L;
            } else if (this.index == -2) {
                this.nextIndex = false;
                this.tailer = this.tailer.toEnd();
                this.index = this.tailer.index();
                if (this.index == -1) {
                    this.nextIndex = true;
                    this.tailer = this.tailer.toStart();
                    this.index = -1L;
                }
            } else {
                this.nextIndex = false;
            }
            sendSizeAndIndex(ChronicleTcp.SYNC_IDX_LEN, this.index);
            selectionKey.interestOps(5);
            return false;
        }

        @Override // net.openhft.chronicle.tcp.SourceTcp.SessionHandler
        protected boolean onSubmit(SelectionKey selectionKey, long j, boolean z) throws IOException {
            readUpTo((int) j);
            this.appender.startExcerpt((int) j);
            this.appender.write(this.readBuffer);
            this.appender.finish();
            SourceTcp.this.pauser.unpause();
            if (!z) {
                return true;
            }
            sendSizeAndIndex(ChronicleTcp.ACK_LEN, this.appender.lastWrittenIndex());
            return true;
        }

        @Override // net.openhft.chronicle.tcp.SourceTcp.SessionHandler
        protected boolean write(Object obj) throws IOException {
            if (this.nextIndex) {
                if (!this.tailer.nextIndex()) {
                    pause();
                    if (SourceTcp.this.running.get() && !this.tailer.nextIndex()) {
                        return false;
                    }
                }
            } else {
                if (!this.tailer.index(this.index)) {
                    return false;
                }
                this.nextIndex = true;
            }
            pauseReset();
            Bytes applyMapping = applyMapping(this.tailer, obj);
            int limit = (int) applyMapping.limit();
            this.writeBuffer.clear();
            this.writeBuffer.putInt(limit);
            this.writeBuffer.putLong(this.tailer.index());
            if (limit > this.writeBuffer.limit() / 2) {
                while (limit > 0) {
                    int min = Math.min(limit, this.writeBuffer.remaining());
                    applyMapping.read(this.writeBuffer, limit);
                    this.writeBuffer.flip();
                    this.connection.writeAll(this.writeBuffer);
                    this.writeBuffer.clear();
                    limit -= min;
                    if (limit > 0) {
                        this.writeBuffer.clear();
                    }
                }
            } else {
                applyMapping.read(this.writeBuffer, limit);
                long index = this.tailer.index();
                int maxExcerptsPerMessage = SourceTcp.this.builder.maxExcerptsPerMessage();
                while (true) {
                    if (maxExcerptsPerMessage <= 0 || !this.tailer.nextIndex()) {
                        break;
                    }
                    long index2 = this.tailer.index();
                    Bytes applyMapping2 = applyMapping(this.tailer, obj);
                    if (!hasRoomForExcerpt(this.writeBuffer, applyMapping2)) {
                        this.tailer.finish();
                        this.tailer.index(index);
                        break;
                    }
                    int limit2 = (int) applyMapping2.limit();
                    index = index2;
                    this.writeBuffer.putInt(limit2);
                    this.writeBuffer.putLong(index2);
                    applyMapping2.read(this.writeBuffer, limit2);
                    maxExcerptsPerMessage--;
                    this.tailer.finish();
                }
                this.writeBuffer.flip();
                this.connection.writeAll(this.writeBuffer);
            }
            if (this.writeBuffer.remaining() > 0) {
                throw new EOFException("Failed to send index=" + this.tailer.index());
            }
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceTcp(String str, ChronicleQueueBuilder.ReplicaChronicleQueueBuilder replicaChronicleQueueBuilder, ThreadPoolExecutor threadPoolExecutor) {
        this.builder = replicaChronicleQueueBuilder;
        this.name = ChronicleTcp.connectionName(str, this.builder.bindAddress(), this.builder.connectAddress());
        this.logger = LoggerFactory.getLogger(this.name);
        this.executor = threadPoolExecutor;
        this.pauser = new LightPauser(replicaChronicleQueueBuilder.busyPeriodTimeNanos(), replicaChronicleQueueBuilder.parkPeriodTimeNanos());
    }

    public SourceTcp open() {
        this.running.set(true);
        this.executor.execute(createHandler());
        return this;
    }

    public boolean close() {
        this.running.set(false);
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(this.builder.selectTimeout() * 2, this.builder.selectTimeoutUnit());
        } catch (InterruptedException e) {
        }
        return !this.running.get();
    }

    public String toString() {
        return this.name;
    }

    public void dataNotification() {
        this.pauser.unpause();
    }

    public abstract boolean isLocalhost();

    protected abstract Runnable createHandler();

    /* JADX INFO: Access modifiers changed from: protected */
    public Runnable createSessionHandler(@NotNull SocketChannel socketChannel) {
        Chronicle chronicle = this.builder.chronicle();
        if (chronicle == null) {
            throw new IllegalStateException("Chronicle can't be null");
        }
        if (chronicle instanceof IndexedChronicle) {
            return new IndexedSessionHandler(socketChannel);
        }
        if (chronicle instanceof VanillaChronicle) {
            return new VanillaSessionHandler(socketChannel);
        }
        throw new IllegalStateException("Chronicle must be Indexed or Vanilla");
    }
}
