package org.wso2.carbon.bam.cassandra.data.archive.service;

import java.util.HashMap;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.ThriftCluster;
import me.prettyprint.hector.api.Cluster;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.quartz.CronExpression;
import org.wso2.carbon.analytics.hive.ServiceHolder;
import org.wso2.carbon.analytics.hive.exception.HiveExecutionException;
import org.wso2.carbon.analytics.hive.web.HiveScriptStoreService;
import org.wso2.carbon.bam.cassandra.data.archive.CassandraArchivalConstants;
import org.wso2.carbon.bam.cassandra.data.archive.exception.CassandraArchiveException;
import org.wso2.carbon.bam.cassandra.data.archive.exception.InvalidCronExpressionException;
import org.wso2.carbon.bam.cassandra.data.archive.util.ArchiveConfiguration;
import org.wso2.carbon.bam.cassandra.data.archive.util.ArchiveThreadExecutor;
import org.wso2.carbon.bam.cassandra.data.archive.util.CassandraArchiveUtil;
import org.wso2.carbon.bam.cassandra.data.archive.util.GenerateHiveScript;
import org.wso2.carbon.bam.cassandra.hector.datareader.HectorCassandraConfiguration;
import org.wso2.carbon.context.CarbonContext;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionStoreException;
import org.wso2.carbon.registry.core.Resource;
import org.wso2.carbon.registry.core.exceptions.RegistryException;
import org.wso2.carbon.registry.core.session.UserRegistry;

/* loaded from: input_file:org/wso2/carbon/bam/cassandra/data/archive/service/CassandraArchivalService.class */
public class CassandraArchivalService {
    private static final Log log = LogFactory.getLog(CassandraArchivalService.class);
    Cluster cluster;

