package org.wso2.andes.server.cassandra;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.impl.sql.compile.SQLParserConstants;
import org.wso2.andes.server.AMQChannel;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.message.AMQMessage;
import org.wso2.andes.server.queue.AMQQueue;
import org.wso2.andes.server.queue.QueueEntry;
import org.wso2.andes.server.store.CassandraMessageStore;
import org.wso2.andes.server.subscription.Subscription;
import org.wso2.andes.server.subscription.SubscriptionImpl;
import org.wso2.andes.server.util.AndesUtils;

/* loaded from: input_file:org/wso2/andes/server/cassandra/CassandraMessageFlusher.class */
public class CassandraMessageFlusher extends Thread {
    private AMQQueue queue;
    private String nodeQueue;
    private static Log log = LogFactory.getLog(CassandraMessageFlusher.class);
    private int messageCountToRead;
    private int resetCounter;
    private SequentialThreadPoolExecutor executor;
    private int queueWorkerWaitInterval;
    private Map<String, Map<String, CassandraSubscription>> subscriptionMap;
    private boolean running = true;
    private int maxNumberOfUnAckedMessages = 50;
    private long lastProcessedId = 0;
    private int maxRestCounter = 50;
    private long totMsgSent = 0;
    private long totMsgRead = 0;
    private long lastRestTime = 0;
    private long iterations = 0;
    private int workqueueSize = 0;
    private long failureCount = 0;
    private int totalReadButUndeliveredMessages = 0;
    private Map<String, QueueDeliveryInfo> subscriptionCursar4QueueMap = new HashMap();
    private OnflightMessageTracker onflightMessageTracker = OnflightMessageTracker.getInstance();

    /* loaded from: input_file:org/wso2/andes/server/cassandra/CassandraMessageFlusher$QueueDeliveryInfo.class */
    public class QueueDeliveryInfo {
        String queueName;
        Iterator<CassandraSubscription> iterator;
        List<QueueEntry> readButUndeliveredMessages = new ArrayList();
        boolean messageIgnored = false;

        public QueueDeliveryInfo() {
        }
    }

    public CassandraSubscription findNextSubscriptionToSent(String str) {
        Map<String, CassandraSubscription> map = this.subscriptionMap.get(str);
        if (map == null || map.size() == 0) {
            this.subscriptionCursar4QueueMap.remove(str);
            return null;
        }
        QueueDeliveryInfo queueDeliveryInfo = getQueueDeliveryInfo(str);
        Iterator<CassandraSubscription> it = queueDeliveryInfo.iterator;
        if (it.hasNext()) {
            return it.next();
        }
        Iterator<CassandraSubscription> it2 = map.values().iterator();
        queueDeliveryInfo.iterator = it2;
        if (it2.hasNext()) {
            return it2.next();
        }
        return null;
    }

    private QueueDeliveryInfo getQueueDeliveryInfo(String str) {
        QueueDeliveryInfo queueDeliveryInfo = this.subscriptionCursar4QueueMap.get(str);
        if (queueDeliveryInfo == null) {
            queueDeliveryInfo = new QueueDeliveryInfo();
            queueDeliveryInfo.queueName = str;
            queueDeliveryInfo.iterator = this.subscriptionMap.get(str).values().iterator();
            this.subscriptionCursar4QueueMap.put(str, queueDeliveryInfo);
        }
        return queueDeliveryInfo;
    }

