package org.wso2.carbon.event.stream.manager.core.internal;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.Credentials;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.utils.DataBridgeCommonsUtils;
import org.wso2.carbon.databridge.commons.utils.EventDefinitionConverterUtils;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionStoreException;
import org.wso2.carbon.event.processor.api.passthrough.PassthroughReceiverConfigurator;
import org.wso2.carbon.event.processor.api.passthrough.exception.PassthroughConfigurationException;
import org.wso2.carbon.event.processor.api.receive.exception.EventReceiverException;
import org.wso2.carbon.event.stream.manager.core.EventStreamListener;
import org.wso2.carbon.event.stream.manager.core.EventStreamService;
import org.wso2.carbon.event.stream.manager.core.exception.EventStreamConfigurationException;
import org.wso2.carbon.event.stream.manager.core.internal.ds.EventStreamServiceValueHolder;
import org.wso2.carbon.event.stream.manager.core.internal.util.EventStreamManagerConstants;
import org.wso2.carbon.registry.core.Resource;
import org.wso2.carbon.registry.core.exceptions.RegistryException;
import org.wso2.carbon.registry.core.session.UserRegistry;
import org.wso2.carbon.registry.core.utils.RegistryUtils;
import org.wso2.carbon.user.api.UserStoreException;

/* loaded from: input_file:org/wso2/carbon/event/stream/manager/core/internal/CarbonEventStreamService.class */
public class CarbonEventStreamService implements EventStreamService {
    private static final Log log = LogFactory.getLog(CarbonEventStreamService.class);
    private static final String STREAM_DEFINITION_STORE = "/StreamDefinitions";
    private List<EventStreamListener> eventStreamListenerList = new ArrayList();

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void addStreamDefinitionToStore(Credentials credentials, StreamDefinition streamDefinition, AxisConfiguration axisConfiguration) throws StreamDefinitionStoreException {
        try {
            try {
                PrivilegedCarbonContext.startTenantFlow();
                PrivilegedCarbonContext threadLocalCarbonContext = PrivilegedCarbonContext.getThreadLocalCarbonContext();
                threadLocalCarbonContext.setTenantId(EventStreamServiceValueHolder.getRealmService().getTenantManager().getTenantId(credentials.getDomainName()));
                threadLocalCarbonContext.setTenantDomain(credentials.getDomainName());
                PassthroughReceiverConfigurator passthroughReceiverConfigurator = EventStreamServiceValueHolder.getPassthroughReceiverConfigurator();
                try {
                    UserRegistry governanceUserRegistry = EventStreamServiceValueHolder.getRegistryService().getGovernanceUserRegistry(credentials.getUsername(), credentials.getPassword());
                    Resource newResource = governanceUserRegistry.newResource();
                    newResource.setContent(EventDefinitionConverterUtils.convertToJson(streamDefinition));
                    newResource.setMediaType("application/json");
                    governanceUserRegistry.put("/StreamDefinitions/" + streamDefinition.getName() + "/" + streamDefinition.getVersion(), newResource);
                    passthroughReceiverConfigurator.deployDefaultEventBuilder(streamDefinition.getStreamId(), axisConfiguration);
                    log.info("Stream definition added to registry successfully : " + streamDefinition.getStreamId());
                    Iterator<EventStreamListener> it = this.eventStreamListenerList.iterator();
                    while (it.hasNext()) {
                        it.next().addedEventStream(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(), streamDefinition.getName(), streamDefinition.getVersion());
                    }
                } catch (RegistryException e) {
                    log.error("Error in saving Stream Definition " + streamDefinition);
                }
            } catch (UserStoreException e2) {
                throw new StreamDefinitionStoreException("Error in saving definition " + streamDefinition + " to registry, " + e2.getMessage(), e2);
            }
        } finally {
            PrivilegedCarbonContext.endTenantFlow();
        }
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void addEventStreamDefinitionToStore(StreamDefinition streamDefinition, AxisConfiguration axisConfiguration) throws EventStreamConfigurationException {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        StreamDefinition streamDefinitionFromStore = getStreamDefinitionFromStore(streamDefinition.getName(), streamDefinition.getVersion(), tenantId);
        if (streamDefinitionFromStore != null) {
            if (!streamDefinitionFromStore.equals(streamDefinition)) {
                throw new EventStreamConfigurationException("Another Stream with same name and version exist :" + EventDefinitionConverterUtils.convertToJson(streamDefinitionFromStore));
            }
            return;
        }
        saveStreamDefinitionToStore(streamDefinition, tenantId);
        PassthroughReceiverConfigurator passthroughReceiverConfigurator = EventStreamServiceValueHolder.getPassthroughReceiverConfigurator();
        if (passthroughReceiverConfigurator != null) {
            try {
                passthroughReceiverConfigurator.deployDefaultEventBuilder(streamDefinition.getStreamId(), axisConfiguration);
            } catch (EventReceiverException e) {
                throw new EventStreamConfigurationException((Throwable) e);
            }
        }
        Iterator<EventStreamListener> it = this.eventStreamListenerList.iterator();
        while (it.hasNext()) {
            it.next().addedEventStream(tenantId, streamDefinition.getName(), streamDefinition.getVersion());
        }
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void addEventStreamDefinitionToStore(StreamDefinition streamDefinition) throws EventStreamConfigurationException {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        StreamDefinition streamDefinitionFromStore = getStreamDefinitionFromStore(streamDefinition.getName(), streamDefinition.getVersion(), tenantId);
        if (streamDefinitionFromStore != null) {
            if (!streamDefinitionFromStore.equals(streamDefinition)) {
                throw new EventStreamConfigurationException("Another Stream with same name and version exist : " + EventDefinitionConverterUtils.convertToJson(streamDefinitionFromStore));
            }
            return;
        }
        saveStreamDefinitionToStore(streamDefinition, tenantId);
        PassthroughReceiverConfigurator passthroughReceiverConfigurator = EventStreamServiceValueHolder.getPassthroughReceiverConfigurator();
        if (passthroughReceiverConfigurator != null) {
            try {
                passthroughReceiverConfigurator.saveDefaultEventBuilder(streamDefinition.getStreamId());
            } catch (EventReceiverException e) {
                throw new EventStreamConfigurationException((Throwable) e);
            }
        }
        Iterator<EventStreamListener> it = this.eventStreamListenerList.iterator();
        while (it.hasNext()) {
            it.next().addedEventStream(tenantId, streamDefinition.getName(), streamDefinition.getVersion());
        }
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void removeEventStreamDefinition(String str, String str2, int i) throws EventStreamConfigurationException {
        if (removeStreamDefinitionFromStore(str, str2, i)) {
            log.info("Stream definition - " + str + EventStreamManagerConstants.STREAM_SEPARATOR + str2 + " removed from registry successfully");
        }
        Iterator<EventStreamListener> it = this.eventStreamListenerList.iterator();
        while (it.hasNext()) {
            it.next().removedEventStream(i, str, str2);
        }
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void registerEventStreamListener(EventStreamListener eventStreamListener) {
        if (eventStreamListener != null) {
            this.eventStreamListenerList.add(eventStreamListener);
        }
    }

    private void saveStreamDefinitionToStore(StreamDefinition streamDefinition, int i) throws EventStreamConfigurationException {
        try {
            UserRegistry governanceSystemRegistry = EventStreamServiceValueHolder.getRegistryService().getGovernanceSystemRegistry(i);
            Resource newResource = governanceSystemRegistry.newResource();
            newResource.setContent(EventDefinitionConverterUtils.convertToJson(streamDefinition));
            newResource.setMediaType("application/json");
            governanceSystemRegistry.put("/StreamDefinitions/" + streamDefinition.getName() + "/" + streamDefinition.getVersion(), newResource);
            log.info("Stream definition added to registry successfully : " + streamDefinition.getStreamId());
        } catch (RegistryException e) {
            log.error("Error in saving Stream Definition " + streamDefinition);
            throw new EventStreamConfigurationException("Error in saving Stream Definition " + streamDefinition, e);
        }
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public StreamDefinition getStreamDefinitionFromStore(String str, String str2, int i) throws EventStreamConfigurationException {
        try {
            UserRegistry governanceSystemRegistry = EventStreamServiceValueHolder.getRegistryService().getGovernanceSystemRegistry(i);
            if (!governanceSystemRegistry.resourceExists("/StreamDefinitions/" + str + "/" + str2)) {
                return null;
            }
            Resource resource = governanceSystemRegistry.get("/StreamDefinitions/" + str + "/" + str2);
            if (resource.getContent() != null) {
                return EventDefinitionConverterUtils.convertFromJson(RegistryUtils.decodeBytes((byte[]) resource.getContent()));
            }
            return null;
        } catch (Exception e) {
            log.error("Error in getting Stream Definition " + str + EventStreamManagerConstants.STREAM_SEPARATOR + str2, e);
            throw new EventStreamConfigurationException("Error in getting Stream Definition " + str + EventStreamManagerConstants.STREAM_SEPARATOR + str2, e);
        }
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public boolean removeStreamDefinitionFromStore(String str, String str2, int i) throws EventStreamConfigurationException {
        try {
            UserRegistry governanceSystemRegistry = EventStreamServiceValueHolder.getRegistryService().getGovernanceSystemRegistry(i);
            governanceSystemRegistry.delete("/StreamDefinitions/" + str + "/" + str2);
            return !governanceSystemRegistry.resourceExists(new StringBuilder().append("/StreamDefinitions/").append(str).append("/").append(str2).toString());
        } catch (RegistryException e) {
            log.error("Error in deleting Stream Definition " + str + EventStreamManagerConstants.STREAM_SEPARATOR + str2);
            throw new EventStreamConfigurationException("Error in deleting Stream Definition " + str + EventStreamManagerConstants.STREAM_SEPARATOR + str2, e);
        }
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public Collection<StreamDefinition> getAllStreamDefinitionsFromStore(int i) throws EventStreamConfigurationException {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        try {
            UserRegistry governanceSystemRegistry = EventStreamServiceValueHolder.getRegistryService().getGovernanceSystemRegistry(i);
            if (governanceSystemRegistry.resourceExists(STREAM_DEFINITION_STORE)) {
                for (String str : governanceSystemRegistry.get(STREAM_DEFINITION_STORE).getChildren()) {
                    for (String str2 : governanceSystemRegistry.get(str).getChildren()) {
                        Resource resource = governanceSystemRegistry.get(str2);
                        try {
                            StreamDefinition convertFromJson = EventDefinitionConverterUtils.convertFromJson(RegistryUtils.decodeBytes((byte[]) resource.getContent()));
                            concurrentHashMap.put(convertFromJson.getStreamId(), convertFromJson);
                        } catch (Throwable th) {
                            log.error("Error in retrieving streamDefinition from the resource at " + resource.getPath(), th);
                            throw new EventStreamConfigurationException("Error in retrieving streamDefinition from the resource at " + resource.getPath(), th);
                        }
                    }
                }
            } else {
                governanceSystemRegistry.put(STREAM_DEFINITION_STORE, governanceSystemRegistry.newCollection());
            }
            return concurrentHashMap.values();
        } catch (RegistryException e) {
            log.error("Error in retrieving streamDefinitions from the registry", e);
            throw new EventStreamConfigurationException("Error in retrieving streamDefinitions from the registry", e);
        }
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public StreamDefinition getStreamDefinitionFromStore(String str, int i) throws EventStreamConfigurationException {
        return getStreamDefinitionFromStore(DataBridgeCommonsUtils.getStreamNameFromStreamId(str), DataBridgeCommonsUtils.getStreamVersionFromStreamId(str), i);
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public List<String> getStreamIds(int i) throws EventStreamConfigurationException {
        Collection<StreamDefinition> allStreamDefinitionsFromStore = getAllStreamDefinitionsFromStore(i);
        ArrayList arrayList = new ArrayList(allStreamDefinitionsFromStore.size());
        Iterator<StreamDefinition> it = allStreamDefinitionsFromStore.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getStreamId());
        }
        return arrayList;
    }

    public void processPendingStreamList() {
        if (EventStreamServiceValueHolder.getPassthroughReceiverConfigurator() == null || EventStreamServiceValueHolder.getPendingStreamIdList() == null || EventStreamServiceValueHolder.getPendingStreamIdList().isEmpty()) {
            return;
        }
        Iterator<StreamDefinition> it = EventStreamServiceValueHolder.getPendingStreamIdList().iterator();
        while (it.hasNext()) {
            try {
                addEventStreamDefinitionToStore(it.next());
            } catch (EventStreamConfigurationException e) {
                throw new PassthroughConfigurationException("Error while loading streams from config ", e);
            }
        }
        EventStreamServiceValueHolder.setPendingStreamIdList(null);
    }
}
