package com.beiming.pigeons.api.producer.rocketmq;

import com.alibaba.fastjson.JSONObject;
import com.beiming.framework.domain.DubboResult;
import com.beiming.framework.domain.DubboResultBuilder;
import com.beiming.framework.domain.PlatformConfig;
import com.beiming.framework.enums.DubboResultCodeEnums;
import com.beiming.framework.util.RequestIdUtils;
import com.beiming.pigeons.api.constants.KangarooConstants;
import com.beiming.pigeons.api.producer.MessageDto;
import com.beiming.pigeons.api.producer.ProducerService;
import com.beiming.pigeons.api.rocketmq.RocketMqAddrService;
import com.beiming.pigeons.api.rocketmq.RocketMqClientDto;
import com.beiming.pigeons.api.utils.InetUtils;
import com.beiming.pigeons.api.utils.JsonSerializeUtil;
import com.beiming.pigeons.api.utils.StopWatch;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.util.StringUtils;

/* loaded from: input_file:WEB-INF/lib/hainan-pigeons-api-1.0-SNAPSHOT.jar:com/beiming/pigeons/api/producer/rocketmq/RocketProducerClient.class */
public class RocketProducerClient implements ApplicationListener<ContextRefreshedEvent>, DisposableBean {
    private String rocketMqName;
    private String mqProducerGroup;
    private ProducerService producerService;
    private RocketMqAddrService rocketMqAddrService;
    private DefaultMQProducer defaultMQProducer;
    private static final Set<String> TOPICS = Sets.newConcurrentHashSet();
    private static final ConcurrentMap<String, DefaultMQProducer> PRODUCERS = Maps.newConcurrentMap();
    private final Logger logger = LoggerFactory.getLogger((Class<?>) RocketProducerClient.class);
    private volatile boolean isStarted = false;
    private volatile boolean startError = false;
    String localIp = InetUtils.getLocalHostIp();

    /* JADX INFO: Access modifiers changed from: private */
    public void init() throws MQClientException, InterruptedException {
        RocketMqClientDto rocketMqClientDto = new RocketMqClientDto();
        rocketMqClientDto.setClientIp(this.localIp);
        rocketMqClientDto.setClientGroup(this.mqProducerGroup);
        rocketMqClientDto.setRocketMqName(this.rocketMqName);
        rocketMqClientDto.setClientType(KangarooConstants.PRODUCER_CLIENT);
        DubboResult<String> rmNamesrvAddr = this.rocketMqAddrService.getRmNamesrvAddr(rocketMqClientDto);
        if (!rmNamesrvAddr.isSuccess()) {
            this.logger.error("获取 rocketMq：" + this.rocketMqName + " nameServer 地址失败，message=" + rmNamesrvAddr.getMessage());
            return;
        }
        String data = rmNamesrvAddr.getData();
        if (StringUtils.isEmpty(this.mqProducerGroup) || StringUtils.isEmpty(data)) {
            this.logger.error("mqProducerGroup or namesrvAddr parameters is null");
            return;
        }
        this.logger.info("DefaultMQProducer " + this.mqProducerGroup + " initialize!");
        this.logger.info("mqProducerGroup:" + this.mqProducerGroup);
        this.logger.info("namesrvAddr:" + data);
        if (PRODUCERS.containsKey(this.mqProducerGroup)) {
            this.defaultMQProducer = PRODUCERS.get(this.mqProducerGroup);
            this.logger.info(this.mqProducerGroup + " 的Producer已经被创建，直接返回，建议检查配置是否重复");
            return;
        }
        this.defaultMQProducer = new DefaultMQProducer(this.mqProducerGroup);
        if (PRODUCERS.putIfAbsent(this.mqProducerGroup, this.defaultMQProducer) != null) {
            this.defaultMQProducer = PRODUCERS.get(this.mqProducerGroup);
            this.logger.info(this.mqProducerGroup + " 的Producer已经被创建，直接返回，建议检查配置是否重复");
            return;
        }
        this.defaultMQProducer.setUnitName(this.rocketMqName + "_" + this.mqProducerGroup);
        this.defaultMQProducer.setNamesrvAddr(data);
        this.defaultMQProducer.setVipChannelEnabled(false);
        this.defaultMQProducer.start();
        Thread.sleep(500L);
        this.isStarted = true;
        synchronized (this) {
            notifyAll();
        }
        this.logger.info("DefaultMQProducer start success!");
    }

    public DubboResult sendMessage(RocketMessageDto rocketMessageDto) {
        DubboResult validateProducerClientStatus = validateProducerClientStatus();
        if (!validateProducerClientStatus.isSuccess()) {
            return validateProducerClientStatus;
        }
        if (rocketMessageDto == null) {
            this.logger.error("msgDto不能为null。");
            return DubboResultBuilder.error(DubboResultCodeEnums.PARAM_ERROR.value(), "msgDto不能为null。");
        }
        saveTopic(rocketMessageDto.getTopic());
        StopWatch stopWatch = new StopWatch();
        try {
            try {
                this.logger.info("开始发送消息，消息为：{}, 业务key为:{} topic为:{}, tags为:{}", JSONObject.toJSONString(rocketMessageDto.getData()), rocketMessageDto.getKey(), rocketMessageDto.getTopic(), rocketMessageDto.getTags());
                SendResult send = this.defaultMQProducer.send(rocketMessageDto.getMessage());
                if (send == null || send.getSendStatus() == SendStatus.SEND_OK) {
                    DubboResult success = DubboResultBuilder.success(null);
                    this.logger.info("rocketmq mq 发送消息结束， 时间为：{}ms", Long.valueOf(stopWatch.elapsedTime()));
                    return success;
                }
                this.logger.error("发送消息状态有错:" + JSONObject.toJSONString(send));
                DubboResult success2 = DubboResultBuilder.success(send.getSendStatus());
                this.logger.info("rocketmq mq 发送消息结束， 时间为：{}ms", Long.valueOf(stopWatch.elapsedTime()));
                return success2;
            } catch (Exception e) {
                this.logger.error("通过Rocket发送消息失败,将消息保存至消息中心。", (Throwable) e);
                DubboResult saveToMessageCenter = saveToMessageCenter(rocketMessageDto);
                this.logger.info("rocketmq mq 发送消息结束， 时间为：{}ms", Long.valueOf(stopWatch.elapsedTime()));
                return saveToMessageCenter;
            }
        } catch (Throwable th) {
            this.logger.info("rocketmq mq 发送消息结束， 时间为：{}ms", Long.valueOf(stopWatch.elapsedTime()));
            throw th;
        }
    }

