package com.beiming.pigeons.api.consumer.rocketmq;

import com.alibaba.dubbo.monitor.MonitorService;
import com.alibaba.fastjson.JSONObject;
import com.beiming.framework.constant.LogConstants;
import com.beiming.framework.domain.DubboResult;
import com.beiming.framework.domain.PlatformConfig;
import com.beiming.framework.enums.RequestTypeEnums;
import com.beiming.framework.log.ActionLoggerImpl;
import com.beiming.framework.log.TraceLogger;
import com.beiming.framework.util.RemoteAddressUtil;
import com.beiming.framework.util.RequestIdUtils;
import com.beiming.pigeons.api.constants.KangarooConstants;
import com.beiming.pigeons.api.consumer.ConsumeErrorDto;
import com.beiming.pigeons.api.consumer.ConsumeErrorService;
import com.beiming.pigeons.api.exception.KangarooConsumerException;
import com.beiming.pigeons.api.rocketmq.RocketMqAddrService;
import com.beiming.pigeons.api.rocketmq.RocketMqClientDto;
import com.beiming.pigeons.api.utils.InetUtils;
import com.beiming.pigeons.api.utils.JsonSerializeUtil;
import com.beiming.pigeons.api.utils.StopWatch;
import com.google.common.collect.Maps;
import java.lang.reflect.Field;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Proxy;
import java.lang.reflect.Type;
import java.util.Iterator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.aop.SpringProxy;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;

/* JADX WARN: Classes with same name are omitted:
  input_file:WEB-INF/lib/suqianodr-pigeons-api-1.0.1-20221024.072509-2.jar:com/beiming/pigeons/api/consumer/rocketmq/RocketConsumerClient.class
 */
/* loaded from: input_file:WEB-INF/lib/suqianodr-pigeons-api-1.0.1-SNAPSHOT.jar:com/beiming/pigeons/api/consumer/rocketmq/RocketConsumerClient.class */
public class RocketConsumerClient implements ApplicationListener<ContextRefreshedEvent>, DisposableBean {
    private DefaultMQPushConsumer defaultMQPushConsumer;
    private String mqConsumerGroup;
    private String topic;
    private String rocketMqName;
    private RocketMqAddrService rocketMqAddrService;
    private ConsumeErrorService consumeErrorService;
    private RocketMsgProcessor processor;
    private static ConcurrentMap<String, DefaultMQPushConsumer> consumers = Maps.newConcurrentMap();
    private String messageModel;
    private String tag;
    private Integer batchMaxSize;
    private Integer consumeThreadMax;
    private Integer consumeThreadMin;
    private Integer queueNum;
    private final Logger logger = LoggerFactory.getLogger((Class<?>) RocketConsumerClient.class);
    String localIp = InetUtils.getLocalHostIp();

