package org.apache.tephra.shaded.org.apache.twill.internal.kafka.client;

import com.google.gson.Gson;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.tephra.shaded.com.google.common.base.Charsets;
import org.apache.tephra.shaded.com.google.common.base.Function;
import org.apache.tephra.shaded.com.google.common.base.Joiner;
import org.apache.tephra.shaded.com.google.common.base.Preconditions;
import org.apache.tephra.shaded.com.google.common.base.Supplier;
import org.apache.tephra.shaded.com.google.common.base.Suppliers;
import org.apache.tephra.shaded.com.google.common.base.Throwables;
import org.apache.tephra.shaded.com.google.common.cache.CacheBuilder;
import org.apache.tephra.shaded.com.google.common.cache.CacheLoader;
import org.apache.tephra.shaded.com.google.common.cache.LoadingCache;
import org.apache.tephra.shaded.com.google.common.collect.ImmutableList;
import org.apache.tephra.shaded.com.google.common.collect.Iterables;
import org.apache.tephra.shaded.com.google.common.collect.Sets;
import org.apache.tephra.shaded.com.google.common.primitives.Ints;
import org.apache.tephra.shaded.com.google.common.util.concurrent.AbstractIdleService;
import org.apache.tephra.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.tephra.shaded.com.google.common.util.concurrent.Futures;
import org.apache.tephra.shaded.com.google.common.util.concurrent.SettableFuture;
import org.apache.tephra.shaded.org.apache.twill.common.Cancellable;
import org.apache.tephra.shaded.org.apache.twill.common.Threads;
import org.apache.tephra.shaded.org.apache.twill.kafka.client.BrokerInfo;
import org.apache.tephra.shaded.org.apache.twill.kafka.client.BrokerService;
import org.apache.tephra.shaded.org.apache.twill.kafka.client.TopicPartition;
import org.apache.tephra.shaded.org.apache.twill.zookeeper.NodeChildren;
import org.apache.tephra.shaded.org.apache.twill.zookeeper.NodeData;
import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClient;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tephra/shaded/org/apache/twill/internal/kafka/client/ZKBrokerService.class */
public final class ZKBrokerService extends AbstractIdleService implements BrokerService {
    private static final String BROKER_IDS_PATH = "/brokers/ids";
    private static final String BROKER_TOPICS_PATH = "/brokers/topics";
    private static final long FAILURE_RETRY_SECONDS = 5;
    private final ZKClient zkClient;
    private final LoadingCache<BrokerId, Supplier<BrokerInfo>> brokerInfos = CacheBuilder.newBuilder().build(createCacheLoader(new CacheInvalidater<BrokerId>() { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.3
        @Override // org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.CacheInvalidater
        public void invalidate(BrokerId brokerId) {
            ZKBrokerService.this.brokerInfos.invalidate(brokerId);
        }
    }, BrokerInfo.class));
    private final LoadingCache<KeyPathTopicPartition, Supplier<PartitionInfo>> partitionInfos = CacheBuilder.newBuilder().build(createCacheLoader(new CacheInvalidater<KeyPathTopicPartition>() { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.4
        @Override // org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.CacheInvalidater
        public void invalidate(KeyPathTopicPartition keyPathTopicPartition) {
            ZKBrokerService.this.partitionInfos.invalidate(keyPathTopicPartition);
        }
    }, PartitionInfo.class));
    private final Set<ListenerExecutor> listeners = Sets.newCopyOnWriteArraySet();
    private ExecutorService executorService;
    private Supplier<Iterable<BrokerInfo>> brokerList;
    private static final Logger LOG = LoggerFactory.getLogger(ZKBrokerService.class);
    private static final Gson GSON = new Gson();
    private static final Function<String, BrokerId> BROKER_ID_TRANSFORMER = new Function<String, BrokerId>() { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.1
        @Override // org.apache.tephra.shaded.com.google.common.base.Function
        public BrokerId apply(String str) {
            return new BrokerId(Integer.parseInt(str));
        }
    };
    private static final Function<BrokerInfo, String> BROKER_INFO_TO_ADDRESS = new Function<BrokerInfo, String>() { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.2
        @Override // org.apache.tephra.shaded.com.google.common.base.Function
        public String apply(BrokerInfo brokerInfo) {
            return String.format("%s:%d", brokerInfo.getHost(), Integer.valueOf(brokerInfo.getPort()));
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX INFO: Add missing generic type declarations: [T, K] */
    /* renamed from: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService$7, reason: invalid class name */
    /* loaded from: input_file:org/apache/tephra/shaded/org/apache/twill/internal/kafka/client/ZKBrokerService$7.class */
    public class AnonymousClass7<K, T> extends CacheLoader<K, Supplier<T>> {
        final /* synthetic */ Class val$resultType;
        final /* synthetic */ CacheInvalidater val$invalidater;

        AnonymousClass7(Class cls, CacheInvalidater cacheInvalidater) {
            this.val$resultType = cls;
            this.val$invalidater = cacheInvalidater;
        }

        /* JADX WARN: Incorrect types in method signature: (TK;)Lorg/apache/tephra/shaded/com/google/common/base/Supplier<TT;>; */
        @Override // org.apache.tephra.shaded.com.google.common.cache.CacheLoader
        public Supplier load(final KeyPath keyPath) throws Exception {
            final SettableFuture create = SettableFuture.create();
            final AtomicReference atomicReference = new AtomicReference();
            final String path = keyPath.getPath();
            ZKBrokerService.this.actOnExists(path, new Runnable() { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.7.1
                @Override // java.lang.Runnable
                public void run() {
                    final FutureCallback<NodeData> futureCallback = new FutureCallback<NodeData>() { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.7.1.1
                        @Override // org.apache.tephra.shaded.com.google.common.util.concurrent.FutureCallback
                        public void onSuccess(NodeData nodeData) {
                            Object decodeNodeData = ZKBrokerService.this.decodeNodeData(nodeData, AnonymousClass7.this.val$resultType);
                            atomicReference.set(decodeNodeData);
                            create.set(decodeNodeData);
                        }

                        @Override // org.apache.tephra.shaded.com.google.common.util.concurrent.FutureCallback
                        public void onFailure(Throwable th) {
                            ZKBrokerService.LOG.error("Failed to fetch node data on {}", path, th);
                            if (th instanceof KeeperException.NoNodeException) {
                                atomicReference.set(null);
                                create.set(null);
                            } else {
                                AnonymousClass7.this.val$invalidater.invalidate(keyPath);
                                create.setException(th);
                            }
                        }
                    };
                    Futures.addCallback(ZKBrokerService.this.zkClient.getData(path, new Watcher() { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.7.1.2
                        public void process(WatchedEvent watchedEvent) {
                            if (ZKBrokerService.this.isRunning()) {
                                if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
                                    Futures.addCallback(ZKBrokerService.this.zkClient.getData(path, this), futureCallback, ZKBrokerService.this.executorService);
                                } else if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted) {
                                    ZKBrokerService.this.brokerInfos.invalidate(keyPath);
                                }
                            }
                        }
                    }), futureCallback, ZKBrokerService.this.executorService);
                }
            }, create, ZKBrokerService.FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);
            create.get();
            return ZKBrokerService.this.createSupplier(atomicReference);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tephra/shaded/org/apache/twill/internal/kafka/client/ZKBrokerService$BrokerId.class */
    public static final class BrokerId implements KeyPath {
        private final int id;

        private BrokerId(int i) {
            this.id = i;
        }

        public boolean equals(Object obj) {
            return this == obj || (obj != null && getClass() == obj.getClass() && this.id == ((BrokerId) obj).id);
        }

        public int hashCode() {
            return Ints.hashCode(this.id);
        }

        @Override // org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.KeyPath
        public String getPath() {
            return "/brokers/ids/" + this.id;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tephra/shaded/org/apache/twill/internal/kafka/client/ZKBrokerService$CacheInvalidater.class */
    public interface CacheInvalidater<T> {
        void invalidate(T t);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tephra/shaded/org/apache/twill/internal/kafka/client/ZKBrokerService$KeyPath.class */
    public interface KeyPath {
        String getPath();
    }

    /* loaded from: input_file:org/apache/tephra/shaded/org/apache/twill/internal/kafka/client/ZKBrokerService$KeyPathTopicPartition.class */
    private static final class KeyPathTopicPartition extends TopicPartition implements KeyPath {
        private KeyPathTopicPartition(String str, int i) {
            super(str, i);
        }

        @Override // org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.KeyPath
        public String getPath() {
            return String.format("%s/%s/partitions/%d/state", ZKBrokerService.BROKER_TOPICS_PATH, getTopic(), Integer.valueOf(getPartition()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tephra/shaded/org/apache/twill/internal/kafka/client/ZKBrokerService$ListenerExecutor.class */
    public static final class ListenerExecutor extends BrokerService.BrokerChangeListener {
        private final BrokerService.BrokerChangeListener listener;
        private final Executor executor;

        private ListenerExecutor(BrokerService.BrokerChangeListener brokerChangeListener, Executor executor) {
            this.listener = brokerChangeListener;
            this.executor = executor;
        }

        @Override // org.apache.tephra.shaded.org.apache.twill.kafka.client.BrokerService.BrokerChangeListener
        public void changed(final BrokerService brokerService) {
            try {
                this.executor.execute(new Runnable() { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.ListenerExecutor.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            ListenerExecutor.this.listener.changed(brokerService);
                        } catch (Throwable th) {
                            ZKBrokerService.LOG.error("Failure when calling BrokerChangeListener.", th);
                        }
                    }
                });
            } catch (Throwable th) {
                ZKBrokerService.LOG.error("Failure when calling BrokerChangeListener.", th);
            }
        }
    }

    /* loaded from: input_file:org/apache/tephra/shaded/org/apache/twill/internal/kafka/client/ZKBrokerService$PartitionInfo.class */
    private static final class PartitionInfo {
        private int[] isr;
        private int leader;

        private PartitionInfo() {
        }

        private int[] getIsr() {
            return this.isr;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getLeader() {
            return this.leader;
        }
    }

    public ZKBrokerService(ZKClient zKClient) {
        this.zkClient = zKClient;
    }

    @Override // org.apache.tephra.shaded.com.google.common.util.concurrent.AbstractIdleService
    protected void startUp() throws Exception {
        this.executorService = Executors.newCachedThreadPool(Threads.createDaemonThreadFactory("zk-kafka-broker"));
    }

    @Override // org.apache.tephra.shaded.com.google.common.util.concurrent.AbstractIdleService
    protected void shutDown() throws Exception {
        this.executorService.shutdownNow();
    }

    @Override // org.apache.tephra.shaded.org.apache.twill.kafka.client.BrokerService
    public BrokerInfo getLeader(String str, int i) {
        Preconditions.checkState(isRunning(), "BrokerService is not running.");
        PartitionInfo partitionInfo = this.partitionInfos.getUnchecked(new KeyPathTopicPartition(str, i)).get();
        if (partitionInfo == null) {
            return null;
        }
        return this.brokerInfos.getUnchecked(new BrokerId(partitionInfo.getLeader())).get();
    }

    @Override // org.apache.tephra.shaded.org.apache.twill.kafka.client.BrokerService
    public synchronized Iterable<BrokerInfo> getBrokers() {
        Preconditions.checkState(isRunning(), "BrokerService is not running.");
        if (this.brokerList != null) {
            return this.brokerList.get();
        }
        final SettableFuture<?> create = SettableFuture.create();
        final AtomicReference atomicReference = new AtomicReference(ImmutableList.of());
        actOnExists(BROKER_IDS_PATH, new Runnable() { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.5
            @Override // java.lang.Runnable
            public void run() {
                final FutureCallback<NodeChildren> futureCallback = new FutureCallback<NodeChildren>() { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.5.1
                    @Override // org.apache.tephra.shaded.com.google.common.util.concurrent.FutureCallback
                    public void onSuccess(NodeChildren nodeChildren) {
                        try {
                            atomicReference.set(ImmutableList.copyOf(Iterables.transform(ZKBrokerService.this.brokerInfos.getAll(Iterables.transform(nodeChildren.getChildren(), ZKBrokerService.BROKER_ID_TRANSFORMER)).values(), Suppliers.supplierFunction())));
                            create.set(null);
                            Iterator it = ZKBrokerService.this.listeners.iterator();
                            while (it.hasNext()) {
                                ((ListenerExecutor) it.next()).changed(ZKBrokerService.this);
                            }
                        } catch (ExecutionException e) {
                            create.setException(e.getCause());
                        }
                    }

                    @Override // org.apache.tephra.shaded.com.google.common.util.concurrent.FutureCallback
                    public void onFailure(Throwable th) {
                        create.setException(th);
                    }
                };
                Futures.addCallback(ZKBrokerService.this.zkClient.getChildren(ZKBrokerService.BROKER_IDS_PATH, new Watcher() { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.5.2
                    public void process(WatchedEvent watchedEvent) {
                        if (ZKBrokerService.this.isRunning() && watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
                            Futures.addCallback(ZKBrokerService.this.zkClient.getChildren(ZKBrokerService.BROKER_IDS_PATH, this), futureCallback, ZKBrokerService.this.executorService);
                        }
                    }
                }), futureCallback, ZKBrokerService.this.executorService);
            }
        }, create, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);
        this.brokerList = createSupplier(atomicReference);
        try {
            create.get();
            return this.brokerList.get();
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // org.apache.tephra.shaded.org.apache.twill.kafka.client.BrokerService
    public String getBrokerList() {
        return Joiner.on(',').join(Iterables.transform(getBrokers(), BROKER_INFO_TO_ADDRESS));
    }

    @Override // org.apache.tephra.shaded.org.apache.twill.kafka.client.BrokerService
    public Cancellable addChangeListener(BrokerService.BrokerChangeListener brokerChangeListener, Executor executor) {
        final ListenerExecutor listenerExecutor = new ListenerExecutor(brokerChangeListener, executor);
        this.listeners.add(listenerExecutor);
        return new Cancellable() { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.6
            @Override // org.apache.tephra.shaded.org.apache.twill.common.Cancellable
            public void cancel() {
                ZKBrokerService.this.listeners.remove(listenerExecutor);
            }
        };
    }

    private <K extends KeyPath, T> CacheLoader<K, Supplier<T>> createCacheLoader(CacheInvalidater<K> cacheInvalidater, Class<T> cls) {
        return new AnonymousClass7(cls, cacheInvalidater);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> T decodeNodeData(NodeData nodeData, Class<T> cls) {
        byte[] data = nodeData == null ? null : nodeData.getData();
        if (data == null) {
            return null;
        }
        return (T) GSON.fromJson(new String(data, Charsets.UTF_8), cls);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void actOnExists(final String str, final Runnable runnable, final SettableFuture<?> settableFuture, final long j, final TimeUnit timeUnit) {
        Futures.addCallback(this.zkClient.exists(str, new Watcher() { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.8
            public void process(WatchedEvent watchedEvent) {
                if (ZKBrokerService.this.isRunning() && watchedEvent.getType() == Watcher.Event.EventType.NodeCreated) {
                    runnable.run();
                }
            }
        }), new FutureCallback<Stat>() { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.9
            @Override // org.apache.tephra.shaded.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(Stat stat) {
                if (stat != null) {
                    runnable.run();
                } else {
                    settableFuture.set(null);
                }
            }

            @Override // org.apache.tephra.shaded.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                Thread thread = new Thread("zk-broker-service-retry") { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.9.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            timeUnit.sleep(j);
                            ZKBrokerService.this.actOnExists(str, runnable, settableFuture, j, timeUnit);
                        } catch (InterruptedException e) {
                            ZKBrokerService.LOG.warn("ZK retry thread interrupted. Action not retried.");
                        }
                    }
                };
                thread.setDaemon(true);
                thread.start();
            }
        }, this.executorService);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> Supplier<T> createSupplier(final AtomicReference<T> atomicReference) {
        return new Supplier<T>() { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.client.ZKBrokerService.10
            @Override // org.apache.tephra.shaded.com.google.common.base.Supplier
            public T get() {
                return (T) atomicReference.get();
            }
        };
    }
}
