package com.beiming.pigeons.exchange.service.impl;

import com.beiming.framework.domain.DubboResult;
import com.beiming.framework.domain.DubboResultBuilder;
import com.beiming.framework.enums.DubboResultCodeEnums;
import com.beiming.framework.util.UUIDUtils;
import com.beiming.pigeons.common.enums.ClientType;
import com.beiming.pigeons.common.enums.DeliverType;
import com.beiming.pigeons.common.enums.HandleType;
import com.beiming.pigeons.domain.message.BasicMessage;
import com.beiming.pigeons.domain.message.MessageRelation;
import com.beiming.pigeons.domain.message.MessageTopic;
import com.beiming.pigeons.domain.message.NotifyMessage;
import com.beiming.pigeons.exchange.service.MessageExchangeService;
import com.beiming.pigeons.service.MessageRelationService;
import com.beiming.pigeons.service.MessageRetryTimeService;
import com.beiming.pigeons.service.MessageTopicService;
import com.beiming.pigeons.service.NotifyMessageService;
import com.beiming.pigeons.service.listener.KangarooMessageListener;
import com.google.common.base.Joiner;
import java.util.Date;
import javax.annotation.Resource;
import javax.transaction.Transactional;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopContext;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/beiming/pigeons/exchange/service/impl/MessageExchangeServiceImpl.class */
public class MessageExchangeServiceImpl implements MessageExchangeService {
    private static Logger logger = LoggerFactory.getLogger(MessageExchangeService.class);

    @Resource
    private NotifyMessageService notifyMessageService;

    @Resource
    private MessageTopicService messageTopicService;

    @Resource
    private KangarooMessageListener kangarooMessageListener;

    @Resource
    private MessageRelationService messageRelationService;

    @Resource
    private MessageRetryTimeService messageRetryTimeService;

    @Resource
    private RedisTemplate redisTemplate;
    private volatile MessageExchangeServiceImpl proxy;

    @Override // com.beiming.pigeons.exchange.service.MessageExchangeService
    public DubboResult exchange(BasicMessage basicMessage) {
        long currentTimeMillis = System.currentTimeMillis();
        String str = null;
        if (StringUtils.isEmpty(basicMessage.getTopic())) {
            str = "topic is null";
        }
        if (StringUtils.isEmpty(basicMessage.getReceiverAddress())) {
            str = "ReceiverAddress is null";
        }
        if (null == basicMessage.getDeliverType()) {
            str = "deliverType is null";
        }
        if (DeliverType.HESSIAN_RPC.equalsValue(basicMessage.getDeliverType())) {
            if (!basicMessage.getReceiverAddress().contains("$$")) {
                str = "ReceiverAddress is not correct";
            }
        } else if (DeliverType.HTTP.equalsValue(basicMessage.getDeliverType()) && !basicMessage.getReceiverAddress().contains("http")) {
            str = "ReceiverAddress is not correct";
        }
        if (basicMessage.getReceiverParam() == null) {
            str = "ReceiverParam is null";
        }
        if (str != null) {
            return DubboResultBuilder.error(DubboResultCodeEnums.PARAM_ERROR.value(), str);
        }
        NotifyMessage notifyMessage = new NotifyMessage();
        notifyMessage.setStatus(0);
        notifyMessage.setSourcePlatform(basicMessage.getSourcePlatform());
        long currentTimeMillis2 = System.currentTimeMillis();
        notifyMessage.setCreateAt(Long.valueOf(currentTimeMillis2));
        notifyMessage.setUpdateAt(Long.valueOf(currentTimeMillis2));
        notifyMessage.setRequestId(basicMessage.getRequestId());
        notifyMessage.setSendNum(0);
        notifyMessage.setLastSendAt(Long.valueOf(currentTimeMillis2));
        notifyMessage.setDeliverType(basicMessage.getDeliverType());
        notifyMessage.setKeyword(basicMessage.getKeyword());
        notifyMessage.setCallbackAddress(basicMessage.getCallbackAddress());
        notifyMessage.setReceiverAddress(basicMessage.getReceiverAddress());
        notifyMessage.setReceiverParam(basicMessage.getReceiverParam());
        notifyMessage.setTopic(basicMessage.getTopic());
        notifyMessage.setClientIp(basicMessage.getClientIp());
        if (StringUtils.isNotEmpty(basicMessage.getErrorReason())) {
            String errorReason = basicMessage.getErrorReason();
            if (basicMessage.getErrorReason().length() > 450) {
                errorReason = errorReason.substring(0, 450);
            }
            notifyMessage.setErrorReason(errorReason);
        }
        notifyMessage.setGroupName(basicMessage.getGroupName());
        notifyMessage.setClientType(basicMessage.getClientType());
        notifyMessage.setMqMsgId(basicMessage.getMqMsgId());
        notifyMessage.setTags(basicMessage.getTags());
        notifyMessage.setRetry(basicMessage.getRetry());
        if (DeliverType.ROCKET_MQ.equalsValue(notifyMessage.getDeliverType())) {
            saveRocketTypeMessage(notifyMessage);
        } else {
            saveNormalTypeMessage(notifyMessage);
        }
        long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
        if (currentTimeMillis3 > 100) {
            logger.info("msgId={}, 存储消息花费时间:{}", notifyMessage.getId(), Long.valueOf(currentTimeMillis3));
        }
        return DubboResultBuilder.success(basicMessage.getId());
    }

