package org.wso2.carbon.databridge.core.internal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.Credentials;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.exception.DifferentStreamDefinitionAlreadyDefinedException;
import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
import org.wso2.carbon.databridge.commons.utils.DataBridgeCommonsUtils;
import org.wso2.carbon.databridge.commons.utils.EventDefinitionConverterUtils;
import org.wso2.carbon.databridge.core.AgentCallback;
import org.wso2.carbon.databridge.core.EventConverter;
import org.wso2.carbon.databridge.core.RawDataAgentCallback;
import org.wso2.carbon.databridge.core.StreamAttributeComposite;
import org.wso2.carbon.databridge.core.StreamTypeHolder;
import org.wso2.carbon.databridge.core.Utils.AgentSession;
import org.wso2.carbon.databridge.core.Utils.EventComposite;
import org.wso2.carbon.databridge.core.conf.DataBridgeConfiguration;
import org.wso2.carbon.databridge.core.definitionstore.AbstractStreamDefinitionStore;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionStoreException;
import org.wso2.carbon.databridge.core.internal.queue.EventQueue;

/* loaded from: input_file:org/wso2/carbon/databridge/core/internal/EventDispatcher.class */
public class EventDispatcher {
    private AbstractStreamDefinitionStore streamDefinitionStore;
    private EventQueue eventQueue;
    private static final Log log = LogFactory.getLog(EventDispatcher.class);
    private List<AgentCallback> subscribers = new ArrayList();
    private List<RawDataAgentCallback> rawDataSubscribers = new ArrayList();
    private Map<String, StreamTypeHolder> domainNameStreamTypeHolderCache = new ConcurrentHashMap();

    public EventDispatcher(AbstractStreamDefinitionStore abstractStreamDefinitionStore, DataBridgeConfiguration dataBridgeConfiguration) {
        this.eventQueue = new EventQueue(this.subscribers, this.rawDataSubscribers, dataBridgeConfiguration);
        this.streamDefinitionStore = abstractStreamDefinitionStore;
    }

    public void addCallback(AgentCallback agentCallback) {
        this.subscribers.add(agentCallback);
    }

    public void addCallback(RawDataAgentCallback rawDataAgentCallback) {
        this.rawDataSubscribers.add(rawDataAgentCallback);
    }

    public synchronized String defineStream(String str, AgentSession agentSession) throws MalformedStreamDefinitionException, DifferentStreamDefinitionAlreadyDefinedException, StreamDefinitionStoreException {
        StreamDefinition convertFromJson = EventDefinitionConverterUtils.convertFromJson(str);
        StreamTypeHolder streamDefinitionHolder = getStreamDefinitionHolder(agentSession.getCredentials());
        StreamAttributeComposite attributeComposite = streamDefinitionHolder.getAttributeComposite(convertFromJson.getStreamId());
        if (attributeComposite != null) {
            StreamDefinition streamDefinition = attributeComposite.getStreamDefinition();
            if (!streamDefinition.equals(convertFromJson)) {
                throw new DifferentStreamDefinitionAlreadyDefinedException("Similar event stream for " + convertFromJson + " with the same name and version already exist: " + this.streamDefinitionStore.getStreamDefinition(agentSession.getCredentials(), convertFromJson.getName(), convertFromJson.getVersion()));
            }
            convertFromJson = streamDefinition;
        } else {
            Iterator<StreamAttributeComposite> it = streamDefinitionHolder.getAttributeCompositeMap().values().iterator();
            while (it.hasNext()) {
                validateStreamDefinition(convertFromJson, it.next().getStreamDefinition());
            }
            updateDomainNameStreamTypeHolderCache(convertFromJson, agentSession.getCredentials());
            this.streamDefinitionStore.saveStreamDefinition(agentSession.getCredentials(), convertFromJson);
        }
        Iterator<AgentCallback> it2 = this.subscribers.iterator();
        while (it2.hasNext()) {
            it2.next().definedStream(convertFromJson, agentSession.getCredentials());
        }
        Iterator<RawDataAgentCallback> it3 = this.rawDataSubscribers.iterator();
        while (it3.hasNext()) {
            it3.next().definedStream(convertFromJson, agentSession.getCredentials());
        }
        return convertFromJson.getStreamId();
    }

