package org.wso2.carbon.event.processor.core.internal;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.sql.DataSource;
import javax.xml.stream.XMLStreamException;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.util.AXIOMUtil;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.cassandra.dataaccess.ClusterInformation;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.event.processor.api.passthrough.PassthroughSenderConfigurator;
import org.wso2.carbon.event.processor.api.receive.EventReceiver;
import org.wso2.carbon.event.processor.api.receive.exception.EventReceiverException;
import org.wso2.carbon.event.processor.api.send.EventSender;
import org.wso2.carbon.event.processor.api.send.exception.EventProducerException;
import org.wso2.carbon.event.processor.core.EventProcessorService;
import org.wso2.carbon.event.processor.core.ExecutionPlan;
import org.wso2.carbon.event.processor.core.ExecutionPlanConfiguration;
import org.wso2.carbon.event.processor.core.ExecutionPlanConfigurationFile;
import org.wso2.carbon.event.processor.core.StreamConfiguration;
import org.wso2.carbon.event.processor.core.exception.ExecutionPlanConfigurationException;
import org.wso2.carbon.event.processor.core.exception.ExecutionPlanDependencyValidationException;
import org.wso2.carbon.event.processor.core.exception.ServiceDependencyValidationException;
import org.wso2.carbon.event.processor.core.internal.ds.EventProcessorValueHolder;
import org.wso2.carbon.event.processor.core.internal.listener.ExternalStreamConsumer;
import org.wso2.carbon.event.processor.core.internal.listener.ExternalStreamListener;
import org.wso2.carbon.event.processor.core.internal.listener.SiddhiInputEventDispatcher;
import org.wso2.carbon.event.processor.core.internal.listener.SiddhiOutputStreamListener;
import org.wso2.carbon.event.processor.core.internal.persistence.CassandraPersistenceStore;
import org.wso2.carbon.event.processor.core.internal.stream.EventConsumer;
import org.wso2.carbon.event.processor.core.internal.stream.EventJunction;
import org.wso2.carbon.event.processor.core.internal.stream.EventProducer;
import org.wso2.carbon.event.processor.core.internal.util.EventProcessorConfigurationFilesystemInvoker;
import org.wso2.carbon.event.processor.core.internal.util.EventProcessorConstants;
import org.wso2.carbon.event.processor.core.internal.util.EventProcessorUtil;
import org.wso2.carbon.event.processor.core.internal.util.helper.CassandraConnectionValidator;
import org.wso2.carbon.event.processor.core.internal.util.helper.EventProcessorConfigurationHelper;
import org.wso2.carbon.event.processor.core.internal.util.helper.SiddhiExtensionLoader;
import org.wso2.carbon.event.stream.manager.core.exception.EventStreamConfigurationException;
import org.wso2.carbon.ndatasource.common.DataSourceException;
import org.wso2.carbon.ndatasource.core.CarbonDataSource;
import org.wso2.carbon.user.core.UserStoreException;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.config.SiddhiConfiguration;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.siddhi.query.compiler.exception.SiddhiPraserException;

/* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/CarbonEventProcessorService.class */
public class CarbonEventProcessorService implements EventProcessorService {
    private static final Log log = LogFactory.getLog(CarbonEventProcessorService.class);
    private Map<Integer, Map<String, ExecutionPlan>> tenantSpecificExecutionPlans = new ConcurrentHashMap();
    private Map<Integer, List<ExecutionPlanConfigurationFile>> tenantSpecificExecutionPlanFiles = new ConcurrentHashMap();
    private Map<Integer, Map<String, EventJunction>> tenantSpecificEventJunctions = new ConcurrentHashMap();

