package org.wso2.carbon.bam.notification.task;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.xml.stream.XMLInputFactory;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.OrderedRows;
import me.prettyprint.hector.api.beans.Row;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.RangeSlicesQuery;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.impl.builder.StAXOMBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.bam.datasource.utils.DataSourceUtils;
import org.wso2.carbon.bam.notification.task.internal.NotificationDispatchComponent;
import org.wso2.carbon.databridge.agent.thrift.Agent;
import org.wso2.carbon.databridge.agent.thrift.DataPublisher;
import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.AttributeType;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.ndatasource.rdbms.RDBMSConfiguration;
import org.wso2.carbon.ntask.core.AbstractTask;
import org.wso2.carbon.utils.CarbonUtils;

/* loaded from: input_file:org/wso2/carbon/bam/notification/task/NotificationDispatchTask.class */
public class NotificationDispatchTask extends AbstractTask {
    private static final String DATA_BRIDGE_CONFIG_XML = "data-bridge-config.xml";
    private static final String DATA_BRIDGE_DIR = "data-bridge";
    private static final String STREAM_ID = "streamId";
    public static final String BAM_NOTIFICATION_CF = "bam_notification_messages";
    public static final String TASK_TYPE = "BAM_NOTIFICATION_DISPATCHER_TASK";
    public static final String TASK_NAME = "NOTIFIER";
    public static final int TASK_INTERVAL = 5000;
    public static final String BAM_CASSANDRA_UTIL_DATASOURCE = "WSO2BAM_UTIL_DATASOURCE";
    private static DataPublisher publisher;
    private static Keyspace keyspace;
    private static StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final Log log = LogFactory.getLog(NotificationDispatchTask.class);
    private static Map<String, StreamDefinition> streamDefs = new HashMap();
    private static Set<String> streamsDefinedInDataBridge = new HashSet();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.wso2.carbon.bam.notification.task.NotificationDispatchTask$1, reason: invalid class name */
    /* loaded from: input_file:org/wso2/carbon/bam/notification/task/NotificationDispatchTask$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$wso2$carbon$databridge$commons$AttributeType = new int[AttributeType.values().length];

        static {
            try {
                $SwitchMap$org$wso2$carbon$databridge$commons$AttributeType[AttributeType.BOOL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$wso2$carbon$databridge$commons$AttributeType[AttributeType.DOUBLE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$wso2$carbon$databridge$commons$AttributeType[AttributeType.FLOAT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$wso2$carbon$databridge$commons$AttributeType[AttributeType.INT.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$wso2$carbon$databridge$commons$AttributeType[AttributeType.LONG.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$wso2$carbon$databridge$commons$AttributeType[AttributeType.STRING.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* loaded from: input_file:org/wso2/carbon/bam/notification/task/NotificationDispatchTask$Record.class */
    public static class Record {
        private String streamId;
        private Object[] data;

        public Record(String str, Map<String, String> map) {
            this.streamId = str;
            this.data = convertStreamData(str, map);
        }

        public String getStreamId() {
            return this.streamId;
        }

        public Object[] getData() {
            return this.data;
        }

        private Object[] convertStreamData(String str, Map<String, String> map) {
            try {
                StreamDefinition streamDef = NotificationDispatchTask.getStreamDef(str);
                if (streamDef == null) {
                    return null;
                }
                ArrayList arrayList = new ArrayList();
                for (Attribute attribute : streamDef.getPayloadData()) {
                    arrayList.add(convert(map.get(attribute.getName()), attribute.getType()));
                }
                return arrayList.toArray();
            } catch (Exception e) {
                return null;
            }
        }

        private Object convert(String str, AttributeType attributeType) {
            if (str == null) {
                return null;
            }
            switch (AnonymousClass1.$SwitchMap$org$wso2$carbon$databridge$commons$AttributeType[attributeType.ordinal()]) {
                case 1:
                    return Boolean.valueOf(Boolean.parseBoolean(str));
                case 2:
                    return Double.valueOf(Double.parseDouble(str));
                case 3:
                    return Float.valueOf(Float.parseFloat(str));
                case 4:
                    return Integer.valueOf(Integer.parseInt(str));
                case 5:
                    return Long.valueOf(Long.parseLong(str));
                case 6:
                    return str;
                default:
                    return str;
            }
        }
    }

    private void initPublisherKS() throws Exception {
        if (publisher == null) {
            Agent agent = new Agent(new AgentConfiguration());
            String[] agentURLs = getAgentURLs();
            String[] credentials = getCredentials();
            publisher = new DataPublisher(agentURLs[0], agentURLs[1], credentials[0], credentials[1], agent);
        }
        keyspace = (Keyspace) DataSourceUtils.getClusterKeyspaceFromRDBMSDataSource(-1234, BAM_CASSANDRA_UTIL_DATASOURCE)[1];
        if (keyspace == null) {
            log.warn("Unable to retrieve keyspace in initializing NotificationDispatchTask");
        }
    }

