/*
 * Decompiled with CFR 0.152.
 */
package com.beiming.pigeons.api.consumer.rocketmq;

import com.alibaba.fastjson.JSONObject;
import com.beiming.framework.domain.DubboResult;
import com.beiming.framework.domain.PlatformConfig;
import com.beiming.framework.enums.RequestTypeEnums;
import com.beiming.framework.log.ActionLoggerImpl;
import com.beiming.framework.log.TraceLogger;
import com.beiming.framework.util.RemoteAddressUtil;
import com.beiming.framework.util.RequestIdUtils;
import com.beiming.pigeons.api.consumer.ConsumeErrorDto;
import com.beiming.pigeons.api.consumer.ConsumeErrorService;
import com.beiming.pigeons.api.consumer.rocketmq.RocketMsgProcessor;
import com.beiming.pigeons.api.exception.KangarooConsumerException;
import com.beiming.pigeons.api.rocketmq.RocketMqAddrService;
import com.beiming.pigeons.api.rocketmq.RocketMqClientDto;
import com.beiming.pigeons.api.utils.InetUtils;
import com.beiming.pigeons.api.utils.JsonSerializeUtil;
import com.beiming.pigeons.api.utils.StopWatch;
import com.google.common.collect.Maps;
import java.lang.reflect.Field;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Proxy;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.aop.SpringProxy;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;