    public CassandraMessageFlusher(String str, AMQQueue aMQQueue, Map<String, Map<String, CassandraSubscription>> map, SequentialThreadPoolExecutor sequentialThreadPoolExecutor, int i) {
        this.messageCountToRead = 50;
        this.subscriptionMap = new ConcurrentHashMap();
        this.queue = aMQQueue;
        this.nodeQueue = str;
        this.executor = sequentialThreadPoolExecutor;
        this.messageCountToRead = ClusterResourceHolder.getInstance().getClusterConfiguration().getMessageBatchSizeForSubscribers();
        this.queueWorkerWaitInterval = i;
        this.subscriptionMap = map;
        log.info("Queue worker started for queue: " + aMQQueue.getResourceName() + " with on flight message checks");
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.iterations = 0L;
        this.workqueueSize = 0;
        this.lastRestTime = System.currentTimeMillis();
        this.failureCount = 0L;
        while (this.running) {
            try {
                this.workqueueSize = this.executor.getSize();
                if (this.workqueueSize > 1000) {
                    if (this.workqueueSize > 5000) {
                        log.error("Flusher queue is growing, and this should not happen. Please check cassandra Flusher");
                    }
                    log.info("skipping content cassandra reading thread as flusher queue has " + this.workqueueSize + " tasks");
                    sleep4waitInterval();
                } else {
                    resetOffsetAtCassadraQueueIfNeeded(false);
                    CassandraMessageStore cassandraMessageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
                    int i = 0;
                    if (this.totalReadButUndeliveredMessages < 10000) {
                        List<QueueEntry> messagesFromUserQueue = cassandraMessageStore.getMessagesFromUserQueue(this.nodeQueue, this.queue, this.messageCountToRead, this.lastProcessedId);
                        for (QueueEntry queueEntry : messagesFromUserQueue) {
                            QueueDeliveryInfo queueDeliveryInfo = getQueueDeliveryInfo(((AMQMessage) queueEntry.getMessage()).getMessageMetaData().getMessagePublishInfo().getRoutingKey().toString());
                            if (!queueDeliveryInfo.messageIgnored) {
                                if (queueDeliveryInfo.readButUndeliveredMessages.size() < 5000) {
                                    queueDeliveryInfo.readButUndeliveredMessages.add(queueEntry);
                                    this.totalReadButUndeliveredMessages++;
                                    this.lastProcessedId = queueEntry.getMessage().getMessageNumber().longValue();
                                } else {
                                    queueDeliveryInfo.messageIgnored = true;
                                }
                            }
                        }
                        if (messagesFromUserQueue.size() == 0) {
                            resetOffsetAtCassadraQueueIfNeeded(false);
                            sleep4waitInterval();
                        }
                        if (messagesFromUserQueue.size() == this.messageCountToRead) {
                            this.messageCountToRead += 100;
                            if (this.messageCountToRead > 300) {
                                this.messageCountToRead = SQLParserConstants.LOGGED;
                            }
                        } else {
                            this.messageCountToRead -= 50;
                            if (this.messageCountToRead < 20) {
                                this.messageCountToRead = 20;
                            }
                        }
                        this.totMsgRead += messagesFromUserQueue.size();
                        i = messagesFromUserQueue.size();
                    }
                    int i2 = 0;
                    for (QueueDeliveryInfo queueDeliveryInfo2 : this.subscriptionCursar4QueueMap.values()) {
                        i2 = sendMessagesToSubscriptions(queueDeliveryInfo2.queueName, queueDeliveryInfo2.readButUndeliveredMessages);
                    }
                    log.info("[Flusher]readNow=" + i + " totRead=" + this.totMsgRead + " totprocessed= " + this.totMsgSent + ", totalReadButNotSent=" + this.totalReadButUndeliveredMessages + ". workQueue= " + this.workqueueSize + " lastID=" + this.lastProcessedId);
                    if (this.totalReadButUndeliveredMessages + this.totMsgSent != this.totMsgRead) {
                        log.error("Messages got lost in flusher");
                    }
                    this.iterations++;
                    if (i2 == 0 || this.iterations % 10 == 0) {
                        sleep4waitInterval();
                    }
                    this.failureCount = 0L;
                }
            } catch (Throwable th) {
                long j = this.queueWorkerWaitInterval;
                this.failureCount++;
                try {
                    Thread.sleep(Math.max(j * 5, this.failureCount * j));
                } catch (InterruptedException e) {
                }
                log.error("Error running Cassandra Message Flusher" + th.getMessage(), th);
            }
        }
    }

    private void sleep4waitInterval() {
        try {
            Thread.sleep(this.queueWorkerWaitInterval);
        } catch (InterruptedException e) {
        }
    }

