package org.wso2.andes.server.cassandra;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.andes.AMQStoreException;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.message.AMQMessage;
import org.wso2.andes.server.protocol.AMQProtocolSession;
import org.wso2.andes.server.queue.AMQQueue;
import org.wso2.andes.server.queue.DLCQueueUtils;
import org.wso2.andes.server.queue.IncomingMessage;
import org.wso2.andes.server.queue.QueueEntry;
import org.wso2.andes.server.queue.SimpleQueueEntryList;
import org.wso2.andes.server.store.CassandraMessageStore;
import org.wso2.andes.server.subscription.Subscription;
import org.wso2.andes.server.subscription.SubscriptionImpl;
import org.wso2.andes.server.util.AndesConstants;
import org.wso2.andes.server.util.AndesUtils;

/* loaded from: input_file:org/wso2/andes/server/cassandra/QueueBrowserDeliveryWorker.class */
public class QueueBrowserDeliveryWorker {
    private Subscription subscription;
    private AMQQueue queue;
    private AMQProtocolSession session;
    private String id;
    private int defaultMessageCount;
    private int messageCount;
    private int messageBatchSize;
    private boolean isInMemoryMode;
    private static Log log = LogFactory.getLog(QueueBrowserDeliveryWorker.class);
    private HashMap<String, Long> lastReadMessageIdMap;

    /* loaded from: input_file:org/wso2/andes/server/cassandra/QueueBrowserDeliveryWorker$CustomComparator.class */
    public class CustomComparator implements Comparator<CassandraQueueMessage> {
        public CustomComparator() {
        }

        @Override // java.util.Comparator
        public int compare(CassandraQueueMessage cassandraQueueMessage, CassandraQueueMessage cassandraQueueMessage2) {
            return (int) (cassandraQueueMessage.getMessageId() - cassandraQueueMessage2.getMessageId());
        }
    }

    /* loaded from: input_file:org/wso2/andes/server/cassandra/QueueBrowserDeliveryWorker$InMemoryMessageComparator.class */
    public class InMemoryMessageComparator implements Comparator<IncomingMessage> {
        public InMemoryMessageComparator() {
        }

        @Override // java.util.Comparator
        public int compare(IncomingMessage incomingMessage, IncomingMessage incomingMessage2) {
            return (int) (incomingMessage.getMessageNumber().longValue() - incomingMessage2.getMessageNumber().longValue());
        }
    }

    public QueueBrowserDeliveryWorker(Subscription subscription, AMQQueue aMQQueue, AMQProtocolSession aMQProtocolSession) {
        this(subscription, aMQQueue, aMQProtocolSession, false);
    }

    public QueueBrowserDeliveryWorker(Subscription subscription, AMQQueue aMQQueue, AMQProtocolSession aMQProtocolSession, boolean z) {
        this.defaultMessageCount = Integer.MAX_VALUE;
        this.isInMemoryMode = false;
        this.lastReadMessageIdMap = new HashMap<>();
        this.subscription = subscription;
        this.queue = aMQQueue;
        this.session = aMQProtocolSession;
        this.id = "" + subscription.getSubscriptionID();
        this.isInMemoryMode = z;
        this.messageCount = this.defaultMessageCount;
        this.messageBatchSize = ClusterResourceHolder.getInstance().getClusterConfiguration().getMessageBatchSizeForBrowserSubscriptions();
    }

    public void send() {
        try {
            if (this.isInMemoryMode) {
                try {
                    sendMessagesToClient(getSortedMessagesForInMemoryQueue(this.queue));
                    this.subscription.confirmAutoClose();
                } catch (AMQStoreException e) {
                    log.error("Error while sending message for Browser subscription", e);
                    this.subscription.confirmAutoClose();
                } catch (Exception e2) {
                    e2.printStackTrace();
                    this.subscription.confirmAutoClose();
                }
                return;
            }
            try {
                try {
                    try {
                        AndesUtils.flushBrowserMessageIDCorrelater();
                        AndesUtils.flushQueueNameCorelater();
                        sendMessagesToClient(getSortedMessages());
                        this.subscription.confirmAutoClose();
                    } catch (Exception e3) {
                        e3.printStackTrace();
                        this.subscription.confirmAutoClose();
                    }
                } catch (AMQStoreException e4) {
                    log.error("Error while sending message for Browser subscription", e4);
                    this.subscription.confirmAutoClose();
                }
            } catch (Throwable th) {
                this.subscription.confirmAutoClose();
                throw th;
            }
        } catch (Throwable th2) {
            this.subscription.confirmAutoClose();
            throw th2;
        }
    }

