package org.wso2.carbon.bam.data.publisher.activity.mediation;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/wso2/carbon/bam/data/publisher/activity/mediation/ActivityQueue.class */
public class ActivityQueue {
    private static final Log log = LogFactory.getLog(ActivityQueue.class);
    private volatile int threshold = 1;
    private boolean shutdown = false;
    private Queue<MessageActivity> activities = new ConcurrentLinkedQueue();
    private ExecutorService exec = Executors.newSingleThreadExecutor();
    private ActivityProcessor activityProcessor;

    /* loaded from: input_file:org/wso2/carbon/bam/data/publisher/activity/mediation/ActivityQueue$ActivityWorker.class */
    private class ActivityWorker implements Runnable {
        private ActivityWorker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (ActivityQueue.log.isDebugEnabled()) {
                ActivityQueue.log.info("Initializing the activity processor thread...");
            }
            while (true) {
                try {
                    int size = ActivityQueue.this.activities.size();
                    if (size >= ActivityQueue.this.threshold || ActivityQueue.this.shutdown) {
                        if (ActivityQueue.log.isDebugEnabled()) {
                            ActivityQueue.log.debug("Activity threshold (" + ActivityQueue.this.threshold + ") exceeds current activity queue length (" + size + ")");
                        }
                        ActivityQueue.this.clearActivities(size);
                    } else {
                        ActivityQueue.this.delay();
                    }
                } catch (Throwable th) {
                    ActivityQueue.log.error("Unexpected runtime error in the activity processor", th);
                }
            }
        }
    }

    public ActivityQueue(ActivityProcessor activityProcessor) {
        this.activityProcessor = activityProcessor;
        this.exec.submit(new ActivityWorker());
    }

    public void cleanup() {
        this.shutdown = true;
        while (this.activities.size() > 0) {
            if (log.isDebugEnabled()) {
                log.debug("Waiting for the activity queue to become empty");
            }
            delay();
        }
        this.exec.shutdownNow();
        this.activityProcessor.destroy();
    }

    public void setThreshold(int i) {
        if (log.isDebugEnabled()) {
            log.debug("Initializing the activity queue with the threshold value: " + i);
        }
        this.threshold = i;
    }

    public void enqueue(MessageActivity messageActivity) {
        if (this.shutdown) {
            log.warn("BAM activity queue is shutting down... Not accepting the new activity...");
        } else {
            this.activities.offer(messageActivity);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void clearActivities(int i) {
        if (log.isDebugEnabled()) {
            log.debug("Clearing " + i + " activities from the activity queue...");
        }
        MessageActivity[] messageActivityArr = new MessageActivity[i];
        for (int i2 = 0; i2 < i; i2++) {
            messageActivityArr[i2] = this.activities.poll();
        }
        this.activityProcessor.process(messageActivityArr);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void delay() {
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
        }
    }
}
