package org.wso2.carbon.databridge.streamdefn.cassandra.datastore;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import me.prettyprint.cassandra.model.BasicColumnFamilyDefinition;
import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
import me.prettyprint.cassandra.serializers.LongSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.service.ThriftCfDef;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.OrderedRows;
import me.prettyprint.hector.api.beans.Row;
import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.ColumnQuery;
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.RangeSlicesQuery;
import me.prettyprint.hector.api.query.SliceQuery;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.AttributeType;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
import org.wso2.carbon.databridge.commons.utils.EventDefinitionConverterUtils;
import org.wso2.carbon.databridge.core.Utils.DataBridgeUtils;
import org.wso2.carbon.databridge.core.exception.EventProcessingException;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionStoreException;
import org.wso2.carbon.databridge.streamdefn.cassandra.Utils.CassandraSDSUtils;
import org.wso2.carbon.databridge.streamdefn.cassandra.exception.NullValueException;
import org.wso2.carbon.databridge.streamdefn.cassandra.inserter.BoolInserter;
import org.wso2.carbon.databridge.streamdefn.cassandra.inserter.DoubleInserter;
import org.wso2.carbon.databridge.streamdefn.cassandra.inserter.FloatInserter;
import org.wso2.carbon.databridge.streamdefn.cassandra.inserter.IntInserter;
import org.wso2.carbon.databridge.streamdefn.cassandra.inserter.LongInserter;
import org.wso2.carbon.databridge.streamdefn.cassandra.inserter.StringInserter;
import org.wso2.carbon.databridge.streamdefn.cassandra.inserter.TypeInserter;
import org.wso2.carbon.databridge.streamdefn.cassandra.internal.util.ServiceHolder;
import org.wso2.carbon.databridge.streamdefn.cassandra.internal.util.Utils;
import org.wso2.carbon.utils.CarbonUtils;

