package org.wso2.andes.server.cassandra;

import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.impl.sql.compile.SQLParserConstants;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.message.AMQMessage;
import org.wso2.andes.server.queue.AMQQueue;
import org.wso2.andes.server.queue.QueueEntry;
import org.wso2.andes.server.subscription.Subscription;
import org.wso2.andes.server.subscription.SubscriptionImpl;

/* loaded from: input_file:org/wso2/andes/server/cassandra/CassandraMessageFlusher.class */
public class CassandraMessageFlusher extends Thread {
    private Map<String, CassandraSubscription> cassandraSubscriptions;
    private AMQQueue queue;
    private static Log log = LogFactory.getLog(CassandraMessageFlusher.class);
    private int messageCountToRead;
    private int resetCounter;
    private SequentialThreadPoolExecutor executor;
    private int queueWorkerWaitInterval;
    private boolean running = true;
    private long lastProcessedId = 0;
    private int resetCount = 50;
    private long messageProcessed = 0;
    private long lastRestTime = 0;

    public CassandraMessageFlusher(AMQQueue aMQQueue, Map<String, CassandraSubscription> map, SequentialThreadPoolExecutor sequentialThreadPoolExecutor, int i) {
        this.messageCountToRead = 20;
        this.cassandraSubscriptions = map;
        this.queue = aMQQueue;
        this.executor = sequentialThreadPoolExecutor;
        this.messageCountToRead = ClusterResourceHolder.getInstance().getClusterConfiguration().getMessageBatchSizeForSubscribers();
        this.queueWorkerWaitInterval = i;
        System.out.println("Queue worker started for queue: " + aMQQueue.getResourceName());
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        int size;
        CassandraSubscription next;
        long j = 0;
        this.lastRestTime = System.currentTimeMillis();
        long j2 = 0;
        while (this.running) {
            try {
                size = this.executor.getSize();
            } catch (Throwable th) {
                long j3 = this.queueWorkerWaitInterval;
                j2++;
                try {
                    Thread.sleep(Math.max(j3 * 5, j2 * j3));
                } catch (InterruptedException e) {
                }
                log.error("Error running Cassandra Message Flusher" + th.getMessage(), th);
            }
            if (size > 1000) {
                if (size > 5000) {
                    try {
                        log.error("Flusher queue is growing, and this should not happen. Please check cassandra Flusher");
                    } catch (InterruptedException e2) {
                        log.error(e2);
                    }
                }
                log.info("skipping content cassandra reading thread as flusher queue has " + size + " tasks");
                Thread.sleep(this.queueWorkerWaitInterval);
            }
            if (resetOffset() && System.currentTimeMillis() - this.lastRestTime < 30000) {
                this.lastProcessedId = 0L;
                this.lastRestTime = System.currentTimeMillis();
            }
            List<QueueEntry> messagesFromUserQueue = ClusterResourceHolder.getInstance().getCassandraMessageStore().getMessagesFromUserQueue(this.queue, this.messageCountToRead, this.lastProcessedId);
            if (messagesFromUserQueue.size() == this.messageCountToRead) {
                this.messageCountToRead += 10;
                if (this.messageCountToRead > 300) {
                    this.messageCountToRead = SQLParserConstants.LOGGED;
                }
            } else {
                this.messageCountToRead -= 10;
                if (this.messageCountToRead < 20) {
                    this.messageCountToRead = 20;
                }
            }
            if (messagesFromUserQueue.size() <= 0 || this.cassandraSubscriptions.size() <= 0) {
                resetOffset();
                try {
                    Thread.sleep(this.queueWorkerWaitInterval);
                } catch (InterruptedException e3) {
                }
            } else {
                Iterator<CassandraSubscription> it = this.cassandraSubscriptions.values().iterator();
                for (int i = 0; i < messagesFromUserQueue.size(); i++) {
                    QueueEntry queueEntry = messagesFromUserQueue.get(i);
                    try {
                        if (!it.hasNext()) {
                            it = this.cassandraSubscriptions.values().iterator();
                            if (!it.hasNext()) {
                                break;
                            } else {
                                next = it.next();
                            }
                        } else {
                            next = it.next();
                        }
                        Subscription subscription = next.getSubscription();
                        ((AMQMessage) queueEntry.getMessage()).setClientIdentifier(next.getSession());
                        if (log.isDebugEnabled()) {
                            ByteBuffer allocate = ByteBuffer.allocate(100);
                            log.debug("readFromCassandra(" + queueEntry.getMessage().getMessageNumber() + ")" + new String(allocate.array(), 0, queueEntry.getMessage().getContent(allocate, 0)));
                        }
                        deliverAsynchronously(subscription, queueEntry);
                        this.messageProcessed++;
                        if (i == messagesFromUserQueue.size() - 1) {
                            this.lastProcessedId = queueEntry.getMessage().getMessageNumber().longValue();
                        }
                    } catch (Exception e4) {
                        log.error("Unexpected Error in Message Flusher Task while delivering the message : ", e4);
                        e4.printStackTrace();
                    }
                }
                j++;
                if (this.messageProcessed > 10 || size > 100) {
                    log.debug("[Flusher]read=" + messagesFromUserQueue.size() + " tot= " + this.messageProcessed + ". queue size = " + size);
                }
                messagesFromUserQueue.clear();
                if (j % 10 == 0) {
                    try {
                        Thread.sleep(this.queueWorkerWaitInterval);
                    } catch (InterruptedException e5) {
                    }
                }
            }
            j2 = 0;
        }
    }

    public AMQQueue getQueue() {
        return this.queue;
    }

    private void deliverAsynchronously(final Subscription subscription, final QueueEntry queueEntry) {
        if (OnflightMessageTracker.getInstance().testMessage(queueEntry.getMessage().getMessageNumber().longValue())) {
            this.executor.submit(new Runnable() { // from class: org.wso2.andes.server.cassandra.CassandraMessageFlusher.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        if (subscription instanceof SubscriptionImpl.AckSubscription) {
                            subscription.send(queueEntry);
                        } else {
                            CassandraMessageFlusher.log.error(new StringBuilder().append("Unexpected Subscription Implementation : ").append(subscription).toString() != null ? subscription.getClass().getName() : null);
                        }
                    } catch (Throwable th) {
                        CassandraMessageFlusher.log.error("Error while delivering message ", th);
                    }
                }
            }, subscription);
        }
    }

    public void stopFlusher() {
        this.running = false;
    }

    private boolean resetOffset() {
        int i = this.resetCounter;
        this.resetCounter = i + 1;
        if (i <= this.resetCount) {
            return false;
        }
        this.resetCounter = 0;
        return true;
    }
}
