package org.apache.rocketmq.client.impl.consumer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/* loaded from: input_file:WEB-INF/lib/rocketmq-client-4.3.0.jar:org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.class */
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
    private static final InternalLogger log = ClientLogger.getLog();
    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    private final MessageListenerConcurrently messageListener;
    private final ThreadPoolExecutor consumeExecutor;
    private final String consumerGroup;
    private final BlockingQueue<Runnable> consumeRequestQueue = new LinkedBlockingQueue();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
    private final ScheduledExecutorService cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/rocketmq-client-4.3.0.jar:org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService$ConsumeRequest.class */
    public class ConsumeRequest implements Runnable {
        private final List<MessageExt> msgs;
        private final ProcessQueue processQueue;
        private final MessageQueue messageQueue;

        public ConsumeRequest(List<MessageExt> list, ProcessQueue processQueue, MessageQueue messageQueue) {
            this.msgs = list;
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
        }

        public List<MessageExt> getMsgs() {
            return this.msgs;
        }

        public ProcessQueue getProcessQueue() {
            return this.processQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.processQueue.isDropped()) {
                ConsumeMessageConcurrentlyService.log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
                return;
            }
            MessageListenerConcurrently messageListenerConcurrently = ConsumeMessageConcurrentlyService.this.messageListener;
            ConsumeConcurrentlyContext consumeConcurrentlyContext = new ConsumeConcurrentlyContext(this.messageQueue);
            ConsumeConcurrentlyStatus consumeConcurrentlyStatus = null;
            ConsumeMessageContext consumeMessageContext = null;
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext = new ConsumeMessageContext();
                consumeMessageContext.setConsumerGroup(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getConsumerGroup());
                consumeMessageContext.setProps(new HashMap());
                consumeMessageContext.setMq(this.messageQueue);
                consumeMessageContext.setMsgList(this.msgs);
                consumeMessageContext.setSuccess(false);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
            }
            long currentTimeMillis = System.currentTimeMillis();
            boolean z = false;
            ConsumeReturnType consumeReturnType = ConsumeReturnType.SUCCESS;
            try {
                ConsumeMessageConcurrentlyService.this.resetRetryTopic(this.msgs);
                if (this.msgs != null && !this.msgs.isEmpty()) {
                    Iterator<MessageExt> it2 = this.msgs.iterator();
                    while (it2.hasNext()) {
                        MessageAccessor.setConsumeStartTimeStamp(it2.next(), String.valueOf(System.currentTimeMillis()));
                    }
                }
                consumeConcurrentlyStatus = messageListenerConcurrently.consumeMessage(Collections.unmodifiableList(this.msgs), consumeConcurrentlyContext);
            } catch (Throwable th) {
                ConsumeMessageConcurrentlyService.log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(th), ConsumeMessageConcurrentlyService.this.consumerGroup, this.msgs, this.messageQueue);
                z = true;
            }
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (null == consumeConcurrentlyStatus) {
                consumeReturnType = z ? ConsumeReturnType.EXCEPTION : ConsumeReturnType.RETURNNULL;
            } else if (currentTimeMillis2 >= ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                consumeReturnType = ConsumeReturnType.TIME_OUT;
            } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == consumeConcurrentlyStatus) {
                consumeReturnType = ConsumeReturnType.FAILED;
            } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == consumeConcurrentlyStatus) {
                consumeReturnType = ConsumeReturnType.SUCCESS;
            }
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, consumeReturnType.name());
            }
            if (null == consumeConcurrentlyStatus) {
                ConsumeMessageConcurrentlyService.log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.msgs, this.messageQueue);
                consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.setStatus(consumeConcurrentlyStatus.toString());
                consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == consumeConcurrentlyStatus);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
            }
            ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue.getTopic(), currentTimeMillis2);
            if (this.processQueue.isDropped()) {
                ConsumeMessageConcurrentlyService.log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", this.messageQueue, this.msgs);
            } else {
                ConsumeMessageConcurrentlyService.this.processConsumeResult(consumeConcurrentlyStatus, consumeConcurrentlyContext, this);
            }
        }

        public MessageQueue getMessageQueue() {
            return this.messageQueue;
        }
    }

    public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListenerConcurrently) {
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListenerConcurrently;
        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 60000L, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_"));
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public void start() {
        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { // from class: org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.1
            @Override // java.lang.Runnable
            public void run() {
                ConsumeMessageConcurrentlyService.this.cleanExpireMsg();
            }
        }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public void shutdown() {
        this.scheduledExecutorService.shutdown();
        this.consumeExecutor.shutdown();
        this.cleanExpireMsgExecutors.shutdown();
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public void updateCorePoolSize(int i) {
        if (i <= 0 || i > 32767 || i >= this.defaultMQPushConsumer.getConsumeThreadMax()) {
            return;
        }
        this.consumeExecutor.setCorePoolSize(i);
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public void incCorePoolSize() {
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public void decCorePoolSize() {
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public int getCorePoolSize() {
        return this.consumeExecutor.getCorePoolSize();
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt messageExt, String str) {
        ConsumeMessageDirectlyResult consumeMessageDirectlyResult = new ConsumeMessageDirectlyResult();
        consumeMessageDirectlyResult.setOrder(false);
        consumeMessageDirectlyResult.setAutoCommit(true);
        ArrayList arrayList = new ArrayList();
        arrayList.add(messageExt);
        MessageQueue messageQueue = new MessageQueue();
        messageQueue.setBrokerName(str);
        messageQueue.setTopic(messageExt.getTopic());
        messageQueue.setQueueId(messageExt.getQueueId());
        ConsumeConcurrentlyContext consumeConcurrentlyContext = new ConsumeConcurrentlyContext(messageQueue);
        resetRetryTopic(arrayList);
        long currentTimeMillis = System.currentTimeMillis();
        log.info("consumeMessageDirectly receive new message: {}", messageExt);
        try {
            ConsumeConcurrentlyStatus consumeMessage = this.messageListener.consumeMessage(arrayList, consumeConcurrentlyContext);
            if (consumeMessage != null) {
                switch (consumeMessage) {
                    case CONSUME_SUCCESS:
                        consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_SUCCESS);
                        break;
                    case RECONSUME_LATER:
                        consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_LATER);
                        break;
                }
            } else {
                consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_RETURN_NULL);
            }
        } catch (Throwable th) {
            consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
            consumeMessageDirectlyResult.setRemark(RemotingHelper.exceptionSimpleDesc(th));
            log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", RemotingHelper.exceptionSimpleDesc(th), this.consumerGroup, arrayList, messageQueue), th);
        }
        consumeMessageDirectlyResult.setSpentTimeMills(System.currentTimeMillis() - currentTimeMillis);
        log.info("consumeMessageDirectly Result: {}", consumeMessageDirectlyResult);
        return consumeMessageDirectlyResult;
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public void submitConsumeRequest(List<MessageExt> list, ProcessQueue processQueue, MessageQueue messageQueue, boolean z) {
        int consumeMessageBatchMaxSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (list.size() <= consumeMessageBatchMaxSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(list, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
                return;
            } catch (RejectedExecutionException e) {
                submitConsumeRequestLater(consumeRequest);
                return;
            }
        }
        int i = 0;
        while (i < list.size()) {
            ArrayList arrayList = new ArrayList(consumeMessageBatchMaxSize);
            int i2 = 0;
            while (i2 < consumeMessageBatchMaxSize && i < list.size()) {
                arrayList.add(list.get(i));
                i2++;
                i++;
            }
            ConsumeRequest consumeRequest2 = new ConsumeRequest(arrayList, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest2);
            } catch (RejectedExecutionException e2) {
                while (i < list.size()) {
                    arrayList.add(list.get(i));
                    i++;
                }
                submitConsumeRequestLater(consumeRequest2);
            }
        }
    }

    public void resetRetryTopic(List<MessageExt> list) {
        String retryTopic = MixAll.getRetryTopic(this.consumerGroup);
        for (MessageExt messageExt : list) {
            String property = messageExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
            if (property != null && retryTopic.equals(messageExt.getTopic())) {
                messageExt.setTopic(property);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanExpireMsg() {
        Iterator<Map.Entry<MessageQueue, ProcessQueue>> it2 = this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
        while (it2.hasNext()) {
            it2.next().getValue().cleanExpiredMsg(this.defaultMQPushConsumer);
        }
    }

    public void processConsumeResult(ConsumeConcurrentlyStatus consumeConcurrentlyStatus, ConsumeConcurrentlyContext consumeConcurrentlyContext, ConsumeRequest consumeRequest) {
        int ackIndex = consumeConcurrentlyContext.getAckIndex();
        if (consumeRequest.getMsgs().isEmpty()) {
            return;
        }
        switch (consumeConcurrentlyStatus) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int i = ackIndex + 1;
                int size = consumeRequest.getMsgs().size() - i;
                getConsumerStatsManager().incConsumeOKTPS(this.consumerGroup, consumeRequest.getMessageQueue().getTopic(), i);
                getConsumerStatsManager().incConsumeFailedTPS(this.consumerGroup, consumeRequest.getMessageQueue().getTopic(), size);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                getConsumerStatsManager().incConsumeFailedTPS(this.consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size());
                break;
        }
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                for (int i2 = ackIndex + 1; i2 < consumeRequest.getMsgs().size(); i2++) {
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", consumeRequest.getMsgs().get(i2).toString());
                }
                break;
            case CLUSTERING:
                ArrayList arrayList = new ArrayList(consumeRequest.getMsgs().size());
                for (int i3 = ackIndex + 1; i3 < consumeRequest.getMsgs().size(); i3++) {
                    MessageExt messageExt = consumeRequest.getMsgs().get(i3);
                    if (!sendMessageBack(messageExt, consumeConcurrentlyContext)) {
                        messageExt.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
                        arrayList.add(messageExt);
                    }
                }
                if (!arrayList.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(arrayList);
                    submitConsumeRequestLater(arrayList, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                    break;
                }
                break;
        }
        long removeMessage = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (removeMessage < 0 || consumeRequest.getProcessQueue().isDropped()) {
            return;
        }
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), removeMessage, true);
    }

    public ConsumerStatsManager getConsumerStatsManager() {
        return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
    }

    public boolean sendMessageBack(MessageExt messageExt, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            this.defaultMQPushConsumerImpl.sendMessageBack(messageExt, consumeConcurrentlyContext.getDelayLevelWhenNextConsume(), consumeConcurrentlyContext.getMessageQueue().getBrokerName());
            return true;
        } catch (Exception e) {
            log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + messageExt.toString(), (Throwable) e);
            return false;
        }
    }

    private void submitConsumeRequestLater(final List<MessageExt> list, final ProcessQueue processQueue, final MessageQueue messageQueue) {
        this.scheduledExecutorService.schedule(new Runnable() { // from class: org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.2
            @Override // java.lang.Runnable
            public void run() {
                ConsumeMessageConcurrentlyService.this.submitConsumeRequest(list, processQueue, messageQueue, true);
            }
        }, 5000L, TimeUnit.MILLISECONDS);
    }

    private void submitConsumeRequestLater(final ConsumeRequest consumeRequest) {
        this.scheduledExecutorService.schedule(new Runnable() { // from class: org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.3
            @Override // java.lang.Runnable
            public void run() {
                ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
            }
        }, 5000L, TimeUnit.MILLISECONDS);
    }
}
