package org.wso2.carbon.bam.data.publisher.activity.mediation.jdbc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.bam.data.publisher.activity.mediation.ActivityProcessor;
import org.wso2.carbon.bam.data.publisher.activity.mediation.MessageActivity;

/* loaded from: input_file:org/wso2/carbon/bam/data/publisher/activity/mediation/jdbc/JDBCActivityProcessor.class */
public class JDBCActivityProcessor implements ActivityProcessor {
    private static final Log log = LogFactory.getLog(JDBCActivityProcessor.class);
    private ExecutorService exec;
    private ActivityPersistenceManager pm;

    /* loaded from: input_file:org/wso2/carbon/bam/data/publisher/activity/mediation/jdbc/JDBCActivityProcessor$JDBCWorker.class */
    private class JDBCWorker implements Runnable {
        private MessageActivity[] activities;

        private JDBCWorker(MessageActivity[] messageActivityArr) {
            this.activities = messageActivityArr;
        }

        @Override // java.lang.Runnable
        public void run() {
            for (MessageActivity messageActivity : this.activities) {
                JDBCActivityProcessor.this.pm.persistActivity(messageActivity);
            }
        }
    }

    public JDBCActivityProcessor() {
        if (log.isDebugEnabled()) {
            log.debug("Initializing the JDBC based activity processor for BAM");
        }
        this.pm = new ActivityPersistenceManager();
        this.exec = Executors.newCachedThreadPool();
    }

    @Override // org.wso2.carbon.bam.data.publisher.activity.mediation.ActivityProcessor
    public void destroy() {
        this.exec.shutdown();
    }

    @Override // org.wso2.carbon.bam.data.publisher.activity.mediation.ActivityProcessor
    public void process(MessageActivity[] messageActivityArr) {
        this.exec.submit(new JDBCWorker(messageActivityArr));
    }
}
