package org.wso2.carbon.databridge.datasink.cassandra.subscriber;

import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.utils.DataBridgeCommonsUtils;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionStoreException;
import org.wso2.carbon.databridge.datasink.cassandra.internal.util.ServiceHolder;
import org.wso2.carbon.event.stream.manager.core.EventStreamListener;
import org.wso2.carbon.event.stream.manager.core.exception.EventStreamConfigurationException;

/* loaded from: input_file:org/wso2/carbon/databridge/datasink/cassandra/subscriber/BAMCassandraEventStreamListener.class */
public class BAMCassandraEventStreamListener implements EventStreamListener {
    private static final Log log = LogFactory.getLog(BAMCassandraEventStreamListener.class);
    private static final ConcurrentHashMap<String, BAMCassandraWSO2EventConsumer> consumerCache = new ConcurrentHashMap<>();

    public void removedEventStream(int i, String str, String str2) {
        String generateConsumerCacheKey = generateConsumerCacheKey(str, str2, i);
        if (consumerCache.get(generateConsumerCacheKey) != null) {
            ServiceHolder.getEventStreamService().unsubscribe(consumerCache.get(generateConsumerCacheKey), i);
        }
    }

    public void addedEventStream(int i, String str, String str2) {
        if (CassandraDataSinkConfiguration.getInstance().isPersistedStream(str, str2)) {
            BAMCassandraWSO2EventConsumer bAMCassandraWSO2EventConsumer = new BAMCassandraWSO2EventConsumer(DataBridgeCommonsUtils.generateStreamId(str, str2), i);
            consumerCache.put(generateConsumerCacheKey(str, str2, i), bAMCassandraWSO2EventConsumer);
            try {
                ServiceHolder.getEventStreamService().subscribe(bAMCassandraWSO2EventConsumer, i);
            } catch (EventStreamConfigurationException e) {
                log.error("Error while registering subscriber for stream name :" + str + " , version :" + str2 + " for tenant id " + i + ". " + e.getMessage(), e);
            }
        }
    }

    private String generateConsumerCacheKey(String str, String str2, int i) {
        return str + ":" + str2 + ":" + i;
    }

    public synchronized void loadEventStreams(int i) {
        try {
            for (StreamDefinition streamDefinition : ServiceHolder.getStreamDefinitionStoreService().getAllStreamDefinitionsFromStore(i)) {
                addedEventStream(i, streamDefinition.getName(), streamDefinition.getVersion());
            }
        } catch (StreamDefinitionStoreException e) {
            log.error("Error while loading the stream definitions from store for tenant " + i + ". " + e.getMessage(), e);
        }
    }

    public synchronized void unLoadEventStreams(int i) {
        try {
            for (StreamDefinition streamDefinition : ServiceHolder.getStreamDefinitionStoreService().getAllStreamDefinitionsFromStore(i)) {
                removedEventStream(i, streamDefinition.getName(), streamDefinition.getVersion());
            }
        } catch (StreamDefinitionStoreException e) {
            log.error("Error while loading the stream definitions from store for tenant " + i + ". " + e.getMessage(), e);
        }
    }
}
