package org.wso2.andes.server.cassandra;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.impl.sql.compile.SQLParserConstants;
import org.wso2.andes.server.AMQChannel;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.configuration.ClusterConfiguration;
import org.wso2.andes.server.message.AMQMessage;
import org.wso2.andes.server.queue.AMQQueue;
import org.wso2.andes.server.queue.QueueEntry;
import org.wso2.andes.server.store.CassandraMessageStore;
import org.wso2.andes.server.subscription.Subscription;
import org.wso2.andes.server.subscription.SubscriptionImpl;
import org.wso2.andes.server.util.AndesConstants;

/* loaded from: input_file:org/wso2/andes/server/cassandra/QueueDeliveryWorker.class */
public class QueueDeliveryWorker extends Thread {
    private final AMQQueue queue;
    private final String nodeQueue;
    private static Log log = LogFactory.getLog(QueueDeliveryWorker.class);
    private static final Log traceLog = LogFactory.getLog(AndesConstants.TRACE_LOGGER);
    private int messageCountToRead;
    private int maxMessageCountToRead;
    private int minMessageCountToRead;
    private int maxNumberOfUnAckedMessages;
    private int maxNumberOfReadButUndeliveredMessages;
    private int resetCounter;
    private SequentialThreadPoolExecutor executor;
    private final int queueWorkerWaitInterval;
    private int queueMsgDeliveryCurserResetTimeInterval;
    private OnflightMessageTracker onflightMessageTracker;
    private Map<String, Map<String, CassandraSubscription>> subscriptionMap;
    private boolean isInMemoryMode;
    private boolean running = true;
    private long lastProcessedId = 0;
    private int maxRestCounter = 10;
    private long totMsgSent = 0;
    private long totMsgRead = 0;
    private long lastRestTime = 0;
    private long iterations = 0;
    private int workqueueSize = 0;
    private long failureCount = 0;
    private AtomicInteger totalReadButUndeliveredMessages = new AtomicInteger(0);
    ConcurrentLinkedQueue<QueueEntry> laggards = new ConcurrentLinkedQueue<>();
    private Map<String, QueueDeliveryInfo> subscriptionCursar4QueueMap = new HashMap();
    private ConcurrentHashMap<String, ArrayList<QueueEntry>> undeliveredMessagesMap = new ConcurrentHashMap<>();

    /* loaded from: input_file:org/wso2/andes/server/cassandra/QueueDeliveryWorker$QueueDeliveryInfo.class */
    public class QueueDeliveryInfo {
        String queueName;
        Iterator<CassandraSubscription> iterator;
        List<QueueEntry> readButUndeliveredMessages = Collections.synchronizedList(new ArrayList());
        boolean messageIgnored = false;
        boolean hasQueueFullAndMessagesIgnored = false;
        long ignoredFirstMessageId = -1;
        boolean needToReset = false;

        public QueueDeliveryInfo() {
        }

        public void setIgnoredFirstMessageId(long j) {
            this.ignoredFirstMessageId = j;
        }

        public void setNeedToReset(boolean z) {
            this.needToReset = z;
        }
    }

    public CassandraSubscription findNextSubscriptionToSent(String str) {
        Map<String, CassandraSubscription> map = this.subscriptionMap.get(str);
        if (map == null || map.size() == 0) {
            this.subscriptionCursar4QueueMap.remove(str);
            return null;
        }
        QueueDeliveryInfo queueDeliveryInfo = getQueueDeliveryInfo(str);
        Iterator<CassandraSubscription> it = queueDeliveryInfo.iterator;
        if (it.hasNext()) {
            return it.next();
        }
        Iterator<CassandraSubscription> it2 = map.values().iterator();
        queueDeliveryInfo.iterator = it2;
        if (it2.hasNext()) {
            return it2.next();
        }
        return null;
    }

    public QueueDeliveryInfo getQueueDeliveryInfo(String str) {
        QueueDeliveryInfo queueDeliveryInfo = this.subscriptionCursar4QueueMap.get(str);
        if (queueDeliveryInfo == null) {
            queueDeliveryInfo = new QueueDeliveryInfo();
            queueDeliveryInfo.queueName = str;
            Map<String, CassandraSubscription> map = this.subscriptionMap.get(str);
            if (map != null) {
                queueDeliveryInfo.iterator = map.values().iterator();
            } else {
                queueDeliveryInfo.iterator = Collections.emptyList().iterator();
            }
            this.subscriptionCursar4QueueMap.put(str, queueDeliveryInfo);
        }
        return queueDeliveryInfo;
    }