public class RocketConsumerClient
implements ApplicationListener<ContextRefreshedEvent>,
DisposableBean {
    private final Logger logger = LoggerFactory.getLogger(RocketConsumerClient.class);
    private DefaultMQPushConsumer defaultMQPushConsumer;
    private String mqConsumerGroup;
    private String topic;
    private String rocketMqName;
    private RocketMqAddrService rocketMqAddrService;
    private ConsumeErrorService consumeErrorService;
    private RocketMsgProcessor processor;
    private static ConcurrentMap<String, DefaultMQPushConsumer> consumers = Maps.newConcurrentMap();
    String localIp = InetUtils.getLocalHostIp();
    private String messageModel;
    private String tag;
    private Integer batchMaxSize;
    private Integer consumeThreadMax;
    private Integer consumeThreadMin;
    private Integer queueNum;

    private void init(MessageListenerConcurrently msgListener) throws InterruptedException, MQClientException {
        RocketMqClientDto clientDto = new RocketMqClientDto();
        clientDto.setClientIp(this.localIp);
        clientDto.setClientGroup(this.mqConsumerGroup);
        clientDto.setRocketMqName(this.rocketMqName);
        clientDto.setTopic(this.topic);
        clientDto.setClientType("CONSUMER");
        clientDto.setQueueNum(this.queueNum);
        DubboResult<String> namesrvAddrResult = this.rocketMqAddrService.getRmNamesrvAddr(clientDto);
        if (!namesrvAddrResult.isSuccess()) {
            this.logger.error("\u83b7\u53d6 rocketMq nameServer \u5730\u5740\u5931\u8d25\uff0cmessage=" + namesrvAddrResult.getMessage());
            return;
        }
        String namesrvAddr = (String)((Object)namesrvAddrResult.getData());
        if (this.batchMaxSize == null || this.batchMaxSize < 1) {
            this.batchMaxSize = 1;
        }
        if (StringUtils.isEmpty((CharSequence)this.mqConsumerGroup) || StringUtils.isEmpty((CharSequence)namesrvAddr) || StringUtils.isEmpty((CharSequence)this.topic)) {
            this.logger.error("\u7f3a\u5c11\u521d\u59cb\u5316\u53c2\u6570\u4e0d\u80fd\u4e3a\u7a7a\u3002\u3002\u3002\u3002\u3002\u3002");
            return;
        }
        this.logger.info("DefaultMQPushConsumer " + this.mqConsumerGroup + " initialize!");
        this.logger.info("mqConsumerGroup:" + this.mqConsumerGroup);
        this.logger.info("namesrvAddr:" + namesrvAddr);
        this.logger.info("topic:" + this.topic);
        this.logger.info("tag:" + this.tag);
        this.logger.info("messageModel:" + this.getMessageModelByCN(this.messageModel));
        if (consumers.containsKey(this.mqConsumerGroup)) {
            this.logger.info(this.mqConsumerGroup + "\u7684 consumer \u5df2\u7ecf\u521b\u5efa\uff0c\u76f4\u63a5\u8fd4\u56de\uff0c\u5efa\u8bae\u68c0\u67e5\u914d\u7f6e\u662f\u5426\u91cd\u590d");
            this.defaultMQPushConsumer = (DefaultMQPushConsumer)consumers.get(this.mqConsumerGroup);
            return;
        }
        if (this.processor == null || this.consumeErrorService == null) {
            this.logger.error("\u6d88\u606f\u7684\u5ba2\u6237\u7aef\u6ca1\u6709\u914d\u7f6eprocessor\u6216\u8005consumeErrorService");
            return;
        }
        this.defaultMQPushConsumer = new DefaultMQPushConsumer(this.mqConsumerGroup);
        if (consumers.putIfAbsent(this.mqConsumerGroup, this.defaultMQPushConsumer) != null) {
            this.logger.info(this.mqConsumerGroup + "\u7684 consumer \u5df2\u7ecf\u521b\u5efa\uff0c\u76f4\u63a5\u8fd4\u56de\uff0c\u5efa\u8bae\u68c0\u67e5\u914d\u7f6e\u662f\u5426\u91cd\u590d");
            this.defaultMQPushConsumer = (DefaultMQPushConsumer)consumers.get(this.mqConsumerGroup);
            return;
        }
        this.defaultMQPushConsumer.setNamesrvAddr(namesrvAddr);
        if (StringUtils.isEmpty((CharSequence)this.tag)) {
            this.tag = "*";
        }
        this.defaultMQPushConsumer.subscribe(this.topic, this.tag);
        this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        if (StringUtils.isEmpty((CharSequence)this.messageModel)) {
            this.defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        } else {
            this.defaultMQPushConsumer.setMessageModel(this.getMessageModelByCN(this.messageModel));
        }
        this.defaultMQPushConsumer.setConsumeMessageBatchMaxSize(this.batchMaxSize.intValue());
        this.defaultMQPushConsumer.setVipChannelEnabled(false);
        this.defaultMQPushConsumer.setUnitName(this.rocketMqName + "_" + this.mqConsumerGroup);
        this.defaultMQPushConsumer.registerMessageListener(msgListener);
        if (this.consumeThreadMax != null) {
            this.defaultMQPushConsumer.setConsumeThreadMax(this.consumeThreadMax.intValue());
        }
        if (this.consumeThreadMin != null) {
            this.defaultMQPushConsumer.setConsumeThreadMin(this.consumeThreadMin.intValue());
        }
        this.defaultMQPushConsumer.start();
        this.logger.info("DefaultMQPushConsumer: " + this.mqConsumerGroup + " start success!");
    }

    public void start() throws MQClientException, InterruptedException {
        if (this.processor == null) {
            throw new KangarooConsumerException("\u6ca1\u6709\u5904\u7406\u6d88\u606f\u7684processor");
        }
        this.init(new MessageListenerConcurrently(){

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                if (CollectionUtils.isEmpty(messageExtList)) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                for (MessageExt messageExt : messageExtList) {
                    this.consumeMessage(messageExt);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private void consumeMessage(MessageExt messageExt) {
                String msgKey = messageExt.getKeys();
                boolean retry = false;
                String errorReason = "";
                TraceLogger.get().initialize();
                ActionLoggerImpl actionLogger = RocketConsumerClient.this.initActionLogger(messageExt, PlatformConfig.getPlatform());
                StopWatch stopWatch = new StopWatch();
                RocketConsumerClient.this.logger.info("======= begin request processing =======");
                try {
                    errorReason = RocketConsumerClient.this.processMsg(messageExt, actionLogger);
                    RocketConsumerClient.this.logger.info("=======" + errorReason + "=======");
                    if (StringUtils.isNotBlank((CharSequence)errorReason)) {
                        if (errorReason.contains("\u6210\u529f") || errorReason.contains("success")) {
                            RocketConsumerClient.this.logger.info("\u5982\u679c\u6d88\u606f\u5904\u7406\u6210\u529f\uff0ctopic={}, msgKey={}\uff0c\u65b9\u6cd5\u4e0d\u9700\u8981\u8fd4\u56de\u5904\u7406\u7ed3\u679c, \u5904\u7406\u7ed3\u679c\u4e3a={}", new Object[]{RocketConsumerClient.this.topic, msgKey, errorReason});
                        } else if ("SmsAndPushMsgTopic".equalsIgnoreCase(RocketConsumerClient.this.topic) && errorReason.contains("\u6a21\u677f")) {
                            RocketConsumerClient.this.logger.info("\u6d88\u606f\u5904\u7406\u5931\u8d25\uff0ctopic={}, msgKey={}, errorReason={}", new Object[]{RocketConsumerClient.this.topic, msgKey, errorReason});
                        } else {
                            RocketConsumerClient.this.dealConsumeResult(messageExt, errorReason, retry);
                        }
                    }
                }
                catch (Exception e) {
                    RocketConsumerClient.this.logger.info(RocketConsumerClient.this.mqConsumerGroup + "\u6d88\u8d39\u8005\u6d88\u8d39\u6d88\u606f\u5931\u8d25, msgKey=" + msgKey + ", \u9519\u8befmessage=" + e.getMessage(), (Throwable)e);
                    errorReason = e.getClass().getSimpleName() + ":" + e.getMessage();
                    retry = true;
                    RocketConsumerClient.this.dealConsumeResult(messageExt, errorReason, retry);
                }
                finally {
                    RocketConsumerClient.this.logger.info("\u6d88\u606f\u5904\u7406\u5b8c\u6bd5\uff0c\u5904\u7406\u7ed3\u679c={}\uff0c\u8017\u65f6={}ms.", (Object)errorReason, (Object)stopWatch.elapsedTime());
                    RocketConsumerClient.this.logger.info("======= finish request processing =======\r\n");
                    actionLogger.currentActionLog().setElapsedTime(stopWatch.elapsedTime());
                    TraceLogger.get().cleanup(true);
                    RequestIdUtils.removeRequestId();
                    actionLogger.save();
                }
            }
        });
    }

    private <M> String processMsg(MessageExt msg, ActionLoggerImpl actionLogger) throws ClassNotFoundException {
        Class<?> originClazz = this.processor.getClass();
        if (this.processor instanceof SpringProxy && (originClazz = this.processor.getClass().getSuperclass()).equals(Proxy.class)) {
            String processorProxyName = this.processor.toString();
            String originClazzName = processorProxyName.split("@")[0];
            originClazz = Class.forName(originClazzName);
        }
        Type[] types = originClazz.getGenericInterfaces();
        Class clazz = null;
        for (Type type : types) {
            if (!(type instanceof ParameterizedType) || !((ParameterizedType)type).getRawType().equals(RocketMsgProcessor.class)) continue;
            clazz = (Class)((ParameterizedType)type).getActualTypeArguments()[0];
            break;
        }
        if (clazz != null) {
            String action = ClassUtils.getSimpleName(originClazz);
            actionLogger.currentActionLog().setAction(action);
            MDC.put((String)"ACTION", (String)action);
            Object messageData = JsonSerializeUtil.deserialize(msg.getBody(), clazz);
            String messageContent = JSONObject.toJSONString(messageData);
            this.logger.info("\u5904\u7406\u6d88\u606f\uff0c\u5904\u7406\u8be5\u6d88\u606f\u7684\u7c7b\u4e3a={}\uff0ctopic={}, msgKey={}, \u6d88\u606f={}", new Object[]{this.processor.getClass().getName(), this.topic, msg.getKeys(), messageContent});
            actionLogger.currentActionLog().setRequestContent(messageContent);
            String result = this.processor.process(messageData);
            if (StringUtils.isNotBlank((CharSequence)result)) {
                this.logger.error("\u6d88\u606f\u6d88\u8d39\u7ed3\u679c:{}", (Object)result);
            }
            return result;
        }
        throw new KangarooConsumerException("json\u8981\u8f6c\u6362\u7684class\u4e3a\u7a7a\uff0c \u7236\u7c7b\u7684\u6cdb\u578btypes\u4e3a" + types.toString());
    }

    private void dealConsumeResult(MessageExt messageExt, String errorReason, boolean retry) {
        if (this.consumeErrorService == null) {
            this.logger.info("consumeErrorService\u4e3a\u7a7a\u5c06\u4e0d\u80fd\u628a\u9519\u8bef\u6d88\u606f\u53d1\u9001\u5230\u6d88\u606f\u7cfb\u7edf\uff0c\u8bf7\u5728spring\u914d\u7f6e\u6587\u4ef6\u4e2d\u914d\u7f6econsumerErrorService");
            return;
        }
        try {
            this.logger.info("\u5c06\u9519\u8bef\u7684\u6d88\u606f\u5199\u5165\u6570\u636e\u5e93\uff0c\u662f\u5426\u81ea\u52a8\u91cd\u8bd5={}", (Object)retry);
            ConsumeErrorDto consumeErrorDto = new ConsumeErrorDto();
            consumeErrorDto.setClientIp(this.localIp);
            consumeErrorDto.setErrorReason(errorReason);
            Field msgIdField = MessageExt.class.getDeclaredField("msgId");
            msgIdField.setAccessible(true);
            String msgId = (String)msgIdField.get(messageExt);
            consumeErrorDto.setMqMsgId(msgId);
            consumeErrorDto.setTopic(messageExt.getTopic());
            consumeErrorDto.setRetry(retry);
            consumeErrorDto.setRocketMqName(this.rocketMqName);
            consumeErrorDto.setGroupName(this.mqConsumerGroup);
            consumeErrorDto.setRequestId(messageExt.getProperty("requestId"));
            consumeErrorDto.setMsgSourcePlatform(messageExt.getProperty("PLATFORM"));
            this.consumeErrorService.consumeError(consumeErrorDto);
        }
        catch (NoSuchFieldException e) {
            this.logger.error("\u901a\u8fc7dubbo\u53d1\u9001\u5f02\u5e38\u6d88\u606f\u5230\u6d88\u606f\u7cfb\u7edf\u5931\u8d25", (Throwable)e);
        }
        catch (IllegalAccessException e) {
            this.logger.error("\u901a\u8fc7dubbo\u53d1\u9001\u5f02\u5e38\u6d88\u606f\u5230\u6d88\u606f\u7cfb\u7edf\u5931\u8d25", (Throwable)e);
        }
    }

    private ActionLoggerImpl initActionLogger(MessageExt msg, String platform) {
        ActionLoggerImpl actionLogger = ActionLoggerImpl.get();
        actionLogger.initialize();
        String sourcePlatform = msg.getProperty("PLATFORM");
        actionLogger.currentActionLog().setSourcePlatform(sourcePlatform);
        actionLogger.currentActionLog().setCurrentPlatform(platform);
        String requestId = msg.getProperty("requestId");
        if (StringUtils.isBlank((CharSequence)requestId)) {
            requestId = RequestIdUtils.generateDefaultRequestId();
        }
        MDC.put((String)"REQUEST_ID", (String)requestId);
        this.logger.info("requestId={}", (Object)requestId);
        RequestIdUtils.setRequestId((String)requestId);
        actionLogger.currentActionLog().setRequestId(requestId);
        actionLogger.currentActionLog().setClientIP(msg.getBornHostString());
        actionLogger.currentActionLog().setServerIP(new RemoteAddressUtil().getServerIP());
        actionLogger.currentActionLog().setRequestType(RequestTypeEnums.MQ);
        return actionLogger;
    }

    private DefaultMQPushConsumer getDefaultMQPushConsumer() {
        return this.defaultMQPushConsumer;
    }

    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        Timer timer = new Timer();
        timer.schedule(new TimerTask(){

            @Override
            public void run() {
                block3: {
                    try {
                        RocketConsumerClient.this.start();
                    }
                    catch (Throwable e) {
                        RocketConsumerClient.this.logger.error(RocketConsumerClient.this.mqConsumerGroup + "\u6d88\u606f\u7cfb\u7edf\u5ba2\u6237\u7aef\u542f\u52a8\u5f02\u5e38", e);
                        boolean startError = true;
                        for (int i = 0; i < 5 && startError; ++i) {
                            startError = this.isStartError(startError, i);
                        }
                        if (!startError) break block3;
                        RocketConsumerClient.this.logger.error("5\u6b21\u91cd\u65b0\u542f\u52a8consumerClient:" + RocketConsumerClient.this.mqConsumerGroup + "\u4ecd\u7136\u5931\u8d25");
                    }
                }
            }

            private boolean isStartError(boolean startError, int i) {
                try {
                    Thread.sleep(3000L);
                    RocketConsumerClient.this.start();
                    startError = false;
                    RocketConsumerClient.this.logger.info("\u91cd\u65b0\u542f\u52a8consumerClient" + RocketConsumerClient.this.mqConsumerGroup + ", \u91cd\u65b0\u542f\u52a8\u6210\u529f");
                }
                catch (Exception e1) {
                    RocketConsumerClient.this.logger.error(RocketConsumerClient.this.mqConsumerGroup + " consumerClient \u7b2c" + (i + 1) + "\u6b21\u91cd\u65b0\u542f\u52a8\u5931\u8d25", (Throwable)e1);
                }
                return startError;
            }
        }, 500L);
    }

    public void destroy() {
        try {
            if (this.defaultMQPushConsumer != null) {
                this.defaultMQPushConsumer.shutdown();
            }
        }
        catch (Throwable t) {
            this.logger.error("RConsumer defaultMQPushConsumer " + this.mqConsumerGroup + " destroy exception!", t);
        }
        this.logger.info("RConsumer defaultMQPushConsumer " + this.mqConsumerGroup + " destroy success");
    }

    public String getTag() {
        return this.tag;
    }

    public void setMqConsumerGroup(String mqConsumerGroup) {
        this.mqConsumerGroup = mqConsumerGroup;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public void setTag(String tag) {
        this.tag = tag;
    }

    public void setBatchMaxSize(int batchMaxSize) {
        this.batchMaxSize = batchMaxSize;
    }

    public void setConsumeThreadMax(Integer consumeThreadMax) {
        this.consumeThreadMax = consumeThreadMax;
    }

    public void setConsumeThreadMin(Integer consumeThreadMin) {
        this.consumeThreadMin = consumeThreadMin;
    }

    public void setMessageModel(String messageModel) {
        this.messageModel = messageModel;
    }

    public void setRocketMqName(String rocketMqName) {
        this.rocketMqName = rocketMqName;
    }

    public void setRocketMqAddrService(RocketMqAddrService rocketMqAddrService) {
        this.rocketMqAddrService = rocketMqAddrService;
    }

    public void setConsumeErrorService(ConsumeErrorService consumeErrorService) {
        this.consumeErrorService = consumeErrorService;
    }

    public void setQueueNum(Integer queueNum) {
        this.queueNum = queueNum;
    }

    public void setBatchMaxSize(Integer batchMaxSize) {
        this.batchMaxSize = batchMaxSize;
    }

    public void setProcessor(RocketMsgProcessor processor) {
        this.processor = processor;
    }

    private MessageModel getMessageModelByCN(String modeCN) {
        MessageModel[] messageModels;
        for (MessageModel messageModel : messageModels = MessageModel.values()) {
            if (!messageModel.getModeCN().equals(modeCN)) continue;
            return messageModel;
        }
        return MessageModel.CLUSTERING;
    }
}