    private void saveRocketTypeMessage(NotifyMessage notifyMessage) {
        int saveMessageToDB = getProxy().saveMessageToDB(notifyMessage);
        if (needSaveToQueue(notifyMessage)) {
            Date deliverTime = this.messageRetryTimeService.getDeliverTime(saveMessageToDB);
            if (deliverTime == null) {
                logger.info("消息不再重发，msgKey={}, mqMsgId={}，level={}", new Object[]{notifyMessage.getKeyword(), notifyMessage.getMqMsgId(), Integer.valueOf(saveMessageToDB)});
                return;
            }
            logger.info("消息需要重发：msgKey={}, mqMsgId={}, 下次重发时间为={}", new Object[]{notifyMessage.getKeyword(), notifyMessage.getMqMsgId(), deliverTime});
            long time = deliverTime.getTime();
            String join = Joiner.on("_").join("msg_rocket_queue_set", notifyMessage.getReceiverAddress(), new Object[]{Integer.valueOf(saveMessageToDB)});
            this.redisTemplate.opsForZSet().add(join, Long.valueOf(time), time);
            String join2 = Joiner.on("_").join("msg_queue", notifyMessage.getReceiverAddress(), new Object[]{Integer.valueOf(saveMessageToDB), Long.valueOf(time)});
            logger.info("存储消息的重发队列， queueSetKey={}, queueId={}", join, join2);
            this.notifyMessageService.saveMessageToRedis(join2, notifyMessage);
        }
    }

    @Transactional
    public int saveMessageToDB(NotifyMessage notifyMessage) {
        boolean z = false;
        int i = 2;
        String str = null;
        if (needSaveToQueue(notifyMessage)) {
            notifyMessage.setHandleType(Integer.valueOf(HandleType.WAIT.getValue()));
        } else {
            notifyMessage.setHandleType(Integer.valueOf(HandleType.NO_NEED.getValue()));
        }
        String uuidWithoutSeparator = UUIDUtils.uuidWithoutSeparator();
        if (StringUtils.isEmpty(notifyMessage.getMqMsgId())) {
            z = true;
        } else {
            MessageRelation relationFromRedis = this.messageRelationService.getRelationFromRedis(notifyMessage.getMqMsgId());
            if (relationFromRedis == null) {
                z = true;
                MessageRelation messageRelation = new MessageRelation();
                messageRelation.setErrorReason(notifyMessage.getErrorReason());
                messageRelation.setMqMsgId(notifyMessage.getMqMsgId());
                messageRelation.setMsgId(uuidWithoutSeparator);
                messageRelation.setHandleType(notifyMessage.getHandleType());
                messageRelation.setOrderSeq(1);
                messageRelation.setCreateAt(new Date());
                this.messageRelationService.saveRelationToDB(messageRelation);
            } else {
                i = relationFromRedis.getOrderSeq().intValue() + 1;
                str = relationFromRedis.getMsgId();
                notifyMessage.setId(str);
                this.messageRelationService.updateErrorReason(relationFromRedis.getMqMsgId(), notifyMessage.getErrorReason());
                this.redisTemplate.opsForSet().remove("compensate_set", new Object[]{str});
            }
        }
        if (z) {
            notifyMessage.setId(uuidWithoutSeparator);
            this.notifyMessageService.saveToDB(notifyMessage);
        } else if (StringUtils.isNotEmpty(str)) {
            this.notifyMessageService.updateMqMsgError(str, notifyMessage.getErrorReason());
        }
        return i;
    }

    private boolean needSaveToQueue(NotifyMessage notifyMessage) {
        return ClientType.PRODUCER.equalsValue(notifyMessage.getClientType()) || (notifyMessage.getRetry() != null && notifyMessage.getRetry().booleanValue());
    }

    private void saveNormalTypeMessage(NotifyMessage notifyMessage) {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            this.notifyMessageService.saveToDB(notifyMessage);
            logger.info("save DB time====" + (System.currentTimeMillis() - currentTimeMillis));
            long currentTimeMillis2 = System.currentTimeMillis();
            boolean saveMessageToRedis = this.notifyMessageService.saveMessageToRedis(Joiner.on("_").join("msg_queue", "1", new Object[]{notifyMessage.getTopic()}), notifyMessage);
            logger.info("saveToRedis time====" + (System.currentTimeMillis() - currentTimeMillis2));
            if (!saveMessageToRedis) {
                MessageTopic messageTopic = new MessageTopic();
                messageTopic.setName(notifyMessage.getTopic());
                this.messageTopicService.saveToDB(messageTopic);
            }
            this.kangarooMessageListener.newMessage();
        } catch (Exception e) {
            logger.error("保存message 到 redis或保存消息topic失败", e);
        }
    }

    private MessageExchangeServiceImpl getProxy() {
        if (this.proxy == null) {
            this.proxy = (MessageExchangeServiceImpl) AopContext.currentProxy();
        }
        return this.proxy;
    }
}
