package org.wso2.carbon.databridge.core.internal.queue;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.core.AgentCallback;
import org.wso2.carbon.databridge.core.exception.EventConversionException;
import org.wso2.carbon.databridge.core.internal.utils.EventComposite;

/* loaded from: input_file:org/wso2/carbon/databridge/core/internal/queue/QueueWorker.class */
public class QueueWorker implements Runnable {
    private static final Log log = LogFactory.getLog(QueueWorker.class);
    private BlockingQueue<EventComposite> eventQueue;
    private List<AgentCallback> subscribers;

    public QueueWorker(BlockingQueue<EventComposite> blockingQueue, List<AgentCallback> list) {
        this.eventQueue = blockingQueue;
        this.subscribers = list;
    }

    @Override // java.lang.Runnable
    public void run() {
        List<Event> list = null;
        try {
            if (log.isDebugEnabled()) {
                log.debug(this.eventQueue.size() + " messages in queue before " + Thread.currentThread().getName() + " worker has polled queue");
            }
            EventComposite poll = this.eventQueue.poll();
            try {
                list = poll.getEventConverter().toEventList(poll.getEventBundle(), poll.getStreamTypeHolder());
                if (log.isDebugEnabled()) {
                    log.debug("Dispatching event to " + this.subscribers.size() + " subscriber(s)");
                }
                Iterator<AgentCallback> it = this.subscribers.iterator();
                while (it.hasNext()) {
                    it.next().receive(list, poll.getAgentSession().getCredentials());
                }
                if (log.isDebugEnabled()) {
                    log.debug(this.eventQueue.size() + " messages in queue after " + Thread.currentThread().getName() + " worker has finished work");
                }
            } catch (EventConversionException e) {
                log.error("Wrongly formatted event sent for " + poll.getStreamTypeHolder().getDomainName(), e);
            }
        } catch (Throwable th) {
            log.error("Error in passing events " + list + " to subscribers " + this.subscribers, th);
        }
    }
}
