package org.wso2.carbon.databridge.core.internal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.databridge.commons.Credentials;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.exception.DifferentStreamDefinitionAlreadyDefinedException;
import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
import org.wso2.carbon.databridge.commons.exception.UndefinedEventTypeException;
import org.wso2.carbon.databridge.commons.utils.EventDefinitionConverterUtils;
import org.wso2.carbon.databridge.core.AgentCallback;
import org.wso2.carbon.databridge.core.AttributeComposite;
import org.wso2.carbon.databridge.core.EventConverter;
import org.wso2.carbon.databridge.core.RawDataAgentCallback;
import org.wso2.carbon.databridge.core.StreamTypeHolder;
import org.wso2.carbon.databridge.core.Utils.AgentSession;
import org.wso2.carbon.databridge.core.Utils.EventComposite;
import org.wso2.carbon.databridge.core.conf.DataBridgeConfiguration;
import org.wso2.carbon.databridge.core.definitionstore.AbstractStreamDefinitionStore;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionNotFoundException;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionStoreException;
import org.wso2.carbon.databridge.core.internal.queue.EventQueue;

/* loaded from: input_file:org/wso2/carbon/databridge/core/internal/EventDispatcher.class */
public class EventDispatcher {
    public static final String HACK_DOMAIN_CONSTANT = "-1234";
    private AbstractStreamDefinitionStore streamDefinitionStore;
    private EventQueue eventQueue;
    private static final Log log = LogFactory.getLog(EventDispatcher.class);
    private List<AgentCallback> subscribers = new ArrayList();
    private List<RawDataAgentCallback> rawDataSubscribers = new ArrayList();
    private Map<String, StreamTypeHolder> streamTypeCache = new ConcurrentHashMap();

    /* loaded from: input_file:org/wso2/carbon/databridge/core/internal/EventDispatcher$StreamTypeCache.class */
    private static class StreamTypeCache {
        private StreamTypeCache() {
        }
    }

    public EventDispatcher(AbstractStreamDefinitionStore abstractStreamDefinitionStore, DataBridgeConfiguration dataBridgeConfiguration) {
        this.eventQueue = new EventQueue(this.subscribers, this.rawDataSubscribers, dataBridgeConfiguration);
        this.streamDefinitionStore = abstractStreamDefinitionStore;
    }

    public void addCallback(AgentCallback agentCallback) {
        this.subscribers.add(agentCallback);
    }

    public void addCallback(RawDataAgentCallback rawDataAgentCallback) {
        this.rawDataSubscribers.add(rawDataAgentCallback);
    }

    public String defineStream(String str, AgentSession agentSession) throws MalformedStreamDefinitionException, DifferentStreamDefinitionAlreadyDefinedException, StreamDefinitionStoreException {
        String streamId;
        StreamDefinition streamDefinition;
        synchronized (EventDispatcher.class) {
            StreamDefinition convertFromJson = EventDefinitionConverterUtils.convertFromJson(str);
            try {
                streamDefinition = this.streamDefinitionStore.getStreamDefinition(agentSession.getCredentials(), convertFromJson.getName(), convertFromJson.getVersion());
            } catch (StreamDefinitionNotFoundException e) {
                this.streamDefinitionStore.saveStreamDefinition(agentSession.getCredentials(), convertFromJson);
                updateStreamTypeCache(agentSession.getDomainName(), convertFromJson);
            }
            if (!streamDefinition.equals(convertFromJson)) {
                throw new DifferentStreamDefinitionAlreadyDefinedException("Similar event stream for " + convertFromJson + " with the same name and version already exist: " + this.streamDefinitionStore.getStreamDefinition(agentSession.getCredentials(), convertFromJson.getName(), convertFromJson.getVersion()));
            }
            convertFromJson = streamDefinition;
            updateStreamCacheIfNotStreamDefnExisting(agentSession.getDomainName(), convertFromJson, agentSession.getCredentials());
            Iterator<AgentCallback> it = this.subscribers.iterator();
            while (it.hasNext()) {
                it.next().definedStream(convertFromJson, agentSession.getCredentials());
            }
            streamId = convertFromJson.getStreamId();
        }
        return streamId;
    }

    private void updateStreamTypeCache(String str, StreamDefinition streamDefinition) {
        synchronized (EventDispatcher.class) {
            StreamTypeHolder streamTypeHolder = this.streamTypeCache.containsKey(str) ? this.streamTypeCache.get(str) : new StreamTypeHolder(str);
            updateStreamTypeHolder(streamTypeHolder, streamDefinition);
            if (log.isTraceEnabled()) {
                String str2 = ("Event Stream Type getting updated : Event stream holder for domain name : " + str + " : \n ") + "Meta, Correlation & Payload Data Type Map : ";
                for (Map.Entry<String, AttributeComposite> entry : streamTypeHolder.getAttributeCompositeMap().entrySet()) {
                    str2 = (((str2 + "StreamID=" + ((Object) entry.getKey()) + " :  ") + "Meta= " + Arrays.deepToString(entry.getValue().getAttributeTypes()[0]) + " :  ") + "Correlation= " + Arrays.deepToString(entry.getValue().getAttributeTypes()[1]) + " :  ") + "Payload= " + Arrays.deepToString(entry.getValue().getAttributeTypes()[2]) + "\n";
                }
                log.trace(str2);
            }
            this.streamTypeCache.put(str, streamTypeHolder);
        }
    }

