package com.beiming.pigeons.service.rocketmq;

import com.beiming.pigeons.api.exception.KangarooException;
import com.beiming.pigeons.domain.message.RocketMqInfo;
import com.beiming.pigeons.service.RocketMqInfoService;
import com.google.common.collect.Maps;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import javax.annotation.Resource;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/beiming/pigeons/service/rocketmq/RocketMqFactory.class */
public class RocketMqFactory implements ApplicationListener<ContextRefreshedEvent>, DisposableBean {
    private static Logger logger = LoggerFactory.getLogger(RocketMqFactory.class);
    private static final Map<String, DefaultMQProducer> PRODUCER_POOL = Maps.newHashMap();
    private static final Map<String, DefaultMQAdminExt> ADMIN_POOL = Maps.newHashMap();
    private static final Map<String, RocketMqInfo> ROCKET_MQ_POOL = Maps.newHashMap();
    private static final Map<String, DefaultMQPullConsumer> CONSUMER_POOL = Maps.newHashMap();

    @Resource
    private RocketMqInfoService rocketMqInfoService;
    private volatile boolean isInit = false;

    public synchronized void init() throws MQClientException {
        Iterator<RocketMqInfo> it = this.rocketMqInfoService.getRocketMqList().iterator();
        while (it.hasNext()) {
            addRocketMq(it.next());
        }
        this.isInit = true;
        logger.info("rocketMq producer and admin start success!");
    }

    public boolean isInit() {
        return this.isInit;
    }

    public DefaultMQProducer getMqProducer(String str) {
        if (this.isInit) {
            return PRODUCER_POOL.get(str);
        }
        throw new KangarooException("producerPool not init");
    }

    public DefaultMQAdminExt getMqAdmin(String str) {
        if (this.isInit) {
            return ADMIN_POOL.get(str);
        }
        throw new KangarooException("ADMIN_POOL not init");
    }

    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        new Timer().schedule(new TimerTask() { // from class: com.beiming.pigeons.service.rocketmq.RocketMqFactory.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    RocketMqFactory.this.init();
                } catch (MQClientException e) {
                    RocketMqFactory.logger.error("RocketMqFactory 启动时，RocketMQClient 异常", e);
                }
            }
        }, 500L);
    }

    public void addRocketMq(RocketMqInfo rocketMqInfo) {
        String name = rocketMqInfo.getName();
        String address = rocketMqInfo.getAddress();
        ROCKET_MQ_POOL.put(name, rocketMqInfo);
        try {
            String str = name + "RetryGroup";
            DefaultMQProducer defaultMQProducer = new DefaultMQProducer(str);
            logger.info("DefaultMQProducer " + str + " initialize!");
            logger.info("producerGroup:" + str);
            logger.info("mNamesrvAddr:" + address);
            defaultMQProducer.setVipChannelEnabled(false);
            defaultMQProducer.setNamesrvAddr(address);
            defaultMQProducer.setUnitName(str);
            defaultMQProducer.start();
            PRODUCER_POOL.put(name, defaultMQProducer);
        } catch (Throwable th) {
            logger.error("初始化 defaultMQProducer 失败， rocketMqName=" + name, th);
        }
        try {
            DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
            String str2 = name + "AdminGroup";
            defaultMQAdminExt.setNamesrvAddr(address);
            defaultMQAdminExt.setAdminExtGroup(str2);
            defaultMQAdminExt.setUnitName(str2);
            defaultMQAdminExt.setVipChannelEnabled(false);
            defaultMQAdminExt.start();
            ADMIN_POOL.put(name, defaultMQAdminExt);
        } catch (Throwable th2) {
            logger.error("初始化 defaultMQAdminExt 失败， rocketMqName=" + name, th2);
        }
        try {
            DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer();
            String str3 = name + "pullConsumerGroup";
            defaultMQPullConsumer.setConsumerGroup(str3);
            defaultMQPullConsumer.setNamesrvAddr(address);
            defaultMQPullConsumer.setUnitName(str3);
            defaultMQPullConsumer.setVipChannelEnabled(false);
            defaultMQPullConsumer.start();
            CONSUMER_POOL.put(name, defaultMQPullConsumer);
        } catch (Throwable th3) {
            logger.error("初始化 defaultPullConsumer 失败， rocketMqName=" + name, th3);
        }
    }

    public void updateRocketMq(RocketMqInfo rocketMqInfo) {
        String name = rocketMqInfo.getName();
        RocketMqInfo rocketMqInfo2 = ROCKET_MQ_POOL.get(name);
        ROCKET_MQ_POOL.put(name, rocketMqInfo);
        try {
            if (!rocketMqInfo.getAddress().equals(rocketMqInfo2.getAddress())) {
                shutdownRocketMqClient(name);
                String str = name + "RetryGroup";
                DefaultMQProducer defaultMQProducer = new DefaultMQProducer(str);
                defaultMQProducer.setVipChannelEnabled(false);
                defaultMQProducer.setNamesrvAddr(rocketMqInfo.getAddress());
                defaultMQProducer.setUnitName(str);
                defaultMQProducer.start();
                PRODUCER_POOL.put(name, defaultMQProducer);
                String str2 = name + "AdminGroup";
                DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
                defaultMQAdminExt.setNamesrvAddr(rocketMqInfo.getAddress());
                defaultMQAdminExt.setAdminExtGroup(str2);
                defaultMQAdminExt.setUnitName(str2);
                defaultMQAdminExt.setVipChannelEnabled(false);
                defaultMQAdminExt.start();
                ADMIN_POOL.put(name, defaultMQAdminExt);
                DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer();
                String str3 = name + "pullConsumerGroup";
                defaultMQPullConsumer.setConsumerGroup(str3);
                defaultMQPullConsumer.setNamesrvAddr(rocketMqInfo.getAddress());
                defaultMQPullConsumer.setUnitName(str3);
                defaultMQPullConsumer.setVipChannelEnabled(false);
                defaultMQPullConsumer.start();
                CONSUMER_POOL.put(name, defaultMQPullConsumer);
            }
        } catch (Throwable th) {
            logger.error("更新rocketMq信息，启动rocketMq客户端失败", th);
        }
    }

    public void shutdownRocketMqClient(String str) {
        try {
            DefaultMQProducer defaultMQProducer = PRODUCER_POOL.get(str);
            if (defaultMQProducer != null) {
                defaultMQProducer.shutdown();
            }
            DefaultMQAdminExt defaultMQAdminExt = ADMIN_POOL.get(str);
            if (defaultMQAdminExt != null) {
                defaultMQAdminExt.shutdown();
            }
            DefaultMQPullConsumer defaultMQPullConsumer = CONSUMER_POOL.get(str);
            if (defaultMQPullConsumer != null) {
                defaultMQPullConsumer.shutdown();
            }
        } catch (Exception e) {
            logger.error("关闭rocketMqClient 失败;rocketMqName=" + str, e);
        }
    }

    public DefaultMQPullConsumer getMQConsumer(String str) {
        return CONSUMER_POOL.get(str);
    }

    public void destroy() throws Exception {
        Iterator<Map.Entry<String, RocketMqInfo>> it = ROCKET_MQ_POOL.entrySet().iterator();
        while (it.hasNext()) {
            shutdownRocketMqClient(it.next().getKey());
        }
    }

    public void removeRocketMq(String str) {
        shutdownRocketMqClient(str);
        PRODUCER_POOL.remove(str);
        ADMIN_POOL.remove(str);
        CONSUMER_POOL.remove(str);
    }
}
