package com.beiming.pigeons.distribute.service;

import ch.qos.logback.core.spi.AbstractComponentTracker;
import com.beiming.framework.domain.DubboResult;
import com.beiming.framework.domain.DubboResultBuilder;
import com.beiming.pigeons.common.constants.RedisKeyConstants;
import com.beiming.pigeons.common.enums.DeliverType;
import com.beiming.pigeons.common.util.SpringContextUtil;
import com.beiming.pigeons.distribute.service.concurrent.MsgExecutorServiceFactory;
import com.beiming.pigeons.domain.message.NotifyMessage;
import com.beiming.pigeons.domain.message.NotifyMessageQuery;
import com.beiming.pigeons.service.NotifyMessageService;
import com.beiming.pigeons.service.listener.KangarooMessageListener;
import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SetOperations;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

@Service("distributeService")
/* loaded from: input_file:WEB-INF/lib/pigeons-service-2.0.0-SNAPSHOT.jar:com/beiming/pigeons/distribute/service/DistributeServiceImpl.class */
public class DistributeServiceImpl implements ApplicationListener<ContextRefreshedEvent>, DisposableBean {
    private static Logger logger = LoggerFactory.getLogger((Class<?>) DistributeServiceImpl.class);

    @Resource
    private NotifyMessageService notifyMessageService;

    @Resource
    private RedisTemplate redisTemplate;
    private static final int FIRST_LEVEL = 1;
    private static final int EMPTY_DISTRIBUTE_MESSAGE_CODE = 999;
    private volatile boolean stop;

    @Resource
    private MsgExecutorServiceFactory msgExecutorServiceFactory;

    @Resource
    private KangarooMessageListener kangarooMessageListener;

    @Resource
    private ManualDeliverMessageService manualDeliverMessageService;
    private volatile boolean isStart = false;
    ScheduledExecutorService firstQueueExecutor = Executors.newSingleThreadScheduledExecutor();
    ScheduledExecutorService otherQueueExecutor = Executors.newSingleThreadScheduledExecutor();

    public void distributeMsg() {
        logger.info("消息分发开始执行============================");
        SetOperations opsForSet = this.redisTemplate.opsForSet();
        this.firstQueueExecutor.schedule(() -> {
            while (!this.stop) {
                Set<String> members = opsForSet.members(RedisKeyConstants.REMAIN_TOPIC_KEY);
                logger.debug("分发的topics:" + members);
                if (CollectionUtils.isEmpty(members)) {
                    this.kangarooMessageListener.finishMessage();
                }
                CountDownLatch countDownLatch = new CountDownLatch(members.size());
                for (String str : members) {
                    ThreadPoolExecutor firstQueueDistributeExecutor = this.msgExecutorServiceFactory.getFirstQueueDistributeExecutor();
                    int activeCount = firstQueueDistributeExecutor.getActiveCount();
                    int maximumPoolSize = firstQueueDistributeExecutor.getMaximumPoolSize();
                    if (activeCount == maximumPoolSize) {
                        logger.error("一级分发线程池活跃线程数:{}达到最大值:{},不再分配投递任务", Integer.valueOf(activeCount), Integer.valueOf(maximumPoolSize));
                        break;
                    }
                    String join = Joiner.on("_").join(RedisKeyConstants.QUEUE_KEY_PREFIX, 1, str);
                    logger.info("一级分发队列开始分发消息====================");
                    QueueConsumer queueConsumer = (QueueConsumer) SpringContextUtil.getBean("queueConsumer", QueueConsumer.class);
                    queueConsumer.init(join, null, 1, countDownLatch);
                    firstQueueDistributeExecutor.execute(queueConsumer);
                }
                try {
                    countDownLatch.await();
                    this.kangarooMessageListener.finishMessage();
                } catch (InterruptedException e) {
                    logger.error("消息分发有异常", (Throwable) e);
                }
            }
        }, 10L, TimeUnit.SECONDS);
        ZSetOperations opsForZSet = this.redisTemplate.opsForZSet();
        this.otherQueueExecutor.scheduleWithFixedDelay(() -> {
            logger.info("其他队列发送消息开始执行");
            long currentTimeMillis = System.currentTimeMillis();
            for (int i = 2; i <= 8; i++) {
                String join = Joiner.on("_").join(RedisKeyConstants.MSG_OTHER_QUEUE_KEY_SET_PREFIX, Integer.valueOf(i), new Object[0]);
                Set rangeByScore = opsForZSet.rangeByScore(join, 0.0d, currentTimeMillis);
                if (!CollectionUtils.isEmpty(rangeByScore)) {
                    CountDownLatch countDownLatch = new CountDownLatch(rangeByScore.size());
                    Iterator it = rangeByScore.iterator();
                    while (it.hasNext()) {
                        String join2 = Joiner.on("_").join(RedisKeyConstants.QUEUE_KEY_PREFIX, Integer.valueOf(i), (Long) it.next());
                        ThreadPoolExecutor otherQueueDistributeExecutor = this.msgExecutorServiceFactory.getOtherQueueDistributeExecutor();
                        int activeCount = otherQueueDistributeExecutor.getActiveCount();
                        int maximumPoolSize = otherQueueDistributeExecutor.getMaximumPoolSize();
                        if (activeCount == maximumPoolSize) {
                            logger.error("其他分发线程池活跃线程数:{}达到最大值:{},不再分配投递任务", Integer.valueOf(activeCount), Integer.valueOf(maximumPoolSize));
                            break;
                        } else {
                            QueueConsumer queueConsumer = (QueueConsumer) SpringContextUtil.getBean("queueConsumer", QueueConsumer.class);
                            queueConsumer.init(join2, join, i, countDownLatch);
                            otherQueueDistributeExecutor.execute(queueConsumer);
                        }
                    }
                    try {
                        countDownLatch.await();
                    } catch (InterruptedException e) {
                        logger.error("消息分发有异常", (Throwable) e);
                    }
                }
            }
            logger.debug("其他队列消息分发完成");
        }, AbstractComponentTracker.LINGERING_TIMEOUT, 2000L, TimeUnit.MILLISECONDS);
    }

    public DubboResult distributeByQuery(NotifyMessageQuery notifyMessageQuery) {
        notifyMessageQuery.setNotDeliverType(Integer.valueOf(DeliverType.ROCKET_MQ.getValue()));
        ArrayList<NotifyMessage> listByQuery = this.notifyMessageService.getListByQuery(notifyMessageQuery);
        return CollectionUtils.isEmpty(listByQuery) ? DubboResultBuilder.error(999, "没有需要分发的消息") : this.manualDeliverMessageService.deliverMessage(listByQuery);
    }

    private String getFirstQueueIdSuffix(String str) {
        return "1_" + str;
    }

    @Override // org.springframework.context.ApplicationListener
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        synchronized (this) {
            if (!this.isStart) {
                this.isStart = true;
                distributeMsg();
            }
        }
    }

    @Override // org.springframework.beans.factory.DisposableBean
    public void destroy() throws Exception {
        this.stop = true;
        this.firstQueueExecutor.shutdown();
        this.otherQueueExecutor.shutdown();
    }
}