    /* JADX WARN: Type inference failed for: r0v36, types: [org.wso2.andes.server.cassandra.QueueDeliveryWorker$1] */
    public QueueDeliveryWorker(final String str, final AMQQueue aMQQueue, Map<String, Map<String, CassandraSubscription>> map, SequentialThreadPoolExecutor sequentialThreadPoolExecutor, final int i, boolean z) {
        this.messageCountToRead = 50;
        this.maxMessageCountToRead = SQLParserConstants.LOGGED;
        this.minMessageCountToRead = 20;
        this.maxNumberOfUnAckedMessages = 20000;
        this.maxNumberOfReadButUndeliveredMessages = 1000;
        this.subscriptionMap = new ConcurrentHashMap();
        this.isInMemoryMode = false;
        this.queue = aMQQueue;
        this.nodeQueue = str;
        this.executor = sequentialThreadPoolExecutor;
        ClusterConfiguration clusterConfiguration = ClusterResourceHolder.getInstance().getClusterConfiguration();
        this.messageCountToRead = clusterConfiguration.getMessageBatchSizeForSubscribers();
        this.maxMessageCountToRead = clusterConfiguration.getMaxMessageBatchSizeForSubscribers();
        this.minMessageCountToRead = clusterConfiguration.getMinMessageBatchSizeForSubscribers();
        this.maxNumberOfUnAckedMessages = clusterConfiguration.getMaxNumberOfUnackedMessages();
        this.maxNumberOfReadButUndeliveredMessages = clusterConfiguration.getMaxNumberOfReadButUndeliveredMessages();
        this.queueMsgDeliveryCurserResetTimeInterval = clusterConfiguration.getQueueMsgDeliveryCurserResetTimeInterval();
        this.queueWorkerWaitInterval = i;
        this.subscriptionMap = map;
        this.onflightMessageTracker = OnflightMessageTracker.getInstance();
        this.isInMemoryMode = z;
        new Thread() { // from class: org.wso2.andes.server.cassandra.QueueDeliveryWorker.1
            /* JADX WARN: Code restructure failed: missing block: B:11:0x0031, code lost:
            
                if (r11 >= r9.this$0.lastProcessedId) goto L48;
             */
            @Override // java.lang.Thread, java.lang.Runnable
            /*
                Code decompiled incorrectly, please refer to instructions dump.
                To view partially-correct add '--show-bad-code' argument
            */
            public void run() {
                /*
                    Method dump skipped, instructions count: 413
                    To view this dump add '--comments-level debug' option
                */
                throw new UnsupportedOperationException("Method not decompiled: org.wso2.andes.server.cassandra.QueueDeliveryWorker.AnonymousClass1.run():void");
            }
        }.start();
        log.info("Queue worker started for queue: " + aMQQueue.getResourceName() + " with on flight message checks");
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.iterations = 0L;
        this.workqueueSize = 0;
        this.lastRestTime = System.currentTimeMillis();
        this.failureCount = 0L;
        while (this.running) {
            try {
                this.workqueueSize = this.executor.getSize();
                if (this.workqueueSize > 1000) {
                    if (this.workqueueSize > 5000) {
                        log.error("Flusher queue is growing, and this should not happen. Please check cassandra Flusher");
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("skipping content cassandra reading thread as flusher queue has " + this.workqueueSize + " tasks");
                    }
                    sleep4waitInterval(this.queueWorkerWaitInterval);
                } else {
                    resetOffsetAtCassadraQueueIfNeeded(false);
                    int i = 0;
                    ArrayList arrayList = new ArrayList();
                    if (this.totalReadButUndeliveredMessages.get() < 10000) {
                        if (this.isInMemoryMode) {
                            CassandraMessageStore cassandraMessageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
                            List<QueueEntry> nextIgnoredQueueMessagesToDeliver = cassandraMessageStore.getNextIgnoredQueueMessagesToDeliver(this.queue, this.messageCountToRead);
                            if (nextIgnoredQueueMessagesToDeliver.size() == 0) {
                                nextIgnoredQueueMessagesToDeliver = cassandraMessageStore.getNextQueueMessagesToDeliver(this.queue, this.messageCountToRead);
                            }
                            for (QueueEntry queueEntry : nextIgnoredQueueMessagesToDeliver) {
                                QueueDeliveryInfo queueDeliveryInfo = getQueueDeliveryInfo(((AMQMessage) queueEntry.getMessage()).getMessageMetaData().getMessagePublishInfo().getRoutingKey().toString());
                                if (!queueDeliveryInfo.messageIgnored) {
                                    if (queueDeliveryInfo.readButUndeliveredMessages.size() < this.maxNumberOfReadButUndeliveredMessages) {
                                        queueDeliveryInfo.readButUndeliveredMessages.add(queueEntry);
                                        this.totalReadButUndeliveredMessages.incrementAndGet();
                                    } else {
                                        queueDeliveryInfo.hasQueueFullAndMessagesIgnored = true;
                                        queueDeliveryInfo.ignoredFirstMessageId = queueEntry.getMessage().getMessageNumber().longValue();
                                        OnflightMessageTracker.getInstance().deleteFromAlreadyReadFromNodeQueueMessagesInstantly(queueEntry.getMessage().getMessageNumber().longValue());
                                    }
                                }
                                if (queueDeliveryInfo.messageIgnored) {
                                    this.lastProcessedId = queueEntry.getMessage().getMessageNumber().longValue();
                                    cassandraMessageStore.setNextIgnoredQueueMessageId(this.lastProcessedId);
                                }
                            }
                            if (nextIgnoredQueueMessagesToDeliver.size() == 0) {
                                sleep4waitInterval(this.queueWorkerWaitInterval);
                            }
                            if (nextIgnoredQueueMessagesToDeliver.size() == this.messageCountToRead) {
                                this.messageCountToRead += 100;
                                if (this.messageCountToRead > this.maxMessageCountToRead) {
                                    this.messageCountToRead = this.maxMessageCountToRead;
                                }
                            } else {
                                this.messageCountToRead -= 50;
                                if (this.messageCountToRead < this.minMessageCountToRead) {
                                    this.messageCountToRead = this.minMessageCountToRead;
                                }
                            }
                            this.totMsgRead += nextIgnoredQueueMessagesToDeliver.size();
                            i = nextIgnoredQueueMessagesToDeliver.size();
                        } else {
                            CassandraMessageStore cassandraMessageStore2 = ClusterResourceHolder.getInstance().getCassandraMessageStore();
                            ArrayList<QueueEntry> arrayList2 = new ArrayList();
                            for (QueueEntry queueEntry2 : cassandraMessageStore2.getMessagesFromNodeQueue(this.nodeQueue, this.queue, this.messageCountToRead, this.lastProcessedId, -1L)) {
                                Long messageNumber = queueEntry2.getMessage().getMessageNumber();
                                if (this.onflightMessageTracker.checkAlreadyReadFromNodeQueue(queueEntry2.getMessage().getMessageNumber().longValue())) {
                                    arrayList.add(queueEntry2.getMessage().getMessageNumber());
                                } else {
                                    this.onflightMessageTracker.addReadMessageFromNodeQueueToSet(messageNumber.longValue());
                                    if (traceLog.isTraceEnabled()) {
                                        traceLog.trace("TRACING>> QDW - ===Adding " + queueEntry2.getMessage().getMessageNumber() + " From leading thread to deliver");
                                    }
                                    arrayList2.add(queueEntry2);
                                }
                                this.lastProcessedId = messageNumber.longValue();
                            }
                            Iterator<QueueEntry> it = this.laggards.iterator();
                            while (it.hasNext()) {
                                QueueEntry next = it.next();
                                if (this.onflightMessageTracker.checkAlreadyReadFromNodeQueue(next.getMessage().getMessageNumber().longValue())) {
                                    arrayList.add(next.getMessage().getMessageNumber());
                                } else {
                                    if (((DefaultClusteringEnabledSubscriptionManager) ClusterResourceHolder.getInstance().getSubscriptionManager()).getNumberOfSubscriptionsForQueue(((AMQMessage) next.getMessage()).getMessageMetaData().getMessagePublishInfo().getRoutingKey().toString()) > 0) {
                                        arrayList2.add(next);
                                        this.onflightMessageTracker.addReadMessageFromNodeQueueToSet(next.getMessage().getMessageNumber().longValue());
                                        if (traceLog.isTraceEnabled()) {
                                            traceLog.trace("TRACING>> QDW - ===Adding " + next.getMessage().getMessageNumber() + " From laggards to deliver");
                                        }
                                    }
                                }
                                it.remove();
                            }
                            if (arrayList.size() > 0) {
                                OnflightMessageTracker.getInstance().checkAndRemoveAlreadySentAndAckedMessagesFromStore(arrayList);
                                arrayList.clear();
                            }
                            for (QueueEntry queueEntry3 : arrayList2) {
                                if (this.onflightMessageTracker.testMessage(queueEntry3.getMessage().getMessageNumber().longValue())) {
                                    QueueDeliveryInfo queueDeliveryInfo2 = getQueueDeliveryInfo(((AMQMessage) queueEntry3.getMessage()).getMessageMetaData().getMessagePublishInfo().getRoutingKey().toString());
                                    if (queueDeliveryInfo2.hasQueueFullAndMessagesIgnored) {
                                        if (queueDeliveryInfo2.hasQueueFullAndMessagesIgnored && queueDeliveryInfo2.ignoredFirstMessageId == -1) {
                                            queueDeliveryInfo2.ignoredFirstMessageId = queueEntry3.getMessage().getMessageNumber().longValue();
                                        }
                                        OnflightMessageTracker.getInstance().deleteFromAlreadyReadFromNodeQueueMessagesInstantly(queueEntry3.getMessage().getMessageNumber().longValue());
                                    } else if (queueDeliveryInfo2.readButUndeliveredMessages.size() < this.maxNumberOfReadButUndeliveredMessages) {
                                        queueEntry3.getMessage().getMessageNumber().longValue();
                                        queueDeliveryInfo2.readButUndeliveredMessages.add(queueEntry3);
                                        this.totalReadButUndeliveredMessages.incrementAndGet();
                                    } else {
                                        queueDeliveryInfo2.hasQueueFullAndMessagesIgnored = true;
                                        queueDeliveryInfo2.ignoredFirstMessageId = queueEntry3.getMessage().getMessageNumber().longValue();
                                        OnflightMessageTracker.getInstance().deleteFromAlreadyReadFromNodeQueueMessagesInstantly(queueEntry3.getMessage().getMessageNumber().longValue());
                                    }
                                }
                            }
                            if (arrayList2.size() == 0) {
                                sleep4waitInterval(this.queueWorkerWaitInterval);
                            }
                            if (arrayList2.size() == this.messageCountToRead) {
                                this.messageCountToRead += 100;
                                if (this.messageCountToRead > this.maxMessageCountToRead) {
                                    this.messageCountToRead = this.maxMessageCountToRead;
                                }
                            } else {
                                this.messageCountToRead -= 50;
                                if (this.messageCountToRead < this.minMessageCountToRead) {
                                    this.messageCountToRead = this.minMessageCountToRead;
                                }
                            }
                            this.totMsgRead += arrayList2.size();
                            i = arrayList2.size();
                        }
                    } else if (log.isDebugEnabled()) {
                        log.debug("QDW >> Total ReadButUndeliveredMessages count " + this.totalReadButUndeliveredMessages.get() + " is over the accepted limit ");
                    }
                    int i2 = 0;
                    for (QueueDeliveryInfo queueDeliveryInfo3 : this.subscriptionCursar4QueueMap.values()) {
                        i2 = sendMessagesToSubscriptions(queueDeliveryInfo3.queueName, queueDeliveryInfo3.readButUndeliveredMessages);
                        queueDeliveryInfo3.messageIgnored = false;
                    }
                    if (this.iterations % 20 == 0 && log.isDebugEnabled()) {
                        log.debug("[Flusher" + this + "]readNow=" + i + " totRead=" + this.totMsgRead + " totprocessed= " + this.totMsgSent + ", totalReadButNotSent=" + this.totalReadButUndeliveredMessages + ". workQueue= " + this.workqueueSize + " lastID=" + this.lastProcessedId);
                    }
                    this.iterations++;
                    if (i == 0) {
                        resetOffsetAtCassadraQueueIfNeeded(false);
                    }
                    if (i2 == 0 || this.iterations % 10 == 0) {
                        sleep4waitInterval(this.queueWorkerWaitInterval);
                    }
                    this.failureCount = 0L;
                }
            } catch (Throwable th) {
                long j = this.queueWorkerWaitInterval;
                this.failureCount++;
                try {
                    Thread.sleep(Math.max(j * 5, this.failureCount * j));
                } catch (InterruptedException e) {
                }
                log.error("Error running Cassandra Message Flusher" + th.getMessage(), th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sleep4waitInterval(long j) {
        try {
            Thread.sleep(this.queueWorkerWaitInterval);
        } catch (InterruptedException e) {
        }
    }

    private boolean isThisSubscriptionHasRoom(CassandraSubscription cassandraSubscription) {
        AMQChannel aMQChannel = null;
        if (cassandraSubscription != null && (cassandraSubscription.getSubscription() instanceof SubscriptionImpl.AckSubscription)) {
            aMQChannel = ((SubscriptionImpl.AckSubscription) cassandraSubscription.getSubscription()).getChannel();
        }
        int notAckedMessageCount = aMQChannel.getNotAckedMessageCount();
        if (notAckedMessageCount < this.maxNumberOfUnAckedMessages) {
            return true;
        }
        if (!log.isDebugEnabled()) {
            return false;
        }
        log.debug("Not selected, channel=" + this.queue.getName() + "/" + aMQChannel + " pending count =" + (notAckedMessageCount + this.workqueueSize));
        return false;
    }

    public int sendMessagesToSubscriptions(String str, List<QueueEntry> list) {
        if (((DefaultClusteringEnabledSubscriptionManager) ClusterResourceHolder.getInstance().getSubscriptionManager()).getNumberOfSubscriptionsForQueue(str) == 0) {
            if (!log.isDebugEnabled()) {
                return 0;
            }
            log.debug("TRACING >> QDW >> returning from sending messages to subscriptions for target queue " + str + " since the number of subscripion is 0");
            return 0;
        }
        ArrayList<QueueEntry> undeliveredMessagesOfQueue = getUndeliveredMessagesOfQueue(str);
        if (undeliveredMessagesOfQueue != null && undeliveredMessagesOfQueue.size() > 0) {
            list.addAll(undeliveredMessagesOfQueue);
            Collections.sort(list, new Comparator<QueueEntry>() { // from class: org.wso2.andes.server.cassandra.QueueDeliveryWorker.2
                @Override // java.util.Comparator
                public int compare(QueueEntry queueEntry, QueueEntry queueEntry2) {
                    return queueEntry.getMessage().getMessageNumber().compareTo(queueEntry2.getMessage().getMessageNumber());
                }
            });
        }
        int i = 0;
        Iterator<QueueEntry> it = list.iterator();
        while (it.hasNext()) {
            QueueEntry next = it.next();
            Map<String, CassandraSubscription> map = this.subscriptionMap.get(str);
            if (map != null) {
                if (str.contains("carbon:")) {
                    int i2 = 0;
                    while (true) {
                        if (i2 < map.size()) {
                            CassandraSubscription findNextSubscriptionToSent = findNextSubscriptionToSent(str);
                            if (isThisSubscriptionHasRoom(findNextSubscriptionToSent)) {
                                if (isThisSubscriptionInterestedInMessage(findNextSubscriptionToSent, next)) {
                                    ((AMQMessage) next.getMessage()).setClientIdentifier(findNextSubscriptionToSent.getSession());
                                    try {
                                        Thread.sleep(0L, 500000);
                                    } catch (InterruptedException e) {
                                    }
                                    deliverAsynchronously(findNextSubscriptionToSent.getSubscription(), next);
                                    this.totMsgSent++;
                                    i++;
                                    this.totalReadButUndeliveredMessages.decrementAndGet();
                                }
                                it.remove();
                            } else {
                                i2++;
                            }
                        }
                    }
                } else {
                    int i3 = 0;
                    while (true) {
                        if (i3 < map.size()) {
                            CassandraSubscription findNextSubscriptionToSent2 = findNextSubscriptionToSent(str);
                            if (isThisSubscriptionHasRoom(findNextSubscriptionToSent2) && isThisSubscriptionInterestedInMessage(findNextSubscriptionToSent2, next)) {
                                ((AMQMessage) next.getMessage()).setClientIdentifier(findNextSubscriptionToSent2.getSession());
                                try {
                                    Thread.sleep(0L, 500000);
                                } catch (InterruptedException e2) {
                                }
                                deliverAsynchronously(findNextSubscriptionToSent2.getSubscription(), next);
                                this.totMsgSent++;
                                i++;
                                this.totalReadButUndeliveredMessages.decrementAndGet();
                                it.remove();
                                break;
                            }
                            i3++;
                        }
                    }
                }
            }
        }
        return i;
    }

    private boolean isThisSubscriptionInterestedInMessage(CassandraSubscription cassandraSubscription, QueueEntry queueEntry) {
        return cassandraSubscription.getSubscription().hasInterest(queueEntry);
    }

    public AMQQueue getQueue() {
        return this.queue;
    }

    private void deliverAsynchronously(final Subscription subscription, final QueueEntry queueEntry) {
        if (!this.onflightMessageTracker.testMessage(queueEntry.getMessage().getMessageNumber().longValue())) {
            if (traceLog.isTraceEnabled()) {
                traceLog.trace("Rejecting message with messageID " + queueEntry.getMessage().getMessageNumber());
                return;
            }
            return;
        }
        AMQChannel aMQChannel = null;
        if (subscription instanceof SubscriptionImpl.AckSubscription) {
            aMQChannel = ((SubscriptionImpl.AckSubscription) subscription).getChannel();
        }
        aMQChannel.incrementNonAckedMessageCount();
        if (traceLog.isTraceEnabled()) {
            traceLog.trace("TRACING>> QDW - sent out message for for delivery channel id=" + aMQChannel + " " + this.queue.getName() + " message id " + queueEntry.getMessage().getMessageNumber());
        } else {
            try {
                Thread.sleep(0L, 500000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.executor.submit(new Runnable() { // from class: org.wso2.andes.server.cassandra.QueueDeliveryWorker.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (!(subscription instanceof SubscriptionImpl.AckSubscription)) {
                        QueueDeliveryWorker.log.error(new StringBuilder().append("Unexpected Subscription Implementation : ").append(subscription).toString() != null ? subscription.getClass().getName() : null);
                    } else if (!subscription.isActive()) {
                        QueueDeliveryWorker.this.storeUndeliveredMessagesDueToInactiveSubscriptions(queueEntry);
                        if (QueueDeliveryWorker.log.isDebugEnabled()) {
                            QueueDeliveryWorker.log.debug("TRACING>> QDW- storing due to subscription vanish - messageID:" + queueEntry.getMessage().getMessageNumber() + " for subscription " + ((SubscriptionImpl.AckSubscription) subscription).getName());
                        }
                    } else if (!OnflightMessageTracker.getInstance().testForAlreadyDeliveredMessage(queueEntry.getMessage().getMessageNumber().longValue())) {
                        subscription.send(queueEntry);
                        if (QueueDeliveryWorker.traceLog.isTraceEnabled()) {
                            QueueDeliveryWorker.traceLog.trace("TRACING>> QDW- sent messageID-" + queueEntry.getMessage().getMessageNumber() + "-to subscription " + ((SubscriptionImpl.AckSubscription) subscription).getName());
                        }
                    } else if (QueueDeliveryWorker.traceLog.isTraceEnabled()) {
                        QueueDeliveryWorker.traceLog.trace("TRACING >> QDW - Filtered out from sending out " + queueEntry.getMessage().getMessageNumber());
                    }
                } catch (Throwable th) {
                    QueueDeliveryWorker.log.error("Error while delivering message ", th);
                }
            }
        }, subscription.getSubscriptionID());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void storeUndeliveredMessagesDueToInactiveSubscriptions(QueueEntry queueEntry) {
        String name = queueEntry.getQueue().getName();
        ArrayList<QueueEntry> arrayList = this.undeliveredMessagesMap.get(name);
        if (arrayList != null) {
            arrayList.add(queueEntry);
            return;
        }
        ArrayList<QueueEntry> arrayList2 = new ArrayList<>();
        arrayList2.add(queueEntry);
        this.undeliveredMessagesMap.put(name, arrayList2);
    }

    private ArrayList<QueueEntry> getUndeliveredMessagesOfQueue(String str) {
        ArrayList<QueueEntry> arrayList = new ArrayList<>();
        ArrayList<QueueEntry> sentButNotAckedMessagesOfQueue = this.onflightMessageTracker.getSentButNotAckedMessagesOfQueue(str);
        if (sentButNotAckedMessagesOfQueue != null && !sentButNotAckedMessagesOfQueue.isEmpty()) {
            arrayList.addAll(sentButNotAckedMessagesOfQueue);
            if (traceLog.isTraceEnabled()) {
                Iterator<QueueEntry> it = sentButNotAckedMessagesOfQueue.iterator();
                while (it.hasNext()) {
                    traceLog.trace("TRACING >> QDW - scheduling sent but not acked message kept in memory to deliver messageID: " + it.next().getMessage().getMessageNumber());
                }
            }
        }
        ArrayList<QueueEntry> remove = this.undeliveredMessagesMap.remove(str);
        if (remove != null && !remove.isEmpty()) {
            arrayList.addAll(remove);
            if (traceLog.isTraceEnabled()) {
                Iterator<QueueEntry> it2 = remove.iterator();
                while (it2.hasNext()) {
                    traceLog.trace("TRACING >> QDW - scheduling delivery missed message due to inactive subscriptions message kept in memory to deliver messageID: " + it2.next().getMessage().getMessageNumber());
                }
            }
        }
        return arrayList;
    }

    public void stopFlusher() {
        this.running = false;
        log.debug("Shutting down the message flusher for the queue " + this.queue.getName());
    }

    public void startFlusher() {
        log.debug("staring flusher for " + this.queue.getName());
        this.running = true;
    }

    private boolean resetOffsetAtCassadraQueueIfNeeded(boolean z) {
        this.resetCounter++;
        if (!z && (this.resetCounter <= this.maxRestCounter || System.currentTimeMillis() - this.lastRestTime <= this.queueMsgDeliveryCurserResetTimeInterval)) {
            return false;
        }
        this.resetCounter = 0;
        this.lastRestTime = System.currentTimeMillis();
        this.lastProcessedId = getStartingIndex();
        if (!log.isDebugEnabled()) {
            return true;
        }
        log.debug("TRACING>> QDW - Reset offset called and Updated lastProcessedId is= " + this.lastProcessedId);
        return true;
    }

    private long getStartingIndex() {
        long j = this.lastProcessedId;
        if (this.subscriptionCursar4QueueMap.values().size() == 0) {
            j = 0;
        }
        for (QueueDeliveryInfo queueDeliveryInfo : this.subscriptionCursar4QueueMap.values()) {
            if (queueDeliveryInfo.hasQueueFullAndMessagesIgnored) {
                if (j > queueDeliveryInfo.ignoredFirstMessageId && queueDeliveryInfo.ignoredFirstMessageId != -1) {
                    j = queueDeliveryInfo.ignoredFirstMessageId;
                }
                if (queueDeliveryInfo.readButUndeliveredMessages.size() < this.maxNumberOfReadButUndeliveredMessages / 2) {
                    queueDeliveryInfo.hasQueueFullAndMessagesIgnored = false;
                }
            }
            if (queueDeliveryInfo.needToReset) {
                if (j > queueDeliveryInfo.ignoredFirstMessageId) {
                    j = queueDeliveryInfo.ignoredFirstMessageId;
                }
                queueDeliveryInfo.setNeedToReset(false);
            }
        }
        if (j > 0) {
            j--;
        }
        return j;
    }

    public void clearMessagesAccumilatedDueToInactiveSubscriptionsForQueue(String str) {
        this.undeliveredMessagesMap.remove(str);
        synchronized (getQueueDeliveryInfo(str).readButUndeliveredMessages) {
            getQueueDeliveryInfo(str).readButUndeliveredMessages.clear();
        }
        Iterator<QueueEntry> it = this.laggards.iterator();
        while (it.hasNext()) {
            if (((AMQMessage) it.next().getMessage()).getMessageMetaData().getMessagePublishInfo().getRoutingKey().toString().equals(str)) {
                it.remove();
            }
        }
    }
}
