package com.taobao.hsf.remoting;

import com.taobao.hsf.io.ClientConnectionHook;
import com.taobao.hsf.io.ConnectionListener;
import com.taobao.hsf.io.ConnectionManager;
import com.taobao.hsf.io.StreamManager;
import com.taobao.hsf.io.client.ClientConnectionListener;
import com.taobao.hsf.io.server.ServerConnectionListener;
import com.taobao.hsf.io.stream.ClientStream;
import com.taobao.hsf.io.stream.ServerStream;
import com.taobao.hsf.io.stream.Stream;
import com.taobao.hsf.logger.LoggerInit;
import com.taobao.middleware.logger.Logger;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:lib/hsf-feature-default-2.2.8.2.jar:com/taobao/hsf/remoting/DefaultConnectionManager.class */
public class DefaultConnectionManager implements ConnectionManager, StreamManager {
    private static final Logger LOGGER = LoggerInit.LOGGER;
    private final List<ConnectionListener> clientListeners = new CopyOnWriteArrayList();
    private final List<ConnectionListener> serverListeners = new CopyOnWriteArrayList();
    private final List<ConnectionListener> commonListeners = new CopyOnWriteArrayList();
    private final List<ClientConnectionHook> hooks = new CopyOnWriteArrayList();
    private final ConcurrentMap<String, Stream> streams = new ConcurrentHashMap();
    private final ExecutorService notifyThread = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactory() { // from class: com.taobao.hsf.remoting.DefaultConnectionManager.1
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "HSF-Connection-Notifier-Thread");
        }
    });

    @Override // com.taobao.hsf.io.ConnectionManager
    public void addListener(ConnectionListener connectionListener) {
        if (connectionListener instanceof ServerConnectionListener) {
            this.serverListeners.add(connectionListener);
        } else if (connectionListener instanceof ClientConnectionListener) {
            this.clientListeners.add(connectionListener);
        } else {
            this.commonListeners.add(connectionListener);
        }
        notifyAllStream(connectionListener);
    }

    @Override // com.taobao.hsf.io.ConnectionManager
    public void addConnectionHook(ClientConnectionHook clientConnectionHook) {
        if (this.hooks.contains(clientConnectionHook)) {
            return;
        }
        this.hooks.add(clientConnectionHook);
    }

    @Override // com.taobao.hsf.io.ConnectionManager
    public Stream getStream(String str) {
        return this.streams.get(str);
    }

    @Override // com.taobao.hsf.io.ConnectionManager
    public Collection<Stream> getAllStream() {
        return Collections.unmodifiableCollection(this.streams.values());
    }

    @Override // com.taobao.hsf.io.ConnectionManager
    public Collection<Stream> getAllBiDirectionStream() {
        Collection<Stream> allStream = getAllStream();
        ArrayList arrayList = new ArrayList(allStream.size());
        for (Stream stream : allStream) {
            if (stream.supportBiDirection()) {
                arrayList.add(stream);
            }
        }
        return arrayList;
    }

    @Override // com.taobao.hsf.io.StreamManager
    public void removeStream(final Stream stream) {
        final String addressFromStream = getAddressFromStream(stream);
        this.notifyThread.execute(new Runnable() { // from class: com.taobao.hsf.remoting.DefaultConnectionManager.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (DefaultConnectionManager.this.isClientStream(stream)) {
                        Iterator it = DefaultConnectionManager.this.clientListeners.iterator();
                        while (it.hasNext()) {
                            ((ConnectionListener) it.next()).onDisconnected(addressFromStream);
                        }
                    } else if (DefaultConnectionManager.this.isServerStream(stream)) {
                        Iterator it2 = DefaultConnectionManager.this.serverListeners.iterator();
                        while (it2.hasNext()) {
                            ((ConnectionListener) it2.next()).onDisconnected(addressFromStream);
                        }
                    } else {
                        Iterator it3 = DefaultConnectionManager.this.commonListeners.iterator();
                        while (it3.hasNext()) {
                            ((ConnectionListener) it3.next()).onDisconnected(addressFromStream);
                        }
                    }
                } catch (Throwable th) {
                    DefaultConnectionManager.LOGGER.error("HSF-133", "ConnectionManager failed to notify listener size:{} {} {}", Integer.valueOf(DefaultConnectionManager.this.commonListeners.size()), th);
                }
                DefaultConnectionManager.this.streams.remove(addressFromStream, stream);
            }
        });
    }

    @Override // com.taobao.hsf.io.StreamManager
    public void addStream(final Stream stream) {
        final String addressFromStream = getAddressFromStream(stream);
        this.streams.putIfAbsent(addressFromStream, stream);
        this.notifyThread.execute(new Runnable() { // from class: com.taobao.hsf.remoting.DefaultConnectionManager.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (DefaultConnectionManager.this.isClientStream(stream)) {
                        Iterator it = DefaultConnectionManager.this.clientListeners.iterator();
                        while (it.hasNext()) {
                            ((ConnectionListener) it.next()).onConnected(addressFromStream);
                        }
                    } else if (DefaultConnectionManager.this.isServerStream(stream)) {
                        Iterator it2 = DefaultConnectionManager.this.serverListeners.iterator();
                        while (it2.hasNext()) {
                            ((ConnectionListener) it2.next()).onConnected(addressFromStream);
                        }
                    } else {
                        Iterator it3 = DefaultConnectionManager.this.commonListeners.iterator();
                        while (it3.hasNext()) {
                            ((ConnectionListener) it3.next()).onConnected(addressFromStream);
                        }
                    }
                } catch (Throwable th) {
                    DefaultConnectionManager.LOGGER.error("HSF-133", "ConnectionManager failed to notify listener size:{} {} {}", Integer.valueOf(DefaultConnectionManager.this.commonListeners.size()), th);
                }
            }
        });
    }

    @Override // com.taobao.hsf.io.StreamManager
    public void callClientHook(final Stream stream) {
        this.notifyThread.execute(new Runnable() { // from class: com.taobao.hsf.remoting.DefaultConnectionManager.4
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Iterator it = DefaultConnectionManager.this.hooks.iterator();
                    while (it.hasNext()) {
                        ((ClientConnectionHook) it.next()).afterConnect(stream);
                    }
                } catch (Throwable th) {
                    DefaultConnectionManager.LOGGER.error("HSF-133", "ConnectionManager failed to notify hook listener size:{} ", Integer.valueOf(DefaultConnectionManager.this.hooks.size()), th);
                }
            }
        });
    }

    private void notifyAllStream(final ConnectionListener connectionListener) {
        this.notifyThread.execute(new Runnable() { // from class: com.taobao.hsf.remoting.DefaultConnectionManager.5
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Iterator it = new ArrayList(DefaultConnectionManager.this.streams.values()).iterator();
                    while (it.hasNext()) {
                        connectionListener.onConnected(DefaultConnectionManager.this.getAddressFromStream((Stream) it.next()));
                    }
                } catch (Throwable th) {
                    DefaultConnectionManager.LOGGER.error("HSF-133", "ConnectionManager failed to notify listener size:{} {} {}", Integer.valueOf(DefaultConnectionManager.this.commonListeners.size()), th);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getAddressFromStream(Stream stream) {
        return stream.remoteIp() + ":" + ((InetSocketAddress) stream.getRemoteAddress()).getPort();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isClientStream(Stream stream) {
        return stream instanceof ClientStream;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isServerStream(Stream stream) {
        return stream instanceof ServerStream;
    }
}
