package org.wso2.andes.server.cluster;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.cassandra.CassandraQueueMessage;
import org.wso2.andes.server.stats.PerformanceCounter;
import org.wso2.andes.server.store.CassandraMessageStore;

/* loaded from: input_file:org/wso2/andes/server/cluster/GlobalQueueWorker.class */
public class GlobalQueueWorker implements Runnable {
    private static Log log = LogFactory.getLog(GlobalQueueWorker.class);
    private String globalQueueName;
    private boolean running;
    private int messageCountToReadFromCasssandra;
    private CassandraMessageStore cassandraMessageStore;
    private long totMsgMoved = 0;

    public GlobalQueueWorker(String str, CassandraMessageStore cassandraMessageStore, int i) {
        this.cassandraMessageStore = cassandraMessageStore;
        this.globalQueueName = str;
        this.messageCountToReadFromCasssandra = i;
    }

    @Override // java.lang.Runnable
    public void run() {
        long j = 0;
        int queueWorkerInterval = ClusterResourceHolder.getInstance().getClusterConfiguration().getQueueWorkerInterval();
        List<String> list = null;
        try {
            list = this.cassandraMessageStore.getUserQueues(this.globalQueueName);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        while (this.running) {
            try {
                Queue<CassandraQueueMessage> messagesFromGlobalQueue = this.cassandraMessageStore.getMessagesFromGlobalQueue(this.globalQueueName, this.messageCountToReadFromCasssandra);
                int size = messagesFromGlobalQueue.size();
                PerformanceCounter.recordGlobalQueueMsgMove(size);
                if (j % 10 == 0) {
                    list = this.cassandraMessageStore.getUserQueues(this.globalQueueName);
                }
                if (list == null || list.size() <= 0) {
                    try {
                        Thread.sleep(queueWorkerInterval);
                    } catch (InterruptedException e2) {
                    }
                } else {
                    ArrayList arrayList = new ArrayList();
                    for (int i = 0; i < size; i++) {
                        CassandraQueueMessage poll = messagesFromGlobalQueue.poll();
                        String str = list.get(i % list.size());
                        poll.setQueue(str);
                        arrayList.add(Long.valueOf(poll.getMessageId()));
                        messagesFromGlobalQueue.add(poll);
                        if (log.isDebugEnabled()) {
                            log.debug("global worker moved " + poll.getMessageId() + " to subscription " + str);
                        }
                    }
                    this.cassandraMessageStore.transferMessageBatchFromGlobalQueueToUserQueue((CassandraQueueMessage[]) messagesFromGlobalQueue.toArray(new CassandraQueueMessage[messagesFromGlobalQueue.size()]), this.globalQueueName);
                    if (size == 0 || j % 10 == 0) {
                        try {
                            j = 0;
                            Thread.sleep(queueWorkerInterval);
                        } catch (InterruptedException e3) {
                        }
                    } else {
                        this.totMsgMoved += messagesFromGlobalQueue.size();
                        if (log.isDebugEnabled()) {
                            log.debug("[Global, " + this.globalQueueName + "] moved " + messagesFromGlobalQueue.size() + " to user queues, tot = " + this.totMsgMoved);
                        }
                    }
                }
            } catch (Exception e4) {
                e4.printStackTrace();
            }
            j++;
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    public void setRunning(boolean z) {
        this.running = z;
    }
}
