package com.taobao.hsf.io.stream;

import com.google.common.collect.Maps;
import com.taobao.hsf.domain.HSFResponse;
import com.taobao.hsf.domain.ResponseStatus;
import com.taobao.hsf.invocation.Invocation;
import com.taobao.hsf.invocation.InvocationType;
import com.taobao.hsf.invocation.RPCResult;
import com.taobao.hsf.io.PacketFactory;
import com.taobao.hsf.io.PacketFactorySelector;
import com.taobao.hsf.io.RpcResultResponsePacketWrapper;
import com.taobao.hsf.io.StreamManager;
import com.taobao.hsf.io.client.MessageAnswerHandler;
import com.taobao.hsf.io.client.ResponseMessageAnswerHandler;
import com.taobao.hsf.io.client.SerializePhase;
import com.taobao.hsf.io.common.RemotingConstants;
import com.taobao.hsf.io.serialize.SerializeType;
import com.taobao.hsf.logger.LoggerInit;
import com.taobao.hsf.threadpool.ThreadPoolService;
import com.taobao.hsf.util.ConcurrentAttributeMap;
import com.taobao.hsf.util.HSFServiceContainer;
import com.taobao.hsf.util.concurrent.DefaultListenableFuture;
import com.taobao.hsf.util.concurrent.Futures;
import com.taobao.hsf.util.concurrent.ListenableFuture;
import com.taobao.hsf.util.concurrent.UserThreadPreferedExecutor;
import com.taobao.middleware.logger.Logger;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:lib/hsf-io-2.2.8.2.jar:com/taobao/hsf/io/stream/AbstractStream.class */
public abstract class AbstractStream implements Stream {
    private static final Logger LOGGER = LoggerInit.LOGGER_REMOTING;
    protected volatile boolean disabled = false;
    private AtomicBoolean supportBiDirection = new AtomicBoolean();
    private ConcurrentAttributeMap attributeMap = new ConcurrentAttributeMap(STREAM_ATTRIBUTE_NAMESPACE, 8);
    private StreamManager connectionManager = (StreamManager) HSFServiceContainer.SHARED_CONTAINER.getInstance(StreamManager.class);
    private Map<Long, MessageAnswerHandler> requestToHandler = Maps.newHashMap();
    private ThreadPoolService threadPoolService = (ThreadPoolService) HSFServiceContainer.SHARED_CONTAINER.getInstance(ThreadPoolService.class);

    @Override // com.taobao.hsf.io.stream.Stream
    public void close() {
        disable();
        if (this.connectionManager != null) {
            removeFromManager();
        }
        closeChannel();
    }

    public void removeFromManager() {
        this.connectionManager.removeStream(this);
    }

    @Override // com.taobao.hsf.io.stream.Stream
    public void enable() {
        this.disabled = false;
    }

    @Override // com.taobao.hsf.io.stream.Stream
    public ConcurrentAttributeMap attributeMap() {
        return this.attributeMap;
    }

    @Override // com.taobao.hsf.io.stream.Stream
    public void disable() {
        this.disabled = true;
    }

    protected abstract void closeChannel();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract boolean isWritable();

    public boolean isDisabled() {
        return this.disabled;
    }

    public String toString() {
        return String.format("[L:%s %s R:%s]", getLocalAddress(), isActive() ? "-" : "!", getRemoteAddress());
    }

    @Override // com.taobao.hsf.io.stream.Stream
    public void write(RPCResult rPCResult) {
        PacketFactory select = PacketFactorySelector.getInstance().select(rPCResult.getResponseContext().getProtocolType());
        send(rPCResult.getInvocationType() == InvocationType.BIZ ? rPCResult.getResponseContext().getSerializeType() == SerializeType.OPTIMIZED_HESSIAN2.getCode() ? new StreamWriteRequest(rPCResult) : new StreamWriteRequest(select.serverCreate(rPCResult, this)) : new StreamWriteRequest(select.serverCreateHeartbeatResponse(rPCResult.getResponseContext().getRequestId())));
    }

    @Override // com.taobao.hsf.io.stream.Stream
    public ListenableFuture<RPCResult> write(Invocation invocation) {
        StreamWriteRequest streamWriteRequest;
        if (invocation.getInvokerContext().isBiDirectionalInvocation() && !supportBiDirection()) {
            LOGGER.warn("", "Ignored request to stream:{} cause the client is not bi-directional", getRemoteAddress());
            throw new IllegalStateException("Client Stream is not support bi-direction");
        }
        Executor executor = invocation.getExecutor();
        if (executor == null) {
            executor = this.threadPoolService.callbackExecutor();
        } else if (executor instanceof UserThreadPreferedExecutor) {
            ((UserThreadPreferedExecutor) executor).activate();
        }
        DefaultListenableFuture createSettableFuture = Futures.createSettableFuture(executor);
        ResponseMessageAnswerHandler responseMessageAnswerHandler = new ResponseMessageAnswerHandler(invocation.getApplicationModel(), SerializePhase.BIZ, createSettableFuture, this);
        ListenableFuture<RPCResult> map = Futures.map(createSettableFuture, responseMessageAnswerHandler);
        PacketFactory select = PacketFactorySelector.getInstance().select(invocation.getInvokerContext().getProtocolType());
        if (invocation.getInvocationType() != InvocationType.BIZ) {
            streamWriteRequest = new StreamWriteRequest(select.clientCreateHeartbeatRequest(), responseMessageAnswerHandler);
        } else if (invocation.getInvokerContext().getSerializeType() == SerializeType.OPTIMIZED_HESSIAN2.getCode()) {
            streamWriteRequest = new StreamWriteRequest(invocation, responseMessageAnswerHandler);
            responseMessageAnswerHandler.setSerializePhase(SerializePhase.IO);
        } else {
            streamWriteRequest = new StreamWriteRequest(select.clientCreate(invocation, this), responseMessageAnswerHandler);
        }
        send(streamWriteRequest);
        return map;
    }

    @Override // com.taobao.hsf.io.stream.Stream
    public boolean supportBiDirection() {
        return this.supportBiDirection.get();
    }

    @Override // com.taobao.hsf.io.stream.Stream
    public void setBiDirectionSupport(boolean z) {
        this.supportBiDirection.set(z);
    }

    public abstract void send(Object obj);

    @Override // com.taobao.hsf.io.stream.Stream
    public MessageAnswerHandler removeAnswerHandler(long j) {
        MessageAnswerHandler remove = this.requestToHandler.remove(Long.valueOf(j));
        cancelWriteTimeout(j);
        return remove;
    }

    @Override // com.taobao.hsf.io.stream.Stream
    public void removeAnswerHandlers() {
        MessageAnswerHandler removeAnswerHandler;
        for (Long l : new HashSet(this.requestToHandler.keySet())) {
            if (l != null && (removeAnswerHandler = removeAnswerHandler(l.longValue())) != null) {
                RPCResult rPCResult = new RPCResult();
                rPCResult.setHsfResponse(new HSFResponse());
                rPCResult.setClientErrorMsg(RemotingConstants.INVALID_CAUSE_CLOSE);
                rPCResult.setStatus(ResponseStatus.COMM_ERROR.getCode());
                removeAnswerHandler.setAnswer(new RpcResultResponsePacketWrapper(rPCResult));
            }
        }
    }

    @Override // com.taobao.hsf.io.stream.Stream
    public void putAnswerHandler(long j, MessageAnswerHandler messageAnswerHandler, int i) {
        this.requestToHandler.put(Long.valueOf(j), messageAnswerHandler);
        startWriteTimeout(j, i);
    }
}
