package org.apache.synapse.message.processor.impl;

import java.util.Map;
import java.util.Properties;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.axis2.deployment.DeploymentEngine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseException;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.deployers.MessageProcessorDeployer;
import org.apache.synapse.message.processor.MessageProcessorConstants;
import org.apache.synapse.message.processor.impl.forwarder.ForwardingProcessorConstants;
import org.apache.synapse.message.processor.impl.forwarder.ForwardingService;
import org.apache.synapse.message.processor.impl.sampler.SamplingProcessor;
import org.apache.synapse.message.processor.impl.sampler.SamplingService;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.UnableToInterruptJobException;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.beans.PropertyAccessor;

/* loaded from: input_file:WEB-INF/lib/synapse-core-2.1.2-wso2v5.jar:org/apache/synapse/message/processor/impl/ScheduledMessageProcessor.class */
public abstract class ScheduledMessageProcessor extends AbstractMessageProcessor {
    private static final Log logger = LogFactory.getLog(ScheduledMessageProcessor.class.getName());
    protected Scheduler scheduler = null;
    protected long interval = 1000;
    protected String quartzConfig = null;
    protected String cronExpression = null;
    protected AtomicBoolean isPaused = new AtomicBoolean(false);
    private AtomicBoolean isActivated = new AtomicBoolean(true);
    protected String[] nonRetryStatusCodes = null;

    @Override // org.apache.synapse.message.processor.impl.AbstractMessageProcessor, org.apache.synapse.ManagedLifecycle
    public void init(SynapseEnvironment synapseEnvironment) {
        if (!isPinnedServer(synapseEnvironment.getServerContextInformation().getServerConfigurationInformation().getServerName())) {
            setActivated(false);
        }
        super.init(synapseEnvironment);
        StdSchedulerFactory stdSchedulerFactory = null;
        try {
            stdSchedulerFactory = new StdSchedulerFactory(getSchedulerProperties(this.name));
            if (this.quartzConfig != null && !"".equals(this.quartzConfig)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Initiating a Scheduler with configuration : " + this.quartzConfig);
                }
                stdSchedulerFactory.initialize(this.quartzConfig);
            }
            try {
                this.scheduler = stdSchedulerFactory.getScheduler();
                start();
            } catch (SchedulerException e) {
                throw new SynapseException("Error getting a  scheduler instance form scheduler factory " + stdSchedulerFactory, e);
            }
        } catch (SchedulerException e2) {
            throw new SynapseException("Error initiating scheduler factory " + stdSchedulerFactory + "with configuration loaded from " + this.quartzConfig, e2);
        }
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public boolean start() {
        Trigger build;
        try {
            if (this.isActivated.get()) {
                setMessageConsumer(this.configuration.getMessageStore(this.messageStore).getConsumer());
                this.scheduler.start();
                if (logger.isDebugEnabled()) {
                    logger.debug("Started message processor. [" + getName() + "].");
                }
            }
            TriggerBuilder<Trigger> withIdentity = TriggerBuilder.newTrigger().withIdentity(this.name + "-trigger");
            if (this.cronExpression == null || "".equals(this.cronExpression)) {
                build = withIdentity.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInMilliseconds(isThrottling(this.interval) ? 1000L : this.interval).repeatForever().withMisfireHandlingInstructionNextWithRemainingCount()).build();
            } else {
                build = withIdentity.startNow().withSchedule(CronScheduleBuilder.cronSchedule(this.cronExpression).withMisfireHandlingInstructionDoNothing()).build();
            }
            JobDataMap jobDataMap = getJobDataMap();
            jobDataMap.put("parameters", (Object) this.parameters);
            JobDetail build2 = getJobBuilder().usingJobData(jobDataMap).build();
            try {
                this.scheduler.scheduleJob(build2, build);
                return true;
            } catch (SchedulerException e) {
                throw new SynapseException("Error scheduling job : " + build2 + " with trigger " + build, e);
            }
        } catch (SchedulerException e2) {
            throw new SynapseException("Error starting the scheduler", e2);
        }
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public boolean isDeactivated() {
        try {
            return this.scheduler.isInStandbyMode();
        } catch (SchedulerException e) {
            throw new SynapseException("Error Standing-by Message processor scheduler ", e);
        }
    }