    private void init(MessageListenerConcurrently messageListenerConcurrently) throws InterruptedException, MQClientException {
        RocketMqClientDto rocketMqClientDto = new RocketMqClientDto();
        rocketMqClientDto.setClientIp(this.localIp);
        rocketMqClientDto.setClientGroup(this.mqConsumerGroup);
        rocketMqClientDto.setRocketMqName(this.rocketMqName);
        rocketMqClientDto.setTopic(this.topic);
        rocketMqClientDto.setClientType(KangarooConstants.CONSUMER_CLIENT);
        rocketMqClientDto.setQueueNum(this.queueNum);
        DubboResult<String> rmNamesrvAddr = this.rocketMqAddrService.getRmNamesrvAddr(rocketMqClientDto);
        if (!rmNamesrvAddr.isSuccess()) {
            this.logger.error("获取 rocketMq nameServer 地址失败，message=" + rmNamesrvAddr.getMessage());
            return;
        }
        String data = rmNamesrvAddr.getData();
        if (this.batchMaxSize == null || this.batchMaxSize.intValue() < 1) {
            this.batchMaxSize = 1;
        }
        if (StringUtils.isEmpty(this.mqConsumerGroup) || StringUtils.isEmpty(data) || StringUtils.isEmpty(this.topic)) {
            this.logger.error("缺少初始化参数不能为空。。。。。。");
            return;
        }
        this.logger.info("DefaultMQPushConsumer " + this.mqConsumerGroup + " initialize!");
        this.logger.info("mqConsumerGroup:" + this.mqConsumerGroup);
        this.logger.info("namesrvAddr:" + data);
        this.logger.info("topic:" + this.topic);
        this.logger.info("tag:" + this.tag);
        this.logger.info("messageModel:" + getMessageModelByCN(this.messageModel));
        if (consumers.containsKey(this.mqConsumerGroup)) {
            this.logger.info(this.mqConsumerGroup + "的 consumer 已经创建，直接返回，建议检查配置是否重复");
            this.defaultMQPushConsumer = consumers.get(this.mqConsumerGroup);
            return;
        }
        if (this.processor == null || this.consumeErrorService == null) {
            this.logger.error("消息的客户端没有配置processor或者consumeErrorService");
            return;
        }
        this.defaultMQPushConsumer = new DefaultMQPushConsumer(this.mqConsumerGroup);
        if (consumers.putIfAbsent(this.mqConsumerGroup, this.defaultMQPushConsumer) != null) {
            this.logger.info(this.mqConsumerGroup + "的 consumer 已经创建，直接返回，建议检查配置是否重复");
            this.defaultMQPushConsumer = consumers.get(this.mqConsumerGroup);
            return;
        }
        this.defaultMQPushConsumer.setNamesrvAddr(data);
        if (StringUtils.isEmpty(this.tag)) {
            this.tag = "*";
        }
        this.defaultMQPushConsumer.subscribe(this.topic, this.tag);
        this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        if (StringUtils.isEmpty(this.messageModel)) {
            this.defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        } else {
            this.defaultMQPushConsumer.setMessageModel(getMessageModelByCN(this.messageModel));
        }
        this.defaultMQPushConsumer.setConsumeMessageBatchMaxSize(this.batchMaxSize.intValue());
        this.defaultMQPushConsumer.setVipChannelEnabled(false);
        this.defaultMQPushConsumer.setUnitName(this.rocketMqName + "_" + this.mqConsumerGroup);
        this.defaultMQPushConsumer.registerMessageListener(messageListenerConcurrently);
        if (this.consumeThreadMax != null) {
            this.defaultMQPushConsumer.setConsumeThreadMax(this.consumeThreadMax.intValue());
        }
        if (this.consumeThreadMin != null) {
            this.defaultMQPushConsumer.setConsumeThreadMin(this.consumeThreadMin.intValue());
        }
        this.defaultMQPushConsumer.start();
        this.logger.info("DefaultMQPushConsumer: " + this.mqConsumerGroup + " start success!");
    }

