package org.wso2.carbon.databridge.persistence.cassandra.datastore;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import me.prettyprint.cassandra.model.BasicColumnDefinition;
import me.prettyprint.cassandra.model.BasicColumnFamilyDefinition;
import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
import me.prettyprint.cassandra.serializers.LongSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.service.ThriftCfDef;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.OrderedRows;
import me.prettyprint.hector.api.beans.Row;
import me.prettyprint.hector.api.ddl.ColumnDefinition;
import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
import me.prettyprint.hector.api.ddl.ComparatorType;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.ColumnQuery;
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.RangeSlicesQuery;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.AttributeType;
import org.wso2.carbon.databridge.commons.Credentials;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
import org.wso2.carbon.databridge.commons.utils.DataBridgeCommonsUtils;
import org.wso2.carbon.databridge.commons.utils.EventDefinitionConverterUtils;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionStoreException;
import org.wso2.carbon.databridge.persistence.cassandra.Utils.CassandraSDSUtils;
import org.wso2.carbon.databridge.persistence.cassandra.Utils.KeySpaceUtils;
import org.wso2.carbon.databridge.persistence.cassandra.caches.CFCache;
import org.wso2.carbon.databridge.persistence.cassandra.exception.NullValueException;
import org.wso2.carbon.databridge.persistence.cassandra.inserter.BoolInserter;
import org.wso2.carbon.databridge.persistence.cassandra.inserter.DoubleInserter;
import org.wso2.carbon.databridge.persistence.cassandra.inserter.FloatInserter;
import org.wso2.carbon.databridge.persistence.cassandra.inserter.IntInserter;
import org.wso2.carbon.databridge.persistence.cassandra.inserter.LongInserter;
import org.wso2.carbon.databridge.persistence.cassandra.inserter.StringInserter;
import org.wso2.carbon.databridge.persistence.cassandra.inserter.TypeInserter;
import org.wso2.carbon.databridge.persistence.cassandra.internal.util.AppendUtils;
import org.wso2.carbon.databridge.persistence.cassandra.internal.util.ServiceHolder;
import org.wso2.carbon.databridge.persistence.cassandra.internal.util.Utils;
import org.wso2.carbon.utils.CarbonUtils;
import org.wso2.carbon.utils.multitenancy.MultitenantUtils;

/* loaded from: input_file:org/wso2/carbon/databridge/persistence/cassandra/datastore/CassandraConnector.class */
public class CassandraConnector {
    private static final String STREAM_NAME_KEY = "Name";
    private static final String STREAM_VERSION_KEY = "Version";
    private static final String STREAM_NICK_NAME_KEY = "Nick_Name";
    private static final String STREAM_TIMESTAMP_KEY = "Timestamp";
    private static final String STREAM_DESCRIPTION_KEY = "Description";
    private static final String STREAM_ID_KEY = "StreamId";
    public static final String BAM_META_STREAM_DEF_CF = "STREAM_DEFINITION";
    public static final String BAM_META_KEYSPACE = "META_KS";
    private static final String STREAM_DEF = "STREAM_DEFINITION";
    private static final String COMPARATOR_BOOL_TYPE = "org.apache.cassandra.db.marshal.BooleanType";
    private static final String COMPARATOR_DOUBLE_TYPE = "org.apache.cassandra.db.marshal.DoubleType";
    private static final String COMPARATOR_FLOAT_TYPE = "org.apache.cassandra.db.marshal.FloatType";
    private int port;
    private String localAddress;
    private long startTime;
    private boolean IS_PERFORMANCE_MEASURED;
    private static final StringSerializer stringSerializer = StringSerializer.get();
    private static final LongSerializer longSerializer = LongSerializer.get();
    private static final ByteBufferSerializer byteBufferSerializer = ByteBufferSerializer.get();
    static Log log = LogFactory.getLog(CassandraConnector.class);
    private static Map<AttributeType, String> attributeComparatorMap = new HashMap();
    private volatile AtomicInteger eventCounter = new AtomicInteger();
    private volatile AtomicLong totalEventCounter = new AtomicLong();
    private AtomicInteger rowkeyCounter = new AtomicInteger();
    private Map<AttributeType, TypeInserter> inserterMap = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/carbon/databridge/persistence/cassandra/datastore/CassandraConnector$StreamDefnCache.class */
    public static class StreamDefnCache {
        private static volatile LoadingCache<StreamIdClusterBean, StreamDefinition> streamDefnCache = null;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/wso2/carbon/databridge/persistence/cassandra/datastore/CassandraConnector$StreamDefnCache$StreamIdClusterBean.class */
        public static class StreamIdClusterBean {
            private String tenantDomain;
            private String streamId;
            private Credentials credentials;