    private static void populateAttributes(StreamDefinition streamDefinition, List<Attribute> list, String str) {
        if (list != null) {
            Iterator<Attribute> it = list.iterator();
            while (it.hasNext()) {
                org.wso2.siddhi.query.api.definition.Attribute convertToSiddhiAttribute = EventProcessorUtil.convertToSiddhiAttribute(it.next(), str);
                streamDefinition.attribute(convertToSiddhiAttribute.getName(), convertToSiddhiAttribute.getType());
            }
        }
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void deployExecutionPlanConfiguration(ExecutionPlanConfiguration executionPlanConfiguration, AxisConfiguration axisConfiguration) throws ExecutionPlanDependencyValidationException, ExecutionPlanConfigurationException {
        String name = executionPlanConfiguration.getName();
        OMElement om = EventProcessorConfigurationHelper.toOM(executionPlanConfiguration);
        EventProcessorConfigurationHelper.validateExecutionPlanConfiguration(om);
        File file = new File(axisConfiguration.getRepository().getPath());
        if (!file.exists() && file.mkdir()) {
            throw new ExecutionPlanConfigurationException("Cannot create directory to add tenant specific execution plan : " + name);
        }
        File file2 = new File(file.getAbsolutePath() + File.separator + EventProcessorConstants.EP_ELE_DIRECTORY);
        if (!file2.exists() && !file2.mkdir()) {
            throw new ExecutionPlanConfigurationException("Cannot create directory executionplans to add tenant specific  execution plan :" + name);
        }
        validateToRemoveInactiveExecutionPlanConfiguration(name, axisConfiguration);
        EventProcessorConfigurationFilesystemInvoker.save(om, name, name + ".xml", axisConfiguration);
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void undeployInactiveExecutionPlanConfiguration(String str, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        EventProcessorConfigurationFilesystemInvoker.delete(str, axisConfiguration);
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void undeployActiveExecutionPlanConfiguration(String str, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        EventProcessorConfigurationFilesystemInvoker.delete(getExecutionPlanConfigurationFileByPlanName(str, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()).getFileName(), axisConfiguration);
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void editActiveExecutionPlanConfiguration(String str, String str2, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        try {
            OMElement stringToOM = AXIOMUtil.stringToOM(str);
            EventProcessorConfigurationHelper.validateExecutionPlanConfiguration(stringToOM);
            ExecutionPlanConfiguration fromOM = EventProcessorConfigurationHelper.fromOM(stringToOM);
            if (!fromOM.getName().equals(str2) && !checkExecutionPlanValidity(fromOM.getName(), tenantId)) {
                throw new ExecutionPlanConfigurationException(fromOM.getName() + " already registered as an execution in this tenant");
            }
            if (str2 == null || str2.length() <= 0) {
                throw new ExecutionPlanConfigurationException("Invalid configuration provided, No execution plan name.");
            }
            ExecutionPlanConfigurationFile executionPlanConfigurationFileByPlanName = getExecutionPlanConfigurationFileByPlanName(str2, tenantId);
            String fileName = executionPlanConfigurationFileByPlanName == null ? str2 + ".xml" : executionPlanConfigurationFileByPlanName.getFileName();
            EventProcessorConfigurationFilesystemInvoker.delete(fileName, axisConfiguration);
            EventProcessorConfigurationFilesystemInvoker.save(str, str2, fileName, axisConfiguration);
        } catch (XMLStreamException e) {
            log.error("Error while creating the xml object");
            throw new ExecutionPlanConfigurationException("Not a valid xml object, ", e);
        }
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void editInactiveExecutionPlanConfiguration(String str, String str2, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        try {
            OMElement stringToOM = AXIOMUtil.stringToOM(str);
            EventProcessorConfigurationHelper.validateExecutionPlanConfiguration(stringToOM);
            ExecutionPlanConfiguration fromOM = EventProcessorConfigurationHelper.fromOM(stringToOM);
            EventProcessorConfigurationFilesystemInvoker.delete(str2, axisConfiguration);
            EventProcessorConfigurationFilesystemInvoker.save(str, fromOM.getName(), str2, axisConfiguration);
        } catch (XMLStreamException e) {
            log.error("Error while creating the xml object");
            throw new ExecutionPlanConfigurationException("Not a valid xml object ", e);
        }
    }

    public void addExecutionPlanConfiguration(ExecutionPlanConfiguration executionPlanConfiguration, AxisConfiguration axisConfiguration) throws ExecutionPlanDependencyValidationException, ExecutionPlanConfigurationException, ServiceDependencyValidationException {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        Map<String, ExecutionPlan> map = this.tenantSpecificExecutionPlans.get(Integer.valueOf(tenantId));
        if (map == null) {
            map = new ConcurrentHashMap();
            this.tenantSpecificExecutionPlans.put(Integer.valueOf(tenantId), map);
        } else if (map.get(executionPlanConfiguration.getName()) != null) {
            throw new ExecutionPlanConfigurationException("Execution plan with the same name already exists. Please remove it and retry.");
        }
        Map<String, EventJunction> map2 = this.tenantSpecificEventJunctions.get(Integer.valueOf(tenantId));
        if (map2 == null) {
            map2 = new ConcurrentHashMap();
            this.tenantSpecificEventJunctions.put(Integer.valueOf(tenantId), map2);
        }
        for (StreamConfiguration streamConfiguration : executionPlanConfiguration.getImportedStreams()) {
            if (map2.get(streamConfiguration.getStreamId()) == null) {
                log.info("Execution plan deployment held back and in inactive state: " + executionPlanConfiguration.getName() + ". Event receiver not found for stream ID : " + streamConfiguration.getStreamId());
                throw new ExecutionPlanDependencyValidationException(streamConfiguration.getStreamId(), "Event receiver not found for stream ID : " + streamConfiguration.getStreamId());
            }
        }
        SiddhiManager siddhiManagerFor = getSiddhiManagerFor(executionPlanConfiguration, getSiddhiConfigurationFor(executionPlanConfiguration, tenantId));
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(executionPlanConfiguration.getImportedStreams().size());
        for (StreamConfiguration streamConfiguration2 : executionPlanConfiguration.getImportedStreams()) {
            StreamDefinition streamDefinition = new StreamDefinition();
            streamDefinition.name(streamConfiguration2.getSiddhiStreamName());
            org.wso2.carbon.databridge.commons.StreamDefinition streamDefinition2 = map2.get(streamConfiguration2.getStreamId()).getStreamDefinition();
            populateAttributes(streamDefinition, streamDefinition2.getMetaData(), "meta_");
            populateAttributes(streamDefinition, streamDefinition2.getCorrelationData(), "correlation_");
            populateAttributes(streamDefinition, streamDefinition2.getPayloadData(), "");
            concurrentHashMap.put(streamDefinition2.getStreamId(), siddhiManagerFor.defineStream(streamDefinition));
            log.debug("input handler created for " + streamDefinition.getStreamId());
        }
        try {
            siddhiManagerFor.addExecutionPlan(executionPlanConfiguration.getQueryExpressions());
            ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap(executionPlanConfiguration.getExportedStreams().size());
            ArrayList arrayList = new ArrayList();
            ArrayList<String> arrayList2 = new ArrayList();
            for (StreamConfiguration streamConfiguration3 : executionPlanConfiguration.getExportedStreams()) {
                StreamDefinition streamDefinition3 = siddhiManagerFor.getStreamDefinition(streamConfiguration3.getSiddhiStreamName());
                if (streamDefinition3 == null) {
                    throw new ExecutionPlanConfigurationException("No matching Siddhi stream for " + streamConfiguration3.getStreamId() + " in the name of " + streamConfiguration3.getSiddhiStreamName());
                }
                org.wso2.carbon.databridge.commons.StreamDefinition convertToDatabridgeStreamDefinition = EventProcessorUtil.convertToDatabridgeStreamDefinition(streamDefinition3, streamConfiguration3);
                EventJunction eventJunction = map2.get(streamConfiguration3.getStreamId());
                if (eventJunction == null) {
                    eventJunction = createEventJunctionWithoutSubscriptions(tenantId, convertToDatabridgeStreamDefinition);
                    arrayList.add(eventJunction.getStreamDefinition().getStreamId());
                }
                if (streamConfiguration3.isPassThroughFlowSupported()) {
                    arrayList2.add(streamConfiguration3.getStreamId());
                }
                SiddhiOutputStreamListener siddhiOutputStreamListener = new SiddhiOutputStreamListener(streamConfiguration3.getSiddhiStreamName(), eventJunction, executionPlanConfiguration, tenantId);
                siddhiManagerFor.addCallback(streamConfiguration3.getSiddhiStreamName(), siddhiOutputStreamListener);
                concurrentHashMap2.put(streamConfiguration3.getStreamId(), siddhiOutputStreamListener);
            }
            for (StreamConfiguration streamConfiguration4 : executionPlanConfiguration.getImportedStreams()) {
                map2.get(streamConfiguration4.getStreamId()).addConsumer(new SiddhiInputEventDispatcher(streamConfiguration4.getStreamId(), (InputHandler) concurrentHashMap.get(streamConfiguration4.getStreamId()), executionPlanConfiguration, tenantId));
            }
            for (StreamConfiguration streamConfiguration5 : executionPlanConfiguration.getExportedStreams()) {
                map2.get(streamConfiguration5.getStreamId()).addProducer((EventProducer) concurrentHashMap2.get(streamConfiguration5.getStreamId()));
            }
            map.put(executionPlanConfiguration.getName(), new ExecutionPlan(executionPlanConfiguration.getName(), siddhiManagerFor, executionPlanConfiguration));
            List<PassthroughSenderConfigurator> passthroughSenderConfiguratorList = EventProcessorValueHolder.getPassthroughSenderConfiguratorList();
            for (String str : arrayList2) {
                Iterator<PassthroughSenderConfigurator> it = passthroughSenderConfiguratorList.iterator();
                while (it.hasNext()) {
                    it.next().deployDefaultEventSender(str, axisConfiguration);
                }
            }
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                activateInactiveExecutionPlanConfigurations(ExecutionPlanConfigurationFile.Status.WAITING_FOR_DEPENDENCY, (String) it2.next(), tenantId);
            }
        } catch (Exception e) {
            throw new ExecutionPlanConfigurationException("Invalid query specified, " + e.getMessage(), e);
        }
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public List<org.wso2.carbon.databridge.commons.StreamDefinition> getSiddhiStreams(String[] strArr, String str) throws SiddhiPraserException {
        SiddhiManager createMockSiddhiManager = createMockSiddhiManager(strArr, str);
        List<StreamDefinition> streamDefinitions = createMockSiddhiManager.getStreamDefinitions();
        ArrayList arrayList = new ArrayList(streamDefinitions.size());
        for (StreamDefinition streamDefinition : streamDefinitions) {
            arrayList.add(EventProcessorUtil.convertToDatabridgeStreamDefinition(streamDefinition, new StreamConfiguration(streamDefinition.getStreamId())));
        }
        createMockSiddhiManager.shutdown();
        return arrayList;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public String getExecutionPlanStatusAsString(String str) {
        List<ExecutionPlanConfigurationFile> list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (list == null) {
            return EventProcessorConstants.NO_DEPENDENCY_INFO_MSG;
        }
        for (ExecutionPlanConfigurationFile executionPlanConfigurationFile : list) {
            if (str != null && str.equals(executionPlanConfigurationFile.getFileName())) {
                String deploymentStatusMessage = executionPlanConfigurationFile.getDeploymentStatusMessage();
                if (executionPlanConfigurationFile.getDependency() != null) {
                    deploymentStatusMessage = deploymentStatusMessage + " [Dependency: " + executionPlanConfigurationFile.getDependency() + "]";
                }
                return deploymentStatusMessage;
            }
        }
        return EventProcessorConstants.NO_DEPENDENCY_INFO_MSG;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public boolean validateSiddhiQueries(String[] strArr, String str) throws SiddhiPraserException {
        createMockSiddhiManager(strArr, str).shutdown();
        return true;
    }

    private SiddhiManager createMockSiddhiManager(String[] strArr, String str) throws SiddhiPraserException {
        SiddhiManager siddhiManager = new SiddhiManager();
        try {
            for (CarbonDataSource carbonDataSource : EventProcessorValueHolder.getDataSourceService().getAllDataSources()) {
                try {
                    if (carbonDataSource.getDSObject() instanceof DataSource) {
                        siddhiManager.getSiddhiContext().addDataSource(carbonDataSource.getDSMInfo().getName(), (DataSource) carbonDataSource.getDSObject());
                    }
                } catch (Exception e) {
                    log.error("Unable to add the datasource" + carbonDataSource.getDSMInfo().getName(), e);
                }
            }
        } catch (DataSourceException e2) {
            log.error("Unable to access the datasource service", e2);
        }
        for (String str2 : strArr) {
            if (str2.trim().length() > 0) {
                siddhiManager.defineStream(str2);
            }
        }
        siddhiManager.addExecutionPlan(str);
        return siddhiManager;
    }

    private SiddhiManager getSiddhiManagerFor(ExecutionPlanConfiguration executionPlanConfiguration, SiddhiConfiguration siddhiConfiguration) throws ExecutionPlanConfigurationException {
        SiddhiManager siddhiManager = new SiddhiManager(siddhiConfiguration);
        try {
            for (CarbonDataSource carbonDataSource : EventProcessorValueHolder.getDataSourceService().getAllDataSources()) {
                try {
                    if (carbonDataSource.getDSObject() instanceof DataSource) {
                        siddhiManager.getSiddhiContext().addDataSource(carbonDataSource.getDSMInfo().getName(), (DataSource) carbonDataSource.getDSObject());
                    }
                } catch (Exception e) {
                    log.error("Unable to add the datasource" + carbonDataSource.getDSMInfo().getName(), e);
                }
            }
        } catch (DataSourceException e2) {
            log.error("Unable to populate the data sources in Siddhi engine.", e2);
        }
        int i = 0;
        try {
            i = Integer.parseInt(executionPlanConfiguration.getSiddhiConfigurationProperties().get(EventProcessorConstants.SIDDHI_SNAPSHOT_INTERVAL));
        } catch (NumberFormatException e3) {
            log.error("Unable to parse snapshot time interval.", e3);
        }
        if (i > 0) {
            if (null == EventProcessorValueHolder.getPersistenceStore()) {
                if (EventProcessorValueHolder.getClusterInformation() == null) {
                    try {
                        ClusterInformation clusterInformation = new ClusterInformation(EventProcessorValueHolder.getUserRealm().getRealmConfiguration().getAdminUserName(), EventProcessorValueHolder.getUserRealm().getRealmConfiguration().getAdminPassword());
                        clusterInformation.setClusterName(CassandraPersistenceStore.CLUSTER_NAME);
                        EventProcessorValueHolder.setClusterInformation(clusterInformation);
                    } catch (UserStoreException e4) {
                        log.error("Unable to get realm configuration.", e4);
                    }
                }
                if (!CassandraConnectionValidator.getInstance().checkCassandraConnection(EventProcessorValueHolder.getClusterInformation().getUsername(), EventProcessorValueHolder.getClusterInformation().getPassword())) {
                    throw new ExecutionPlanConfigurationException("Cassandra is not up and running, All connection pools are down. Please enable cassandra with server startup (Command: ./wso2server.sh -Ddisable.cassandra.server.startup=false)");
                }
                EventProcessorValueHolder.setPersistenceStore(new CassandraPersistenceStore(EventProcessorValueHolder.getDataAccessService().getCluster(EventProcessorValueHolder.getClusterInformation())));
            }
            siddhiManager.setPersistStore(EventProcessorValueHolder.getPersistenceStore());
        }
        return siddhiManager;
    }

    private SiddhiConfiguration getSiddhiConfigurationFor(ExecutionPlanConfiguration executionPlanConfiguration, int i) throws ServiceDependencyValidationException {
        SiddhiConfiguration siddhiConfiguration = new SiddhiConfiguration();
        siddhiConfiguration.setAsyncProcessing(false);
        siddhiConfiguration.setInstanceIdentifier("org.wso2.siddhi.instance-" + i + "-" + UUID.randomUUID().toString());
        String str = executionPlanConfiguration.getSiddhiConfigurationProperties().get(EventProcessorConstants.SIDDHI_DISTRIBUTED_PROCESSING);
        if (str == null || !str.equalsIgnoreCase("true")) {
            siddhiConfiguration.setDistributedProcessing(false);
        } else {
            siddhiConfiguration.setDistributedProcessing(true);
            if (EventProcessorValueHolder.getHazelcastInstance() == null) {
                throw new ServiceDependencyValidationException(EventProcessorConstants.HAZELCAST_INSTANCE, "Hazelcast instance is not initialized.");
            }
            siddhiConfiguration.setInstanceIdentifier(EventProcessorValueHolder.getHazelcastInstance().getName());
        }
        siddhiConfiguration.setQueryPlanIdentifier("org.wso2.siddhi-" + i + "-" + executionPlanConfiguration.getName());
        siddhiConfiguration.setSiddhiExtensions(SiddhiExtensionLoader.loadSiddhiExtensions());
        return siddhiConfiguration;
    }

    public void notifyServiceAvailability(String str) {
        Iterator<Integer> it = this.tenantSpecificExecutionPlanFiles.keySet().iterator();
        while (it.hasNext()) {
            try {
                activateInactiveExecutionPlanConfigurations(ExecutionPlanConfigurationFile.Status.WAITING_FOR_OSGI_SERVICE, str, it.next().intValue());
            } catch (ExecutionPlanConfigurationException e) {
                log.error("Error while redeploying distributed execution plans.", e);
            }
        }
    }

    private void removeExecutionPlanConfiguration(String str, int i) {
        Map<String, ExecutionPlan> map = this.tenantSpecificExecutionPlans.get(Integer.valueOf(i));
        if (map == null || !map.containsKey(str)) {
            return;
        }
        ExecutionPlan remove = map.remove(str);
        remove.shutdown();
        ExecutionPlanConfiguration executionPlanConfiguration = remove.getExecutionPlanConfiguration();
        Iterator<StreamConfiguration> it = executionPlanConfiguration.getImportedStreams().iterator();
        while (it.hasNext()) {
            EventJunction eventJunction = getEventJunction(i, it.next().getStreamId());
            if (eventJunction != null) {
                for (EventConsumer eventConsumer : eventJunction.getAllEventConsumers()) {
                    if (eventConsumer.getOwner() == executionPlanConfiguration) {
                        eventJunction.removeConsumer(eventConsumer);
                    }
                }
            }
        }
        for (StreamConfiguration streamConfiguration : executionPlanConfiguration.getExportedStreams()) {
            EventJunction eventJunction2 = getEventJunction(i, streamConfiguration.getStreamId());
            if (eventJunction2 != null) {
                for (EventProducer eventProducer : eventJunction2.getAllEventProducers()) {
                    if (eventProducer.getOwner() == executionPlanConfiguration) {
                        eventJunction2.removeProducer(eventProducer);
                    }
                }
                if (eventJunction2.getAllEventProducers().size() == 0) {
                    deactivateActiveExecutionPlanConfigurations(streamConfiguration.getStreamId(), i);
                }
            }
        }
    }

    public void addExecutionPlanConfigurationFile(ExecutionPlanConfigurationFile executionPlanConfigurationFile, int i) {
        List<ExecutionPlanConfigurationFile> list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(i));
        if (list == null) {
            list = new ArrayList();
            this.tenantSpecificExecutionPlanFiles.put(Integer.valueOf(i), list);
        }
        list.add(executionPlanConfigurationFile);
    }

    public void removeExecutionPlanConfigurationFile(String str, int i) {
        Iterator<ExecutionPlanConfigurationFile> it = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(i)).iterator();
        while (it.hasNext()) {
            ExecutionPlanConfigurationFile next = it.next();
            if (next.getFileName().equals(str)) {
                if (next.getStatus().equals(ExecutionPlanConfigurationFile.Status.DEPLOYED)) {
                    removeExecutionPlanConfiguration(next.getExecutionPlanName(), i);
                }
                it.remove();
                return;
            }
        }
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public String getActiveExecutionPlanConfigurationContent(String str, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        ExecutionPlanConfigurationFile executionPlanConfigurationFileByPlanName = getExecutionPlanConfigurationFileByPlanName(str, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId());
        if (executionPlanConfigurationFileByPlanName == null) {
            throw new ExecutionPlanConfigurationException("Configuration file for " + str + "doesn't exist.");
        }
        return EventProcessorConfigurationFilesystemInvoker.readExecutionPlanConfigFile(executionPlanConfigurationFileByPlanName.getFileName(), axisConfiguration);
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public String getInactiveExecutionPlanConfigurationContent(String str, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        return EventProcessorConfigurationFilesystemInvoker.readExecutionPlanConfigFile(str, axisConfiguration);
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public Map<String, ExecutionPlanConfiguration> getAllActiveExecutionConfigurations(int i) {
        HashMap hashMap = new HashMap();
        Map<String, ExecutionPlan> map = this.tenantSpecificExecutionPlans.get(Integer.valueOf(i));
        if (map != null) {
            for (Map.Entry<String, ExecutionPlan> entry : map.entrySet()) {
                hashMap.put(entry.getKey(), entry.getValue().getExecutionPlanConfiguration());
            }
        }
        return hashMap;
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public ExecutionPlanConfiguration getActiveExecutionConfiguration(String str, int i) {
        ExecutionPlan executionPlan;
        Map<String, ExecutionPlan> map = this.tenantSpecificExecutionPlans.get(Integer.valueOf(i));
        if (map == null || (executionPlan = map.get(str)) == null) {
            return null;
        }
        return executionPlan.getExecutionPlanConfiguration();
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public List<ExecutionPlanConfigurationFile> getAllInactiveExecutionPlanConfiguration(int i) {
        List<ExecutionPlanConfigurationFile> list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(i));
        ArrayList arrayList = new ArrayList();
        if (list != null) {
            for (ExecutionPlanConfigurationFile executionPlanConfigurationFile : list) {
                if (executionPlanConfigurationFile.getStatus() == ExecutionPlanConfigurationFile.Status.ERROR || executionPlanConfigurationFile.getStatus() == ExecutionPlanConfigurationFile.Status.WAITING_FOR_DEPENDENCY || executionPlanConfigurationFile.getStatus() == ExecutionPlanConfigurationFile.Status.WAITING_FOR_OSGI_SERVICE) {
                    arrayList.add(executionPlanConfigurationFile);
                }
            }
        }
        return arrayList;
    }

    public void subscribeStreamListener(String str, EventSender eventSender) {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(tenantId));
        if (map == null) {
            map = new ConcurrentHashMap();
            this.tenantSpecificEventJunctions.put(Integer.valueOf(tenantId), map);
        }
        EventJunction eventJunction = map.get(str);
        if (eventJunction == null) {
            throw new EventProducerException("Junction not found for stream id : " + str);
        }
        eventJunction.addConsumer(new ExternalStreamConsumer(eventSender, eventSender));
    }

    public void unsubscribeStreamListener(String str, EventSender eventSender, int i) {
        EventJunction eventJunction;
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(i));
        if (map == null || (eventJunction = map.get(str)) == null) {
            return;
        }
        for (EventConsumer eventConsumer : eventJunction.getAllEventConsumers()) {
            if (eventConsumer.getOwner() == eventSender) {
                eventJunction.removeConsumer(eventConsumer);
            }
        }
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void setTracingEnabled(String str, boolean z, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        Map<String, ExecutionPlan> map = this.tenantSpecificExecutionPlans.get(Integer.valueOf(tenantId));
        if (map != null) {
            ExecutionPlan executionPlan = map.get(str);
            executionPlan.getExecutionPlanConfiguration().setTracingEnabled(z);
            editExecutionPlanConfiguration(executionPlan.getExecutionPlanConfiguration(), str, tenantId, axisConfiguration);
        }
    }

    @Override // org.wso2.carbon.event.processor.core.EventProcessorService
    public void setStatisticsEnabled(String str, boolean z, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        Map<String, ExecutionPlan> map = this.tenantSpecificExecutionPlans.get(Integer.valueOf(tenantId));
        if (map != null) {
            ExecutionPlan executionPlan = map.get(str);
            executionPlan.getExecutionPlanConfiguration().setStatisticsEnabled(z);
            editExecutionPlanConfiguration(executionPlan.getExecutionPlanConfiguration(), str, tenantId, axisConfiguration);
        }
    }

    public void registerPassthroughSenderConfigurator(PassthroughSenderConfigurator passthroughSenderConfigurator) {
        EventProcessorValueHolder.addPassthroughSenderConfigurator(passthroughSenderConfigurator);
    }

    public void activateInactiveExecutionPlanConfigurations(ExecutionPlanConfigurationFile.Status status, String str, int i) throws ExecutionPlanConfigurationException {
        List<ExecutionPlanConfigurationFile> list;
        ArrayList<ExecutionPlanConfigurationFile> arrayList = new ArrayList();
        if (this.tenantSpecificExecutionPlanFiles != null && this.tenantSpecificExecutionPlanFiles.size() > 0 && (list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(i))) != null) {
            for (ExecutionPlanConfigurationFile executionPlanConfigurationFile : list) {
                if (executionPlanConfigurationFile.getStatus().equals(status) && str.equalsIgnoreCase(executionPlanConfigurationFile.getDependency())) {
                    arrayList.add(executionPlanConfigurationFile);
                }
            }
        }
        for (ExecutionPlanConfigurationFile executionPlanConfigurationFile2 : arrayList) {
            try {
                EventProcessorConfigurationFilesystemInvoker.reload(executionPlanConfigurationFile2.getFileName(), executionPlanConfigurationFile2.getAxisConfiguration());
            } catch (Exception e) {
                log.error("Exception occurred while trying to deploy the Execution Plan configuration file : " + new File(executionPlanConfigurationFile2.getFileName()).getName());
            }
        }
    }

    public void deactivateActiveExecutionPlanConfigurations(String str, int i) {
        EventJunction eventJunction = getEventJunction(i, str);
        if (eventJunction == null || eventJunction.getAllEventProducers().size() != 0) {
            return;
        }
        for (EventConsumer eventConsumer : this.tenantSpecificEventJunctions.get(Integer.valueOf(i)).remove(str).getAllEventConsumers()) {
            if (eventConsumer instanceof SiddhiInputEventDispatcher) {
                ExecutionPlanConfigurationFile executionPlanConfigurationFileByPlanName = getExecutionPlanConfigurationFileByPlanName(((ExecutionPlanConfiguration) eventConsumer.getOwner()).getName(), i);
                try {
                    EventProcessorConfigurationFilesystemInvoker.reload(executionPlanConfigurationFileByPlanName.getFileName(), executionPlanConfigurationFileByPlanName.getAxisConfiguration());
                } catch (Exception e) {
                    log.error("Exception occurred while trying to deploy the Execution Plan configuration file : " + new File(executionPlanConfigurationFileByPlanName.getFileName()).getName());
                }
            }
        }
        if (EventProcessorValueHolder.getNotificationListener() != null) {
            EventProcessorValueHolder.getNotificationListener().removeEventStream(i, str);
        }
    }

    public void addExternalStream(String str, EventReceiver eventReceiver) throws ExecutionPlanConfigurationException {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        EventJunction eventJunction = getEventJunction(tenantId, str);
        try {
            org.wso2.carbon.databridge.commons.StreamDefinition streamDefinitionFromStore = EventProcessorValueHolder.getEventStreamService().getStreamDefinitionFromStore(str, tenantId);
            if (streamDefinitionFromStore == null) {
                throw new ExecutionPlanConfigurationException("The stream definition does not exist with stream ID : " + str);
            }
            if (eventJunction == null) {
                eventJunction = createEventJunctionWithoutSubscriptions(tenantId, streamDefinitionFromStore);
            } else if (!eventJunction.getStreamDefinition().equals(streamDefinitionFromStore)) {
                log.error("Externally defined stream: " + streamDefinitionFromStore + " is different from the existing stream :" + eventJunction.getStreamDefinition());
            }
            ExternalStreamListener externalStreamListener = new ExternalStreamListener(eventJunction, eventReceiver);
            eventJunction.addProducer(externalStreamListener);
            try {
                eventReceiver.subscribe(eventJunction.getStreamDefinition().getStreamId(), externalStreamListener, tenantId);
                activateInactiveExecutionPlanConfigurations(ExecutionPlanConfigurationFile.Status.WAITING_FOR_DEPENDENCY, str, tenantId);
            } catch (EventReceiverException e) {
                throw new ExecutionPlanConfigurationException("Error subscribing to event receiver : " + e.getMessage(), e);
            }
        } catch (EventStreamConfigurationException e2) {
            throw new ExecutionPlanConfigurationException("Could not retrieve stream definition with stream ID : " + str, e2);
        }
    }

    public void removeExternalStream(int i, String str, EventReceiver eventReceiver) throws ExecutionPlanConfigurationException {
        EventJunction eventJunction = getEventJunction(i, str);
        boolean z = true;
        if (eventJunction != null) {
            for (EventProducer eventProducer : eventJunction.getAllEventProducers()) {
                if (eventProducer instanceof ExternalStreamListener) {
                    if (eventProducer.getOwner() == eventReceiver) {
                        eventJunction.removeProducer(eventProducer);
                        try {
                            eventReceiver.unsubsribe(str, (ExternalStreamListener) eventProducer, i);
                        } catch (EventReceiverException e) {
                            throw new ExecutionPlanConfigurationException("Error unsubscribing from event receiver : " + e.getMessage(), e);
                        }
                    } else {
                        z = false;
                    }
                }
            }
        }
        if (z) {
            deactivateActiveExecutionPlanConfigurations(str, i);
        }
    }

    private EventJunction getEventJunction(int i, String str) {
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(i));
        if (map != null) {
            return map.get(str);
        }
        return null;
    }

    public void notifyAllStreamsToFormatter() {
        for (Map.Entry<Integer, Map<String, EventJunction>> entry : this.tenantSpecificEventJunctions.entrySet()) {
            Iterator<Map.Entry<String, EventJunction>> it = entry.getValue().entrySet().iterator();
            while (it.hasNext()) {
                EventProcessorValueHolder.getNotificationListener().addedNewEventStream(entry.getKey().intValue(), it.next().getValue().getStreamDefinition().getStreamId());
            }
        }
    }

    private ExecutionPlanConfigurationFile getExecutionPlanConfigurationFileByPlanName(String str, int i) {
        List<ExecutionPlanConfigurationFile> list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(i));
        if (list == null) {
            return null;
        }
        for (ExecutionPlanConfigurationFile executionPlanConfigurationFile : list) {
            if (str.equals(executionPlanConfigurationFile.getExecutionPlanName()) && executionPlanConfigurationFile.getStatus().equals(ExecutionPlanConfigurationFile.Status.DEPLOYED)) {
                return executionPlanConfigurationFile;
            }
        }
        return null;
    }

    private void editExecutionPlanConfiguration(ExecutionPlanConfiguration executionPlanConfiguration, String str, int i, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        ExecutionPlanConfigurationFile executionPlanConfigurationFileByPlanName = getExecutionPlanConfigurationFileByPlanName(str, i);
        String fileName = executionPlanConfigurationFileByPlanName.getFileName();
        EventProcessorConfigurationFilesystemInvoker.delete(executionPlanConfigurationFileByPlanName.getFileName(), axisConfiguration);
        EventProcessorConfigurationFilesystemInvoker.save(EventProcessorConfigurationHelper.toOM(executionPlanConfiguration), str, fileName, axisConfiguration);
    }

    private EventJunction createEventJunctionWithoutSubscriptions(int i, org.wso2.carbon.databridge.commons.StreamDefinition streamDefinition) {
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(i));
        if (map == null) {
            map = new ConcurrentHashMap();
            this.tenantSpecificEventJunctions.put(Integer.valueOf(i), map);
        }
        EventJunction eventJunction = new EventJunction(streamDefinition);
        map.put(streamDefinition.getStreamId(), eventJunction);
        if (EventProcessorValueHolder.getNotificationListener() != null) {
            EventProcessorValueHolder.getNotificationListener().addedNewEventStream(i, streamDefinition.getStreamId());
        }
        return eventJunction;
    }

    private void validateToRemoveInactiveExecutionPlanConfiguration(String str, AxisConfiguration axisConfiguration) throws ExecutionPlanConfigurationException {
        String str2 = str + ".xml";
        List<ExecutionPlanConfigurationFile> list = this.tenantSpecificExecutionPlanFiles.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (list != null) {
            for (ExecutionPlanConfigurationFile executionPlanConfigurationFile : list) {
                if (executionPlanConfigurationFile.getFileName().equals(str2) && !executionPlanConfigurationFile.getStatus().equals(ExecutionPlanConfigurationFile.Status.DEPLOYED)) {
                    EventProcessorConfigurationFilesystemInvoker.delete(str2, axisConfiguration);
                    return;
                }
            }
        }
    }

    private boolean checkExecutionPlanValidity(String str, int i) throws ExecutionPlanConfigurationException {
        Map<String, ExecutionPlanConfiguration> allActiveExecutionConfigurations = getAllActiveExecutionConfigurations(i);
        if (allActiveExecutionConfigurations == null) {
            return true;
        }
        Iterator<String> it = allActiveExecutionConfigurations.keySet().iterator();
        while (it.hasNext()) {
            if (str.equalsIgnoreCase(it.next())) {
                return false;
            }
        }
        return true;
    }
}