    public void start() throws MQClientException, InterruptedException {
        if (this.processor == null) {
            throw new KangarooConsumerException("没有处理消息的processor");
        }
        init(new MessageListenerConcurrently() { // from class: com.beiming.pigeons.api.consumer.rocketmq.RocketConsumerClient.1
            @Override // org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                if (CollectionUtils.isEmpty(list)) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                Iterator<MessageExt> it = list.iterator();
                while (it.hasNext()) {
                    consumeMessage(it.next());
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }

            private void consumeMessage(MessageExt messageExt) {
                String keys = messageExt.getKeys();
                TraceLogger.get().initialize();
                ActionLoggerImpl initActionLogger = RocketConsumerClient.this.initActionLogger(messageExt, PlatformConfig.getPlatform());
                StopWatch stopWatch = new StopWatch();
                RocketConsumerClient.this.logger.info("======= begin request processing =======");
                try {
                    try {
                        String processMsg = RocketConsumerClient.this.processMsg(messageExt, initActionLogger);
                        if (StringUtils.isNotBlank(processMsg)) {
                            if (processMsg.contains("成功") || processMsg.contains(MonitorService.SUCCESS)) {
                                RocketConsumerClient.this.logger.warn("如果消息处理成功，topic={}, msgKey={}，方法不需要返回处理结果, 处理结果为={}", RocketConsumerClient.this.topic, keys, processMsg);
                            } else if ("SmsAndPushMsgTopic".equalsIgnoreCase(RocketConsumerClient.this.topic) && processMsg.contains("模板")) {
                                RocketConsumerClient.this.logger.warn("消息处理失败，topic={}, msgKey={}, errorReason={}", RocketConsumerClient.this.topic, keys, processMsg);
                            } else {
                                RocketConsumerClient.this.dealConsumeResult(messageExt, processMsg, false);
                            }
                        }
                        RocketConsumerClient.this.logger.info("消息处理完毕，处理结果={}，耗时={}ms.", processMsg, Long.valueOf(stopWatch.elapsedTime()));
                        RocketConsumerClient.this.logger.info("======= finish request processing =======\r\n");
                        initActionLogger.currentActionLog().setElapsedTime(stopWatch.elapsedTime());
                        TraceLogger.get().cleanup(true);
                        RequestIdUtils.removeRequestId();
                        initActionLogger.save();
                    } catch (Exception e) {
                        RocketConsumerClient.this.logger.error(RocketConsumerClient.this.mqConsumerGroup + "消费者消费消息失败, msgKey=" + keys + ", 错误message=" + e.getMessage(), (Throwable) e);
                        String str = e.getClass().getSimpleName() + ":" + e.getMessage();
                        RocketConsumerClient.this.dealConsumeResult(messageExt, str, true);
                        RocketConsumerClient.this.logger.info("消息处理完毕，处理结果={}，耗时={}ms.", str, Long.valueOf(stopWatch.elapsedTime()));
                        RocketConsumerClient.this.logger.info("======= finish request processing =======\r\n");
                        initActionLogger.currentActionLog().setElapsedTime(stopWatch.elapsedTime());
                        TraceLogger.get().cleanup(true);
                        RequestIdUtils.removeRequestId();
                        initActionLogger.save();
                    }
                } catch (Throwable th) {
                    RocketConsumerClient.this.logger.info("消息处理完毕，处理结果={}，耗时={}ms.", "", Long.valueOf(stopWatch.elapsedTime()));
                    RocketConsumerClient.this.logger.info("======= finish request processing =======\r\n");
                    initActionLogger.currentActionLog().setElapsedTime(stopWatch.elapsedTime());
                    TraceLogger.get().cleanup(true);
                    RequestIdUtils.removeRequestId();
                    initActionLogger.save();
                    throw th;
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <M> String processMsg(MessageExt messageExt, ActionLoggerImpl actionLoggerImpl) throws ClassNotFoundException {
        Class<?> cls = this.processor.getClass();
        if (this.processor instanceof SpringProxy) {
            cls = this.processor.getClass().getSuperclass();
            if (cls.equals(Proxy.class)) {
                cls = Class.forName(this.processor.toString().split("@")[0]);
            }
        }
        Type[] genericInterfaces = cls.getGenericInterfaces();
        Class cls2 = null;
        int length = genericInterfaces.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            Type type = genericInterfaces[i];
            if ((type instanceof ParameterizedType) && ((ParameterizedType) type).getRawType().equals(RocketMsgProcessor.class)) {
                cls2 = (Class) ((ParameterizedType) type).getActualTypeArguments()[0];
                break;
            }
            i++;
        }
        if (cls2 == null) {
            throw new KangarooConsumerException("json要转换的class为空， 父类的泛型types为" + genericInterfaces.toString());
        }
        String simpleName = ClassUtils.getSimpleName(cls);
        actionLoggerImpl.currentActionLog().setAction(simpleName);
        MDC.put(LogConstants.ACTION, simpleName);
        Object deserialize = JsonSerializeUtil.deserialize(messageExt.getBody(), cls2);
        String jSONString = JSONObject.toJSONString(deserialize);
        this.logger.info("处理消息，处理该消息的类为={}，topic={}, msgKey={}, 消息={}", this.processor.getClass().getName(), this.topic, messageExt.getKeys(), jSONString);
        actionLoggerImpl.currentActionLog().setRequestContent(jSONString);
        String process = this.processor.process(deserialize);
        if (StringUtils.isNotBlank(process)) {
            this.logger.error("消息消费结果:{}", process);
        }
        return process;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dealConsumeResult(MessageExt messageExt, String str, boolean z) {
        if (this.consumeErrorService == null) {
            this.logger.error("consumeErrorService为空将不能把错误消息发送到消息系统，请在spring配置文件中配置consumerErrorService");
            return;
        }
        try {
            this.logger.info("将错误的消息写入数据库，是否自动重试={}", Boolean.valueOf(z));
            ConsumeErrorDto consumeErrorDto = new ConsumeErrorDto();
            consumeErrorDto.setClientIp(this.localIp);
            consumeErrorDto.setErrorReason(str);
            Field declaredField = MessageExt.class.getDeclaredField("msgId");
            declaredField.setAccessible(true);
            consumeErrorDto.setMqMsgId((String) declaredField.get(messageExt));
            consumeErrorDto.setTopic(messageExt.getTopic());
            consumeErrorDto.setRetry(z);
            consumeErrorDto.setRocketMqName(this.rocketMqName);
            consumeErrorDto.setGroupName(this.mqConsumerGroup);
            consumeErrorDto.setRequestId(messageExt.getProperty(KangarooConstants.ROCKET_REQUEST_ID_NAME));
            consumeErrorDto.setMsgSourcePlatform(messageExt.getProperty(KangarooConstants.PLATFORM_NAME));
            this.consumeErrorService.consumeError(consumeErrorDto);
        } catch (IllegalAccessException e) {
            this.logger.error("通过dubbo发送异常消息到消息系统失败", (Throwable) e);
        } catch (NoSuchFieldException e2) {
            this.logger.error("通过dubbo发送异常消息到消息系统失败", (Throwable) e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ActionLoggerImpl initActionLogger(MessageExt messageExt, String str) {
        ActionLoggerImpl actionLoggerImpl = ActionLoggerImpl.get();
        actionLoggerImpl.initialize();
        actionLoggerImpl.currentActionLog().setSourcePlatform(messageExt.getProperty(KangarooConstants.PLATFORM_NAME));
        actionLoggerImpl.currentActionLog().setCurrentPlatform(str);
        String property = messageExt.getProperty(KangarooConstants.ROCKET_REQUEST_ID_NAME);
        if (StringUtils.isBlank(property)) {
            property = RequestIdUtils.generateDefaultRequestId();
        }
        MDC.put(LogConstants.REQUEST_ID, property);
        this.logger.info("requestId={}", property);
        RequestIdUtils.setRequestId(property);
        actionLoggerImpl.currentActionLog().setRequestId(property);
        actionLoggerImpl.currentActionLog().setClientIP(messageExt.getBornHostString());
        actionLoggerImpl.currentActionLog().setServerIP(new RemoteAddressUtil().getServerIP());
        actionLoggerImpl.currentActionLog().setRequestType(RequestTypeEnums.MQ);
        return actionLoggerImpl;
    }

    private DefaultMQPushConsumer getDefaultMQPushConsumer() {
        return this.defaultMQPushConsumer;
    }

    @Override // org.springframework.context.ApplicationListener
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        new Timer().schedule(new TimerTask() { // from class: com.beiming.pigeons.api.consumer.rocketmq.RocketConsumerClient.2
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    RocketConsumerClient.this.start();
                } catch (Throwable th) {
                    RocketConsumerClient.this.logger.error(RocketConsumerClient.this.mqConsumerGroup + "消息系统客户端启动异常", th);
                    boolean z = true;
                    for (int i = 0; i < 5 && z; i++) {
                        z = isStartError(z, i);
                    }
                    if (z) {
                        RocketConsumerClient.this.logger.error("5次重新启动consumerClient:" + RocketConsumerClient.this.mqConsumerGroup + "仍然失败");
                    }
                }
            }

            private boolean isStartError(boolean z, int i) {
                try {
                    Thread.sleep(3000L);
                    RocketConsumerClient.this.start();
                    z = false;
                    RocketConsumerClient.this.logger.info("重新启动consumerClient" + RocketConsumerClient.this.mqConsumerGroup + ", 重新启动成功");
                } catch (Exception e) {
                    RocketConsumerClient.this.logger.error(RocketConsumerClient.this.mqConsumerGroup + " consumerClient 第" + (i + 1) + "次重新启动失败", (Throwable) e);
                }
                return z;
            }
        }, 500L);
    }

    @Override // org.springframework.beans.factory.DisposableBean
    public void destroy() {
        try {
            if (this.defaultMQPushConsumer != null) {
                this.defaultMQPushConsumer.shutdown();
            }
        } catch (Throwable th) {
            this.logger.error("RConsumer defaultMQPushConsumer " + this.mqConsumerGroup + " destroy exception!", th);
        }
        this.logger.info("RConsumer defaultMQPushConsumer " + this.mqConsumerGroup + " destroy success");
    }

    public String getTag() {
        return this.tag;
    }

    public void setMqConsumerGroup(String str) {
        this.mqConsumerGroup = str;
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    public void setTag(String str) {
        this.tag = str;
    }

    public void setBatchMaxSize(int i) {
        this.batchMaxSize = Integer.valueOf(i);
    }

    public void setConsumeThreadMax(Integer num) {
        this.consumeThreadMax = num;
    }

    public void setConsumeThreadMin(Integer num) {
        this.consumeThreadMin = num;
    }

    public void setMessageModel(String str) {
        this.messageModel = str;
    }

    public void setRocketMqName(String str) {
        this.rocketMqName = str;
    }

    public void setRocketMqAddrService(RocketMqAddrService rocketMqAddrService) {
        this.rocketMqAddrService = rocketMqAddrService;
    }

    public void setConsumeErrorService(ConsumeErrorService consumeErrorService) {
        this.consumeErrorService = consumeErrorService;
    }

    public void setQueueNum(Integer num) {
        this.queueNum = num;
    }

    public void setBatchMaxSize(Integer num) {
        this.batchMaxSize = num;
    }

    public void setProcessor(RocketMsgProcessor rocketMsgProcessor) {
        this.processor = rocketMsgProcessor;
    }

    private MessageModel getMessageModelByCN(String str) {
        for (MessageModel messageModel : MessageModel.values()) {
            if (messageModel.getModeCN().equals(str)) {
                return messageModel;
            }
        }
        return MessageModel.CLUSTERING;
    }
}