    private boolean isThisSubscriptionHasRoom(CassandraSubscription cassandraSubscription) {
        AMQChannel aMQChannel = null;
        if (cassandraSubscription.getSubscription() instanceof SubscriptionImpl.AckSubscription) {
            aMQChannel = ((SubscriptionImpl.AckSubscription) cassandraSubscription.getSubscription()).getChannel();
        }
        int notAckedMessageCount = aMQChannel.getNotAckedMessageCount();
        if (notAckedMessageCount + this.workqueueSize < this.maxNumberOfUnAckedMessages) {
            return true;
        }
        if (!log.isDebugEnabled()) {
            return false;
        }
        log.debug("Not selected, channel=" + this.queue.getName() + "/" + aMQChannel + " pending count =" + (notAckedMessageCount + this.workqueueSize));
        return false;
    }

    public int sendMessagesToSubscriptions(String str, List<QueueEntry> list) {
        int i = 0;
        Iterator<QueueEntry> it = list.iterator();
        while (it.hasNext()) {
            QueueEntry next = it.next();
            boolean z = false;
            Map<String, CassandraSubscription> map = this.subscriptionMap.get(str);
            if (map != null) {
                int i2 = 0;
                while (true) {
                    if (i2 >= map.size()) {
                        break;
                    }
                    CassandraSubscription findNextSubscriptionToSent = findNextSubscriptionToSent(str);
                    if (isThisSubscriptionHasRoom(findNextSubscriptionToSent)) {
                        ((AMQMessage) next.getMessage()).setClientIdentifier(findNextSubscriptionToSent.getSession());
                        if (log.isDebugEnabled()) {
                            log.debug("readFromCassandra" + AndesUtils.printAMQMessage(next));
                        }
                        deliverAsynchronously(findNextSubscriptionToSent.getSubscription(), next);
                        this.totMsgSent++;
                        i++;
                        this.totalReadButUndeliveredMessages--;
                        z = true;
                        it.remove();
                    } else {
                        i2++;
                    }
                }
                if (!z) {
                    log.debug("All subscriptions for queue " + str + " have max Unacked messages " + this.queue.getName());
                }
            }
        }
        return i;
    }

    public AMQQueue getQueue() {
        return this.queue;
    }

    private void deliverAsynchronously(final Subscription subscription, final QueueEntry queueEntry) {
        if (this.onflightMessageTracker.testMessage(queueEntry.getMessage().getMessageNumber().longValue())) {
            AMQChannel aMQChannel = null;
            if (subscription instanceof SubscriptionImpl.AckSubscription) {
                aMQChannel = ((SubscriptionImpl.AckSubscription) subscription).getChannel();
            }
            aMQChannel.incrementNonAckedMessageCount();
            if (log.isDebugEnabled()) {
                log.debug("sent out message for channel id=" + aMQChannel + " " + this.queue.getName());
            }
            this.executor.submit(new Runnable() { // from class: org.wso2.andes.server.cassandra.CassandraMessageFlusher.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        if (subscription instanceof SubscriptionImpl.AckSubscription) {
                            subscription.send(queueEntry);
                        } else {
                            CassandraMessageFlusher.log.error(new StringBuilder().append("Unexpected Subscription Implementation : ").append(subscription).toString() != null ? subscription.getClass().getName() : null);
                        }
                    } catch (Throwable th) {
                        CassandraMessageFlusher.log.error("Error while delivering message ", th);
                    }
                }
            }, subscription);
        }
    }

    public void stopFlusher() {
        this.running = false;
        log.debug("Shutting down the message flusher for the queue " + this.queue.getName());
    }

    public void startFlusher() {
        log.debug("staring flusher for " + this.queue.getName());
        this.running = true;
    }

    private boolean resetOffsetAtCassadraQueueIfNeeded(boolean z) {
        this.resetCounter++;
        if (this.resetCounter <= this.maxRestCounter || System.currentTimeMillis() - this.lastRestTime <= 60000) {
            return false;
        }
        this.resetCounter = 0;
        this.lastRestTime = System.currentTimeMillis();
        this.lastProcessedId = 0L;
        for (QueueDeliveryInfo queueDeliveryInfo : this.subscriptionCursar4QueueMap.values()) {
            queueDeliveryInfo.messageIgnored = false;
            queueDeliveryInfo.readButUndeliveredMessages.clear();
        }
        this.totalReadButUndeliveredMessages = 0;
        this.totMsgRead = 0L;
        this.totMsgSent = 0L;
        log.info("Rest the next message ID to read for cassandra flusher");
        return true;
    }
}