    private void updateStreamCacheIfNotStreamDefnExisting(String str, StreamDefinition streamDefinition, Credentials credentials) {
        synchronized (EventDispatcher.class) {
            if (this.streamTypeCache.containsKey(str)) {
                if (null == this.streamTypeCache.get(str).getDataType(streamDefinition.getStreamId())) {
                    updateStreamTypeCache(str, streamDefinition);
                }
            } else if (this.streamTypeCache.size() == 0) {
                StreamTypeHolder streamTypeHolder = new StreamTypeHolder(str);
                for (StreamDefinition streamDefinition2 : this.streamDefinitionStore.getAllStreamDefinitions(credentials)) {
                    updateStreamTypeHolder(streamTypeHolder, streamDefinition2);
                    updateStreamTypeCache(credentials.getDomainName(), streamDefinition2);
                }
            } else {
                updateStreamTypeCache(str, streamDefinition);
            }
        }
    }

    public void publish(Object obj, AgentSession agentSession, EventConverter eventConverter) throws UndefinedEventTypeException {
        try {
            this.eventQueue.publish(new EventComposite(obj, getStreamDefinitionHolder(agentSession.getCredentials()), agentSession, eventConverter));
        } catch (StreamDefinitionNotFoundException e) {
            throw new UndefinedEventTypeException("No event stream definition exist " + e.getErrorMessage());
        }
    }

    private StreamTypeHolder getStreamDefinitionHolder(Credentials credentials) throws StreamDefinitionNotFoundException {
        StreamTypeHolder streamTypeHolder;
        StreamTypeHolder streamTypeHolder2 = this.streamTypeCache.get(credentials.getDomainName());
        if (log.isTraceEnabled()) {
            log.trace("Retrieving Event Stream Type Cache : " + this.streamTypeCache);
        }
        if (streamTypeHolder2 != null) {
            if (log.isDebugEnabled()) {
                String str = ("Event stream holder for domain name : " + credentials.getDomainName() + " : \n ") + "Meta, Correlation & Payload Data Type Map : ";
                for (Map.Entry<String, AttributeComposite> entry : streamTypeHolder2.getAttributeCompositeMap().entrySet()) {
                    str = (((str + "StreamID=" + ((Object) entry.getKey()) + " :  ") + "Meta= " + Arrays.deepToString(entry.getValue().getAttributeTypes()[0]) + " :  ") + "Correlation= " + Arrays.deepToString(entry.getValue().getAttributeTypes()[1]) + " :  ") + "Payload= " + Arrays.deepToString(entry.getValue().getAttributeTypes()[2]) + "\n";
                }
                log.debug(str);
            }
            return streamTypeHolder2;
        }
        synchronized (EventDispatcher.class) {
            streamTypeHolder = this.streamTypeCache.get(credentials.getDomainName());
            if (null == streamTypeHolder) {
                streamTypeHolder = new StreamTypeHolder(credentials.getDomainName());
                for (StreamDefinition streamDefinition : this.streamDefinitionStore.getAllStreamDefinitions(credentials)) {
                    updateStreamTypeHolder(streamTypeHolder, streamDefinition);
                    updateStreamTypeCache(credentials.getDomainName(), streamDefinition);
                }
            }
        }
        if (log.isDebugEnabled()) {
            String str2 = ("Event stream holder for domain name : " + credentials.getDomainName() + " : \n ") + "Meta, Correlation & Payload Data Type Map : ";
            for (Map.Entry<String, AttributeComposite> entry2 : streamTypeHolder.getAttributeCompositeMap().entrySet()) {
                str2 = (((str2 + "StreamID=" + ((Object) entry2.getKey()) + " :  ") + "Meta= " + Arrays.deepToString(entry2.getValue().getAttributeTypes()[0]) + " :  ") + "Correlation= " + Arrays.deepToString(entry2.getValue().getAttributeTypes()[1]) + " :  ") + "Payload= " + Arrays.deepToString(entry2.getValue().getAttributeTypes()[2]) + "\n";
            }
            log.debug(str2);
        }
        return streamTypeHolder;
    }

    private void updateStreamTypeHolder(StreamTypeHolder streamTypeHolder, StreamDefinition streamDefinition) {
        streamTypeHolder.putDataType(streamDefinition.getStreamId(), EventDefinitionConverterUtils.generateAttributeTypeArray(streamDefinition.getMetaData()), EventDefinitionConverterUtils.generateAttributeTypeArray(streamDefinition.getCorrelationData()), EventDefinitionConverterUtils.generateAttributeTypeArray(streamDefinition.getPayloadData()));
    }

    public List<AgentCallback> getSubscribers() {
        return this.subscribers;
    }

    public List<RawDataAgentCallback> getRawDataSubscribers() {
        return this.rawDataSubscribers;
    }

    public String findStreamId(Credentials credentials, String str, String str2) throws StreamDefinitionNotFoundException, StreamDefinitionStoreException {
        try {
            return this.streamDefinitionStore.getStreamId(credentials, str, str2);
        } catch (StreamDefinitionNotFoundException e) {
            throw new StreamDefinitionNotFoundException("No event stream definition exist " + e.getErrorMessage());
        }
    }

    public boolean deleteStream(Credentials credentials, String str) {
        return this.streamDefinitionStore.deleteStreamDefinition(credentials, str);
    }

    public boolean deleteStream(Credentials credentials, String str, String str2) {
        return this.streamDefinitionStore.deleteStreamDefinition(credentials, str, str2);
    }
}
