package org.wso2.carbon.bam.cassandra.data.archive.service;

import java.util.HashMap;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.ThriftCluster;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.query.ColumnQuery;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.analytics.hive.service.HiveExecutorService;
import org.wso2.carbon.bam.cassandra.data.archive.util.ArchiveConfiguration;
import org.wso2.carbon.bam.cassandra.data.archive.util.CassandraArchiveUtil;
import org.wso2.carbon.bam.cassandra.data.archive.util.GenerateHiveScript;
import org.wso2.carbon.cassandra.dataaccess.ClusterInformation;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
import org.wso2.carbon.databridge.commons.utils.EventDefinitionConverterUtils;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionStoreException;

/* loaded from: input_file:org/wso2/carbon/bam/cassandra/data/archive/service/CassandraArchivalService.class */
public class CassandraArchivalService {
    private static final Log log = LogFactory.getLog(CassandraArchivalService.class);
    private static StringSerializer stringSerializer = StringSerializer.get();
    public static final String BAM_META_KEYSPACE = "META_KS";
    public static final String BAM_META_STREAM_DEF_CF = "STREAM_DEFINITION";
    private static final String STREAM_DEF = "STREAM_DEFINITION";
    private StreamDefinition streamDefinition;
    Cluster cluster;

    public void archiveCassandraData(ArchiveConfiguration archiveConfiguration) throws Exception {
        if (archiveConfiguration.getConnectionURL() == null) {
            this.cluster = CassandraArchiveUtil.getDataAccessService().getCluster(new ClusterInformation(archiveConfiguration.getUserName(), archiveConfiguration.getPassword()));
        } else {
            CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator(archiveConfiguration.getConnectionURL());
            HashMap hashMap = new HashMap();
            hashMap.put("username", archiveConfiguration.getUserName());
            hashMap.put("password", archiveConfiguration.getPassword());
            this.cluster = new ThriftCluster(CassandraArchiveUtil.DEFAULT_CASSANDRA_CLUSTER, cassandraHostConfigurator, hashMap);
        }
        CassandraArchiveUtil.setCluster(this.cluster);
        try {
            this.streamDefinition = getStreamDefinition(this.cluster, archiveConfiguration);
            GenerateHiveScript generateHiveScript = new GenerateHiveScript(this.cluster, archiveConfiguration);
            String str = (((((generateHiveScript.generateMappingForReadingCassandraOriginalCF(this.streamDefinition) + generateHiveScript.createUDF()) + generateHiveScript.generateMappingForWritingToArchivalCF(this.streamDefinition) + "\n") + generateHiveScript.hiveQueryForWritingDataToArchivalCF(this.streamDefinition, archiveConfiguration) + "\n") + generateHiveScript.generateMappingForWritingToTmpCF(this.streamDefinition) + "\n") + generateHiveScript.hiveQueryForWritingDataToTmpCF(this.streamDefinition, archiveConfiguration) + "\n") + generateHiveScript.mapReduceJobAsHiveQuery();
            if (archiveConfiguration.isSchedulingOn()) {
                CassandraArchiveUtil.getHiveScriptStoreService().saveHiveScript(this.streamDefinition.getName() + this.streamDefinition.getVersion() + "_archiveScript", str, archiveConfiguration.getCronExpression());
            } else {
                HiveExecutorService hiveExecutorService = CassandraArchiveUtil.getHiveExecutorService();
                if (log.isDebugEnabled()) {
                    log.debug(str);
                }
                hiveExecutorService.execute(str);
            }
        } catch (StreamDefinitionStoreException e) {
            log.error("Failed to get stream definition from Cassandra", e);
        }
    }

    private StreamDefinition getStreamDefinition(Cluster cluster, ArchiveConfiguration archiveConfiguration) throws StreamDefinitionStoreException {
        StreamDefinition streamDefinition = null;
        ColumnQuery createStringColumnQuery = HFactory.createStringColumnQuery(HFactory.createKeyspace(BAM_META_KEYSPACE, cluster));
        createStringColumnQuery.setColumnFamily("STREAM_DEFINITION").setKey(getStreamKey(archiveConfiguration)).setName("STREAM_DEFINITION");
        HColumn hColumn = (HColumn) createStringColumnQuery.execute().get();
        if (hColumn != null) {
            try {
                streamDefinition = EventDefinitionConverterUtils.convertFromJson((String) hColumn.getValue());
            } catch (MalformedStreamDefinitionException e) {
                throw new StreamDefinitionStoreException("Retrieved definition from Cassandra store is malformed. Retrieved value : " + ((String) hColumn.getValue()));
            }
        }
        return streamDefinition;
    }

    private String getStreamKey(ArchiveConfiguration archiveConfiguration) {
        return archiveConfiguration.getStreamName() + ":" + archiveConfiguration.getVersion();
    }
}