            private StreamIdClusterBean(Credentials credentials, String str) {
                this.credentials = credentials;
                this.tenantDomain = credentials.getDomainName();
                this.streamId = str;
            }

            public String getUserName() {
                return this.credentials.getUsername();
            }

            public String getPassword() {
                return this.credentials.getPassword();
            }

            public String getStreamId() {
                return this.streamId;
            }

            public boolean equals(Object obj) {
                if (this == obj) {
                    return true;
                }
                if (obj == null || getClass() != obj.getClass()) {
                    return false;
                }
                StreamIdClusterBean streamIdClusterBean = (StreamIdClusterBean) obj;
                return this.tenantDomain.equals(streamIdClusterBean.tenantDomain) && this.streamId.equals(streamIdClusterBean.streamId);
            }

            public int hashCode() {
                return (31 * this.tenantDomain.hashCode()) + this.streamId.hashCode();
            }
        }

        private StreamDefnCache() {
        }

        private static void init() {
            if (streamDefnCache != null) {
                return;
            }
            synchronized (StreamDefnCache.class) {
                if (streamDefnCache != null) {
                    return;
                }
                streamDefnCache = CacheBuilder.newBuilder().maximumSize(1000L).expireAfterAccess(30L, TimeUnit.MINUTES).build(new CacheLoader<StreamIdClusterBean, StreamDefinition>() { // from class: org.wso2.carbon.databridge.persistence.cassandra.datastore.CassandraConnector.StreamDefnCache.1
                    public StreamDefinition load(StreamIdClusterBean streamIdClusterBean) throws Exception {
                        StreamDefinition streamDefinition = ServiceHolder.getDataBridgeReceiverService().getStreamDefinition(ServiceHolder.getDataBridgeReceiverService().login(streamIdClusterBean.getUserName(), streamIdClusterBean.getPassword()), DataBridgeCommonsUtils.getStreamNameFromStreamId(streamIdClusterBean.getStreamId()), DataBridgeCommonsUtils.getStreamVersionFromStreamId(streamIdClusterBean.getStreamId()));
                        if (streamDefinition != null) {
                            return streamDefinition;
                        }
                        throw new NullValueException("No value found");
                    }
                });
            }
        }

        public static StreamDefinition getStreamDefinition(Credentials credentials, String str) throws ExecutionException {
            init();
            return (StreamDefinition) streamDefnCache.get(new StreamIdClusterBean(credentials, str));
        }

        public static void invalidateStreamDefinition(Credentials credentials, String str) {
            streamDefnCache.invalidate(new StreamIdClusterBean(credentials, str));
        }
    }

