package com.beiming.pigeons.service.rocketmq;

import com.alibaba.fastjson.JSONObject;
import com.beiming.framework.domain.DubboResult;
import com.beiming.framework.domain.DubboResultBuilder;
import com.beiming.framework.enums.DubboResultCodeEnums;
import com.beiming.pigeons.api.constants.KangarooConstants;
import com.beiming.pigeons.api.exception.KangarooException;
import com.beiming.pigeons.common.constants.RedisKeyConstants;
import com.beiming.pigeons.common.enums.HandleType;
import com.beiming.pigeons.domain.message.BasicMessage;
import com.beiming.pigeons.domain.message.MessageRelation;
import com.beiming.pigeons.service.MessageRelationService;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

@Service
/* loaded from: input_file:WEB-INF/lib/pigeons-service-2.0.0-SNAPSHOT.jar:com/beiming/pigeons/service/rocketmq/RocketMqRetryProducer.class */
public class RocketMqRetryProducer {
    private static Logger logger = LoggerFactory.getLogger((Class<?>) RocketMqRetryProducer.class);

    @Resource
    private RocketMqFactory rocketMqFactory;

    @Resource
    private RocketMqMessageService rocketMqMessageService;

    @Resource
    private MessageRelationService messageRelationService;

    @Resource
    private RedisTemplate redisTemplate;

    public DubboResult<String> retrySendMessage(BasicMessage basicMessage, int i) {
        try {
            logger.info("开始发送消息，msgKey为: {}, 消息为：{}, topic为:{}, tags为:{}", basicMessage.getKeyword(), new String(basicMessage.getReceiverParam()), basicMessage.getTopic(), basicMessage.getTags());
            MessageRelation messageRelation = null;
            if (!StringUtils.isEmpty(basicMessage.getId())) {
                messageRelation = this.messageRelationService.getLastRelationByMsgId(basicMessage.getId());
            } else if (!StringUtils.isEmpty(basicMessage.getMqMsgId())) {
                messageRelation = this.messageRelationService.getRelationByMqMsgId(basicMessage.getMqMsgId());
            }
            SendResult send = this.rocketMqFactory.getMqProducer(basicMessage.getReceiverAddress()).send(getRocketMessage(basicMessage));
            logger.info("消息重新发送， msgKey为: {}", basicMessage.getKeyword());
            if (send != null && send.getSendStatus() != SendStatus.SEND_OK) {
                logger.error("发送消息状态有错:" + JSONObject.toJSONString(send));
            }
            int i2 = 1;
            String id = basicMessage.getId();
            if (messageRelation != null) {
                i2 = messageRelation.getOrderSeq().intValue() + 1;
                id = messageRelation.getMsgId();
            }
            generateNewRelation(id, i2, send.getOffsetMsgId(), i);
            return DubboResultBuilder.success(send.getOffsetMsgId());
        } catch (Exception e) {
            throw new KangarooException(e);
        }
    }

    private void saveMessageRelation(MessageRelation messageRelation) {
        String str = RedisKeyConstants.MSG_RELATION_SEND_KEY_PREFIX + messageRelation.getMqMsgId();
        this.redisTemplate.opsForValue().set(str, messageRelation);
        this.redisTemplate.expire(str, 2L, TimeUnit.MINUTES);
        this.messageRelationService.saveRelationToDB(messageRelation);
    }

    private void generateNewRelation(String str, int i, String str2, int i2) {
        logger.info("存入新的messageRelation msgId={}，newMqMsgId={}", str, str2);
        MessageRelation messageRelation = new MessageRelation();
        messageRelation.setMqMsgId(str2);
        messageRelation.setMsgId(str);
        messageRelation.setOrderSeq(Integer.valueOf(i));
        messageRelation.setHandleType(Integer.valueOf(i2));
        messageRelation.setCreateAt(new Date());
        saveMessageRelation(messageRelation);
    }

    public DubboResult<ArrayList<String>> retrySendMessage(String str, List<String> list) {
        if (CollectionUtils.isEmpty(list) || StringUtils.isEmpty(str)) {
            return DubboResultBuilder.error(DubboResultCodeEnums.PARAM_ERROR.value(), "参数均不能为空");
        }
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        for (String str2 : list) {
            try {
                Message rocketMessage = this.rocketMqMessageService.getRocketMessage(str, str2);
                logger.info("从RocketMq中获取消息重发：msgKey={}, mqMsgId={}", rocketMessage.getKeys(), str2);
                if (rocketMessage != null) {
                    SendResult send = this.rocketMqFactory.getMqProducer(str).send(rocketMessage);
                    if (send != null && send.getSendStatus() != SendStatus.SEND_OK) {
                        logger.error("发送消息状态有错:" + JSONObject.toJSONString(send));
                    }
                    MessageRelation relationByMqMsgId = this.messageRelationService.getRelationByMqMsgId(str2);
                    if (relationByMqMsgId != null) {
                        this.redisTemplate.opsForSet().add(RedisKeyConstants.COMPENSATE_MSG_SET_KEY, relationByMqMsgId.getMsgId());
                        generateNewRelation(relationByMqMsgId.getMsgId(), relationByMqMsgId.getOrderSeq().intValue() + 1, send.getOffsetMsgId(), HandleType.MANUAL.getValue());
                    }
                    newArrayList2.add(send.getOffsetMsgId());
                } else {
                    newArrayList.add(str2);
                }
            } catch (Throwable th) {
                logger.error("发送消息失败， rocketMqName=" + str + "; msgId=" + str2, th);
                newArrayList.add(str2);
            }
        }
        return newArrayList.size() > 0 ? DubboResultBuilder.error(DubboResultCodeEnums.INTERNAL_ERROR.value(), "发送失败消息mqMsgId为：" + newArrayList) : DubboResultBuilder.success(null);
    }

    public Message getRocketMessage(BasicMessage basicMessage) {
        Message message = new Message();
        message.setKeys(basicMessage.getKeyword());
        message.setTags(basicMessage.getTags());
        message.setTopic(basicMessage.getTopic());
        message.setTags(basicMessage.getTags());
        message.setBody(basicMessage.getReceiverParam());
        message.putUserProperty(KangarooConstants.ROCKET_REQUEST_ID_NAME, basicMessage.getRequestId());
        message.putUserProperty(KangarooConstants.PLATFORM_NAME, basicMessage.getSourcePlatform());
        return message;
    }
}
