package com.beiming.pigeons.exchange.client;

import com.beiming.framework.domain.DubboResult;
import com.beiming.framework.domain.DubboResultBuilder;
import com.beiming.framework.enums.DubboResultCodeEnums;
import com.beiming.pigeons.api.consumer.ConsumeErrorDto;
import com.beiming.pigeons.api.consumer.ConsumeErrorService;
import com.beiming.pigeons.common.enums.ClientType;
import com.beiming.pigeons.common.enums.DeliverType;
import com.beiming.pigeons.domain.message.BasicMessage;
import com.beiming.pigeons.domain.message.RocketMqRelation;
import com.beiming.pigeons.domain.message.query.RocketMqRelationQuery;
import com.beiming.pigeons.exchange.service.MessageExchangeService;
import com.beiming.pigeons.service.RocketMqRelationService;
import com.beiming.pigeons.service.rocketmq.RocketMqMessageService;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

@Service("consumeErrorService")
/* loaded from: input_file:com/beiming/pigeons/exchange/client/ConsumeErrorServiceImpl.class */
public class ConsumeErrorServiceImpl implements ConsumeErrorService {
    private static Logger logger = LoggerFactory.getLogger(ConsumeErrorServiceImpl.class);

    @Resource
    private MessageExchangeService messageExchangeService;

    @Resource
    private RocketMqMessageService rocketMqMessageService;

    @Resource
    private RedisTemplate redisTemplate;

    @Resource
    private RocketMqRelationService rocketMqRelationService;

    public DubboResult<String> consumeError(ConsumeErrorDto consumeErrorDto) {
        logger.info("客户端消费异常消息：" + consumeErrorDto.toString());
        try {
            BasicMessage basicMessage = new BasicMessage();
            basicMessage.setClientType(ClientType.CONSUMER.getName());
            basicMessage.setRequestId(consumeErrorDto.getRequestId());
            boolean booleanValue = this.redisTemplate.opsForValue().setIfAbsent(consumeErrorDto.getMqMsgId(), 1).booleanValue();
            this.redisTemplate.expire(consumeErrorDto.getMqMsgId(), 30L, TimeUnit.SECONDS);
            if (!booleanValue) {
                return DubboResultBuilder.error(DubboResultCodeEnums.PARAM_ERROR.value(), "这是一个重复的请求");
            }
            if (consumeErrorDto.getTopic().contains("%RETRY%")) {
                String groupName = consumeErrorDto.getGroupName();
                RocketMqRelationQuery rocketMqRelationQuery = new RocketMqRelationQuery();
                rocketMqRelationQuery.setConsumerGroup(groupName);
                rocketMqRelationQuery.setRocketMqName(consumeErrorDto.getRocketMqName());
                ArrayList<RocketMqRelation> relationByQuery = this.rocketMqRelationService.getRelationByQuery(rocketMqRelationQuery);
                if (!CollectionUtils.isEmpty(relationByQuery)) {
                    consumeErrorDto.setTopic(relationByQuery.get(0).getTopic());
                }
            }
            Message rocketMessage = this.rocketMqMessageService.getRocketMessage(consumeErrorDto.getRocketMqName(), consumeErrorDto.getTopic(), consumeErrorDto.getMqMsgId());
            if (rocketMessage == null) {
                logger.error("没有从RocketMq中查询到消息，topic={}, mqMessageId={}", consumeErrorDto.getTopic(), consumeErrorDto.getMqMsgId());
                return DubboResultBuilder.error("没有查询到消息");
            }
            logger.info("错误消费的消息查询到消息：topic={}, mqMsgId={}, msgKey={}", new Object[]{consumeErrorDto.getTopic(), consumeErrorDto.getMqMsgId(), rocketMessage.getKeys()});
            basicMessage.setReceiverParam(rocketMessage.getBody());
            basicMessage.setTopic(rocketMessage.getTopic());
            basicMessage.setKeyword(rocketMessage.getKeys());
            basicMessage.setDeliverType(Integer.valueOf(DeliverType.ROCKET_MQ.getValue()));
            if (StringUtils.isNotEmpty(rocketMessage.getTags()) && !"null".equalsIgnoreCase(rocketMessage.getTags())) {
                basicMessage.setTags(rocketMessage.getTags());
            }
            basicMessage.setGroupName(consumeErrorDto.getGroupName());
            basicMessage.setClientIp(consumeErrorDto.getClientIp());
            basicMessage.setReceiverAddress(consumeErrorDto.getRocketMqName());
            basicMessage.setRetry(Boolean.valueOf(consumeErrorDto.isRetry()));
            basicMessage.setMqMsgId(consumeErrorDto.getMqMsgId());
            basicMessage.setErrorReason(consumeErrorDto.getErrorReason());
            basicMessage.setSourcePlatform(consumeErrorDto.getMsgSourcePlatform());
            return this.messageExchangeService.exchange(basicMessage);
        } catch (Exception e) {
            logger.error("接收消息异常 mqMsgId=" + consumeErrorDto.getMqMsgId(), e);
            return DubboResultBuilder.error("接收消息异常");
        }
    }
}