    private DubboResult saveToMessageCenter(RocketMessageDto rocketMessageDto) {
        MessageDto messageDto = new MessageDto();
        String generateNextRequestId = RequestIdUtils.generateNextRequestId();
        messageDto.setReceiverParam(JsonSerializeUtil.serialize(rocketMessageDto.getData()));
        messageDto.setReceiverAddress(this.rocketMqName);
        messageDto.setSourcePlatform(PlatformConfig.getPlatform());
        messageDto.setDeliverType(3);
        messageDto.setKeyword(rocketMessageDto.getKey());
        messageDto.setTopic(rocketMessageDto.getTopic());
        messageDto.setTags(rocketMessageDto.getTags());
        messageDto.setRequestId(generateNextRequestId);
        messageDto.setClientIp(this.localIp);
        return this.producerService.sendMessage(messageDto);
    }

    private void saveTopic(final String str) {
        if (TOPICS.contains(str) || !TOPICS.add(str)) {
            return;
        }
        new Thread(new Runnable() { // from class: com.beiming.pigeons.api.producer.rocketmq.RocketProducerClient.1
            @Override // java.lang.Runnable
            public void run() {
                RocketProducerTopicDto rocketProducerTopicDto = new RocketProducerTopicDto();
                rocketProducerTopicDto.setProducerGroup(RocketProducerClient.this.mqProducerGroup);
                rocketProducerTopicDto.setRocketMqName(RocketProducerClient.this.rocketMqName);
                rocketProducerTopicDto.setTopic(str);
                RocketProducerClient.this.producerService.saveTopic(rocketProducerTopicDto);
            }
        }).start();
    }

    private DubboResult validateProducerClientStatus() {
        if (!this.isStarted) {
            try {
                synchronized (this) {
                    int i = 20;
                    while (!this.isStarted && i > 0) {
                        i--;
                        if (this.startError) {
                            return DubboResultBuilder.error(DubboResultCodeEnums.INTERNAL_ERROR.value(), "rocketMqProducer启动失败，请检查日志");
                        }
                        wait(2000L);
                    }
                }
            } catch (InterruptedException e) {
                this.logger.error("发送消息等待初始化错误", (Throwable) e);
                throw new RuntimeException(e);
            }
        }
        if (!this.startError) {
            return DubboResultBuilder.success();
        }
        this.logger.error("rocketMqProducer启动失败，请检查启动日志是否有异常。");
        return DubboResultBuilder.error(DubboResultCodeEnums.INTERNAL_ERROR.value(), "rocketMqProducer启动失败，请检查日志");
    }

    @Override // org.springframework.beans.factory.DisposableBean
    public void destroy() {
        try {
            if (this.defaultMQProducer != null) {
                this.defaultMQProducer.shutdown();
            }
        } catch (Throwable th) {
            this.logger.error("RocketMq 关闭异常", th);
        }
    }

    public void setMqProducerGroup(String str) {
        this.mqProducerGroup = str;
    }

    public void setRocketMqName(String str) {
        this.rocketMqName = str;
    }

    public void setProducerService(ProducerService producerService) {
        this.producerService = producerService;
    }

    public void setRocketMqAddrService(RocketMqAddrService rocketMqAddrService) {
        this.rocketMqAddrService = rocketMqAddrService;
    }

    @Override // org.springframework.context.ApplicationListener
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        new Timer().schedule(new TimerTask() { // from class: com.beiming.pigeons.api.producer.rocketmq.RocketProducerClient.2
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    RocketProducerClient.this.init();
                } catch (Exception e) {
                    RocketProducerClient.this.logger.error("defaultMqProducer启动失败", (Throwable) e);
                    RocketProducerClient.this.startError = true;
                    for (int i = 0; i < 5 && !RocketProducerClient.this.isStarted; i++) {
                        reInit(i);
                    }
                    if (RocketProducerClient.this.startError) {
                        RocketProducerClient.this.logger.error("5次重新启动producerClient:" + RocketProducerClient.this.mqProducerGroup + "仍然失败");
                    }
                }
            }

            private void reInit(int i) {
                try {
                    Thread.sleep(3000L);
                    RocketProducerClient.this.init();
                    RocketProducerClient.this.startError = false;
                    RocketProducerClient.this.logger.info("重新启动producerClient" + RocketProducerClient.this.mqProducerGroup + ", 重新启动成功");
                } catch (Exception e) {
                    RocketProducerClient.this.logger.error(RocketProducerClient.this.mqProducerGroup + "第" + (i + 1) + "次重新启动失败", (Throwable) e);
                }
            }
        }, 2000L);
    }
}