    private void sendMessagesToClient(List<QueueEntry> list) {
        if (list.size() > 0) {
            int i = this.messageBatchSize;
            if (list.size() < this.messageBatchSize) {
                i = list.size();
            }
            for (int i2 = 0; i2 < i; i2++) {
                QueueEntry queueEntry = list.get(i2);
                try {
                    if (this.subscription instanceof SubscriptionImpl.BrowserSubscription) {
                        this.subscription.send(queueEntry);
                    }
                } catch (Exception e) {
                    log.error("Unexpected Error in Message Flusher Task while delivering the message : ", e);
                }
            }
        }
    }

    private List<QueueEntry> getSortedMessagesForInMemoryQueue(AMQQueue aMQQueue) throws AMQStoreException {
        ArrayList arrayList = new ArrayList();
        Enumeration<IncomingMessage> elements = ClusterResourceHolder.getInstance().getCassandraMessageStore().getIncomingQueueMessageHashtable().elements();
        while (elements.hasMoreElements()) {
            IncomingMessage nextElement = elements.nextElement();
            if (nextElement.getRoutingKey().equals(aMQQueue.getName())) {
                arrayList.add(nextElement);
            }
        }
        Collections.sort(arrayList, new InMemoryMessageComparator());
        return getBrowserMessagesForInMemoryMode(aMQQueue, arrayList);
    }

    private List<QueueEntry> getBrowserMessagesForInMemoryMode(AMQQueue aMQQueue, List<IncomingMessage> list) throws AMQStoreException {
        ArrayList arrayList = new ArrayList();
        SimpleQueueEntryList simpleQueueEntryList = new SimpleQueueEntryList(aMQQueue);
        Iterator<IncomingMessage> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(simpleQueueEntryList.add(new AMQMessage(it.next().getStoredMessage())));
        }
        return arrayList;
    }

    private List<QueueEntry> getSortedMessages() throws Exception {
        CassandraMessageStore cassandraMessageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
        List<CassandraQueueMessage> readMessages = readMessages(new ArrayList(), this.messageBatchSize);
        for (int i = 2; readMessages.size() < this.messageBatchSize && i < 5; i++) {
            readMessages = readMessages(readMessages, this.messageBatchSize * i);
        }
        if (readMessages.size() < this.messageBatchSize) {
            readMessages = readMessagesFromGlobalQueue(readMessages, this.messageBatchSize);
        }
        Collections.sort(readMessages, new CustomComparator());
        return cassandraMessageStore.getPreparedBrowserMessages(this.queue, this.session, readMessages);
    }

    private List<CassandraQueueMessage> readMessages(List<CassandraQueueMessage> list, int i) throws Exception {
        CassandraMessageStore cassandraMessageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
        long j = 0;
        for (String str : cassandraMessageStore.getNodeQueuesForDestinationQueue(this.queue.getResourceName())) {
            if (this.lastReadMessageIdMap.get(str) != null) {
                j = this.lastReadMessageIdMap.get(str).longValue();
            }
            for (CassandraQueueMessage cassandraQueueMessage : cassandraMessageStore.getMessagesFromNodeQueue(str, i, j)) {
                if (cassandraQueueMessage.getDestinationQueueName().equals(this.queue.getResourceName())) {
                    list.add(cassandraQueueMessage);
                } else if (DLCQueueUtils.isDeadLetterQueue(this.queue.getResourceName()) && DLCQueueUtils.identifyTenantInformationAndGenerateDLCString(cassandraQueueMessage.getDestinationQueueName(), AndesConstants.DEAD_LETTER_CHANNEL_QUEUE).equals(this.queue.getResourceName())) {
                    list.add(cassandraQueueMessage);
                }
                this.lastReadMessageIdMap.put(str, Long.valueOf(cassandraQueueMessage.getMessageId()));
            }
        }
        return list;
    }

    private List<CassandraQueueMessage> readMessagesFromGlobalQueue(List<CassandraQueueMessage> list, int i) throws Exception {
        for (CassandraQueueMessage cassandraQueueMessage : ClusterResourceHolder.getInstance().getCassandraMessageStore().getMessagesFromGlobalQueue(AndesUtils.getGlobalQueueNameForDestinationQueue(this.queue.getResourceName()), i)) {
            if (cassandraQueueMessage.getDestinationQueueName().equals(this.queue.getResourceName())) {
                list.add(cassandraQueueMessage);
            }
        }
        return list;
    }
}
