package org.wso2.andes.server.cluster.coordination;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.wso2.andes.pool.AndesExecuter;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.configuration.ClusterConfiguration;

/* loaded from: input_file:org/wso2/andes/server/cluster/coordination/SubscriptionCoordinationManagerImpl.class */
public class SubscriptionCoordinationManagerImpl implements SubscriptionCoordinationManager {
    private static Log log = LogFactory.getLog(SubscriptionCoordinationManagerImpl.class);
    private ZooKeeperAgent zooKeeperAgent;
    private SubscriptionParentDataChangeListener subscriptionParentDataChangeListener;
    private List<SubscriptionListener> subscriptionListeners = new ArrayList();

    /* loaded from: input_file:org/wso2/andes/server/cluster/coordination/SubscriptionCoordinationManagerImpl$SubscriptionParentDataChangeListener.class */
    private class SubscriptionParentDataChangeListener implements Watcher {
        private SubscriptionParentDataChangeListener() {
        }

        public void process(WatchedEvent watchedEvent) {
            SubscriptionCoordinationManagerImpl.log.debug("Subscription data change event received : " + watchedEvent);
            if (Watcher.Event.EventType.NodeDataChanged == watchedEvent.getType()) {
                try {
                    SubscriptionCoordinationManagerImpl.this.zooKeeperAgent.getZooKeeper().getData(CoordinationConstants.SUBSCRIPTION_COORDINATION_PARENT, SubscriptionCoordinationManagerImpl.this.subscriptionParentDataChangeListener, (Stat) null);
                    SubscriptionCoordinationManagerImpl.this.notifySubscriptionChange();
                } catch (Exception e) {
                    SubscriptionCoordinationManagerImpl.log.error("Error while processing subscription Data Change event");
                }
            }
        }
    }

    @Override // org.wso2.andes.server.cluster.coordination.SubscriptionCoordinationManager
    public void init() throws CoordinationException {
        try {
            ClusterConfiguration clusterConfiguration = ClusterResourceHolder.getInstance().getClusterConfiguration();
            if (clusterConfiguration.isClusteringEnabled().booleanValue()) {
                this.zooKeeperAgent = new ZooKeeperAgent(clusterConfiguration.getZookeeperConnection());
                this.zooKeeperAgent.initSubscriptionCoordination();
                ZooKeeper zooKeeper = this.zooKeeperAgent.getZooKeeper();
                this.subscriptionParentDataChangeListener = new SubscriptionParentDataChangeListener();
                zooKeeper.getData(CoordinationConstants.SUBSCRIPTION_COORDINATION_PARENT, this.subscriptionParentDataChangeListener, (Stat) null);
            }
        } catch (Exception e) {
            throw new CoordinationException("Error while initializing SubscriptionCoordinationManagerImpl", e);
        }
    }

    @Override // org.wso2.andes.server.cluster.coordination.SubscriptionCoordinationManager
    public void notifySubscriptionChange() {
        if (log.isDebugEnabled()) {
            log.debug("Notifying subscribers on Subscription changes ");
        }
        AndesExecuter.runAsync(new Runnable() { // from class: org.wso2.andes.server.cluster.coordination.SubscriptionCoordinationManagerImpl.1
            @Override // java.lang.Runnable
            public void run() {
                Iterator it = SubscriptionCoordinationManagerImpl.this.subscriptionListeners.iterator();
                while (it.hasNext()) {
                    try {
                        ((SubscriptionListener) it.next()).subscriptionsChanged();
                    } catch (Exception e) {
                        SubscriptionCoordinationManagerImpl.log.error("Error handling the subscription change ", e);
                    }
                }
            }
        });
    }

    @Override // org.wso2.andes.server.cluster.coordination.SubscriptionCoordinationManager
    public void handleSubscriptionChange() throws CoordinationException {
        if (ClusterResourceHolder.getInstance().getClusterConfiguration().isClusteringEnabled().booleanValue()) {
            ZooKeeper zooKeeper = this.zooKeeperAgent.getZooKeeper();
            if (zooKeeper == null) {
                throw new CoordinationException("Subscription Coordination Manager not initialized yet");
            }
            try {
                zooKeeper.setData(CoordinationConstants.SUBSCRIPTION_COORDINATION_PARENT, new byte[]{1}, -1);
            } catch (Exception e) {
                throw new CoordinationException("Error while handling subscription change");
            }
        }
        notifySubscriptionChange();
    }

    @Override // org.wso2.andes.server.cluster.coordination.SubscriptionCoordinationManager
    public void registerSubscriptionListener(SubscriptionListener subscriptionListener) {
        if (subscriptionListener == null) {
            throw new RuntimeException("Error while registering subscribers : invalid argument listener = null");
        }
        this.subscriptionListeners.add(subscriptionListener);
    }

    @Override // org.wso2.andes.server.cluster.coordination.SubscriptionCoordinationManager
    public void removeSubscriptionListener(SubscriptionListener subscriptionListener) {
        if (this.subscriptionListeners.contains(subscriptionListener)) {
            this.subscriptionListeners.remove(subscriptionListener);
        }
    }
}