    public synchronized String defineStream(String str, AgentSession agentSession, String str2) throws MalformedStreamDefinitionException, DifferentStreamDefinitionAlreadyDefinedException, StreamDefinitionStoreException {
        StreamDefinition convertFromJson = EventDefinitionConverterUtils.convertFromJson(str);
        StreamTypeHolder streamDefinitionHolder = getStreamDefinitionHolder(agentSession.getCredentials());
        StreamAttributeComposite attributeComposite = streamDefinitionHolder.getAttributeComposite(convertFromJson.getStreamId());
        if (attributeComposite != null) {
            StreamDefinition streamDefinition = attributeComposite.getStreamDefinition();
            if (!streamDefinition.equals(convertFromJson)) {
                throw new DifferentStreamDefinitionAlreadyDefinedException("Similar event stream for " + convertFromJson + " with the same name and version already exist: " + this.streamDefinitionStore.getStreamDefinition(agentSession.getCredentials(), convertFromJson.getName(), convertFromJson.getVersion()));
            }
            convertFromJson = streamDefinition;
        } else {
            Iterator<StreamAttributeComposite> it = streamDefinitionHolder.getAttributeCompositeMap().values().iterator();
            while (it.hasNext()) {
                validateStreamDefinition(convertFromJson, it.next().getStreamDefinition());
            }
            updateDomainNameStreamTypeHolderCache(convertFromJson, agentSession.getCredentials());
            this.streamDefinitionStore.saveStreamDefinition(agentSession.getCredentials(), convertFromJson);
        }
        convertFromJson.createIndexDefinition(str2);
        Iterator<AgentCallback> it2 = this.subscribers.iterator();
        while (it2.hasNext()) {
            it2.next().definedStream(convertFromJson, agentSession.getCredentials());
        }
        Iterator<RawDataAgentCallback> it3 = this.rawDataSubscribers.iterator();
        while (it3.hasNext()) {
            it3.next().definedStream(convertFromJson, agentSession.getCredentials());
        }
        return convertFromJson.getStreamId();
    }

    private void validateStreamDefinition(StreamDefinition streamDefinition, StreamDefinition streamDefinition2) throws DifferentStreamDefinitionAlreadyDefinedException {
        if (streamDefinition.getName().equals(streamDefinition2.getName())) {
            validateAttributes(streamDefinition.getMetaData(), streamDefinition2.getMetaData(), "meta", streamDefinition, streamDefinition2);
            validateAttributes(streamDefinition.getCorrelationData(), streamDefinition2.getCorrelationData(), "correlation", streamDefinition, streamDefinition2);
            validateAttributes(streamDefinition.getPayloadData(), streamDefinition2.getPayloadData(), "payload", streamDefinition, streamDefinition2);
        }
    }

    private void validateAttributes(List<Attribute> list, List<Attribute> list2, String str, StreamDefinition streamDefinition, StreamDefinition streamDefinition2) throws DifferentStreamDefinitionAlreadyDefinedException {
        if (list == null || list2 == null) {
            return;
        }
        for (Attribute attribute : list) {
            for (Attribute attribute2 : list2) {
                if (attribute.getName().equals(attribute2.getName()) && attribute.getType() != attribute2.getType()) {
                    throw new DifferentStreamDefinitionAlreadyDefinedException("Attribute type mismatch " + str + " " + attribute.getName() + " type:" + attribute.getType() + " was already defined with type:" + attribute2.getType() + " in " + streamDefinition2 + ", hence " + streamDefinition + " cannot be defined");
                }
            }
        }
    }

    public void publish(Object obj, AgentSession agentSession, EventConverter eventConverter) {
        this.eventQueue.publish(new EventComposite(obj, getStreamDefinitionHolder(agentSession.getCredentials()), agentSession, eventConverter));
    }

    private StreamTypeHolder getStreamDefinitionHolder(Credentials credentials) {
        StreamTypeHolder streamTypeHolder = this.domainNameStreamTypeHolderCache.get(credentials.getDomainName());
        if (streamTypeHolder == null) {
            return initDomainNameStreamTypeHolderCache(credentials);
        }
        if (log.isDebugEnabled()) {
            String str = ("Event stream holder for domain name : " + credentials.getDomainName() + " : \n ") + "Meta, Correlation & Payload Data Type Map : ";
            for (Map.Entry<String, StreamAttributeComposite> entry : streamTypeHolder.getAttributeCompositeMap().entrySet()) {
                str = (((str + "StreamID=" + ((Object) entry.getKey()) + " :  ") + "Meta= " + Arrays.deepToString(entry.getValue().getAttributeTypes()[0]) + " :  ") + "Correlation= " + Arrays.deepToString(entry.getValue().getAttributeTypes()[1]) + " :  ") + "Payload= " + Arrays.deepToString(entry.getValue().getAttributeTypes()[2]) + "\n";
            }
            log.debug(str);
        }
        return streamTypeHolder;
    }

    public void updateStreamDefinitionHolder(Credentials credentials) {
        StreamTypeHolder streamTypeHolder = this.domainNameStreamTypeHolderCache.get(credentials.getDomainName());
        if (streamTypeHolder != null) {
            if (log.isDebugEnabled()) {
                String str = ("Event stream holder for domain name : " + credentials.getDomainName() + " : \n ") + "Meta, Correlation & Payload Data Type Map : ";
                for (Map.Entry<String, StreamAttributeComposite> entry : streamTypeHolder.getAttributeCompositeMap().entrySet()) {
                    str = (((str + "StreamID=" + ((Object) entry.getKey()) + " :  ") + "Meta= " + Arrays.deepToString(entry.getValue().getAttributeTypes()[0]) + " :  ") + "Correlation= " + Arrays.deepToString(entry.getValue().getAttributeTypes()[1]) + " :  ") + "Payload= " + Arrays.deepToString(entry.getValue().getAttributeTypes()[2]) + "\n";
                }
                log.debug(str);
            }
            updateDomainNameStreamTypeHolderCache(credentials);
        }
    }