    @Override // org.apache.synapse.message.processor.impl.AbstractMessageProcessor, org.apache.synapse.message.processor.MessageProcessor
    public void setParameters(Map<String, Object> map) {
        super.setParameters(map);
        if (map == null || map.isEmpty()) {
            return;
        }
        Object obj = map.get(MessageProcessorConstants.CRON_EXPRESSION);
        if (obj != null) {
            this.cronExpression = obj.toString();
        }
        if (map.get(MessageProcessorConstants.INTERVAL) != null) {
            this.interval = Integer.parseInt(r0.toString());
        }
        Object obj2 = map.get("quartz.conf");
        if (obj2 != null) {
            this.quartzConfig = obj2.toString();
        }
        Object obj3 = map.get(MessageProcessorConstants.IS_ACTIVATED);
        if (obj3 != null) {
            this.isActivated.set(Boolean.valueOf(obj3.toString()).booleanValue());
        }
        Object obj4 = map.get(ForwardingProcessorConstants.NON_RETRY_STATUS_CODES);
        if (obj4 != null) {
            this.nonRetryStatusCodes = obj4.toString().split(",");
        }
    }

    private JobBuilder getJobBuilder() {
        JobBuilder newJob = this instanceof SamplingProcessor ? JobBuilder.newJob(SamplingService.class) : JobBuilder.newJob(ForwardingService.class);
        newJob.withIdentity(this.name + "-job", MessageProcessorConstants.SCHEDULED_MESSAGE_PROCESSOR_GROUP);
        return newJob;
    }

