package org.wso2.andes.server.cassandra;

import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.andes.AMQStoreException;
import org.wso2.andes.server.AMQChannel;
import org.wso2.andes.server.cassandra.OnflightMessageTracker;
import org.wso2.andes.server.stats.PerformanceCounter;
import org.wso2.andes.server.store.CassandraMessageStore;

/* loaded from: input_file:org/wso2/andes/server/cassandra/QueueSubscriptionAcknowledgementHandler.class */
public class QueueSubscriptionAcknowledgementHandler {
    private CassandraMessageStore cassandraMessageStore;
    private QueueMessageTagCleanupJob cleanupJob;
    private static Log log = LogFactory.getLog(QueueSubscriptionAcknowledgementHandler.class);
    private Map<Long, QueueMessageTag> deliveryTagMessageMap = new ConcurrentHashMap();
    private Map<Long, QueueMessageTag> sentMessagesMap = new ConcurrentHashMap();
    private SortedMap<Long, Long> timeStampAckedMessageIdMap = new ConcurrentSkipListMap();
    private SortedMap<Long, Long> timeStampMessageIdMap = new ConcurrentSkipListMap();
    private Map<Long, Long> messageDeliveryTimeRecorderMap = new ConcurrentHashMap();
    private long timeOutInMills = 10000;
    private long ackedMessageTimeOut = 3 * this.timeOutInMills;
    private OnflightMessageTracker messageTracker = OnflightMessageTracker.getInstance();

    /* loaded from: input_file:org/wso2/andes/server/cassandra/QueueSubscriptionAcknowledgementHandler$QueueMessageTag.class */
    private class QueueMessageTag {
        private long deliveryTag;
        private long messageId;
        private String queue;

        public QueueMessageTag(String str, long j, long j2) {
            this.queue = str;
            this.deliveryTag = j;
            this.messageId = j2;
        }

        public long getDeliveryTag() {
            return this.deliveryTag;
        }

        public long getMessageId() {
            return this.messageId;
        }

        public String getQueue() {
            return this.queue;
        }
    }

    /* loaded from: input_file:org/wso2/andes/server/cassandra/QueueSubscriptionAcknowledgementHandler$QueueMessageTagCleanupJob.class */
    private class QueueMessageTagCleanupJob implements Runnable {
        private boolean running = true;

        private QueueMessageTagCleanupJob() {
        }

        @Override // java.lang.Runnable
        public void run() {
            QueueMessageTag queueMessageTag;
            long currentTimeMillis = System.currentTimeMillis();
            while (this.running) {
                try {
                    try {
                        synchronized (QueueSubscriptionAcknowledgementHandler.this.cassandraMessageStore) {
                            if (((Long) QueueSubscriptionAcknowledgementHandler.this.timeStampMessageIdMap.firstKey()).longValue() + QueueSubscriptionAcknowledgementHandler.this.timeOutInMills <= currentTimeMillis) {
                                SortedMap headMap = QueueSubscriptionAcknowledgementHandler.this.timeStampMessageIdMap.headMap(Long.valueOf(currentTimeMillis - QueueSubscriptionAcknowledgementHandler.this.timeOutInMills));
                                if (headMap.size() > 0) {
                                    Iterator it = headMap.keySet().iterator();
                                    while (it.hasNext()) {
                                        QueueMessageTag queueMessageTag2 = (QueueMessageTag) QueueSubscriptionAcknowledgementHandler.this.sentMessagesMap.get(Long.valueOf(((Long) headMap.get((Long) it.next())).longValue()));
                                        if (queueMessageTag2 != null) {
                                            long deliveryTag = queueMessageTag2.getDeliveryTag();
                                            if (QueueSubscriptionAcknowledgementHandler.this.deliveryTagMessageMap.containsKey(Long.valueOf(deliveryTag)) && (queueMessageTag = (QueueMessageTag) QueueSubscriptionAcknowledgementHandler.this.deliveryTagMessageMap.get(Long.valueOf(deliveryTag))) != null) {
                                                if (QueueSubscriptionAcknowledgementHandler.this.sentMessagesMap.containsKey(Long.valueOf(queueMessageTag.getMessageId()))) {
                                                    QueueSubscriptionAcknowledgementHandler.this.sentMessagesMap.remove(Long.valueOf(queueMessageTag.getMessageId()));
                                                }
                                                QueueSubscriptionAcknowledgementHandler.this.deliveryTagMessageMap.remove(Long.valueOf(deliveryTag));
                                            }
                                        }
                                    }
                                    Iterator it2 = headMap.keySet().iterator();
                                    while (it2.hasNext()) {
                                        QueueSubscriptionAcknowledgementHandler.this.timeStampMessageIdMap.remove((Long) it2.next());
                                    }
                                }
                                if (((Long) QueueSubscriptionAcknowledgementHandler.this.timeStampAckedMessageIdMap.firstKey()).longValue() + QueueSubscriptionAcknowledgementHandler.this.ackedMessageTimeOut < currentTimeMillis) {
                                    Iterator it3 = QueueSubscriptionAcknowledgementHandler.this.timeStampAckedMessageIdMap.headMap(Long.valueOf(currentTimeMillis - QueueSubscriptionAcknowledgementHandler.this.ackedMessageTimeOut)).keySet().iterator();
                                    while (it3.hasNext()) {
                                        QueueSubscriptionAcknowledgementHandler.this.timeStampAckedMessageIdMap.remove(Long.valueOf(((Long) it3.next()).longValue()));
                                    }
                                }
                            }
                        }
                        try {
                            Thread.sleep(60000L);
                        } catch (InterruptedException e) {
                        }
                    } catch (Exception e2) {
                        QueueSubscriptionAcknowledgementHandler.log.error("Error while running Queue Message Tag Cleanup Task", e2);
                        try {
                            Thread.sleep(60000L);
                        } catch (InterruptedException e3) {
                        }
                    }
                } catch (Throwable th) {
                    try {
                        Thread.sleep(60000L);
                    } catch (InterruptedException e4) {
                    }
                    throw th;
                }
            }
        }

        public void stop() {
        }
    }

    public QueueSubscriptionAcknowledgementHandler(CassandraMessageStore cassandraMessageStore, String str) {
        this.cassandraMessageStore = cassandraMessageStore;
    }

    public boolean checkAndRegisterSent(AMQChannel aMQChannel, long j, long j2, String str) {
        return this.messageTracker.testAndAddMessage(aMQChannel, j, j2, str);
    }

    public void handleAcknowledgement(AMQChannel aMQChannel, long j) {
        try {
            try {
                OnflightMessageTracker.MsgData ackReceived = this.messageTracker.ackReceived(aMQChannel, j);
                if (ackReceived != null) {
                    this.cassandraMessageStore.removeMessageFromUserQueue(ackReceived.queue, ackReceived.msgID);
                    this.cassandraMessageStore.addContentDeletionTask(ackReceived.msgID);
                    log.debug("Ack:" + ackReceived.msgID + " " + j);
                    PerformanceCounter.recordMessageDelivered(ackReceived.queue);
                }
            } catch (AMQStoreException e) {
                log.error("Error while handling the ack for " + j, e);
            }
        } catch (Exception e2) {
            e2.printStackTrace();
        }
    }
}