    private void updateDomainNameStreamTypeHolderCache(StreamDefinition streamDefinition, Credentials credentials) {
        getStreamDefinitionHolder(credentials).putStreamDefinition(streamDefinition);
    }

    private StreamTypeHolder initDomainNameStreamTypeHolderCache(Credentials credentials) {
        StreamTypeHolder streamTypeHolder = this.domainNameStreamTypeHolderCache.get(credentials.getDomainName());
        if (null == streamTypeHolder) {
            streamTypeHolder = new StreamTypeHolder(credentials.getDomainName());
            Collection<StreamDefinition> allStreamDefinitions = this.streamDefinitionStore.getAllStreamDefinitions(credentials);
            if (null != allStreamDefinitions) {
                for (StreamDefinition streamDefinition : allStreamDefinitions) {
                    streamTypeHolder.putStreamDefinition(streamDefinition);
                    Iterator<AgentCallback> it = this.subscribers.iterator();
                    while (it.hasNext()) {
                        it.next().definedStream(streamDefinition, credentials);
                    }
                    Iterator<RawDataAgentCallback> it2 = this.rawDataSubscribers.iterator();
                    while (it2.hasNext()) {
                        it2.next().definedStream(streamDefinition, credentials);
                    }
                }
            }
            this.domainNameStreamTypeHolderCache.put(credentials.getDomainName(), streamTypeHolder);
        }
        return streamTypeHolder;
    }

    private StreamTypeHolder updateDomainNameStreamTypeHolderCache(Credentials credentials) {
        StreamTypeHolder streamTypeHolder = this.domainNameStreamTypeHolderCache.get(credentials.getDomainName());
        if (null != streamTypeHolder) {
            Collection<StreamDefinition> allStreamDefinitions = this.streamDefinitionStore.getAllStreamDefinitions(credentials);
            if (null != allStreamDefinitions) {
                for (StreamDefinition streamDefinition : allStreamDefinitions) {
                    if (streamTypeHolder.getAttributeComposite(streamDefinition.getStreamId()) == null) {
                        streamTypeHolder.putStreamDefinition(streamDefinition);
                        Iterator<AgentCallback> it = this.subscribers.iterator();
                        while (it.hasNext()) {
                            it.next().definedStream(streamDefinition, credentials);
                        }
                        Iterator<RawDataAgentCallback> it2 = this.rawDataSubscribers.iterator();
                        while (it2.hasNext()) {
                            it2.next().definedStream(streamDefinition, credentials);
                        }
                    }
                }
                ArrayList arrayList = new ArrayList();
                Iterator<StreamDefinition> it3 = allStreamDefinitions.iterator();
                while (it3.hasNext()) {
                    arrayList.add(it3.next().getStreamId());
                }
                Iterator<String> it4 = streamTypeHolder.getAttributeCompositeMap().keySet().iterator();
                while (it4.hasNext()) {
                    if (!arrayList.contains(it4.next())) {
                        it4.remove();
                    }
                }
            }
            this.domainNameStreamTypeHolderCache.put(credentials.getDomainName(), streamTypeHolder);
        }
        return streamTypeHolder;
    }

    public List<AgentCallback> getSubscribers() {
        return this.subscribers;
    }

    public List<RawDataAgentCallback> getRawDataSubscribers() {
        return this.rawDataSubscribers;
    }

    public String findStreamId(Credentials credentials, String str, String str2) throws StreamDefinitionStoreException {
        updateDomainNameStreamTypeHolderCache(credentials);
        StreamAttributeComposite attributeComposite = getStreamDefinitionHolder(credentials).getAttributeComposite(DataBridgeCommonsUtils.generateStreamId(str, str2));
        if (attributeComposite != null) {
            return attributeComposite.getStreamDefinition().getStreamId();
        }
        return null;
    }

    public boolean deleteStream(Credentials credentials, String str, String str2) {
        StreamDefinition removeStreamDefinitionFromStreamTypeHolder = removeStreamDefinitionFromStreamTypeHolder(credentials, DataBridgeCommonsUtils.generateStreamId(str, str2));
        if (removeStreamDefinitionFromStreamTypeHolder != null) {
            Iterator<AgentCallback> it = this.subscribers.iterator();
            while (it.hasNext()) {
                it.next().removeStream(removeStreamDefinitionFromStreamTypeHolder, credentials);
            }
            Iterator<RawDataAgentCallback> it2 = this.rawDataSubscribers.iterator();
            while (it2.hasNext()) {
                it2.next().removeStream(removeStreamDefinitionFromStreamTypeHolder, credentials);
            }
        }
        return this.streamDefinitionStore.deleteStreamDefinition(credentials, str, str2);
    }

    private synchronized StreamDefinition removeStreamDefinitionFromStreamTypeHolder(Credentials credentials, String str) {
        StreamTypeHolder streamTypeHolder = this.domainNameStreamTypeHolderCache.get(credentials.getDomainName());
        if (streamTypeHolder != null) {
            return streamTypeHolder.getAttributeCompositeMap().remove(str).getStreamDefinition();
        }
        return null;
    }
}
