package org.wso2.andes.server.cassandra;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.andes.AMQStoreException;
import org.wso2.andes.server.AMQChannel;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.queue.AMQQueue;
import org.wso2.andes.server.store.CassandraMessageStore;
import org.wso2.andes.server.subscription.SubscriptionImpl;
import org.wso2.andes.server.util.AndesUtils;

/* loaded from: input_file:org/wso2/andes/server/cassandra/DefaultClusteringEnabledSubscriptionManager.class */
public class DefaultClusteringEnabledSubscriptionManager implements ClusteringEnabledSubscriptionManager {
    private static Log log = LogFactory.getLog(DefaultClusteringEnabledSubscriptionManager.class);
    private Map<String, QueueDeliveryWorker> workMap = new ConcurrentHashMap();
    private Map<String, Map<String, CassandraSubscription>> subscriptionMap = new ConcurrentHashMap();
    private ExecutorService messageFlusherExecutor = null;
    private SequentialThreadPoolExecutor messagePublishingExecutor = null;
    private Map<AMQChannel, Map<Long, Semaphore>> unAckedMessagelocks = new ConcurrentHashMap();
    private Map<AMQChannel, QueueSubscriptionAcknowledgementHandler> acknowledgementHandlerMap = new ConcurrentHashMap();
    private int queueWorkerWaitInterval;

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void init() {
        this.messageFlusherExecutor = Executors.newFixedThreadPool(ClusterResourceHolder.getInstance().getClusterConfiguration().getSubscriptionPoolSize(), new ThreadFactoryBuilder().setNameFormat("QueueDeliveryWorker-%d").build());
        this.messagePublishingExecutor = new SequentialThreadPoolExecutor(ClusterResourceHolder.getInstance().getClusterConfiguration().getPublisherPoolSize(), "messagePublishingExecutor");
        this.queueWorkerWaitInterval = ClusterResourceHolder.getInstance().getClusterConfiguration().getQueueWorkerInterval();
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void addSubscription(AMQQueue aMQQueue, CassandraSubscription cassandraSubscription) {
        try {
            if (cassandraSubscription.getSubscription() instanceof SubscriptionImpl.BrowserSubscription) {
                ClusterResourceHolder.getInstance().getCassandraMessageStore().addUserQueueToGlobalQueue(aMQQueue.getResourceName());
                new QueueBrowserDeliveryWorker(cassandraSubscription.getSubscription(), aMQQueue, cassandraSubscription.getSession()).send();
            } else {
                Map<String, CassandraSubscription> map = this.subscriptionMap.get(aMQQueue.getResourceName());
                if (map == null || map.size() == 0) {
                    synchronized (this.subscriptionMap) {
                        Map<String, CassandraSubscription> map2 = this.subscriptionMap.get(aMQQueue.getResourceName());
                        if (map2 == null || map2.size() == 0) {
                            Map<String, CassandraSubscription> map3 = this.subscriptionMap.get(aMQQueue.getResourceName());
                            if (map3 == null) {
                                ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                                concurrentHashMap.put(cassandraSubscription.getSubscription().getSubscriptionID() + "", cassandraSubscription);
                                this.subscriptionMap.put(aMQQueue.getResourceName(), concurrentHashMap);
                                if (!aMQQueue.checkIfBoundToTopicExchange()) {
                                    handleSubscription(aMQQueue);
                                }
                            } else if (map3.size() == 0) {
                                map3.put(cassandraSubscription.getSubscription().getSubscriptionID() + "", cassandraSubscription);
                                if (!aMQQueue.checkIfBoundToTopicExchange()) {
                                    handleSubscription(aMQQueue);
                                }
                            }
                        } else {
                            map2.put(cassandraSubscription.getSubscription().getSubscriptionID() + "", cassandraSubscription);
                        }
                    }
                } else {
                    map.put(cassandraSubscription.getSubscription().getSubscriptionID() + "", cassandraSubscription);
                }
                log.info("Binding Subscription " + cassandraSubscription.getSubscription().getSubscriptionID() + " to queue " + aMQQueue.getName());
            }
            ClusterResourceHolder.getInstance().getSubscriptionCoordinationManager().handleSubscriptionChange();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void removeSubscription(String str, String str2, boolean z) {
        if (!z) {
            try {
                Map<String, CassandraSubscription> map = this.subscriptionMap.get(str);
                if (map != null && map.containsKey(str2)) {
                    map.remove(str2);
                    if (map.size() == 0) {
                        log.debug("Executing subscription removal handler to minimize message losses");
                        handleMessageRemoval(str, AndesUtils.getGlobalQueueNameForQueue(str));
                        if (ClusterResourceHolder.getInstance().getCassandraMessageStore().getCassandraMessageCountForQueue(str) == 0) {
                            ClusterResourceHolder.getInstance().getCassandraMessageStore().removeMessageCounterForQueue(str);
                        }
                    }
                }
            } catch (Exception e) {
                log.error("Error while removing subscription for queue: " + str, e);
            }
        }
        try {
            ClusterResourceHolder.getInstance().getSubscriptionCoordinationManager().handleSubscriptionChange();
        } catch (Exception e2) {
            log.error("Error while notifying Subscription change");
        }
    }

    private void handleMessageRemoval(String str, String str2) throws AMQStoreException {
        CassandraMessageStore cassandraMessageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
        cassandraMessageStore.removeUserQueueFromQpidQueue(str2);
        String nodeQueueNameForQueue = AndesUtils.getNodeQueueNameForQueue(str);
        long j = 0;
        List<CassandraQueueMessage> messagesFromUserQueue = cassandraMessageStore.getMessagesFromUserQueue(nodeQueueNameForQueue, 40, 0L);
        while (true) {
            List<CassandraQueueMessage> list = messagesFromUserQueue;
            if (list.size() == 0) {
                return;
            }
            for (CassandraQueueMessage cassandraQueueMessage : list) {
                j = cassandraQueueMessage.getMessageId();
                if (cassandraQueueMessage.getQueue().equals(str)) {
                    cassandraMessageStore.removeMessageFromUserQueue(str, cassandraQueueMessage.getMessageId());
                    try {
                        cassandraMessageStore.addMessageToGlobalQueue(str2, cassandraQueueMessage.getQueue(), cassandraQueueMessage.getMessageId(), cassandraQueueMessage.getMessage());
                    } catch (Exception e) {
                        log.error(e);
                    }
                }
            }
            messagesFromUserQueue = cassandraMessageStore.getMessagesFromUserQueue(nodeQueueNameForQueue, 40, j);
        }
    }

    private void handleSubscription(AMQQueue aMQQueue) {
        try {
            ClusterResourceHolder.getInstance().getCassandraMessageStore().addUserQueueToGlobalQueue(AndesUtils.getGlobalQueueNameForQueue(aMQQueue.getResourceName()));
            String nodeQueueNameForQueue = AndesUtils.getNodeQueueNameForQueue(aMQQueue.getResourceName());
            ClusterResourceHolder.getInstance().getCassandraMessageStore().addMessageCounterForQueue(aMQQueue.getName());
            if (this.workMap.get(nodeQueueNameForQueue) == null) {
                QueueDeliveryWorker queueDeliveryWorker = new QueueDeliveryWorker(nodeQueueNameForQueue, aMQQueue, this.subscriptionMap, this.messagePublishingExecutor, this.queueWorkerWaitInterval);
                this.workMap.put(nodeQueueNameForQueue, queueDeliveryWorker);
                this.messageFlusherExecutor.execute(queueDeliveryWorker);
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("Error while adding subscription to queue :" + aMQQueue, e);
        }
    }

    public void markSubscriptionForRemovel(String str) {
        QueueDeliveryWorker queueDeliveryWorker = this.workMap.get(str);
        if (queueDeliveryWorker != null) {
            queueDeliveryWorker.stopFlusher();
        }
    }

    public int getNumberOfSubscriptionsForQueue(String str) {
        return this.subscriptionMap.get(str).size();
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void stopAllMessageFlushers() {
        Iterator<QueueDeliveryWorker> it = this.workMap.values().iterator();
        while (it.hasNext()) {
            it.next().stopFlusher();
        }
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void startAllMessageFlushers() {
        Iterator<QueueDeliveryWorker> it = this.workMap.values().iterator();
        while (it.hasNext()) {
            it.next().startFlusher();
        }
    }

    public Map<String, QueueDeliveryWorker> getWorkMap() {
        return this.workMap;
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public Map<AMQChannel, Map<Long, Semaphore>> getUnAcknowledgedMessageLocks() {
        return this.unAckedMessagelocks;
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public Map<AMQChannel, QueueSubscriptionAcknowledgementHandler> getAcknowledgementHandlerMap() {
        return this.acknowledgementHandlerMap;
    }
}
