package org.wso2.andes.server.cassandra;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.iapi.sql.compile.TypeCompiler;
import org.wso2.andes.AMQException;
import org.wso2.andes.AMQInternalException;
import org.wso2.andes.AMQStoreException;
import org.wso2.andes.server.AMQChannel;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.binding.Binding;
import org.wso2.andes.server.cassandra.QueueDeliveryWorker;
import org.wso2.andes.server.cluster.GlobalQueueWorker;
import org.wso2.andes.server.cluster.coordination.CoordinationException;
import org.wso2.andes.server.exchange.Exchange;
import org.wso2.andes.server.queue.AMQQueue;
import org.wso2.andes.server.store.CassandraMessageStore;
import org.wso2.andes.server.store.util.CassandraDataAccessException;
import org.wso2.andes.server.subscription.SubscriptionImpl;
import org.wso2.andes.server.util.AndesConstants;
import org.wso2.andes.server.util.AndesUtils;
import org.wso2.andes.server.util.QueueMessageRemovalLock;

/* loaded from: input_file:org/wso2/andes/server/cassandra/DefaultClusteringEnabledSubscriptionManager.class */
public class DefaultClusteringEnabledSubscriptionManager implements ClusteringEnabledSubscriptionManager {
    private static Log log = LogFactory.getLog(DefaultClusteringEnabledSubscriptionManager.class);
    private static final Log traceLog = LogFactory.getLog(AndesConstants.TRACE_LOGGER);
    private int queueWorkerWaitInterval;
    private ConcurrentHashMap<String, List<String>> globalSubscriptionsMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, String> destinationQueues = new ConcurrentHashMap<>();
    private Map<String, QueueDeliveryWorker> workMap = new ConcurrentHashMap();
    private Map<String, Map<String, CassandraSubscription>> subscriptionMap = new ConcurrentHashMap();
    private ExecutorService messageFlusherExecutor = null;
    private SequentialThreadPoolExecutor messageDeliveryExecutor = null;
    private Map<AMQChannel, Map<Long, Semaphore>> unAckedMessagelocks = new ConcurrentHashMap();
    private Map<AMQChannel, QueueSubscriptionAcknowledgementHandler> acknowledgementHandlerMap = new ConcurrentHashMap();
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private HashSet<String> queueNamesWithNoSubscriptions = new HashSet<>();

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void init() {
        this.messageFlusherExecutor = Executors.newFixedThreadPool(ClusterResourceHolder.getInstance().getClusterConfiguration().getFlusherPoolSize(), new ThreadFactoryBuilder().setNameFormat("QueueDeliveryWorker-%d").build());
        this.messageDeliveryExecutor = new SequentialThreadPoolExecutor(ClusterResourceHolder.getInstance().getClusterConfiguration().getPublisherPoolSize(), "AsyncQueueDelivery");
        this.queueWorkerWaitInterval = ClusterResourceHolder.getInstance().getClusterConfiguration().getQueueWorkerInterval();
        clearAndUpdateDestinationQueueList();
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void addSubscription(AMQQueue aMQQueue, CassandraSubscription cassandraSubscription) throws AMQException, CoordinationException {
        List<Binding> bindings = aMQQueue.getBindings();
        if (bindings != null && !bindings.isEmpty()) {
            for (Binding binding : bindings) {
                Exchange exchange = binding.getExchange();
                if (exchange.getName().equalsIgnoreCase("amq.direct")) {
                }
                if (exchange.getName().equalsIgnoreCase("amq.topic")) {
                    if (log.isDebugEnabled()) {
                        log.debug("adding subscription for binding - " + binding.getQueue() + TypeCompiler.MINUS_OP + binding.getBindingKey());
                    }
                    String topicNodeQueueName = AndesUtils.getTopicNodeQueueName();
                    if (aMQQueue.isExclusive()) {
                        if (log.isDebugEnabled()) {
                            log.debug("Checking if an exclusive subscription exists cluster wide");
                        }
                        boolean checkIfDuableExclusiveSubscriptionAlreadyExists = ClusterResourceHolder.getInstance().getCassandraMessageStore().checkIfDuableExclusiveSubscriptionAlreadyExists(aMQQueue.getResourceName());
                        if (log.isDebugEnabled()) {
                            log.debug("check for an exclusive subscription. Result: " + checkIfDuableExclusiveSubscriptionAlreadyExists);
                        }
                        if (checkIfDuableExclusiveSubscriptionAlreadyExists) {
                            throw new AMQQueue.ExistingExclusiveSubscription();
                        }
                    }
                    ClusterResourceHolder.getInstance().getCassandraMessageStore().registerSubscriberForTopic(binding.getBindingKey(), topicNodeQueueName, aMQQueue.getResourceName(), aMQQueue.isDurable(), true);
                    if (log.isDebugEnabled()) {
                        log.debug("DCESM - Adding subscriber for " + topicNodeQueueName + "binding key " + binding.getBindingKey() + " queueName " + aMQQueue.getResourceName());
                    }
                    if (!ClusterResourceHolder.getInstance().getTopicDeliveryWorker().isWorking()) {
                        ClusterResourceHolder.getInstance().getTopicDeliveryWorker().setWorking();
                    }
                    try {
                        ClusterResourceHolder.getInstance().getTopicSubscriptionCoordinationManager().notifyTopicSubscriptionChange(binding.getBindingKey());
                        if (log.isDebugEnabled()) {
                            log.debug("DCESM - notifying topic subscription change " + binding.getBindingKey());
                        }
                    } catch (CoordinationException e) {
                        throw new AMQInternalException("Error in notifying subscription change when adding subscription", e);
                    }
                }
            }
        }
        if (cassandraSubscription.getSubscription() instanceof SubscriptionImpl.BrowserSubscription) {
            new QueueBrowserDeliveryWorker(cassandraSubscription.getSubscription(), aMQQueue, cassandraSubscription.getSession(), ClusterResourceHolder.getInstance().getClusterConfiguration().isInMemoryMode().booleanValue()).send();
        } else {
            Map<String, CassandraSubscription> map = this.subscriptionMap.get(aMQQueue.getResourceName());
            if (map == null || map.size() == 0) {
                synchronized (this.subscriptionMap) {
                    Map<String, CassandraSubscription> map2 = this.subscriptionMap.get(aMQQueue.getResourceName());
                    if (map2 == null || map2.size() == 0) {
                        Map<String, CassandraSubscription> map3 = this.subscriptionMap.get(aMQQueue.getResourceName());
                        if (map3 == null) {
                            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                            concurrentHashMap.put(cassandraSubscription.getSubscription().getSubscriptionID() + "", cassandraSubscription);
                            this.subscriptionMap.put(aMQQueue.getResourceName(), concurrentHashMap);
                            if (aMQQueue.isDurable() || !aMQQueue.checkIfBoundToTopicExchange()) {
                                handleSubscription(aMQQueue);
                            }
                        } else if (map3.size() == 0) {
                            map3.put(cassandraSubscription.getSubscription().getSubscriptionID() + "", cassandraSubscription);
                            if (aMQQueue.isDurable() || !aMQQueue.checkIfBoundToTopicExchange()) {
                                handleSubscription(aMQQueue);
                            }
                        }
                        incrementSubscriptionCount(true, aMQQueue.getResourceName());
                        if (traceLog.isTraceEnabled()) {
                            traceLog.trace("TRACING>> DCESM- Called Increment sub count for-" + aMQQueue.getResourceName() + "-with instantiateColumn=true");
                        }
                    } else {
                        map2.put(cassandraSubscription.getSubscription().getSubscriptionID() + "", cassandraSubscription);
                    }
                }
            } else {
                map.put(cassandraSubscription.getSubscription().getSubscriptionID() + "", cassandraSubscription);
                incrementSubscriptionCount(false, aMQQueue.getResourceName());
                if (traceLog.isTraceEnabled()) {
                    traceLog.trace("TRACING>> DCESM- Called Increment sub count for-" + aMQQueue.getResourceName() + "-with instantiateColumn=false");
                }
            }
            if (log.isDebugEnabled()) {
                log.debug("Binding Subscription " + cassandraSubscription.getSubscription().getSubscriptionID() + " to queue " + aMQQueue.getName());
            }
        }
        if (ClusterResourceHolder.getInstance().getClusterConfiguration().isClusteringEnabled().booleanValue()) {
            if (log.isDebugEnabled()) {
                log.debug("Notifying queue subscription change to the cluster");
            }
            ClusterResourceHolder.getInstance().getSubscriptionCoordinationManager().handleSubscriptionChange();
        } else {
            Iterator<String> it = ClusterResourceHolder.getInstance().getSubscriptionManager().updateNodeQueuesForDestinationQueueMap().iterator();
            while (it.hasNext()) {
                ClusterResourceHolder.getInstance().getClusterManager().getGlobalQueueManager().resetGlobalQueueWorkerIfRunning(AndesUtils.getGlobalQueueNameForDestinationQueue(it.next()));
            }
        }
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public List<String> getDestinationQueues() {
        return new ArrayList(this.destinationQueues.keySet());
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void clearAndUpdateDestinationQueueList() {
        this.destinationQueues.clear();
        log.debug("DCESM-ClearAndUpdateDestinationQueueList- Cleared Destination Queues");
        try {
            for (String str : ClusterResourceHolder.getInstance().getCassandraMessageStore().getDestinationQueues()) {
                this.destinationQueues.put(str, "");
                log.debug("DCESM- ClearAndUpdateDestinationQueueList >> added queue" + str + "to destinationQueues");
            }
        } catch (AMQStoreException e) {
            log.error("Error in updating in-memory list of destination queues", e);
        }
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public List<String> updateNodeQueuesForDestinationQueueMap() {
        ArrayList arrayList = new ArrayList();
        try {
            log.debug("TRACING>> DCESM- UpdateNodeQueuesForDestinationQueueMap called. Updating globalSubscriptionsMap ");
            HashMap hashMap = new HashMap();
            Enumeration<String> keys = this.destinationQueues.keys();
            while (keys.hasMoreElements()) {
                String nextElement = keys.nextElement();
                if (this.globalSubscriptionsMap.get(nextElement) == null) {
                    hashMap.put(nextElement, 0);
                } else {
                    hashMap.put(nextElement, Integer.valueOf(this.globalSubscriptionsMap.get(nextElement).size()));
                }
            }
            CassandraMessageStore cassandraMessageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
            this.globalSubscriptionsMap.clear();
            log.debug("TRACING>> DCESM- Cleared globalSubscriptionsMap");
            Enumeration<String> keys2 = this.destinationQueues.keys();
            while (keys2.hasMoreElements()) {
                String nextElement2 = keys2.nextElement();
                List<String> nodeQueuesForDestinationQueue = cassandraMessageStore.getNodeQueuesForDestinationQueue(nextElement2);
                if (log.isDebugEnabled()) {
                    log.debug("TRACING>> DCESM- Read node queues with size-" + nodeQueuesForDestinationQueue.size() + "-for destination queue-" + nextElement2);
                }
                if (nodeQueuesForDestinationQueue.size() > 0) {
                    ArrayList arrayList2 = new ArrayList();
                    for (String str : nodeQueuesForDestinationQueue) {
                        long subscriptionCountForQueue = cassandraMessageStore.getSubscriptionCountForQueue(nextElement2, str);
                        log.debug("TRACING>> DCESM-Subscription count from cassandra for Destination queue-" + nextElement2 + "-in node queue-" + str + "-is-" + subscriptionCountForQueue);
                        for (long j = 0; j < subscriptionCountForQueue; j++) {
                            arrayList2.add(str);
                        }
                    }
                    this.globalSubscriptionsMap.put(nextElement2, arrayList2);
                    if (log.isDebugEnabled()) {
                        log.debug("TRACING>> DCESM-UpdateNodeQueuesForDestinationQueueMap >> added queueList of size " + arrayList2.size() + "to destination queue -" + nextElement2);
                    }
                }
            }
            Enumeration<String> keys3 = this.destinationQueues.keys();
            while (keys3.hasMoreElements()) {
                String nextElement3 = keys3.nextElement();
                int size = this.globalSubscriptionsMap.get(nextElement3) == null ? 0 : this.globalSubscriptionsMap.get(nextElement3).size();
                int intValue = hashMap.containsKey(nextElement3) ? ((Integer) hashMap.get(nextElement3)).intValue() : 0;
                if (size > 0 && intValue == 0) {
                    arrayList.add(nextElement3);
                }
            }
            Iterator<GlobalQueueWorker> it = ClusterResourceHolder.getInstance().getClusterManager().getGlobalQueueManager().getGlobalQueueWorkersInThisNode().iterator();
            while (it.hasNext()) {
                it.next().wakeUpGlobalQueueWorker();
            }
        } catch (CassandraDataAccessException e) {
            log.error("Error in getting the Node Queues as cassandra connection is down");
        } catch (Exception e2) {
            e2.printStackTrace();
        }
        return arrayList;
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void removeSubscription(String str, String str2, boolean z) {
        synchronized (this.queueNamesWithNoSubscriptions) {
            try {
                Map<String, CassandraSubscription> map = this.subscriptionMap.get(str);
                if (map != null && map.containsKey(str2)) {
                    map.remove(str2);
                    log.debug("Removing Subscription " + str2 + " from queue " + str);
                    if (log.isDebugEnabled()) {
                        log.debug("TRACING>> DCESM - Removing subscription with id " + str2 + " from queue " + str);
                    }
                    ClusterResourceHolder.getInstance().getCassandraMessageStore().decrementSubscriptionCount(str, AndesUtils.getMyNodeQueueName(), 1L);
                    if (map.size() == 0) {
                        this.queueNamesWithNoSubscriptions.add(str);
                    }
                }
            } catch (Exception e) {
                log.error("Error while removing subscription for queue: " + str, e);
            }
        }
        try {
            if (ClusterResourceHolder.getInstance().getClusterConfiguration().isClusteringEnabled().booleanValue()) {
                ClusterResourceHolder.getInstance().getSubscriptionCoordinationManager().handleSubscriptionChange();
            } else {
                Iterator<String> it = ClusterResourceHolder.getInstance().getSubscriptionManager().updateNodeQueuesForDestinationQueueMap().iterator();
                while (it.hasNext()) {
                    ClusterResourceHolder.getInstance().getClusterManager().getGlobalQueueManager().resetGlobalQueueWorkerIfRunning(AndesUtils.getGlobalQueueNameForDestinationQueue(it.next()));
                }
            }
        } catch (Exception e2) {
            log.error("Error while notifying Subscription change");
        }
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public List<String> getNodeQueuesHavingSubscriptionsForQueue(String str) {
        return this.globalSubscriptionsMap.get(str);
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void handleFreshSubscriptionsJoiningToCluster() {
        Iterator<String> it = updateNodeQueuesForDestinationQueueMap().iterator();
        while (it.hasNext()) {
            ClusterResourceHolder.getInstance().getClusterManager().getGlobalQueueManager().resetGlobalQueueWorkerIfRunning(AndesUtils.getGlobalQueueNameForDestinationQueue(it.next()));
        }
        synchronized (this.queueNamesWithNoSubscriptions) {
            Iterator<String> it2 = this.queueNamesWithNoSubscriptions.iterator();
            while (it2.hasNext()) {
                String next = it2.next();
                try {
                    log.debug("Executing subscription removal handler to minimize message losses");
                    if (ClusterResourceHolder.getInstance().getClusterConfiguration().isClusteringEnabled().booleanValue()) {
                        handleMessageRemoval(next, AndesUtils.getGlobalQueueNameForDestinationQueue(next));
                    }
                } catch (AMQStoreException e) {
                    log.error("Error while removing subscription for queue: " + next, e);
                }
                it2.remove();
            }
        }
    }

    public void clearAllPersistedStatesOfDissapearedNode(int i) {
        log.info("Clearing the Persisted State of Node with ID " + i);
        CassandraMessageStore cassandraMessageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
        String nodeQueueNameForNodeId = AndesUtils.getNodeQueueNameForNodeId(i);
        try {
            cassandraMessageStore.deleteNodeData("" + i);
            for (String str : cassandraMessageStore.getDestinationQueues()) {
                cassandraMessageStore.decrementSubscriptionCount(str, nodeQueueNameForNodeId, cassandraMessageStore.getSubscriptionCountForQueue(str, nodeQueueNameForNodeId));
                cassandraMessageStore.removeNodeQueueFromDestinationQueue(str, nodeQueueNameForNodeId);
            }
            if (ClusterResourceHolder.getInstance().getClusterConfiguration().isClusteringEnabled().booleanValue()) {
                ClusterResourceHolder.getInstance().getSubscriptionCoordinationManager().handleSubscriptionChange();
            } else {
                Iterator<String> it = ClusterResourceHolder.getInstance().getSubscriptionManager().updateNodeQueuesForDestinationQueueMap().iterator();
                while (it.hasNext()) {
                    ClusterResourceHolder.getInstance().getClusterManager().getGlobalQueueManager().resetGlobalQueueWorkerIfRunning(AndesUtils.getGlobalQueueNameForDestinationQueue(it.next()));
                }
            }
            cassandraMessageStore.removeTopicSubscriptionsOfDisappearedNode(AndesUtils.getTopicNodeQueueNameForNodeID(i));
            if (ClusterResourceHolder.getInstance().getClusterConfiguration().isClusteringEnabled().booleanValue()) {
                Iterator<String> it2 = cassandraMessageStore.getTopics().iterator();
                while (it2.hasNext()) {
                    ClusterResourceHolder.getInstance().getTopicSubscriptionCoordinationManager().notifyTopicSubscriptionChange(it2.next());
                }
            }
        } catch (AMQStoreException e) {
            log.error("Error while clearing state of disappeared node", e);
        } catch (CoordinationException e2) {
            log.error("Error while notifying subscription change to cluster", e2);
        } catch (CassandraDataAccessException e3) {
            log.error("Error while reading from Cassandra", e3);
        }
    }

    private void handleMessageRemoval(String str, String str2) throws AMQStoreException {
        synchronized (QueueMessageRemovalLock.class) {
            long j = Long.MAX_VALUE;
            try {
                try {
                    Thread.sleep(10000L);
                } catch (InterruptedException e) {
                }
                CassandraMessageStore cassandraMessageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
                String myNodeQueueName = AndesUtils.getMyNodeQueueName();
                int i = 0;
                long j2 = 0;
                for (List<CassandraQueueMessage> messagesFromNodeQueue = cassandraMessageStore.getMessagesFromNodeQueue(myNodeQueueName, 40, 0L); messagesFromNodeQueue.size() != 0; messagesFromNodeQueue = cassandraMessageStore.getMessagesFromNodeQueue(myNodeQueueName, 40, j2)) {
                    for (CassandraQueueMessage cassandraQueueMessage : messagesFromNodeQueue) {
                        if (!cassandraQueueMessage.getDestinationQueueName().equals(str) || OnflightMessageTracker.getInstance().testForAlreadyDeliveredAndAckReceivedMessages(cassandraQueueMessage.getMessageId())) {
                            cassandraMessageStore.removeMessageFromNodeQueue(myNodeQueueName, cassandraQueueMessage.getMessageId());
                            if (log.isDebugEnabled()) {
                                log.debug("+++ TRACING >> DCESM >> Removing the message from node queue " + myNodeQueueName + " for the second time since it is already delivered and acked messageId " + cassandraQueueMessage.getMessageId());
                            }
                            try {
                                cassandraMessageStore.addMessageToGlobalQueue(str2, cassandraQueueMessage.getDestinationQueueName(), cassandraQueueMessage.getMessageId(), cassandraQueueMessage.getMessage(), false, 0L, false, true);
                                if (traceLog.isTraceEnabled()) {
                                    traceLog.trace("TRACING>> DCESM- Moving message-" + AndesUtils.getHID(cassandraQueueMessage.getAmqMessage()) + "- with MessageID-" + cassandraQueueMessage.getMessageId() + "-from NQ " + myNodeQueueName + " to GQ-" + str2);
                                }
                            } catch (Exception e2) {
                                log.error(e2);
                            }
                        } else {
                            if (getNodeQueuesHavingSubscriptionsForQueue(str) != null) {
                                if (getNodeQueuesHavingSubscriptionsForQueue(str).size() > 0 && !getNodeQueuesHavingSubscriptionsForQueue(str).contains(myNodeQueueName)) {
                                    i++;
                                    cassandraMessageStore.removeMessageFromNodeQueue(myNodeQueueName, cassandraQueueMessage.getMessageId());
                                    try {
                                        cassandraMessageStore.addMessageToGlobalQueue(str2, cassandraQueueMessage.getDestinationQueueName(), cassandraQueueMessage.getMessageId(), cassandraQueueMessage.getMessage(), false, 0L, false, true);
                                        if (traceLog.isTraceEnabled()) {
                                            traceLog.trace("TRACING>> DCESM- Moving message-" + AndesUtils.getHID(cassandraQueueMessage.getAmqMessage()) + "- with MessageID-" + cassandraQueueMessage.getMessageId() + "-from NQ " + myNodeQueueName + " to GQ-" + str2);
                                        }
                                    } catch (Exception e3) {
                                        log.error(e3);
                                    }
                                } else if (traceLog.isTraceEnabled()) {
                                    traceLog.trace("TRACING >> DCESM >> Skipped moving message " + AndesUtils.getHID(cassandraQueueMessage.getAmqMessage()) + " with message ID" + cassandraQueueMessage.getMessageId() + "-from NQ " + myNodeQueueName + " to GQ-" + str2 + " since there is no other subscriptions");
                                }
                            }
                            OnflightMessageTracker.getInstance().deleteFromAlreadyReadFromNodeQueueMessagesInstantly(cassandraQueueMessage.getMessageId());
                            if (traceLog.isTraceEnabled()) {
                                traceLog.trace("TRACING >> DCESM >> Invoked deleteFromAlreadyReadFromNodeQueueMessagesInstantly for message " + AndesUtils.getHID(cassandraQueueMessage.getAmqMessage()) + " with messageID " + cassandraQueueMessage.getMessageId());
                            }
                        }
                        j2 = cassandraQueueMessage.getMessageId();
                        if (j > j2) {
                            j = j2;
                        }
                    }
                }
                ClusterResourceHolder.getInstance().getClusterManager().removeInMemoryMessagesAccumulated(str);
                if (log.isDebugEnabled()) {
                    log.debug("Moved " + i + " Number of Messages Addressed to Queue " + str + " from Node Queue " + myNodeQueueName + "to Global Queue");
                }
                updateQueueDeliveryInformation(str, j);
            } catch (AMQStoreException e4) {
                log.error("Error removing messages addressed to " + str + "from relevant node queue");
            }
        }
    }

    private void updateQueueDeliveryInformation(String str, long j) {
        QueueDeliveryWorker.QueueDeliveryInfo queueDeliveryInfo;
        QueueDeliveryWorker queueDeliveryWorker = this.workMap.get(AndesUtils.getMyNodeQueueName());
        if (queueDeliveryWorker == null || (queueDeliveryInfo = queueDeliveryWorker.getQueueDeliveryInfo(str)) == null) {
            return;
        }
        queueDeliveryInfo.setIgnoredFirstMessageId(j);
        queueDeliveryInfo.setNeedToReset(true);
        log.debug("TRACING>> DCESM-updateQueueDeliveryInformation >> Updated the QDI Object of queue-" + str + "-to ignoredFirstMessageID = " + j);
    }

    private void handleSubscription(AMQQueue aMQQueue) {
        try {
            String globalQueueNameForDestinationQueue = AndesUtils.getGlobalQueueNameForDestinationQueue(aMQQueue.getResourceName());
            String myNodeQueueName = AndesUtils.getMyNodeQueueName();
            ClusterResourceHolder.getInstance().getCassandraMessageStore().addNodeQueueToGlobalQueue(globalQueueNameForDestinationQueue, myNodeQueueName);
            ClusterResourceHolder.getInstance().getCassandraMessageStore().addNodeQueueToDestinationQueue(aMQQueue.getResourceName(), myNodeQueueName);
            ClusterResourceHolder.getInstance().getCassandraMessageStore().addMessageCounterForQueue(aMQQueue.getName());
            if (this.workMap.get(myNodeQueueName) == null) {
                QueueDeliveryWorker queueDeliveryWorker = new QueueDeliveryWorker(myNodeQueueName, aMQQueue, this.subscriptionMap, this.messageDeliveryExecutor, this.queueWorkerWaitInterval, ClusterResourceHolder.getInstance().getClusterConfiguration().isInMemoryMode().booleanValue());
                this.workMap.put(myNodeQueueName, queueDeliveryWorker);
                this.messageFlusherExecutor.execute(queueDeliveryWorker);
            } else {
                log.debug("TRACING>> There exists a QueueDeliveryWorker for NodeQueue: " + this.workMap.get(myNodeQueueName));
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("Error while adding subscription to queue :" + aMQQueue, e);
        }
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public List<String> getAllSubscriptionInformation() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(ClusterResourceHolder.getInstance().getCassandraMessageStore().getTopicSubscriptions());
        arrayList.addAll(getAllQueueSubscriptionInformationInCluster());
        return arrayList;
    }

    public List<String> getAllQueueSubscriptionInformationInCluster() {
        ArrayList arrayList = new ArrayList();
        if (this.globalSubscriptionsMap != null && !this.globalSubscriptionsMap.isEmpty()) {
            for (String str : this.globalSubscriptionsMap.keySet()) {
                Iterator<String> it = this.globalSubscriptionsMap.get(str).iterator();
                while (it.hasNext()) {
                    String nodeIDFromNodeQueueName = AndesUtils.getNodeIDFromNodeQueueName(it.next());
                    arrayList.add(("1_" + nodeIDFromNodeQueueName + "@" + str) + "|" + str + "|amq.direct|" + str + "|true|true|" + ClusterResourceHolder.getInstance().getCassandraMessageStore().getCassandraMessageCountForQueue(str) + "|" + ClusterResourceHolder.getInstance().getCassandraMessageStore().getNodeData(nodeIDFromNodeQueueName));
                }
            }
        }
        return arrayList;
    }

    public QueueDeliveryWorker getQueueDeliveryWorkerOnCurrentNode() {
        return this.workMap.get(AndesUtils.getMyNodeQueueName());
    }

    public void incrementSubscriptionCount(boolean z, String str) {
        String myNodeQueueName = AndesUtils.getMyNodeQueueName();
        if (z) {
            ClusterResourceHolder.getInstance().getCassandraMessageStore().addSubscriptionCounterForQueue(str, myNodeQueueName);
        } else {
            ClusterResourceHolder.getInstance().getCassandraMessageStore().incrementSubscriptionCount(str, myNodeQueueName, 1L);
        }
    }

    public void markSubscriptionForRemovel(String str) {
        QueueDeliveryWorker queueDeliveryWorker = this.workMap.get(str);
        if (queueDeliveryWorker != null) {
            queueDeliveryWorker.stopFlusher();
        }
    }

    public int getNumberOfSubscriptionsForQueue(String str) {
        int i = 0;
        Map<String, CassandraSubscription> map = this.subscriptionMap.get(str);
        if (map != null) {
            i = map.size();
        }
        return i;
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void stopAllMessageFlushers() {
        Iterator<QueueDeliveryWorker> it = this.workMap.values().iterator();
        while (it.hasNext()) {
            it.next().stopFlusher();
        }
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void startAllMessageFlushers() {
        Iterator<QueueDeliveryWorker> it = this.workMap.values().iterator();
        while (it.hasNext()) {
            it.next().startFlusher();
        }
    }

    public Map<String, QueueDeliveryWorker> getWorkMap() {
        return this.workMap;
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public Map<AMQChannel, Map<Long, Semaphore>> getUnAcknowledgedMessageLocks() {
        return this.unAckedMessagelocks;
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public Map<AMQChannel, QueueSubscriptionAcknowledgementHandler> getAcknowledgementHandlerMap() {
        return this.acknowledgementHandlerMap;
    }
}
