package org.wso2.carbon.bam.core.receivers;

import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.axis2.context.MessageContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/wso2/carbon/bam/core/receivers/AbstractQueue.class */
public abstract class AbstractQueue {
    private ExecutorService exec;
    private static final Log log = LogFactory.getLog(AbstractQueue.class);
    private Queue<MessageContext> serverQueue = new ArrayBlockingQueue(5000);
    private boolean shutdown = false;

    /* loaded from: input_file:org/wso2/carbon/bam/core/receivers/AbstractQueue$ServerWorker.class */
    private class ServerWorker implements Runnable {
        private ServerWorker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            AbstractQueue.this.clearEventsInQueue(AbstractQueue.this.serverQueue.size());
        }
    }

    public AbstractQueue(int i) {
        this.exec = Executors.newFixedThreadPool(i);
    }

    public void enqueue(MessageContext messageContext) {
        if (this.shutdown) {
            log.warn("The queue is shutting down... Not accepting the new events...");
            return;
        }
        messageContext.getEnvelope().build();
        if (this.serverQueue.offer(messageContext)) {
            if (this.serverQueue.size() > 0) {
                this.exec.submit(new ServerWorker());
            }
        } else {
            log.warn("Queue filled up. Event rejected");
            if (log.isDebugEnabled()) {
                log.debug("Event rejected : " + messageContext.getEnvelope().toString());
            }
        }
    }

    public void cleanup() {
        this.shutdown = true;
        while (this.serverQueue.size() > 0) {
            if (log.isDebugEnabled()) {
                log.debug("Waiting for the queue to become empty");
            }
            try {
                Thread.sleep(200L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        this.exec.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void clearEventsInQueue(int i) {
        if (log.isDebugEnabled()) {
            log.debug("No of messages in queue : " + i);
        }
        if (i == 0) {
            return;
        }
        MessageContext[] messageContextArr = new MessageContext[i];
        for (int i2 = 0; i2 < i; i2++) {
            messageContextArr[i2] = this.serverQueue.poll();
        }
        processEvents(messageContextArr);
    }

    protected abstract void processEvents(MessageContext[] messageContextArr);
}