    private OMElement loadDataBridgeConfig() throws Exception {
        try {
            OMElement documentElement = new StAXOMBuilder(XMLInputFactory.newInstance().createXMLStreamReader(new BufferedInputStream(new FileInputStream(new File(CarbonUtils.getCarbonConfigDirPath() + File.separator + DATA_BRIDGE_DIR + File.separator + DATA_BRIDGE_CONFIG_XML))))).getDocumentElement();
            documentElement.build();
            return documentElement;
        } catch (Exception e) {
            log.error("Error in reading data bridge configuration: " + e.getMessage(), e);
            throw e;
        }
    }

    private int[] loadThriftReceiverPorts() {
        try {
            OMElement oMElement = (OMElement) loadDataBridgeConfig().getChildrenWithLocalName("thriftDataReceiver").next();
            return new int[]{Integer.parseInt(((OMElement) oMElement.getChildrenWithLocalName("securePort").next()).getText()), Integer.parseInt(((OMElement) oMElement.getChildrenWithLocalName("port").next()).getText())};
        } catch (Exception e) {
            return new int[]{7611, 7711};
        }
    }

    private String[] getAgentURLs() {
        String property = System.getProperty("carbon.local.ip");
        String property2 = System.getProperty("portOffset");
        int[] loadThriftReceiverPorts = loadThriftReceiverPorts();
        int i = loadThriftReceiverPorts[0];
        int i2 = loadThriftReceiverPorts[1];
        if (property2 != null) {
            int parseInt = Integer.parseInt(property2);
            i += parseInt;
            i2 += parseInt;
        }
        return new String[]{"ssl://" + property + ":" + i, "tcp://" + property + ":" + i2};
    }

    private String[] getCredentials() throws Exception {
        RDBMSConfiguration rDBMSDataSourceConfig = DataSourceUtils.getRDBMSDataSourceConfig(-1234, BAM_CASSANDRA_UTIL_DATASOURCE);
        if (rDBMSDataSourceConfig == null) {
            throw new Exception("The WSO2 BAM Util Cassandra Data Source is not available");
        }
        return new String[]{rDBMSDataSourceConfig.getUsername(), rDBMSDataSourceConfig.getPassword()};
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static StreamDefinition getStreamDef(String str) throws Exception {
        StreamDefinition streamDefinition = streamDefs.get(str);
        if (streamDefinition == null) {
            streamDefinition = NotificationDispatchComponent.getEventStreamService().getStreamDefinitionFromStore(str, -1234);
            if (streamDefinition != null) {
                streamDefs.put(str, streamDefinition);
            }
        }
        return streamDefinition;
    }

    public void execute() {
        try {
            if (publisher == null || keyspace == null) {
                initPublisherKS();
            }
            if (publisher == null || keyspace == null) {
                return;
            }
            processNotificationRecords();
        } catch (Exception e) {
            log.error("Error executing notification dispatch task: " + e.getMessage(), e);
        }
    }

    private void processNotificationRecords() throws Exception {
        RangeSlicesQuery createRangeSlicesQuery = HFactory.createRangeSlicesQuery(keyspace, STRING_SERIALIZER, STRING_SERIALIZER, STRING_SERIALIZER);
        createRangeSlicesQuery.setColumnFamily(BAM_NOTIFICATION_CF);
        createRangeSlicesQuery.setKeys("", "");
        createRangeSlicesQuery.setRange((Object) null, (Object) null, false, Integer.MAX_VALUE);
        for (Row<String, String, String> row : ((OrderedRows) createRangeSlicesQuery.execute().get()).getList()) {
            Record recordFromRow = recordFromRow(row);
            if (recordFromRow != null) {
                publishData(recordFromRow);
            }
            deleteRow((String) row.getKey());
        }
    }

    private void deleteRow(String str) {
        Mutator createMutator = HFactory.createMutator(keyspace, STRING_SERIALIZER);
        createMutator.addDeletion(str, BAM_NOTIFICATION_CF);
        createMutator.execute();
    }

    private Record recordFromRow(Row<String, String, String> row) {
        HColumn columnByName = row.getColumnSlice().getColumnByName(STREAM_ID);
        if (columnByName == null) {
            return null;
        }
        String str = (String) columnByName.getValue();
        HashMap hashMap = new HashMap();
        for (HColumn hColumn : row.getColumnSlice().getColumns()) {
            if (!((String) hColumn.getName()).equals(STREAM_ID)) {
                hashMap.put(hColumn.getName(), hColumn.getValue());
            }
        }
        Record record = new Record(str, hashMap);
        if (record.getData() == null) {
            return null;
        }
        return record;
    }

    private void defineStream(String str) throws Exception {
        StreamDefinition streamDef = getStreamDef(str);
        if (publisher.findStreamId(streamDef.getName(), streamDef.getVersion()) != null) {
            return;
        }
        publisher.defineStream(streamDef);
    }

    private synchronized void publishData(Record record) throws Exception {
        Event event = new Event();
        event.setStreamId(record.getStreamId());
        event.setPayloadData(record.getData());
        event.setTimeStamp(System.currentTimeMillis());
        if (!streamsDefinedInDataBridge.contains(record.getStreamId())) {
            defineStream(record.getStreamId());
            streamsDefinedInDataBridge.add(record.getStreamId());
        }
        publisher.publish(event);
    }
}
