package org.wso2.andes.server.cassandra;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.andes.server.AMQChannel;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.cluster.coordination.CoordinationException;
import org.wso2.andes.server.queue.AMQQueue;

/* loaded from: input_file:org/wso2/andes/server/cassandra/OnceInOrderEnabledSubscriptionManager.class */
public class OnceInOrderEnabledSubscriptionManager implements ClusteringEnabledSubscriptionManager {
    private static Log log = LogFactory.getLog(OnceInOrderEnabledSubscriptionManager.class);
    private Map<String, CassandraReliableMessageCoordinator> workMap = new ConcurrentHashMap();
    private Queue<String> subscriptionQueue = new ConcurrentLinkedQueue();
    private ExecutorService onceInOrderExecutor = null;
    private boolean active = true;
    private Map<AMQChannel, Map<Long, Semaphore>> unAckedMessagelocks = new ConcurrentHashMap();
    private Map<String, Semaphore> queueLock = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/andes/server/cassandra/OnceInOrderEnabledSubscriptionManager$CassandraReliableMessageFlusherManagerTask.class */
    public class CassandraReliableMessageFlusherManagerTask implements Runnable {
        private CassandraReliableMessageFlusherManagerTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (OnceInOrderEnabledSubscriptionManager.this.active) {
                if (OnceInOrderEnabledSubscriptionManager.this.subscriptionQueue.size() > 0) {
                    String str = (String) OnceInOrderEnabledSubscriptionManager.this.subscriptionQueue.peek();
                    if (OnceInOrderEnabledSubscriptionManager.this.workMap.containsKey(str)) {
                        CassandraReliableMessageCoordinator cassandraReliableMessageCoordinator = (CassandraReliableMessageCoordinator) OnceInOrderEnabledSubscriptionManager.this.workMap.get(str);
                        if (cassandraReliableMessageCoordinator.isMarkedForRemoval()) {
                            OnceInOrderEnabledSubscriptionManager.this.workMap.remove(str);
                            OnceInOrderEnabledSubscriptionManager.this.subscriptionQueue.remove();
                            if (OnceInOrderEnabledSubscriptionManager.log.isDebugEnabled()) {
                                OnceInOrderEnabledSubscriptionManager.log.debug("Removing subscription queue " + str + " from work map");
                            }
                        } else {
                            if (!cassandraReliableMessageCoordinator.isWorking()) {
                                OnceInOrderEnabledSubscriptionManager.this.onceInOrderExecutor.execute(cassandraReliableMessageCoordinator);
                            }
                            OnceInOrderEnabledSubscriptionManager.this.subscriptionQueue.remove();
                            OnceInOrderEnabledSubscriptionManager.this.subscriptionQueue.offer(str);
                        }
                    } else {
                        OnceInOrderEnabledSubscriptionManager.this.subscriptionQueue.remove();
                    }
                }
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    OnceInOrderEnabledSubscriptionManager.log.error("Error in thread sleep", e);
                }
            }
        }
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void init() {
        this.onceInOrderExecutor = Executors.newFixedThreadPool(ClusterResourceHolder.getInstance().getClusterConfiguration().getSubscriptionPoolSize(), new ThreadFactoryBuilder().setNameFormat("OnceInOrderSubscriptionManager-%d").build());
        start();
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void addSubscription(AMQQueue aMQQueue, CassandraSubscription cassandraSubscription) {
        InOrderMessageFlusher inOrderMessageFlusher = new InOrderMessageFlusher(cassandraSubscription.getSubscription(), aMQQueue, cassandraSubscription.getSession());
        if (ClusterResourceHolder.getInstance().getClusterConfiguration().isClusteringEnabled().booleanValue()) {
            try {
                ClusterResourceHolder.getInstance().getSubscriptionCoordinationManager().handleSubscriptionChange();
            } catch (CoordinationException e) {
                log.error("Error in notifying subscription change to the cluster nodes", e);
            }
        } else if (this.queueLock.get(aMQQueue.getResourceName()) == null) {
            this.queueLock.put(aMQQueue.getResourceName(), new Semaphore(1));
        }
        addWork(inOrderMessageFlusher);
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void removeSubscription(String str, String str2, boolean z) {
        CassandraReliableMessageCoordinator cassandraReliableMessageCoordinator = this.workMap.get(str);
        if (cassandraReliableMessageCoordinator != null && cassandraReliableMessageCoordinator.removeSubscription(str2) == 0) {
            cassandraReliableMessageCoordinator.setMarkedForRemoval(true);
            this.queueLock.remove(str);
        }
        if (ClusterResourceHolder.getInstance().getClusterConfiguration().isClusteringEnabled().booleanValue()) {
            try {
                ClusterResourceHolder.getInstance().getSubscriptionCoordinationManager().handleSubscriptionChange();
            } catch (CoordinationException e) {
                log.error("Error in notifying subscription change to the cluster nodes", e);
            }
        }
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public Map<AMQChannel, Map<Long, Semaphore>> getUnAcknowledgedMessageLocks() {
        return this.unAckedMessagelocks;
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public Map<AMQChannel, QueueSubscriptionAcknowledgementHandler> getAcknowledgementHandlerMap() {
        throw new UnsupportedOperationException("Not yet supported for Once In order impl");
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void stopAllMessageFlushers() {
        throw new UnsupportedOperationException("Not yet supported for Once In order impl");
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void startAllMessageFlushers() {
        throw new UnsupportedOperationException("Not yet supported for Once In order impl");
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void clearAndUpdateDestinationQueueList() {
        throw new UnsupportedOperationException("Not yet supported for Once In order impl");
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public List<String> getDestinationQueues() {
        throw new UnsupportedOperationException("Not yet supported for Once In order impl");
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public List<String> updateNodeQueuesForDestinationQueueMap() {
        return new ArrayList();
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public void handleFreshSubscriptionsJoiningToCluster() {
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public List<String> getAllSubscriptionInformation() {
        return new ArrayList();
    }

    @Override // org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager
    public List<String> getNodeQueuesHavingSubscriptionsForQueue(String str) {
        return null;
    }

    private void start() {
        this.active = true;
        this.onceInOrderExecutor.submit(new CassandraReliableMessageFlusherManagerTask());
    }

    public void stop() {
        this.active = false;
    }

    private void addWork(InOrderMessageFlusher inOrderMessageFlusher) {
        CassandraReliableMessageCoordinator cassandraReliableMessageCoordinator = this.workMap.get(inOrderMessageFlusher.getQueue().getName());
        if (cassandraReliableMessageCoordinator != null) {
            cassandraReliableMessageCoordinator.addFlusher(inOrderMessageFlusher);
            return;
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(inOrderMessageFlusher);
        CassandraReliableMessageCoordinator cassandraReliableMessageCoordinator2 = new CassandraReliableMessageCoordinator(inOrderMessageFlusher.getQueue().getName(), arrayList);
        this.subscriptionQueue.offer(inOrderMessageFlusher.getQueue().getName());
        this.workMap.put(inOrderMessageFlusher.getQueue().getName(), cassandraReliableMessageCoordinator2);
    }

    public Map<String, Semaphore> getQueueLock() {
        return this.queueLock;
    }
}
