package org.wso2.andes.server.cassandra;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.andes.AMQStoreException;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.protocol.AMQProtocolSession;
import org.wso2.andes.server.queue.AMQQueue;
import org.wso2.andes.server.queue.QueueEntry;
import org.wso2.andes.server.store.CassandraMessageStore;
import org.wso2.andes.server.store.util.CassandraDataAccessException;
import org.wso2.andes.server.subscription.Subscription;
import org.wso2.andes.server.subscription.SubscriptionImpl;

/* loaded from: input_file:org/wso2/andes/server/cassandra/QueueBrowserFlusher.class */
public class QueueBrowserFlusher {
    private Subscription subscription;
    private AMQQueue queue;
    private AMQProtocolSession session;
    private String id;
    private int defaultMessageCount = Integer.MAX_VALUE;
    private int messageCount = this.defaultMessageCount;
    private int messageBatchSize = ClusterResourceHolder.getInstance().getClusterConfiguration().getMessageBatchSizeForBrowserSubscriptions();
    private static Log log = LogFactory.getLog(QueueBrowserFlusher.class);

    /* loaded from: input_file:org/wso2/andes/server/cassandra/QueueBrowserFlusher$CustomComparator.class */
    public class CustomComparator implements Comparator<CassandraQueueMessage> {
        public CustomComparator() {
        }

        @Override // java.util.Comparator
        public int compare(CassandraQueueMessage cassandraQueueMessage, CassandraQueueMessage cassandraQueueMessage2) {
            return (int) (cassandraQueueMessage.getMessageId() - cassandraQueueMessage2.getMessageId());
        }
    }

    public QueueBrowserFlusher(Subscription subscription, AMQQueue aMQQueue, AMQProtocolSession aMQProtocolSession) {
        this.subscription = subscription;
        this.queue = aMQQueue;
        this.session = aMQProtocolSession;
        this.id = "" + subscription.getSubscriptionID();
    }

    public void send() {
        try {
            try {
                List<QueueEntry> sortedMessages = getSortedMessages();
                if (sortedMessages.size() > 0) {
                    int i = this.messageBatchSize;
                    if (sortedMessages.size() < this.messageBatchSize) {
                        i = sortedMessages.size();
                    }
                    for (int i2 = 0; i2 < i; i2++) {
                        QueueEntry queueEntry = sortedMessages.get(i2);
                        try {
                            if (this.subscription instanceof SubscriptionImpl.BrowserSubscription) {
                                this.subscription.send(queueEntry);
                            }
                        } catch (Exception e) {
                            log.error("Unexpected Error in Message Flusher Task while delivering the message : ", e);
                        }
                    }
                }
                this.subscription.confirmAutoClose();
            } catch (AMQStoreException e2) {
                log.error("Error while sending message for Browser subscription", e2);
                this.subscription.confirmAutoClose();
            } catch (Exception e3) {
                e3.printStackTrace();
                this.subscription.confirmAutoClose();
            }
        } catch (Throwable th) {
            this.subscription.confirmAutoClose();
            throw th;
        }
    }

    private List<QueueEntry> getSortedMessages() throws Exception {
        CassandraMessageStore cassandraMessageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
        List<String> userQueues = cassandraMessageStore.getUserQueues(this.queue.getResourceName());
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = userQueues.iterator();
        while (it.hasNext()) {
            Iterator<CassandraQueueMessage> it2 = cassandraMessageStore.getMessagesFromUserQueue(it.next(), this.queue.getResourceName(), this.messageBatchSize).iterator();
            while (it2.hasNext()) {
                arrayList.add(it2.next());
            }
        }
        Collections.sort(arrayList, new CustomComparator());
        return cassandraMessageStore.getPreparedBrowserMessages(this.queue, this.session, arrayList);
    }

    private void clearBrowserQueue(List<QueueEntry> list) throws CassandraDataAccessException {
        ClusterResourceHolder.getInstance().getCassandraMessageStore().clearBrowserQueue(list, this.queue.getResourceName());
    }
}