    public CassandraConnector() {
        this.port = 0;
        this.localAddress = null;
        this.IS_PERFORMANCE_MEASURED = false;
        if (System.getProperty("profile.receiver") != null) {
            this.IS_PERFORMANCE_MEASURED = System.getProperty("profile.receiver").equals("true");
        }
        try {
            this.port = CarbonUtils.getTransportPort(ServiceHolder.getConfigurationContextService().getServerConfigContext().getAxisConfiguration(), "https") + Integer.parseInt(CarbonUtils.getServerConfiguration().getFirstProperty("Ports.Offset"));
            this.localAddress = Utils.getLocalAddress().getHostAddress();
        } catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.warn("Error when detecting Host/Port, using defaults");
            }
            this.localAddress = this.localAddress == null ? "127.0.0.1" : this.localAddress;
            this.port = this.port == 0 ? 9443 : this.port;
        }
        createInserterMap();
    }

    void commit(Mutator mutator) throws StreamDefinitionStoreException {
        mutator.execute();
    }

    private void createInserterMap() {
        this.inserterMap.put(AttributeType.INT, new IntInserter());
        this.inserterMap.put(AttributeType.BOOL, new BoolInserter());
        this.inserterMap.put(AttributeType.LONG, new LongInserter());
        this.inserterMap.put(AttributeType.FLOAT, new FloatInserter());
        this.inserterMap.put(AttributeType.STRING, new StringInserter());
        this.inserterMap.put(AttributeType.DOUBLE, new DoubleInserter());
    }

    public ColumnFamilyDefinition getColumnFamily(Cluster cluster, String str, String str2) {
        for (ColumnFamilyDefinition columnFamilyDefinition : cluster.describeKeyspace(getKeyspace(str, cluster).getKeyspaceName()).getCfDefs()) {
            if (columnFamilyDefinition.getName().equals(str2)) {
                return columnFamilyDefinition;
            }
        }
        return null;
    }

    public ColumnFamilyDefinition createColumnFamily(Cluster cluster, String str, String str2, StreamDefinition streamDefinition) {
        Keyspace keyspace = getKeyspace(str, cluster);
        for (ColumnFamilyDefinition columnFamilyDefinition : cluster.describeKeyspace(keyspace.getKeyspaceName()).getCfDefs()) {
            if (columnFamilyDefinition.getName().equals(str2)) {
                if (log.isDebugEnabled()) {
                    log.debug("Column Family " + str2 + " already exists.");
                }
                CFCache.putCF(cluster, str, str2, true);
                return columnFamilyDefinition;
            }
        }
        BasicColumnFamilyDefinition basicColumnFamilyDefinition = new BasicColumnFamilyDefinition();
        basicColumnFamilyDefinition.setKeyspaceName(str);
        basicColumnFamilyDefinition.setName(str2);
        basicColumnFamilyDefinition.setKeyValidationClass(ComparatorType.UTF8TYPE.getClassName());
        basicColumnFamilyDefinition.setComparatorType(ComparatorType.UTF8TYPE);
        HashMap hashMap = new HashMap();
        hashMap.put("sstable_compression", "SnappyCompressor");
        hashMap.put("chunk_length_kb", "128");
        basicColumnFamilyDefinition.setCompressionOptions(hashMap);
        addMetaColumnDefinitionsToColumnFamily(basicColumnFamilyDefinition);
        if (streamDefinition != null) {
            addColumnDefinitionsToColumnFamily(streamDefinition.getPayloadData(), DataType.payload, basicColumnFamilyDefinition);
            addColumnDefinitionsToColumnFamily(streamDefinition.getMetaData(), DataType.meta, basicColumnFamilyDefinition);
            addColumnDefinitionsToColumnFamily(streamDefinition.getCorrelationData(), DataType.correlation, basicColumnFamilyDefinition);
        }
        cluster.addColumnFamily(new ThriftCfDef(basicColumnFamilyDefinition), true);
        KeyspaceDefinition describeKeyspace = cluster.describeKeyspace(keyspace.getKeyspaceName());
        for (int i = 0; i < 100; i++) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
            }
            for (ColumnFamilyDefinition columnFamilyDefinition2 : describeKeyspace.getCfDefs()) {
                if (columnFamilyDefinition2.getName().equals(str2)) {
                    if (log.isDebugEnabled()) {
                        log.debug("Column Family " + str2 + " already exists.");
                    }
                    CFCache.putCF(cluster, str, str2, true);
                    return columnFamilyDefinition2;
                }
            }
        }
        throw new RuntimeException("The column family " + str2 + " was  not created");
    }

    public boolean createKeySpaceIfNotExisting(Cluster cluster, String str) {
        if (cluster.describeKeyspace(str) != null) {
            return false;
        }
        cluster.addKeyspace(HFactory.createKeyspaceDefinition(str, Utils.getStrategyClass(), Utils.getReplicationFactor(), (List) null));
        KeyspaceDefinition describeKeyspace = cluster.describeKeyspace(str);
        while (describeKeyspace == null && 0 < 100) {
            try {
                Thread.sleep(100L);
                describeKeyspace = cluster.describeKeyspace(str);
                if (describeKeyspace != null) {
                    return true;
                }
            } catch (InterruptedException e) {
                return true;
            }
        }
        return true;
    }

    public List<String> insertEventList(Credentials credentials, Cluster cluster, List<Event> list) throws StreamDefinitionStoreException {
        Mutator<String> mutator = getMutator(cluster);
        ArrayList arrayList = new ArrayList();
        startTimeMeasurement(this.IS_PERFORMANCE_MEASURED);
        for (Event event : list) {
            StreamDefinition streamDefinitionFromStore = getStreamDefinitionFromStore(credentials, event.getStreamId());
            String convertStreamNameToCFName = CassandraSDSUtils.convertStreamNameToCFName(streamDefinitionFromStore.getName());
            if (streamDefinitionFromStore == null || convertStreamNameToCFName == null) {
                log.error("Event stream definition or column family cannot be null");
                throw new StreamDefinitionStoreException("Event stream definition or column family cannot be null");
            }
            if (log.isTraceEnabled()) {
                KeyspaceDefinition describeKeyspace = cluster.describeKeyspace(KeySpaceUtils.getKeySpaceName());
                log.trace("Keyspace desc. : " + describeKeyspace);
                String str = "CFs present \n";
                Iterator it = describeKeyspace.getCfDefs().iterator();
                while (it.hasNext()) {
                    str = str + "cf name : " + ((ColumnFamilyDefinition) it.next()).getName() + "\n";
                }
                log.trace(str);
            }
            this.eventCounter.incrementAndGet();
            long timeStamp = event.getTimeStamp() != 0 ? event.getTimeStamp() : System.currentTimeMillis();
            String createRowKey = CassandraSDSUtils.createRowKey(timeStamp, this.localAddress, this.port, this.rowkeyCounter.incrementAndGet());
            String description = streamDefinitionFromStore.getDescription();
            String nickName = streamDefinitionFromStore.getNickName();
            mutator.addInsertion(createRowKey, convertStreamNameToCFName, HFactory.createStringColumn(STREAM_ID_KEY, streamDefinitionFromStore.getStreamId()));
            mutator.addInsertion(createRowKey, convertStreamNameToCFName, HFactory.createStringColumn(STREAM_NAME_KEY, streamDefinitionFromStore.getName()));
            mutator.addInsertion(createRowKey, convertStreamNameToCFName, HFactory.createStringColumn(STREAM_VERSION_KEY, streamDefinitionFromStore.getVersion()));
            if (description != null) {
                mutator.addInsertion(createRowKey, convertStreamNameToCFName, HFactory.createStringColumn(STREAM_DESCRIPTION_KEY, description));
            }
            if (nickName != null) {
                mutator.addInsertion(createRowKey, convertStreamNameToCFName, HFactory.createStringColumn(STREAM_NICK_NAME_KEY, nickName));
            }
            mutator.addInsertion(createRowKey, convertStreamNameToCFName, HFactory.createColumn(STREAM_TIMESTAMP_KEY, Long.valueOf(timeStamp), stringSerializer, longSerializer));
            if (event.getArbitraryDataMap() != null) {
                insertVariableFields(convertStreamNameToCFName, createRowKey, mutator, event.getArbitraryDataMap());
            }
            if (streamDefinitionFromStore.getMetaData() != null) {
                prepareDataForInsertion(event.getMetaData(), streamDefinitionFromStore.getMetaData(), DataType.meta, createRowKey, convertStreamNameToCFName, mutator);
            }
            if (event.getCorrelationData() != null) {
                prepareDataForInsertion(event.getCorrelationData(), streamDefinitionFromStore.getCorrelationData(), DataType.correlation, createRowKey, convertStreamNameToCFName, mutator);
            }
            if (event.getPayloadData() != null) {
                prepareDataForInsertion(event.getPayloadData(), streamDefinitionFromStore.getPayloadData(), DataType.payload, createRowKey, convertStreamNameToCFName, mutator);
            }
            arrayList.add(createRowKey);
        }
        commit(mutator);
        endTimeMeasurement(this.IS_PERFORMANCE_MEASURED);
        return arrayList;
    }

    private void endTimeMeasurement(boolean z) {
        if (!z || this.eventCounter.get() <= 100000) {
            return;
        }
        synchronized (this) {
            if (this.eventCounter.get() > 100000) {
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
                Date date = new Date();
                long currentTimeMillis = System.currentTimeMillis();
                int andSet = this.eventCounter.getAndSet(0);
                this.totalEventCounter.addAndGet(andSet);
                try {
                    AppendUtils.appendToFile(IOUtils.toInputStream("[" + simpleDateFormat.format(date) + "] # of events : " + andSet + " start timestamp : " + this.startTime + " end time stamp : " + currentTimeMillis + " Throughput is (events / sec) : " + ((andSet * 1000) / (currentTimeMillis - this.startTime)) + " Total Event Count : " + this.totalEventCounter + " \n"), new File(CarbonUtils.getCarbonHome() + File.separator + "receiver-perf.txt"));
                } catch (IOException e) {
                    log.error(e.getMessage(), e);
                }
                this.startTime = 0L;
            }
        }
    }

    private void startTimeMeasurement(boolean z) {
        if (z && this.startTime == 0) {
            this.startTime = System.currentTimeMillis();
        }
    }

    private void addColumnDefinitionsToColumnFamily(List<Attribute> list, DataType dataType, ColumnFamilyDefinition columnFamilyDefinition) {
        if (list != null) {
            for (Attribute attribute : list) {
                BasicColumnDefinition basicColumnDefinition = new BasicColumnDefinition();
                basicColumnDefinition.setName(stringSerializer.toByteBuffer(CassandraSDSUtils.getColumnName(dataType, attribute)));
                basicColumnDefinition.setValidationClass(attributeComparatorMap.get(attribute.getType()));
                try {
                    columnFamilyDefinition.addColumnDefinition(basicColumnDefinition);
                } catch (UnsupportedOperationException e) {
                    if (log.isDebugEnabled()) {
                        log.debug("Cannot add the meta information to column family.", e);
                    }
                }
            }
        }
    }

    private void addFilteredColumnDefinitionsToColumnFamily(List<Attribute> list, DataType dataType, List<ColumnDefinition> list2, ColumnFamilyDefinition columnFamilyDefinition) {
        ArrayList arrayList = new ArrayList();
        if (list != null) {
            arrayList.addAll(list);
            Iterator<Attribute> it = arrayList.iterator();
            while (it.hasNext()) {
                Attribute next = it.next();
                boolean z = false;
                if (list2 != null) {
                    Iterator<ColumnDefinition> it2 = list2.iterator();
                    while (true) {
                        if (!it2.hasNext()) {
                            break;
                        }
                        if (stringSerializer.fromByteBuffer(it2.next().getName().asReadOnlyBuffer()).equals(CassandraSDSUtils.getColumnName(dataType, next))) {
                            z = true;
                            break;
                        }
                    }
                    if (z) {
                        it.remove();
                    }
                }
            }
            addColumnDefinitionsToColumnFamily(arrayList, dataType, columnFamilyDefinition);
        }
    }

    private void addMetaColumnDefinitionsToColumnFamily(ColumnFamilyDefinition columnFamilyDefinition) {
        BasicColumnDefinition basicColumnDefinition = new BasicColumnDefinition();
        basicColumnDefinition.setName(stringSerializer.toByteBuffer(STREAM_ID_KEY));
        basicColumnDefinition.setValidationClass(ComparatorType.UTF8TYPE.getClassName());
        columnFamilyDefinition.addColumnDefinition(basicColumnDefinition);
        BasicColumnDefinition basicColumnDefinition2 = new BasicColumnDefinition();
        basicColumnDefinition2.setName(stringSerializer.toByteBuffer(STREAM_NAME_KEY));
        basicColumnDefinition2.setValidationClass(ComparatorType.UTF8TYPE.getClassName());
        columnFamilyDefinition.addColumnDefinition(basicColumnDefinition2);
        BasicColumnDefinition basicColumnDefinition3 = new BasicColumnDefinition();
        basicColumnDefinition3.setName(stringSerializer.toByteBuffer(STREAM_VERSION_KEY));
        basicColumnDefinition3.setValidationClass(ComparatorType.UTF8TYPE.getClassName());
        columnFamilyDefinition.addColumnDefinition(basicColumnDefinition3);
        BasicColumnDefinition basicColumnDefinition4 = new BasicColumnDefinition();
        basicColumnDefinition4.setName(stringSerializer.toByteBuffer(STREAM_DESCRIPTION_KEY));
        basicColumnDefinition4.setValidationClass(ComparatorType.UTF8TYPE.getClassName());
        columnFamilyDefinition.addColumnDefinition(basicColumnDefinition4);
        BasicColumnDefinition basicColumnDefinition5 = new BasicColumnDefinition();
        basicColumnDefinition5.setName(stringSerializer.toByteBuffer(STREAM_NICK_NAME_KEY));
        basicColumnDefinition5.setValidationClass(ComparatorType.UTF8TYPE.getClassName());
        columnFamilyDefinition.addColumnDefinition(basicColumnDefinition5);
        BasicColumnDefinition basicColumnDefinition6 = new BasicColumnDefinition();
        basicColumnDefinition6.setName(stringSerializer.toByteBuffer(STREAM_TIMESTAMP_KEY));
        basicColumnDefinition6.setValidationClass(ComparatorType.LONGTYPE.getClassName());
        columnFamilyDefinition.addColumnDefinition(basicColumnDefinition6);
    }

    public void saveStreamDefinitionToStore(Cluster cluster, StreamDefinition streamDefinition) throws StreamDefinitionStoreException {
        String convertStreamNameToCFName = CassandraSDSUtils.convertStreamNameToCFName(streamDefinition.getName());
        try {
            if (!CFCache.getCF(cluster, KeySpaceUtils.getKeySpaceName(), convertStreamNameToCFName).booleanValue()) {
                createColumnFamily(cluster, KeySpaceUtils.getKeySpaceName(), convertStreamNameToCFName, streamDefinition);
            }
            Mutator createMutator = HFactory.createMutator(getKeyspace(BAM_META_KEYSPACE, cluster), stringSerializer);
            createMutator.addInsertion(streamDefinition.getStreamId(), "STREAM_DEFINITION", HFactory.createStringColumn("STREAM_DEFINITION", EventDefinitionConverterUtils.convertToJson(streamDefinition)));
            createMutator.execute();
            log.info("Saving Stream Definition : " + streamDefinition);
            if (log.isDebugEnabled()) {
                log.debug("saveStreamDefinition executed. \n stream definition saved : " + getStreamDefinitionFromStore(getCredentials(cluster), streamDefinition.getStreamId()).toString() + " \n");
            }
        } catch (ExecutionException e) {
            throw new StreamDefinitionStoreException("Error getting column family : " + convertStreamNameToCFName, e);
        }
    }

    public static Credentials getCredentials(Cluster cluster) {
        Credentials credentials = null;
        for (Map.Entry entry : cluster.getCredentials().entrySet()) {
            String str = (String) entry.getKey();
            credentials = new Credentials(str, (String) entry.getValue(), MultitenantUtils.getTenantDomain(str));
        }
        return credentials;
    }

    public boolean deleteStreamDefinitionFromStore(Cluster cluster, String str) throws StreamDefinitionStoreException {
        HFactory.createMutator(getKeyspace(BAM_META_KEYSPACE, cluster), stringSerializer).delete(str, "STREAM_DEFINITION", "STREAM_DEFINITION", stringSerializer);
        return true;
    }

    public boolean deleteStreamDefinitionFromCassandra(Cluster cluster, String str) throws StreamDefinitionStoreException {
        deleteDataFromStreamDefinition(getCredentials(cluster), cluster, str);
        HFactory.createMutator(getKeyspace(BAM_META_KEYSPACE, cluster), stringSerializer).delete(str, "STREAM_DEFINITION", "STREAM_DEFINITION", stringSerializer);
        return true;
    }

    private void deleteDataFromStreamDefinition(Credentials credentials, Cluster cluster, String str) {
        Keyspace keyspace = getKeyspace(KeySpaceUtils.getKeySpaceName(), cluster);
        String convertStreamNameToCFName = CassandraSDSUtils.convertStreamNameToCFName(DataBridgeCommonsUtils.getStreamNameFromStreamId(str));
        String streamVersionFromStreamId = DataBridgeCommonsUtils.getStreamVersionFromStreamId(str);
        RangeSlicesQuery createRangeSlicesQuery = HFactory.createRangeSlicesQuery(keyspace, stringSerializer, stringSerializer, stringSerializer);
        createRangeSlicesQuery.setColumnFamily(convertStreamNameToCFName).setColumnNames(new String[]{STREAM_VERSION_KEY});
        String str2 = "";
        createRangeSlicesQuery.setRowCount(1000);
        if (log.isDebugEnabled()) {
            log.debug("Deleting stream definition with id : " + str);
        }
        boolean z = false;
        Mutator createMutator = HFactory.createMutator(keyspace, stringSerializer);
        boolean z2 = false;
        while (!z) {
            createRangeSlicesQuery.setKeys(str2, "");
            QueryResult execute = createRangeSlicesQuery.execute();
            int i = 0;
            for (Row row : (OrderedRows) execute.get()) {
                i++;
                if (row != null && (str2.equals("") || i != 1)) {
                    HColumn columnByName = row.getColumnSlice().getColumnByName(STREAM_VERSION_KEY);
                    if (columnByName != null) {
                        if (streamVersionFromStreamId.equals((String) columnByName.getValue())) {
                            createMutator.addDeletion(row.getKey(), convertStreamNameToCFName);
                        } else {
                            z2 = true;
                        }
                        str2 = (String) row.getKey();
                    }
                }
            }
            createMutator.execute();
            if (((OrderedRows) execute.get()).getCount() < 1000) {
                z = true;
            }
        }
        if (z2) {
            return;
        }
        cluster.dropColumnFamily(keyspace.getKeyspaceName(), convertStreamNameToCFName);
    }

    public StreamDefinition getStreamDefinitionFromStore(Credentials credentials, String str) {
        try {
            return StreamDefnCache.getStreamDefinition(credentials, str);
        } catch (ExecutionException e) {
            return null;
        }
    }

    public StreamDefinition getStreamDefinitionFromCassandra(Cluster cluster, String str) throws StreamDefinitionStoreException {
        ColumnQuery createStringColumnQuery = HFactory.createStringColumnQuery(getKeyspace(BAM_META_KEYSPACE, cluster));
        createStringColumnQuery.setColumnFamily("STREAM_DEFINITION").setKey(str).setName("STREAM_DEFINITION");
        HColumn hColumn = (HColumn) createStringColumnQuery.execute().get();
        if (hColumn == null) {
            return null;
        }
        try {
            return EventDefinitionConverterUtils.convertFromJson((String) hColumn.getValue());
        } catch (MalformedStreamDefinitionException e) {
            throw new StreamDefinitionStoreException("Retrieved definition from Cassandra store is malformed. Retrieved value : " + ((String) hColumn.getValue()));
        }
    }

    public void definedStream(Cluster cluster, StreamDefinition streamDefinition) {
        String convertStreamNameToCFName = CassandraSDSUtils.convertStreamNameToCFName(streamDefinition.getName());
        try {
            ColumnFamilyDefinition columnFamily = getColumnFamily(cluster, KeySpaceUtils.getKeySpaceName(), convertStreamNameToCFName);
            if (!CFCache.getCF(cluster, KeySpaceUtils.getKeySpaceName(), convertStreamNameToCFName).booleanValue()) {
                if (columnFamily == null) {
                    createColumnFamily(cluster, KeySpaceUtils.getKeySpaceName(), convertStreamNameToCFName, streamDefinition);
                    return;
                }
                CFCache.putCF(cluster, KeySpaceUtils.getKeySpaceName(), convertStreamNameToCFName, true);
            }
            List<ColumnDefinition> columnMetadata = columnFamily.getColumnMetadata();
            int size = columnMetadata.size();
            addFilteredColumnDefinitionsToColumnFamily(streamDefinition.getPayloadData(), DataType.payload, columnMetadata, columnFamily);
            addFilteredColumnDefinitionsToColumnFamily(streamDefinition.getMetaData(), DataType.meta, columnMetadata, columnFamily);
            addFilteredColumnDefinitionsToColumnFamily(streamDefinition.getCorrelationData(), DataType.correlation, columnMetadata, columnFamily);
            if (size != columnFamily.getColumnMetadata().size()) {
                cluster.updateColumnFamily(columnFamily, true);
            }
        } catch (ExecutionException e) {
            log.error("Error while getting column family definition from cache at defined stream.", e);
        }
    }

    public void removeStream(Credentials credentials, Cluster cluster, StreamDefinition streamDefinition) {
        deleteDataFromStreamDefinition(credentials, cluster, streamDefinition.getStreamId());
        StreamDefnCache.invalidateStreamDefinition(credentials, streamDefinition.getStreamId());
    }

    public Collection<StreamDefinition> getAllStreamDefinitionFromStore(Cluster cluster) throws StreamDefinitionStoreException {
        QueryResult execute;
        ArrayList arrayList = new ArrayList();
        RangeSlicesQuery createRangeSlicesQuery = HFactory.createRangeSlicesQuery(getKeyspace(BAM_META_KEYSPACE, cluster), stringSerializer, stringSerializer, stringSerializer);
        createRangeSlicesQuery.setColumnFamily("STREAM_DEFINITION");
        String str = "";
        createRangeSlicesQuery.setColumnNames(new String[]{"STREAM_DEFINITION"});
        createRangeSlicesQuery.setRowCount(100);
        String str2 = log.isDebugEnabled() ? "getAllStreamDefinitions called : \n" : null;
        int i = 0;
        do {
            createRangeSlicesQuery.setKeys(str, "");
            execute = createRangeSlicesQuery.execute();
            int i2 = 0;
            for (Row row : (OrderedRows) execute.get()) {
                i2++;
                if (row != null && (str.equals("") || i2 != 1)) {
                    i++;
                    str = (String) row.getKey();
                    if (null != row.getColumnSlice().getColumnByName("STREAM_DEFINITION")) {
                        String str3 = (String) row.getColumnSlice().getColumnByName("STREAM_DEFINITION").getValue();
                        try {
                            arrayList.add(EventDefinitionConverterUtils.convertFromJson(str3));
                        } catch (MalformedStreamDefinitionException e) {
                            log.error("Malformed StreamDefinition " + str3);
                        }
                    }
                }
            }
        } while (((OrderedRows) execute.get()).getCount() >= 100);
        if (log.isDebugEnabled()) {
            log.debug(str2);
            log.info("Stream Id returned from cassandra: " + i);
        }
        return arrayList;
    }

    void insertVariableFields(String str, String str2, Mutator<String> mutator, Map<String, String> map) {
        for (Map.Entry<String, String> entry : map.entrySet()) {
            mutator.addInsertion(str2, str, HFactory.createStringColumn(entry.getKey(), entry.getValue()));
        }
    }

    Mutator prepareDataForInsertion(Object[] objArr, List<Attribute> list, DataType dataType, String str, String str2, Mutator<String> mutator) {
        for (int i = 0; i < list.size(); i++) {
            Attribute attribute = list.get(i);
            this.inserterMap.get(attribute.getType()).addDataToBatchInsertion(objArr[i], str2, CassandraSDSUtils.getColumnName(dataType, attribute), str, mutator);
        }
        return mutator;
    }

    Object getValueForDataTypeList(ColumnSlice<String, ByteBuffer> columnSlice, Attribute attribute, DataType dataType) throws IOException {
        return CassandraSDSUtils.getOriginalValueFromColumnValue((ByteBuffer) columnSlice.getColumnByName(CassandraSDSUtils.getColumnName(dataType, attribute)).getValue(), attribute.getType());
    }

    Mutator<String> getMutator(Cluster cluster) throws StreamDefinitionStoreException {
        return HFactory.createMutator(getKeyspace(KeySpaceUtils.getKeySpaceName(), cluster), stringSerializer);
    }

    static Keyspace getKeyspace(String str, Cluster cluster) {
        return HFactory.createKeyspace(str, cluster, Utils.getGlobalConsistencyLevelPolicy());
    }

    static {
        attributeComparatorMap.put(AttributeType.STRING, ComparatorType.UTF8TYPE.getClassName());
        attributeComparatorMap.put(AttributeType.INT, ComparatorType.INTEGERTYPE.getClassName());
        attributeComparatorMap.put(AttributeType.LONG, ComparatorType.LONGTYPE.getClassName());
        attributeComparatorMap.put(AttributeType.FLOAT, COMPARATOR_FLOAT_TYPE);
        attributeComparatorMap.put(AttributeType.DOUBLE, COMPARATOR_DOUBLE_TYPE);
        attributeComparatorMap.put(AttributeType.BOOL, COMPARATOR_BOOL_TYPE);
    }
}
