package org.wso2.carbon.cep.siddhi.backend;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.cep.core.Expression;
import org.wso2.carbon.cep.core.backend.CEPBackEndRuntime;
import org.wso2.carbon.cep.core.exception.CEPConfigurationException;
import org.wso2.carbon.cep.core.internal.ds.CEPServiceValueHolder;
import org.wso2.carbon.cep.core.listener.CEPEventListener;
import org.wso2.carbon.cep.core.mapping.input.Input;
import org.wso2.carbon.cep.core.mapping.input.mapping.InputMapping;
import org.wso2.carbon.cep.core.mapping.input.mapping.MapInputMapping;
import org.wso2.carbon.cep.core.mapping.input.mapping.TupleInputMapping;
import org.wso2.carbon.cep.core.mapping.input.mapping.XMLInputMapping;
import org.wso2.carbon.cep.core.mapping.property.Property;
import org.wso2.carbon.databridge.commons.AttributeType;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.registry.core.exceptions.RegistryException;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.siddhi.query.compiler.exception.SiddhiPraserException;

/* loaded from: input_file:org/wso2/carbon/cep/siddhi/backend/SiddhiBackEndRuntime.class */
public class SiddhiBackEndRuntime implements CEPBackEndRuntime {
    private String bucketName;
    private SiddhiManager siddhiManager;
    private Map<String, InputHandler> siddhiInputHandlerMap;
    private int tenantId;
    private long persistenceTimeInterval;
    private static final Log log = LogFactory.getLog(SiddhiBackEndRuntime.class);
    static ConcurrentHashMap<String, Attribute.Type> javaToSiddhiType = new ConcurrentHashMap<>();
    private ScheduledExecutorService persistenceScheduler = Executors.newScheduledThreadPool(1);
    private Map<String, String> queryReferenceMap = new HashMap();

    /* loaded from: input_file:org/wso2/carbon/cep/siddhi/backend/SiddhiBackEndRuntime$PersistenceWorker.class */
    class PersistenceWorker implements Runnable {
        private SiddhiManager siddhiManager;

        PersistenceWorker(SiddhiManager siddhiManager) {
            this.siddhiManager = siddhiManager;
        }

        @Override // java.lang.Runnable
        public void run() {
            SiddhiBackEndRuntime.log.info("Siddhi persisting sates of bucket " + SiddhiBackEndRuntime.this.bucketName);
            this.siddhiManager.persist();
        }
    }

    public SiddhiBackEndRuntime(String str, SiddhiManager siddhiManager, Map<String, InputHandler> map, int i, long j) {
        this.bucketName = str;
        this.siddhiManager = siddhiManager;
        this.siddhiInputHandlerMap = map;
        this.tenantId = i;
        this.persistenceTimeInterval = j;
    }

    public void insertEvent(Object obj, InputMapping inputMapping) {
        try {
            this.siddhiInputHandlerMap.get(inputMapping.getStream()).send(((Event) obj).getTimeStamp(), ((Event) obj).getPayloadData());
        } catch (InterruptedException e) {
            log.error("Unable to send event: " + obj);
        }
    }

    public void addQuery(String str, Expression expression, CEPEventListener cEPEventListener) throws CEPConfigurationException {
        try {
            try {
                String addQuery = this.siddhiManager.addQuery(expression.getType().equals("inline") ? expression.getText() : readSourceTextFromRegistry(expression.getText().trim()));
                this.queryReferenceMap.put(str, addQuery);
                if (cEPEventListener != null) {
                    String streamId = this.siddhiManager.getQuery(addQuery).getOutputStream().getStreamId();
                    StreamDefinition streamDefinition = this.siddhiManager.getStreamDefinition(streamId);
                    cEPEventListener.defineStream(createStreamTypeDef(streamDefinition));
                    this.siddhiManager.addCallback(streamId, new SiddhiEventListner(streamDefinition, cEPEventListener));
                }
            } catch (SiddhiPraserException e) {
                throw new CEPConfigurationException("Error in query ", e);
            }
        } catch (RegistryException e2) {
            log.error("Error in reading query from registry");
            throw new CEPConfigurationException("Problem with reading query from registry " + e2);
        }
    }

