package com.beiming.pigeons.distribute.service;

import com.beiming.pigeons.common.util.SpringContextUtil;
import com.beiming.pigeons.distribute.service.concurrent.MsgExecutorServiceFactory;
import com.beiming.pigeons.distribute.service.deliver.MsgDeliverTask;
import com.beiming.pigeons.distribute.service.filter.DeliverFilterChain;
import com.beiming.pigeons.distribute.service.filter.FilterResult;
import com.beiming.pigeons.domain.message.BasicMessage;
import com.beiming.pigeons.service.NotifyMessageService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.annotation.Scope;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SetOperations;
import org.springframework.stereotype.Service;

@Scope("prototype")
@Service("queueConsumer")
/* loaded from: input_file:com/beiming/pigeons/distribute/service/QueueConsumer.class */
public class QueueConsumer implements Runnable, DisposableBean {
    private Logger logger = LoggerFactory.getLogger(QueueConsumer.class);

    @Resource
    private DeliverFilterChain distributeFilterChain;

    @Resource
    private NotifyMessageService notifyMessageService;

    @Resource
    private RedisTemplate redisTemplate;

    @Resource
    private MsgExecutorServiceFactory msgExecutorServiceFactory;
    private String queueId;
    private String queueSetId;
    private String[] queueIdItems;
    private int level;
    private CountDownLatch latch;
    private static final int FIRST_LEVEL = 1;
    private volatile boolean stop;

    public void init(String str, String str2, int i, CountDownLatch countDownLatch) {
        this.queueId = str;
        this.queueIdItems = str.split("_");
        this.queueSetId = str2;
        this.level = i;
        this.latch = countDownLatch;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            consumeQueue();
        } finally {
            this.latch.countDown();
        }
    }

    private void consumeQueue() {
        ThreadPoolExecutor otherLevelThreadPool;
        int activeCount;
        int maximumPoolSize;
        long taskCount;
        Long size = this.redisTemplate.opsForList().size(this.queueId);
        SetOperations opsForSet = this.redisTemplate.opsForSet();
        this.logger.info("处理分发的消息队列：queueId:{}, size:{}", this.queueId, size);
        if (size == null || size.longValue() == 0) {
            this.logger.info("要分发的消息队列为空,queueId=" + this.queueId);
            if (this.level == FIRST_LEVEL) {
                String topic = getTopic();
                opsForSet.remove("msg_remain_topic_set", new Object[]{topic});
                this.logger.info("删除已经执行完成的topic=" + topic);
            }
            if (this.level > FIRST_LEVEL) {
                long queueTimeMillis = getQueueTimeMillis();
                this.redisTemplate.opsForZSet().remove(this.queueSetId, new Object[]{Long.valueOf(queueTimeMillis)});
                this.logger.info("queueSetId={},删除没有值的队列Id={}", this.queueSetId, Long.valueOf(queueTimeMillis));
                return;
            }
            return;
        }
        while (true) {
            if (this.stop) {
                break;
            }
            try {
                if (this.level == FIRST_LEVEL) {
                    String topic2 = getTopic();
                    FilterResult doFilter = this.distributeFilterChain.doFilter(topic2);
                    if (!doFilter.isCanContinue()) {
                        this.logger.error("分发filter过滤1:" + doFilter.getMessage());
                        break;
                    }
                    otherLevelThreadPool = this.msgExecutorServiceFactory.getFirstLevelThreadPool(topic2);
                } else {
                    otherLevelThreadPool = this.msgExecutorServiceFactory.getOtherLevelThreadPool(this.level);
                }
                activeCount = otherLevelThreadPool.getActiveCount();
                maximumPoolSize = otherLevelThreadPool.getMaximumPoolSize();
                taskCount = otherLevelThreadPool.getTaskCount();
            } catch (Exception e) {
                this.logger.error("消费消息失败,队列ID:" + this.queueId, e);
            }
            if (activeCount == maximumPoolSize) {
                this.logger.info("队列ID:{}投递线程池活跃线程数:{}达到最大值:{},taskCount数量为:{},不再分配投递任务", new Object[]{this.queueId, Integer.valueOf(activeCount), Integer.valueOf(maximumPoolSize), Long.valueOf(taskCount)});
                break;
            }
            BasicMessage popMsg = this.notifyMessageService.popMsg(this.queueId);
            this.logger.info("队列:" + this.queueId + "从redis取到消息:" + popMsg);
            if (popMsg == null) {
                this.logger.info("从队列中queueId:{}获取消息为空, queue Size 为：{}", this.queueId, size);
                break;
            }
            Boolean isMember = opsForSet.isMember("compensate_set", popMsg.getId());
            if (isMember == null || !isMember.booleanValue()) {
                if (popMsg.getReceiverAddress() == null) {
                    popMsg = this.notifyMessageService.getByIdFromDB(popMsg.getId());
                    this.logger.info("从缓存中未获取到消息，从DB中获取：" + popMsg);
                    if (popMsg == null) {
                    }
                }
                this.logger.info("准备执行发送程序Task,message:" + popMsg);
                if (this.level > FIRST_LEVEL) {
                    FilterResult doFilter2 = this.distributeFilterChain.doFilter(popMsg.getTopic());
                    if (!doFilter2.isCanContinue()) {
                        this.logger.error("分发filter过滤2:" + doFilter2.getMessage());
                        break;
                    }
                }
                MsgDeliverTask msgDeliverTask = (MsgDeliverTask) SpringContextUtil.getBean("msgDeliverTask");
                msgDeliverTask.setBasicMessage(popMsg);
                msgDeliverTask.setLevel(this.level);
                otherLevelThreadPool.execute(msgDeliverTask);
            }
        }
        this.logger.info("处理分发的消息队列完成：queueId:{}", this.queueId);
    }

    private String getTopic() {
        return this.queueIdItems[3];
    }

    private long getQueueTimeMillis() {
        return Long.parseLong(this.queueIdItems[this.queueIdItems.length - FIRST_LEVEL]);
    }

    public void destroy() throws Exception {
        this.stop = true;
    }
}