    public void archiveCassandraData(ArchiveConfiguration archiveConfiguration) throws Exception {
        if (archiveConfiguration == null) {
            log.error("UI doesn't pass the configuration to backend");
            throw new CassandraArchiveException("UI doesn't pass the configuration to backend");
        }
        if (archiveConfiguration.isSchedulingOn() && !CronExpression.isValidExpression(archiveConfiguration.getCronExpression())) {
            log.error("Invalid cron expression: " + archiveConfiguration.getCronExpression());
            throw new InvalidCronExpressionException("Invalid cron expression: " + archiveConfiguration.getCronExpression());
        }
        HectorCassandraConfiguration hectorConfiguration = CassandraArchiveUtil.getHectorConfiguration(CarbonContext.getThreadLocalCarbonContext().getTenantId(), CassandraArchiveUtil.getEventSourceName());
        archiveConfiguration.setConnectionURL(hectorConfiguration.getHosts().trim().split(",")[0] + ":" + hectorConfiguration.getPort().trim());
        archiveConfiguration.setUserName(hectorConfiguration.getUsername());
        archiveConfiguration.setPassword(hectorConfiguration.getPassword());
        CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator(archiveConfiguration.getConnectionURL());
        HashMap hashMap = new HashMap();
        hashMap.put("username", archiveConfiguration.getUserName());
        hashMap.put("password", archiveConfiguration.getPassword());
        this.cluster = new ThriftCluster(CassandraArchiveUtil.DEFAULT_CASSANDRA_CLUSTER, cassandraHostConfigurator, hashMap);
        CassandraArchiveUtil.setCluster(this.cluster);
        try {
            StreamDefinition streamDefinition = getStreamDefinition(archiveConfiguration);
            if (streamDefinition == null) {
                String str = "Unable to find stream definition " + archiveConfiguration.getStreamName() + " with version " + archiveConfiguration.getVersion();
                log.error(str);
                throw new CassandraArchiveException(str);
            }
            GenerateHiveScript generateHiveScript = new GenerateHiveScript(this.cluster, archiveConfiguration);
            String str2 = generateHiveScript.generateMappingForReadingCassandraOriginalCF(streamDefinition) + generateHiveScript.createUDF();
            if (!archiveConfiguration.isDeletion()) {
                str2 = (str2 + generateHiveScript.generateMappingForWritingToArchivalCF(streamDefinition) + "\n") + generateHiveScript.hiveQueryForWritingDataToArchivalCF(streamDefinition, archiveConfiguration) + "\n";
            }
            final String str3 = ((streamDefinition.getIndexDefinition() == null ? (str2 + generateHiveScript.generateMappingForWritingToTmpCF(streamDefinition) + "\n") + generateHiveScript.hiveQueryForWritingDataToTmpCF(streamDefinition, archiveConfiguration) + "\n" : (str2 + generateHiveScript.generateMappingForWritingToTmpCFWithIndex(streamDefinition) + "\n") + generateHiveScript.hiveQueryForWritingDataToTmpCFWithIndex(streamDefinition, archiveConfiguration) + "\n") + generateHiveScript.dropTempColumnFamily(streamDefinition.getName()) + "\n") + generateHiveScript.mapReduceJobAsHiveQuery();
            if (archiveConfiguration.isSchedulingOn()) {
                HiveScriptStoreService hiveScriptStoreService = CassandraArchiveUtil.getHiveScriptStoreService();
                String str4 = !archiveConfiguration.isDeletion() ? CassandraArchiveUtil.HIVE_SCRIPT_ARCHIVAL_TYPE : CassandraArchiveUtil.HIVE_SCRIPT_DELETION_TYPE;
                String str5 = streamDefinition.getName() + streamDefinition.getVersion() + "_" + str4 + "Script";
                hiveScriptStoreService.saveHiveScriptWithType(str5, str3, archiveConfiguration.getCronExpression(), str4);
                saveConfigsToRegistry(str5, archiveConfiguration);
            } else {
                if (log.isDebugEnabled()) {
                    log.debug(str3);
                }
                final int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
                ArchiveThreadExecutor.getExecutorServiceInstance().submit(new Runnable() { // from class: org.wso2.carbon.bam.cassandra.data.archive.service.CassandraArchivalService.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            CassandraArchiveUtil.getHiveExecutorService().execute(tenantId, (String) null, str3);
                        } catch (HiveExecutionException e) {
                            CassandraArchivalService.log.error("Failed to execute hive query : " + e.getMessage(), e);
                        }
                    }
                });
            }
        } catch (StreamDefinitionStoreException e) {
            log.error("Failed to get stream definition from Cassandra", e);
            throw new CassandraArchiveException("Failed to get stream definition");
        }
    }

    private void saveConfigsToRegistry(String str, ArchiveConfiguration archiveConfiguration) throws RegistryException {
        UserRegistry configSystemRegistry = ServiceHolder.getRegistryService().getConfigSystemRegistry(CarbonContext.getThreadLocalCarbonContext().getTenantId());
        Resource newResource = configSystemRegistry.newResource();
        newResource.setProperty(CassandraArchivalConstants.STREAM_NAME, archiveConfiguration.getStreamName());
        newResource.setProperty(CassandraArchivalConstants.STREAM_VERSION, archiveConfiguration.getVersion());
        newResource.setProperty(CassandraArchivalConstants.NUMBER_OF_DAYS, Integer.toString(archiveConfiguration.getNoOfDays()));
        newResource.setProperty(CassandraArchivalConstants.CRON_EXPRESSION, archiveConfiguration.getCronExpression());
        newResource.setProperty(CassandraArchivalConstants.DELETION_OR_ARCHIVAL, Boolean.toString(archiveConfiguration.isDeletion()));
        configSystemRegistry.put(CassandraArchivalConstants.REGISTRY_BASE_PATH + str, newResource);
    }

    public String getJobProperty(String str, String str2) {
        try {
            return ServiceHolder.getRegistryService().getConfigSystemRegistry(CarbonContext.getThreadLocalCarbonContext().getTenantId()).get(CassandraArchivalConstants.REGISTRY_BASE_PATH + str).getProperty(str2);
        } catch (RegistryException e) {
            e.printStackTrace();
            return null;
        }
    }

    public boolean deleteJob(String str) {
        try {
            ServiceHolder.getRegistryService().getConfigSystemRegistry(CarbonContext.getThreadLocalCarbonContext().getTenantId()).delete(CassandraArchivalConstants.REGISTRY_BASE_PATH + str);
            return true;
        } catch (RegistryException e) {
            e.printStackTrace();
            return false;
        }
    }

    private StreamDefinition getStreamDefinition(ArchiveConfiguration archiveConfiguration) throws StreamDefinitionStoreException {
        return CassandraArchiveUtil.getStreamDefinitionStoreService().getStreamDefinition(archiveConfiguration.getStreamName() + ":" + archiveConfiguration.getVersion(), PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId());
    }

    private String getStreamKey(ArchiveConfiguration archiveConfiguration) {
        return archiveConfiguration.getStreamName() + ":" + archiveConfiguration.getVersion();
    }
}