/* loaded from: input_file:org/wso2/carbon/databridge/streamdefn/cassandra/datastore/CassandraConnector.class */
public class CassandraConnector {
    private static final String STREAM_NAME_KEY = "Name";
    private static final String STREAM_VERSION_KEY = "Version";
    private static final String STREAM_NICK_NAME_KEY = "Nick_Name";
    private static final String STREAM_TIMESTAMP_KEY = "Timestamp";
    private static final String STREAM_DESCRIPTION_KEY = "Description";
    private static final String STREAM_ID_KEY = "StreamId";
    public static final String BAM_META_STREAM_ID_CF = "STREAM_ID";
    public static final String BAM_META_STREAM_DEF_CF = "STREAM_DEFINITION";
    public static final String BAM_META_STREAM_ID_KEY_CF = "STREAM_DEFINITION_ID_TO_KEY";
    public static final String BAM_META_KEYSPACE = "META_KS";
    public static final String BAM_EVENT_DATA_KEYSPACE = "EVENT_KS";
    private static final String STREAM_ID = "STREAM_DEFINITION_ID";
    private static final String STREAM_DEF = "STREAM_DEFINITION";
    private AtomicInteger eventCounter = new AtomicInteger();
    private AtomicInteger rowkeyCounter = new AtomicInteger();
    private Map<AttributeType, TypeInserter> inserterMap = new ConcurrentHashMap();
    private int port;
    private String localAddress;
    private static final StringSerializer stringSerializer = StringSerializer.get();
    private static final LongSerializer longSerializer = LongSerializer.get();
    private static final ByteBufferSerializer byteBufferSerializer = ByteBufferSerializer.get();
    static Log log = LogFactory.getLog(CassandraConnector.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/carbon/databridge/streamdefn/cassandra/datastore/CassandraConnector$StreamDefnCache.class */
    public static class StreamDefnCache {
        private static LoadingCache<StreamIdClusterBean, StreamDefinition> streamDefnCache = null;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/wso2/carbon/databridge/streamdefn/cassandra/datastore/CassandraConnector$StreamDefnCache$StreamIdClusterBean.class */
        public static class StreamIdClusterBean {
            private Cluster cluster;
            private String streamId;

            private StreamIdClusterBean(Cluster cluster, String str) {
                this.cluster = cluster;
                this.streamId = str;
            }

            public Cluster getCluster() {
                return this.cluster;
            }

            public String getStreamId() {
                return this.streamId;
            }

            public boolean equals(Object obj) {
                if (this == obj) {
                    return true;
                }
                if (obj == null || getClass() != obj.getClass()) {
                    return false;
                }
                StreamIdClusterBean streamIdClusterBean = (StreamIdClusterBean) obj;
                return this.cluster.equals(streamIdClusterBean.cluster) && this.streamId.equals(streamIdClusterBean.streamId);
            }

            public int hashCode() {
                return (31 * this.cluster.hashCode()) + this.streamId.hashCode();
            }
        }

        private StreamDefnCache() {
        }

        private static void init() {
            synchronized (StreamDefnCache.class) {
                if (streamDefnCache != null) {
                    return;
                }
                streamDefnCache = CacheBuilder.newBuilder().maximumSize(1000L).expireAfterAccess(30L, TimeUnit.MINUTES).build(new CacheLoader<StreamIdClusterBean, StreamDefinition>() { // from class: org.wso2.carbon.databridge.streamdefn.cassandra.datastore.CassandraConnector.StreamDefnCache.1
                    public StreamDefinition load(StreamIdClusterBean streamIdClusterBean) throws Exception {
                        ColumnQuery createStringColumnQuery = HFactory.createStringColumnQuery(HFactory.createKeyspace(CassandraConnector.BAM_META_KEYSPACE, streamIdClusterBean.getCluster()));
                        createStringColumnQuery.setColumnFamily("STREAM_DEFINITION").setKey(streamIdClusterBean.getStreamId()).setName("STREAM_DEFINITION");
                        HColumn hColumn = (HColumn) createStringColumnQuery.execute().get();
                        if (hColumn == null) {
                            throw new NullValueException("No value found");
                        }
                        try {
                            return EventDefinitionConverterUtils.convertFromJson((String) hColumn.getValue());
                        } catch (MalformedStreamDefinitionException e) {
                            throw new StreamDefinitionStoreException("Retrieved definition from Cassandra store is malformed. Retrieved value : " + ((String) hColumn.getValue()));
                        }
                    }
                });
            }
        }

        public static StreamDefinition getStreamDefinition(Cluster cluster, String str) throws ExecutionException {
            init();
            return (StreamDefinition) streamDefnCache.get(new StreamIdClusterBean(cluster, str));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/carbon/databridge/streamdefn/cassandra/datastore/CassandraConnector$StreamIdCache.class */
    public static class StreamIdCache {
        private static LoadingCache<StreamKeyClusterBean, String> streamIdCache = null;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/wso2/carbon/databridge/streamdefn/cassandra/datastore/CassandraConnector$StreamIdCache$StreamKeyClusterBean.class */
        public static class StreamKeyClusterBean {
            private Cluster cluster;
            private String streamKey;

            private StreamKeyClusterBean(Cluster cluster, String str) {
                this.cluster = cluster;
                this.streamKey = str;
            }

            public Cluster getCluster() {
                return this.cluster;
            }

            public String getStreamKey() {
                return this.streamKey;
            }

            public boolean equals(Object obj) {
                if (this == obj) {
                    return true;
                }
                if (obj == null || getClass() != obj.getClass()) {
                    return false;
                }
                StreamKeyClusterBean streamKeyClusterBean = (StreamKeyClusterBean) obj;
                return this.cluster.equals(streamKeyClusterBean.cluster) && this.streamKey.equals(streamKeyClusterBean.streamKey);
            }

            public int hashCode() {
                return (31 * this.cluster.hashCode()) + this.streamKey.hashCode();
            }
        }

        private StreamIdCache() {
        }

        private static void init() {
            synchronized (StreamIdCache.class) {
                if (streamIdCache != null) {
                    return;
                }
                streamIdCache = CacheBuilder.newBuilder().maximumSize(1000L).expireAfterAccess(30L, TimeUnit.MINUTES).build(new CacheLoader<StreamKeyClusterBean, String>() { // from class: org.wso2.carbon.databridge.streamdefn.cassandra.datastore.CassandraConnector.StreamIdCache.1
                    public String load(StreamKeyClusterBean streamKeyClusterBean) throws Exception {
                        ColumnQuery createStringColumnQuery = HFactory.createStringColumnQuery(HFactory.createKeyspace(CassandraConnector.BAM_META_KEYSPACE, streamKeyClusterBean.getCluster()));
                        createStringColumnQuery.setColumnFamily(CassandraConnector.BAM_META_STREAM_ID_CF).setKey(streamKeyClusterBean.getStreamKey()).setName(CassandraConnector.STREAM_ID);
                        HColumn hColumn = (HColumn) createStringColumnQuery.execute().get();
                        if (hColumn != null) {
                            return (String) hColumn.getValue();
                        }
                        throw new NullValueException("No value found");
                    }
                });
            }
        }

        public static String getStreamIdFromStreamKey(Cluster cluster, String str) throws ExecutionException {
            init();
            return (String) streamIdCache.get(new StreamKeyClusterBean(cluster, str));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/carbon/databridge/streamdefn/cassandra/datastore/CassandraConnector$StreamKeyCache.class */
    public static class StreamKeyCache {
        private static LoadingCache<StreamIdClusterBean, String> streamKeyCache = null;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/wso2/carbon/databridge/streamdefn/cassandra/datastore/CassandraConnector$StreamKeyCache$StreamIdClusterBean.class */
        public static class StreamIdClusterBean {
            private Cluster cluster;
            private String streamId;

            private StreamIdClusterBean(Cluster cluster, String str) {
                this.cluster = cluster;
                this.streamId = str;
            }

            public Cluster getCluster() {
                return this.cluster;
            }

            public String getStreamId() {
                return this.streamId;
            }

            public boolean equals(Object obj) {
                if (this == obj) {
                    return true;
                }
                if (obj == null || getClass() != obj.getClass()) {
                    return false;
                }
                StreamIdClusterBean streamIdClusterBean = (StreamIdClusterBean) obj;
                return this.cluster.equals(streamIdClusterBean.cluster) && this.streamId.equals(streamIdClusterBean.streamId);
            }

            public int hashCode() {
                return (31 * this.cluster.hashCode()) + this.streamId.hashCode();
            }
        }

        private StreamKeyCache() {
        }

        private static void init() {
            synchronized (StreamKeyCache.class) {
                if (streamKeyCache != null) {
                    return;
                }
                streamKeyCache = CacheBuilder.newBuilder().maximumSize(1000L).expireAfterAccess(30L, TimeUnit.MINUTES).build(new CacheLoader<StreamIdClusterBean, String>() { // from class: org.wso2.carbon.databridge.streamdefn.cassandra.datastore.CassandraConnector.StreamKeyCache.1
                    public String load(StreamIdClusterBean streamIdClusterBean) throws Exception {
                        HColumn hColumn = (HColumn) HFactory.createStringColumnQuery(HFactory.createKeyspace(CassandraConnector.BAM_META_KEYSPACE, streamIdClusterBean.getCluster())).setColumnFamily(CassandraConnector.BAM_META_STREAM_ID_KEY_CF).setKey(streamIdClusterBean.getStreamId()).setName(CassandraConnector.STREAM_ID_KEY).execute().get();
                        if (hColumn != null) {
                            return (String) hColumn.getValue();
                        }
                        throw new NullValueException("No value found");
                    }
                });
            }
        }

        public static String getStreamKeyFromStreamId(Cluster cluster, String str) throws ExecutionException {
            init();
            return (String) streamKeyCache.get(new StreamIdClusterBean(cluster, str));
        }
    }

    public CassandraConnector() {
        this.port = 0;
        this.localAddress = null;
        try {
            this.port = CarbonUtils.getTransportPort(ServiceHolder.getConfigurationContextService().getServerConfigContext().getAxisConfiguration(), "https") + Integer.parseInt(CarbonUtils.getServerConfiguration().getFirstProperty("Ports.Offset"));
            this.localAddress = Utils.getLocalAddress().getHostAddress();
        } catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.warn("Error when detecting Host/Port, using defaults");
            }
            this.localAddress = this.localAddress == null ? "127.0.0.1" : this.localAddress;
            this.port = this.port == 0 ? 9443 : this.port;
        }
        createInserterMap();
    }

    private Mutator<String> getMutator(Cluster cluster) throws StreamDefinitionStoreException {
        return HFactory.createMutator(HFactory.createKeyspace(BAM_EVENT_DATA_KEYSPACE, cluster), stringSerializer);
    }

    private void commit(Mutator mutator) throws StreamDefinitionStoreException {
        mutator.execute();
    }

    private void createInserterMap() {
        this.inserterMap.put(AttributeType.INT, new IntInserter());
        this.inserterMap.put(AttributeType.BOOL, new BoolInserter());
        this.inserterMap.put(AttributeType.LONG, new LongInserter());
        this.inserterMap.put(AttributeType.FLOAT, new FloatInserter());
        this.inserterMap.put(AttributeType.STRING, new StringInserter());
        this.inserterMap.put(AttributeType.DOUBLE, new DoubleInserter());
    }

    public void createColumnFamily(Cluster cluster, String str, String str2) {
        synchronized (CassandraConnector.class) {
            Keyspace createKeyspace = HFactory.createKeyspace(str, cluster);
            Iterator it = cluster.describeKeyspace(createKeyspace.getKeyspaceName()).getCfDefs().iterator();
            while (it.hasNext()) {
                if (((ColumnFamilyDefinition) it.next()).getName().equals(str2)) {
                    if (log.isDebugEnabled()) {
                        log.debug("Column Family " + str2 + " already exists.");
                    }
                    return;
                }
            }
            BasicColumnFamilyDefinition basicColumnFamilyDefinition = new BasicColumnFamilyDefinition();
            basicColumnFamilyDefinition.setKeyspaceName(str);
            basicColumnFamilyDefinition.setName(str2);
            cluster.addColumnFamily(new ThriftCfDef(basicColumnFamilyDefinition), true);
            KeyspaceDefinition describeKeyspace = cluster.describeKeyspace(createKeyspace.getKeyspaceName());
            for (int i = 0; i < 100; i++) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
                Iterator it2 = describeKeyspace.getCfDefs().iterator();
                while (it2.hasNext()) {
                    if (((ColumnFamilyDefinition) it2.next()).getName().equals(str2)) {
                        if (log.isDebugEnabled()) {
                            log.debug("Column Family " + str2 + " already exists.");
                        }
                        return;
                    }
                }
            }
            throw new RuntimeException("The column family " + str2 + " was  not created");
        }
    }

    public boolean createKeySpaceIfNotExisting(Cluster cluster, String str) {
        if (cluster.describeKeyspace(str) != null) {
            return false;
        }
        cluster.addKeyspace(HFactory.createKeyspaceDefinition(str));
        KeyspaceDefinition describeKeyspace = cluster.describeKeyspace(str);
        while (describeKeyspace == null && 0 < 100) {
            try {
                Thread.sleep(100L);
                describeKeyspace = cluster.describeKeyspace(str);
                if (describeKeyspace != null) {
                    return true;
                }
            } catch (InterruptedException e) {
                return true;
            }
        }
        return true;
    }

    public List<String> insertEventList(Cluster cluster, List<Event> list) throws StreamDefinitionStoreException {
        Mutator<String> mutator = getMutator(cluster);
        ArrayList arrayList = new ArrayList();
        for (Event event : list) {
            StreamDefinition streamDefinitionFromStore = getStreamDefinitionFromStore(cluster, event.getStreamId());
            String cFNameFromStreamId = getCFNameFromStreamId(cluster, event.getStreamId());
            if (streamDefinitionFromStore == null || cFNameFromStreamId == null) {
                log.error("Event stream definition or column family cannot be null");
                throw new StreamDefinitionStoreException("Event stream definition or column family cannot be null");
            }
            if (log.isTraceEnabled()) {
                KeyspaceDefinition describeKeyspace = cluster.describeKeyspace(BAM_EVENT_DATA_KEYSPACE);
                log.trace("Keyspace desc. : " + describeKeyspace);
                String str = "CFs present \n";
                Iterator it = describeKeyspace.getCfDefs().iterator();
                while (it.hasNext()) {
                    str = str + "cf name : " + ((ColumnFamilyDefinition) it.next()).getName() + "\n";
                }
                log.trace(str);
            }
            this.eventCounter.incrementAndGet();
            long timeStamp = event.getTimeStamp() != 0 ? event.getTimeStamp() : System.currentTimeMillis();
            String createRowKey = CassandraSDSUtils.createRowKey(timeStamp, this.localAddress, this.port, this.rowkeyCounter.incrementAndGet());
            String description = streamDefinitionFromStore.getDescription();
            String nickName = streamDefinitionFromStore.getNickName();
            mutator.addInsertion(createRowKey, cFNameFromStreamId, HFactory.createStringColumn(STREAM_ID_KEY, streamDefinitionFromStore.getStreamId()));
            mutator.addInsertion(createRowKey, cFNameFromStreamId, HFactory.createStringColumn(STREAM_NAME_KEY, streamDefinitionFromStore.getName()));
            mutator.addInsertion(createRowKey, cFNameFromStreamId, HFactory.createStringColumn(STREAM_VERSION_KEY, streamDefinitionFromStore.getVersion()));
            if (description != null) {
                mutator.addInsertion(createRowKey, cFNameFromStreamId, HFactory.createStringColumn(STREAM_DESCRIPTION_KEY, description));
            }
            if (nickName != null) {
                mutator.addInsertion(createRowKey, cFNameFromStreamId, HFactory.createStringColumn(STREAM_NICK_NAME_KEY, nickName));
            }
            mutator.addInsertion(createRowKey, cFNameFromStreamId, HFactory.createColumn(STREAM_TIMESTAMP_KEY, Long.valueOf(timeStamp), stringSerializer, longSerializer));
            if (streamDefinitionFromStore.getMetaData() != null) {
                prepareDataForInsertion(event.getMetaData(), streamDefinitionFromStore.getMetaData(), DataType.meta, createRowKey, cFNameFromStreamId, mutator);
            }
            if (event.getCorrelationData() != null) {
                prepareDataForInsertion(event.getCorrelationData(), streamDefinitionFromStore.getCorrelationData(), DataType.correlation, createRowKey, cFNameFromStreamId, mutator);
            }
            if (event.getPayloadData() != null) {
                prepareDataForInsertion(event.getPayloadData(), streamDefinitionFromStore.getPayloadData(), DataType.payload, createRowKey, cFNameFromStreamId, mutator);
            }
            arrayList.add(createRowKey);
        }
        commit(mutator);
        return arrayList;
    }

    public String insertEvent(Cluster cluster, Event event) throws MalformedStreamDefinitionException, StreamDefinitionStoreException {
        StreamDefinition streamDefinitionFromStore = getStreamDefinitionFromStore(cluster, event.getStreamId());
        String cFNameFromStreamId = getCFNameFromStreamId(cluster, event.getStreamId());
        if (streamDefinitionFromStore == null || cFNameFromStreamId == null) {
            log.error("Event stream definition or column family cannot be null");
            throw new StreamDefinitionStoreException("Event stream definition or column family cannot be null");
        }
        KeyspaceDefinition describeKeyspace = cluster.describeKeyspace(BAM_EVENT_DATA_KEYSPACE);
        if (log.isTraceEnabled()) {
            log.trace("Keyspace desc. : " + describeKeyspace);
        }
        if (log.isTraceEnabled()) {
            String str = "CFs present \n";
            Iterator it = describeKeyspace.getCfDefs().iterator();
            while (it.hasNext()) {
                str = str + "cf name : " + ((ColumnFamilyDefinition) it.next()).getName() + "\n";
            }
            log.trace(str);
        }
        this.eventCounter.incrementAndGet();
        Mutator<String> mutator = getMutator(cluster);
        long timeStamp = event.getTimeStamp() != 0 ? event.getTimeStamp() : System.currentTimeMillis();
        String createRowKey = CassandraSDSUtils.createRowKey(timeStamp, this.localAddress, this.port, this.rowkeyCounter.incrementAndGet());
        String description = streamDefinitionFromStore.getDescription();
        String nickName = streamDefinitionFromStore.getNickName();
        mutator.addInsertion(createRowKey, cFNameFromStreamId, HFactory.createStringColumn(STREAM_ID_KEY, streamDefinitionFromStore.getStreamId()));
        mutator.addInsertion(createRowKey, cFNameFromStreamId, HFactory.createStringColumn(STREAM_NAME_KEY, streamDefinitionFromStore.getName()));
        mutator.addInsertion(createRowKey, cFNameFromStreamId, HFactory.createStringColumn(STREAM_VERSION_KEY, streamDefinitionFromStore.getVersion()));
        if (description != null) {
            mutator.addInsertion(createRowKey, cFNameFromStreamId, HFactory.createStringColumn(STREAM_DESCRIPTION_KEY, description));
        }
        if (nickName != null) {
            mutator.addInsertion(createRowKey, cFNameFromStreamId, HFactory.createStringColumn(STREAM_NICK_NAME_KEY, nickName));
        }
        mutator.addInsertion(createRowKey, cFNameFromStreamId, HFactory.createColumn(STREAM_TIMESTAMP_KEY, Long.valueOf(timeStamp), stringSerializer, longSerializer));
        if (streamDefinitionFromStore.getMetaData() != null) {
            prepareDataForInsertion(event.getMetaData(), streamDefinitionFromStore.getMetaData(), DataType.meta, createRowKey, cFNameFromStreamId, mutator);
        }
        if (event.getCorrelationData() != null) {
            prepareDataForInsertion(event.getCorrelationData(), streamDefinitionFromStore.getCorrelationData(), DataType.correlation, createRowKey, cFNameFromStreamId, mutator);
        }
        if (event.getPayloadData() != null) {
            prepareDataForInsertion(event.getPayloadData(), streamDefinitionFromStore.getPayloadData(), DataType.payload, createRowKey, cFNameFromStreamId, mutator);
        }
        commit(mutator);
        return createRowKey;
    }

    private Mutator prepareDataForInsertion(Object[] objArr, List<Attribute> list, DataType dataType, String str, String str2, Mutator<String> mutator) {
        for (int i = 0; i < list.size(); i++) {
            Attribute attribute = list.get(i);
            this.inserterMap.get(attribute.getType()).addDataToBatchInsertion(objArr[i], str2, CassandraSDSUtils.getColumnName(dataType, attribute), str, mutator);
        }
        return mutator;
    }

    public Event getEvent(Cluster cluster, String str, String str2) throws EventProcessingException {
        try {
            StreamDefinition streamDefinitionFromStore = getStreamDefinitionFromStore(cluster, str);
            List payloadData = streamDefinitionFromStore.getPayloadData();
            List correlationData = streamDefinitionFromStore.getCorrelationData();
            List metaData = streamDefinitionFromStore.getMetaData();
            SliceQuery createSliceQuery = HFactory.createSliceQuery(HFactory.createKeyspace(BAM_EVENT_DATA_KEYSPACE, cluster), stringSerializer, stringSerializer, byteBufferSerializer);
            createSliceQuery.setKey(str2).setRange("", "", true, Integer.MAX_VALUE).setColumnFamily(getCFNameFromStreamId(cluster, str));
            ColumnSlice<String, ByteBuffer> columnSlice = (ColumnSlice) createSliceQuery.execute().get();
            Event event = new Event();
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            try {
                event.setStreamId(CassandraSDSUtils.getString((ByteBuffer) columnSlice.getColumnByName(STREAM_ID_KEY).getValue()));
                event.setTimeStamp(CassandraSDSUtils.getLong((ByteBuffer) columnSlice.getColumnByName(STREAM_TIMESTAMP_KEY).getValue()));
                if (payloadData != null) {
                    Iterator it = payloadData.iterator();
                    while (it.hasNext()) {
                        arrayList3.add(getValueForDataTypeList(columnSlice, (Attribute) it.next(), DataType.payload));
                    }
                }
                if (metaData != null) {
                    Iterator it2 = metaData.iterator();
                    while (it2.hasNext()) {
                        arrayList.add(getValueForDataTypeList(columnSlice, (Attribute) it2.next(), DataType.meta));
                    }
                }
                if (correlationData != null) {
                    Iterator it3 = correlationData.iterator();
                    while (it3.hasNext()) {
                        arrayList2.add(Boolean.valueOf(arrayList2.add(getValueForDataTypeList(columnSlice, (Attribute) it3.next(), DataType.correlation))));
                    }
                }
                Object[] array = metaData == null ? null : arrayList.toArray();
                Object[] array2 = correlationData == null ? null : arrayList2.toArray();
                Object[] array3 = payloadData == null ? null : arrayList3.toArray();
                event.setMetaData(array);
                event.setCorrelationData(array2);
                event.setPayloadData(array3);
                return event;
            } catch (IOException e) {
                log.error("Error during event data conversions.", e);
                throw new EventProcessingException("Error during event data conversions.", e);
            }
        } catch (StreamDefinitionStoreException e2) {
            String str3 = "Error processing stream definition for stream Id : " + str;
            log.error(str3, e2);
            throw new EventProcessingException(str3, e2);
        }
    }

    private String getCFNameFromStreamId(Cluster cluster, String str) {
        return CassandraSDSUtils.convertStreamNameToCFName(DataBridgeUtils.getStreamNameFromStreamKey(getStreamKeyFromStreamId(cluster, str)));
    }

    private Object getValueForDataTypeList(ColumnSlice<String, ByteBuffer> columnSlice, Attribute attribute, DataType dataType) throws IOException {
        return CassandraSDSUtils.getOriginalValueFromColumnValue((ByteBuffer) columnSlice.getColumnByName(CassandraSDSUtils.getColumnName(dataType, attribute)).getValue(), attribute.getType());
    }

    public String getStreamKeyFromStreamId(Cluster cluster, StreamDefinition streamDefinition) {
        return getStreamKeyFromStreamId(cluster, streamDefinition.getStreamId());
    }

    public String getStreamKeyFromStreamId(Cluster cluster, String str) {
        try {
            return StreamKeyCache.getStreamKeyFromStreamId(cluster, str);
        } catch (ExecutionException e) {
            return null;
        }
    }

    public void saveStreamDefinitionToStore(Cluster cluster, StreamDefinition streamDefinition) {
        saveStreamDefinitionToStore(cluster, streamDefinition.getStreamId(), streamDefinition);
    }

    public void saveStreamDefinitionToStore(Cluster cluster, String str, StreamDefinition streamDefinition) {
        Keyspace createKeyspace = HFactory.createKeyspace(BAM_META_KEYSPACE, cluster);
        createColumnFamily(cluster, BAM_EVENT_DATA_KEYSPACE, DataBridgeUtils.getStreamNameFromStreamKey(CassandraSDSUtils.convertStreamNameToCFName(DataBridgeUtils.getStreamNameFromStreamKey(getStreamKeyFromStreamId(cluster, str)))));
        Mutator createMutator = HFactory.createMutator(createKeyspace, stringSerializer);
        createMutator.addInsertion(str, "STREAM_DEFINITION", HFactory.createStringColumn("STREAM_DEFINITION", EventDefinitionConverterUtils.convertToJson(streamDefinition)));
        createMutator.execute();
        log.info("Saving Stream Definition : " + streamDefinition);
        if (log.isDebugEnabled()) {
            String str2 = "saveStreamDefinition executed. \n";
            try {
                str2 = str2 + " stream definition saved : " + getStreamDefinitionFromStore(cluster, str).toString() + " \n";
            } catch (StreamDefinitionStoreException e) {
                log.error(e.getErrorMessage(), e);
            }
            log.debug(str2);
        }
    }

    public void saveStreamIdToStore(Cluster cluster, StreamDefinition streamDefinition) {
        saveStreamIdToStore(cluster, DataBridgeUtils.constructStreamKey(streamDefinition.getName(), streamDefinition.getVersion()), streamDefinition.getStreamId());
    }

    public void saveStreamIdToStore(Cluster cluster, String str, String str2) {
        Mutator createMutator = HFactory.createMutator(HFactory.createKeyspace(BAM_META_KEYSPACE, cluster), stringSerializer);
        createMutator.addInsertion(str2, BAM_META_STREAM_ID_KEY_CF, HFactory.createStringColumn(STREAM_ID_KEY, str));
        createMutator.addInsertion(str, BAM_META_STREAM_ID_CF, HFactory.createStringColumn(STREAM_ID, str2));
        createMutator.execute();
        if (log.isDebugEnabled()) {
            log.debug("saveStreamID executed. \n stream id saved : " + getStreamIdFromStore(cluster, str) + " \n");
        }
    }

    public String getStreamIdFromStore(Cluster cluster, StreamDefinition streamDefinition) {
        return getStreamIdFromStore(cluster, DataBridgeUtils.constructStreamKey(streamDefinition.getName(), streamDefinition.getVersion()));
    }

    public String getStreamIdFromStore(Cluster cluster, String str) {
        try {
            return StreamIdCache.getStreamIdFromStreamKey(cluster, str);
        } catch (ExecutionException e) {
            return null;
        }
    }

    public StreamDefinition getStreamDefinitionFromStore(Cluster cluster, String str) throws StreamDefinitionStoreException {
        try {
            return StreamDefnCache.getStreamDefinition(cluster, str);
        } catch (ExecutionException e) {
            return null;
        }
    }

    public Collection<StreamDefinition> getAllStreamDefinitionFromStore(Cluster cluster) throws StreamDefinitionStoreException {
        StreamDefinition streamDefinitionFromStore;
        ArrayList arrayList = new ArrayList();
        RangeSlicesQuery createRangeSlicesQuery = HFactory.createRangeSlicesQuery(HFactory.createKeyspace(BAM_META_KEYSPACE, cluster), stringSerializer, stringSerializer, stringSerializer);
        createRangeSlicesQuery.setColumnFamily(BAM_META_STREAM_ID_CF);
        createRangeSlicesQuery.setKeys("", "");
        createRangeSlicesQuery.setColumnNames(new String[]{STREAM_ID});
        QueryResult execute = createRangeSlicesQuery.execute();
        String str = log.isDebugEnabled() ? "getAllStreamDefinitions called : \n" : null;
        for (Row row : (OrderedRows) execute.get()) {
            if (row != null && (streamDefinitionFromStore = getStreamDefinitionFromStore(cluster, (String) row.getColumnSlice().getColumnByName(STREAM_ID).getValue())) != null) {
                arrayList.add(streamDefinitionFromStore);
                if (log.isDebugEnabled()) {
                    str = str + "Stream definitions with stream id : " + streamDefinitionFromStore.getStreamId() + " found. Stream Definition is : " + streamDefinitionFromStore.toString() + " \n";
                }
            }
        }
        if (log.isDebugEnabled()) {
            log.debug(str);
        }
        return arrayList;
    }
}