    private org.wso2.carbon.databridge.commons.StreamDefinition createStreamTypeDef(StreamDefinition streamDefinition) {
        org.wso2.carbon.databridge.commons.StreamDefinition streamDefinition2 = new org.wso2.carbon.databridge.commons.StreamDefinition(streamDefinition.getStreamId());
        ArrayList arrayList = new ArrayList();
        for (Attribute attribute : streamDefinition.getAttributeList()) {
            Attribute.Type type = attribute.getType();
            if (type == Attribute.Type.STRING) {
                arrayList.add(new org.wso2.carbon.databridge.commons.Attribute(attribute.getName(), AttributeType.STRING));
            } else if (type == Attribute.Type.INT) {
                arrayList.add(new org.wso2.carbon.databridge.commons.Attribute(attribute.getName(), AttributeType.INT));
            } else if (type == Attribute.Type.LONG) {
                arrayList.add(new org.wso2.carbon.databridge.commons.Attribute(attribute.getName(), AttributeType.LONG));
            } else if (type == Attribute.Type.BOOL) {
                arrayList.add(new org.wso2.carbon.databridge.commons.Attribute(attribute.getName(), AttributeType.BOOL));
            } else if (type == Attribute.Type.LONG) {
                arrayList.add(new org.wso2.carbon.databridge.commons.Attribute(attribute.getName(), AttributeType.LONG));
            } else if (type == Attribute.Type.FLOAT) {
                arrayList.add(new org.wso2.carbon.databridge.commons.Attribute(attribute.getName(), AttributeType.FLOAT));
            } else if (type == Attribute.Type.DOUBLE) {
                arrayList.add(new org.wso2.carbon.databridge.commons.Attribute(attribute.getName(), AttributeType.DOUBLE));
            }
        }
        streamDefinition2.setPayloadData(arrayList);
        return streamDefinition2;
    }

    public void removeQuery(String str) throws CEPConfigurationException {
    }

    public void removeAllQueries() throws CEPConfigurationException {
    }

    public void addInput(Input input) throws CEPConfigurationException {
        TupleInputMapping inputMapping = input.getInputMapping();
        StreamDefinition streamDefinition = new StreamDefinition();
        streamDefinition.name(inputMapping.getStream());
        for (Property property : inputMapping instanceof TupleInputMapping ? inputMapping.getProperties() : inputMapping instanceof MapInputMapping ? ((MapInputMapping) inputMapping).getProperties() : ((XMLInputMapping) inputMapping).getProperties()) {
            streamDefinition.attribute(property.getName(), javaToSiddhiType.get(property.getType()));
        }
        if (this.siddhiManager.getStreamDefinition(streamDefinition.getStreamId()) == null) {
            this.siddhiInputHandlerMap.put(inputMapping.getStream(), this.siddhiManager.defineStream(streamDefinition));
        }
    }

    public void removeInput(Input input) throws CEPConfigurationException {
    }

    public void init() {
        this.siddhiManager.restoreLastRevision();
        this.persistenceScheduler.scheduleWithFixedDelay(new PersistenceWorker(this.siddhiManager), this.persistenceTimeInterval, this.persistenceTimeInterval, TimeUnit.MINUTES);
    }

    private String readSourceTextFromRegistry(String str) throws RegistryException {
        return new String((byte[]) CEPServiceValueHolder.getInstance().getRegistry(this.tenantId).get(str).getContent());
    }

    static {
        javaToSiddhiType.put(Integer.class.getName(), Attribute.Type.INT);
        javaToSiddhiType.put(String.class.getName(), Attribute.Type.STRING);
        javaToSiddhiType.put(Double.class.getName(), Attribute.Type.DOUBLE);
        javaToSiddhiType.put(Long.class.getName(), Attribute.Type.LONG);
        javaToSiddhiType.put(Float.class.getName(), Attribute.Type.FLOAT);
        javaToSiddhiType.put(Boolean.class.getName(), Attribute.Type.BOOL);
    }
}
