package org.wso2.andes.server.cluster.coordination;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.configuration.ClusterConfiguration;

/* loaded from: input_file:org/wso2/andes/server/cluster/coordination/TopicSubscriptionCoordinationManager.class */
public class TopicSubscriptionCoordinationManager {
    private static Log log = LogFactory.getLog(SubscriptionCoordinationManagerImpl.class);
    private ZooKeeperAgent zooKeeperAgent;
    private TopicSubscriptionParentDataChangeListener topicSubscriptionDataChangeListener;

    /* loaded from: input_file:org/wso2/andes/server/cluster/coordination/TopicSubscriptionCoordinationManager$TopicSubscriptionParentDataChangeListener.class */
    private class TopicSubscriptionParentDataChangeListener implements Watcher {
        private TopicSubscriptionParentDataChangeListener() {
        }

        public void process(WatchedEvent watchedEvent) {
            TopicSubscriptionCoordinationManager.log.debug("Topic subscription data change event received : " + watchedEvent);
            if (Watcher.Event.EventType.NodeDataChanged == watchedEvent.getType()) {
                try {
                    TopicSubscriptionCoordinationManager.this.notifyTopicSubscriptionChange(new String(TopicSubscriptionCoordinationManager.this.zooKeeperAgent.getZooKeeper().getData(CoordinationConstants.TOPIC_SUBSCRIPTION_COORDINATION_PARENT, TopicSubscriptionCoordinationManager.this.topicSubscriptionDataChangeListener, (Stat) null)));
                } catch (Exception e) {
                    TopicSubscriptionCoordinationManager.log.error("Error while processing topic subscription Data Change event");
                }
            }
        }
    }

    public void init() throws CoordinationException {
        try {
            ClusterConfiguration clusterConfiguration = ClusterResourceHolder.getInstance().getClusterConfiguration();
            if (clusterConfiguration.isClusteringEnabled().booleanValue()) {
                this.zooKeeperAgent = new ZooKeeperAgent(clusterConfiguration.getZookeeperConnection());
                this.zooKeeperAgent.initTopicSubscriptionCoordination();
                ZooKeeper zooKeeper = this.zooKeeperAgent.getZooKeeper();
                this.topicSubscriptionDataChangeListener = new TopicSubscriptionParentDataChangeListener();
                zooKeeper.getData(CoordinationConstants.TOPIC_SUBSCRIPTION_COORDINATION_PARENT, this.topicSubscriptionDataChangeListener, (Stat) null);
            }
        } catch (Exception e) {
            throw new CoordinationException("Error while initializing TopicSubscriptionCoordinationManager", e);
        }
    }

    public void handleSubscriptionChange(String str) throws CoordinationException {
        if (!ClusterResourceHolder.getInstance().getClusterConfiguration().isClusteringEnabled().booleanValue()) {
            notifyTopicSubscriptionChange(str);
            return;
        }
        ZooKeeper zooKeeper = this.zooKeeperAgent.getZooKeeper();
        if (zooKeeper == null) {
            throw new CoordinationException("Topic Subscription Coordination Manager not initialized yet");
        }
        try {
            zooKeeper.setData(CoordinationConstants.TOPIC_SUBSCRIPTION_COORDINATION_PARENT, str.getBytes(), -1);
        } catch (Exception e) {
            throw new CoordinationException("Error while handling topic subscription change");
        }
    }

    public void notifyTopicSubscriptionChange(String str) {
        try {
            if (log.isDebugEnabled()) {
                log.debug("Notifying topic Subscriptions has been modified. Load fresh from database nad updating memory map");
            }
            ClusterResourceHolder.getInstance().getCassandraMessageStore().syncTopicSubscriptionsWithDatabase(str);
            ClusterResourceHolder.getInstance().getCassandraMessageStore().syncTopicNodeQueuesWithDatabase(str);
        } catch (Exception e) {
            log.error("Error in synchronizing topic subscriptions with database. This may cause serious issues with topic operations", e);
        }
    }
}