    protected JobDataMap getJobDataMap() {
        return new JobDataMap();
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public boolean stop() {
        try {
            if (this.scheduler != null) {
                if (this.scheduler.isStarted()) {
                    this.scheduler.standby();
                    if (logger.isDebugEnabled()) {
                        logger.debug("ShuttingDown Message Processor Scheduler : " + this.scheduler.getMetaData());
                    }
                    try {
                        this.scheduler.interrupt(new JobKey(this.name + "-job", MessageProcessorConstants.SCHEDULED_MESSAGE_PROCESSOR_GROUP));
                    } catch (UnableToInterruptJobException e) {
                        logger.info("Unable to interrupt job [" + this.name + "-job]");
                    }
                }
                this.scheduler.shutdown(true);
            }
            if (!logger.isDebugEnabled()) {
                return true;
            }
            logger.debug("Stopped message processor [" + getName() + "].");
            return true;
        } catch (SchedulerException e2) {
            throw new SynapseException("Error ShuttingDown Message processor scheduler ", e2);
        }
    }

    @Override // org.apache.synapse.ManagedLifecycle
    public void destroy() {
        stop();
        if (getMessageConsumer() == null) {
            logger.warn(PropertyAccessor.PROPERTY_KEY_PREFIX + getName() + "] Could not find the message consumer to cleanup.");
        } else if (!getMessageConsumer().cleanup()) {
            logger.error(PropertyAccessor.PROPERTY_KEY_PREFIX + getName() + "] Could not cleanup message consumer.");
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Successfully destroyed message processor [" + getName() + "].");
        }
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public boolean deactivate() {
        try {
            if (this.scheduler == null || !this.scheduler.isStarted()) {
                return false;
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Deactivating message processor [" + getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
            }
            this.scheduler.standby();
            try {
                this.scheduler.interrupt(new JobKey(this.name + "-job", MessageProcessorConstants.SCHEDULED_MESSAGE_PROCESSOR_GROUP));
            } catch (UnableToInterruptJobException e) {
                logger.info("Unable to interrupt job [" + this.name + "-job]");
            }
            this.messageConsumer.cleanup();
            logger.info("Successfully deactivated the message processor [" + getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
            setActivated(isActive());
            if (!isPaused()) {
                return true;
            }
            try {
                ((MessageProcessorDeployer) ((DeploymentEngine) this.configuration.getAxisConfiguration().getConfigurator()).getDeployer(this.configuration.getPathToConfigFile() + "/message-processors", "xml")).restoreSynapseArtifact(this.name);
                return true;
            } catch (Exception e2) {
                logger.warn("Couldn't persist the state of the message processor [" + this.name + PropertyAccessor.PROPERTY_KEY_SUFFIX);
                return true;
            }
        } catch (SchedulerException e3) {
            throw new SynapseException("Error Standing-by Message processor scheduler ", e3);
        }
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public boolean activate() {
        try {
            if (this.messageConsumer == null) {
                setMessageConsumer(this.configuration.getMessageStore(this.messageStore).getConsumer());
            }
            if (this.scheduler == null || !this.scheduler.isInStandbyMode()) {
                return false;
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Starting Message Processor Scheduler : " + this.scheduler.getMetaData());
            }
            this.scheduler.start();
            if (isPaused()) {
                resumeService();
            }
            logger.info("Successfully re-activated the message processor [" + getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
            setActivated(isActive());
            return true;
        } catch (SchedulerException e) {
            throw new SynapseException("Error Standing-by Message processor scheduler ", e);
        }
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public void pauseService() {
        try {
            this.scheduler.pauseTrigger(new TriggerKey(this.name + "-trigger"));
            this.isPaused.set(true);
        } catch (SchedulerException e) {
            throw new SynapseException("Error while pausing the service", e);
        }
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public void resumeService() {
        try {
            this.scheduler.resumeTrigger(new TriggerKey(this.name + "-trigger"));
            this.isPaused.set(false);
        } catch (SchedulerException e) {
            throw new SynapseException("Error while pausing the service", e);
        }
    }

    public boolean isActive() {
        return !isDeactivated();
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public boolean isPaused() {
        return this.isPaused.get();
    }

    public boolean getActivated() {
        return this.isActivated.get();
    }

    public void setActivated(boolean z) {
        this.isActivated.set(z);
        this.parameters.put(MessageProcessorConstants.IS_ACTIVATED, String.valueOf(getActivated()));
    }

    private Properties getSchedulerProperties(String str) {
        Properties properties = new Properties();
        properties.put("org.quartz.scheduler.instanceName", str);
        properties.put("org.quartz.scheduler.rmi.export", "false");
        properties.put("org.quartz.scheduler.rmi.proxy", "false");
        properties.put("org.quartz.scheduler.wrapJobExecutionInUserTransaction", "false");
        properties.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
        properties.put(MessageProcessorConstants.THREAD_POOL_THREAD_COUNT, "1");
        properties.put(MessageProcessorConstants.THREAD_POOL_THREAD_PRIORITY, "5");
        properties.put(MessageProcessorConstants.JOB_STORE_MISFIRE_THRESHOLD, "60000");
        properties.put(MessageProcessorConstants.THREAD_INHERIT_CONTEXT_CLASSLOADER_OF_INIT_THREAD, "true");
        properties.put("org.quartz.jobStore.class", "org.quartz.simpl.RAMJobStore");
        return properties;
    }

    private boolean isPinnedServer(String str) {
        boolean z = false;
        Object obj = this.parameters.get(MessageProcessorConstants.PINNED_SERVER);
        if (obj == null || !(obj instanceof String)) {
            z = true;
        } else {
            String str2 = (String) obj;
            StringTokenizer stringTokenizer = new StringTokenizer(str2, " ,");
            while (true) {
                if (!stringTokenizer.hasMoreTokens()) {
                    break;
                }
                if (str.equals(stringTokenizer.nextToken().trim())) {
                    z = true;
                    break;
                }
            }
            if (!z) {
                logger.info("Message processor '" + this.name + "' pinned on '" + str2 + "' not starting on this server '" + str + "'");
            }
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isThrottling(long j) {
        return j == 0;
    }
}
