package org.wso2.andes.server.cassandra;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.iapi.services.daemon.DaemonService;
import org.wso2.andes.AMQException;
import org.wso2.andes.AMQStoreException;
import org.wso2.andes.client.protocol.ProtocolBufferMonitorFilter;
import org.wso2.andes.framing.amqp_0_9.MessageOkBodyImpl;
import org.wso2.andes.server.AMQChannel;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.message.AMQMessage;
import org.wso2.andes.server.queue.DLCQueueUtils;
import org.wso2.andes.server.queue.QueueEntry;
import org.wso2.andes.server.stats.PerformanceCounter;
import org.wso2.andes.server.store.CassandraMessageStore;
import org.wso2.andes.server.util.AndesConstants;
import org.wso2.andes.server.util.AndesUtils;

/* loaded from: input_file:org/wso2/andes/server/cassandra/OnflightMessageTracker.class */
public class OnflightMessageTracker {
    private int acktimeout;
    private int maximumRedeliveryTimes;
    private LinkedHashMap<Long, MsgData> msgId2MsgData;
    private boolean isInMemoryMode;
    private static Log log = LogFactory.getLog(OnflightMessageTracker.class);
    private static final Log traceLog = LogFactory.getLog(AndesConstants.TRACE_LOGGER);
    private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private static final ScheduledExecutorService addedMessagedDeletionScheduler = Executors.newSingleThreadScheduledExecutor();
    private static final ScheduledExecutorService acknowledgedMessageCounterDecrementingScheduler = Executors.newSingleThreadScheduledExecutor();
    private static OnflightMessageTracker instance = new OnflightMessageTracker();
    private ConcurrentHashMap<String, Long> deliveryTag2MsgID = new ConcurrentHashMap<>();
    private ConcurrentHashMap<UUID, HashSet<Long>> channelToMsgIDMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<Long, QueueEntry> messageIdToQueueEntryMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, AtomicInteger> queueNameToAckedCounterMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, Long> queueNameToLastAckReplicatedTimeStampMap = new ConcurrentHashMap<>();
    private HashSet<Long> deliveredButNotAckedMessages = new HashSet<>();
    private AtomicLong sendMessageCount = new AtomicLong();
    private AtomicLong sendButNotAckedMessageCount = new AtomicLong();
    private ConcurrentHashMap<String, ArrayList<QueueEntry>> queueTosentButNotAckedMessageMap = new ConcurrentHashMap<>();
    private long startTime = -1;
    private ConcurrentHashMap<Long, Long> alreadyReadFromNodeQueueMessages = new ConcurrentHashMap<>();

    /* loaded from: input_file:org/wso2/andes/server/cassandra/OnflightMessageTracker$MsgData.class */
    public class MsgData {
        final long msgID;
        boolean ackreceived;
        final String queue;
        final long timestamp;
        final String deliveryID;
        final AMQChannel channel;
        int numOfDeliveries;
        boolean ackWaitTimedOut;
        long ackReceivedTimeStamp = -1;

        public MsgData(long j, boolean z, String str, long j2, String str2, AMQChannel aMQChannel, int i, boolean z2) {
            this.ackreceived = false;
            this.msgID = j;
            this.ackreceived = z;
            this.queue = str;
            this.timestamp = j2;
            this.deliveryID = str2;
            this.channel = aMQChannel;
            this.numOfDeliveries = i;
            this.ackWaitTimedOut = z2;
        }
    }

    public static OnflightMessageTracker getInstance() {
        return instance;
    }

    private OnflightMessageTracker() {
        this.acktimeout = DaemonService.TIMER_DELAY;
        this.maximumRedeliveryTimes = 1;
        this.msgId2MsgData = new LinkedHashMap<>();
        this.isInMemoryMode = false;
        this.acktimeout = ClusterResourceHolder.getInstance().getClusterConfiguration().getMaxAckWaitTime() * 1000;
        this.maximumRedeliveryTimes = ClusterResourceHolder.getInstance().getClusterConfiguration().getNumberOfMaximumDeliveryCount();
        this.msgId2MsgData = new LinkedHashMap<Long, MsgData>() { // from class: org.wso2.andes.server.cassandra.OnflightMessageTracker.1
            private static final long serialVersionUID = -8681132571102532817L;

            @Override // java.util.LinkedHashMap
            protected boolean removeEldestEntry(Map.Entry<Long, MsgData> entry) {
                MsgData value = entry.getValue();
                boolean z = System.currentTimeMillis() - value.timestamp > ((long) (OnflightMessageTracker.this.acktimeout * 10)) && (value.ackreceived || value.ackWaitTimedOut);
                if (z && OnflightMessageTracker.this.deliveryTag2MsgID.remove(value.deliveryID) == null) {
                    OnflightMessageTracker.log.error("Cannot find delivery tag " + value.deliveryID + " and message id " + value.msgID);
                }
                return z;
            }
        };
        scheduler.scheduleAtFixedRate(new Runnable() { // from class: org.wso2.andes.server.cassandra.OnflightMessageTracker.2
            @Override // java.lang.Runnable
            public void run() {
                synchronized (OnflightMessageTracker.this.msgId2MsgData) {
                    OnflightMessageTracker.log.debug("Running the scheduler for cleaning msgId2MsgData...");
                    int i = 0;
                    Iterator it = OnflightMessageTracker.this.msgId2MsgData.values().iterator();
                    while (it.hasNext()) {
                        MsgData msgData = (MsgData) it.next();
                        if ((msgData.ackreceived && System.currentTimeMillis() - msgData.ackReceivedTimeStamp > 180000) || msgData.numOfDeliveries > OnflightMessageTracker.this.maximumRedeliveryTimes) {
                            it.remove();
                            i++;
                            OnflightMessageTracker.this.deliveryTag2MsgID.remove(msgData.deliveryID);
                            if (msgData.numOfDeliveries > OnflightMessageTracker.this.maximumRedeliveryTimes) {
                                OnflightMessageTracker.log.warn("Message " + msgData.msgID + " with " + msgData.deliveryID.substring(msgData.deliveryID.lastIndexOf("/") + 1) + " removed as it has gone though max redeliveries");
                            }
                        }
                    }
                    OnflightMessageTracker.log.debug("Cleared message data from msgId2MsgData for " + i + " entries..");
                }
            }
        }, 5L, 10L, TimeUnit.SECONDS);
        addedMessagedDeletionScheduler.scheduleAtFixedRate(new Runnable() { // from class: org.wso2.andes.server.cassandra.OnflightMessageTracker.3
            @Override // java.lang.Runnable
            public void run() {
                synchronized (this) {
                    Iterator it = OnflightMessageTracker.this.alreadyReadFromNodeQueueMessages.entrySet().iterator();
                    while (it.hasNext()) {
                        Map.Entry entry = (Map.Entry) it.next();
                        long longValue = ((Long) entry.getValue()).longValue();
                        if (longValue > 0 && System.currentTimeMillis() - longValue > 3600000) {
                            it.remove();
                            if (OnflightMessageTracker.traceLog.isTraceEnabled()) {
                                OnflightMessageTracker.traceLog.trace("TRACING>> OFMT-Removed Message Id-" + entry.getKey() + "-from alreadyReadFromNodeQueueMessages");
                            }
                        }
                    }
                }
            }
        }, 5L, 10L, TimeUnit.SECONDS);
        acknowledgedMessageCounterDecrementingScheduler.scheduleAtFixedRate(new Runnable() { // from class: org.wso2.andes.server.cassandra.OnflightMessageTracker.4
            @Override // java.lang.Runnable
            public void run() {
                synchronized (this) {
                    try {
                        Enumeration keys = OnflightMessageTracker.this.queueNameToAckedCounterMap.keys();
                        while (keys.hasMoreElements()) {
                            String str = (String) keys.nextElement();
                            if (OnflightMessageTracker.this.queueNameToLastAckReplicatedTimeStampMap.get(str) == null || System.currentTimeMillis() - ((Long) OnflightMessageTracker.this.queueNameToLastAckReplicatedTimeStampMap.get(str)).longValue() > ProtocolBufferMonitorFilter.DEFAULT_FREQUENCY) {
                                int i = ((AtomicInteger) OnflightMessageTracker.this.queueNameToAckedCounterMap.get(str)).get();
                                ClusterResourceHolder.getInstance().getCassandraMessageStore().decrementQueueCount(str, i);
                                ((AtomicInteger) OnflightMessageTracker.this.queueNameToAckedCounterMap.get(str)).set(((AtomicInteger) OnflightMessageTracker.this.queueNameToAckedCounterMap.get(str)).get() - i);
                                OnflightMessageTracker.this.queueNameToLastAckReplicatedTimeStampMap.put(str, Long.valueOf(System.currentTimeMillis()));
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }, 5L, 10L, TimeUnit.SECONDS);
        this.isInMemoryMode = ClusterResourceHolder.getInstance().getClusterConfiguration().isInMemoryMode().booleanValue();
    }

    public void stampMessageAsAckTimedOut(long j, UUID uuid) {
        System.currentTimeMillis();
        String stringBuffer = new StringBuffer(uuid.toString()).append("/").append(j).toString();
        Long l = this.deliveryTag2MsgID.get(stringBuffer);
        if (l == null) {
            if (log.isDebugEnabled()) {
                log.debug("Error - Though rejected , Unable to find Message with deliveryID = " + stringBuffer + " rejected from client side ");
            }
            if (traceLog.isTraceEnabled()) {
                traceLog.trace("Error - Though rejected , Unable to find Message with deliveryID = " + stringBuffer + " rejected from client side ");
                return;
            }
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("============Message with deliveryID = " + stringBuffer + " and channel=" + uuid + " rejected from client side ");
        }
        if (traceLog.isTraceEnabled()) {
            traceLog.trace("============Message with deliveryID = " + stringBuffer + " MessageID " + l + " and channel= " + uuid + " rejected from client side ");
        }
        synchronized (this.msgId2MsgData) {
            if (this.msgId2MsgData.containsKey(l)) {
                this.msgId2MsgData.get(l).ackWaitTimedOut = true;
            }
        }
        deleteFromAlreadyReadFromNodeQueueMessagesInstantly(l.longValue());
    }

    public boolean testMessage(long j) {
        synchronized (this.msgId2MsgData) {
            System.currentTimeMillis();
            MsgData msgData = this.msgId2MsgData.get(Long.valueOf(j));
            if (msgData != null && (msgData.ackreceived || !msgData.ackWaitTimedOut)) {
                return false;
            }
            if (msgData != null) {
                msgData.channel.decrementNonAckedMessageCount();
            }
            return true;
        }
    }

    public boolean testForAlreadyDeliveredMessage(long j) {
        synchronized (this.msgId2MsgData) {
            MsgData msgData = this.msgId2MsgData.get(Long.valueOf(j));
            return msgData != null && (msgData.ackreceived || !msgData.ackWaitTimedOut);
        }
    }

    public boolean testForAlreadyDeliveredAndAckReceivedMessages(long j) {
        synchronized (this.msgId2MsgData) {
            MsgData msgData = this.msgId2MsgData.get(Long.valueOf(j));
            return msgData != null && msgData.ackreceived;
        }
    }

    public boolean checkAlreadyReadFromNodeQueue(long j) {
        synchronized (this) {
            if (this.alreadyReadFromNodeQueueMessages.get(Long.valueOf(j)) == null) {
                if (traceLog.isTraceEnabled()) {
                    traceLog.trace("TRACING>> OFMT - checkAlreadyReadFromNodeQueue - There is no item with messageID -" + j);
                }
                return false;
            }
            if (traceLog.isTraceEnabled()) {
                traceLog.trace("TRACING>> OFMT - checkAlreadyReadFromNodeQueue - There exists an item with messageID -" + j);
            }
            return true;
        }
    }

    public void addReadMessageFromNodeQueueToSet(long j) {
        synchronized (this) {
            this.alreadyReadFromNodeQueueMessages.put(Long.valueOf(j), 0L);
        }
    }

    public void scheduleToDeleteMessageFromReadMessageFromNodeQueueMap(long j) {
        synchronized (this) {
            this.alreadyReadFromNodeQueueMessages.put(Long.valueOf(j), Long.valueOf(System.currentTimeMillis()));
        }
    }

    public void deleteFromAlreadyReadFromNodeQueueMessagesInstantly(long j) {
        this.alreadyReadFromNodeQueueMessages.remove(Long.valueOf(j));
    }

    public void removeMessage(AMQChannel aMQChannel, long j, long j2) {
        synchronized (this.msgId2MsgData) {
            String stringBuffer = new StringBuffer(aMQChannel.getId().toString()).append("/").append(j).toString();
            Long remove = this.deliveryTag2MsgID.remove(stringBuffer);
            if (remove != null && remove.longValue() != j2) {
                throw new RuntimeException("Delivery Tag " + stringBuffer + " reused for " + j2 + " and " + remove + " , this should not happen");
            }
            this.msgId2MsgData.remove(Long.valueOf(j2));
            log.info("OFMT-Unexpected remove for messageID- " + j2);
        }
    }

    public boolean testAndAddMessage(QueueEntry queueEntry, long j, AMQChannel aMQChannel) throws AMQException {
        int i;
        long longValue = queueEntry.getMessage().getMessageNumber().longValue();
        String aMQShortString = ((AMQMessage) queueEntry.getMessage()).getMessageMetaData().getMessagePublishInfo().getRoutingKey().toString();
        String str = aMQShortString + "_" + ClusterResourceHolder.getInstance().getClusterManager().getNodeId();
        String stringBuffer = new StringBuffer(aMQChannel.getId().toString()).append("/").append(j).toString();
        synchronized (this.msgId2MsgData) {
            long currentTimeMillis = System.currentTimeMillis();
            MsgData msgData = this.msgId2MsgData.get(Long.valueOf(longValue));
            int i2 = 0;
            if (this.deliveryTag2MsgID.containsKey(stringBuffer)) {
                throw new RuntimeException("Delivery Tag " + stringBuffer + " reused, this should not happen");
            }
            if (msgData == null) {
                this.deliveredButNotAckedMessages.add(Long.valueOf(longValue));
                if (traceLog.isTraceEnabled()) {
                    traceLog.trace("TRACING>> OFMT-testAndAdd-scheduling new message to deliver with MessageID-" + longValue);
                }
            } else {
                i2 = msgData.numOfDeliveries;
                queueEntry.setRedelivered();
                this.deliveryTag2MsgID.remove(msgData.deliveryID);
                this.msgId2MsgData.remove(Long.valueOf(longValue));
                if (traceLog.isTraceEnabled()) {
                    traceLog.trace("TRACING>> OFMT- testAndAdd-scheduling ack expired message or rejected message to deliver with MessageID-" + longValue + "number of deliveries: " + i2);
                }
            }
            i = i2 + 1;
            this.deliveryTag2MsgID.put(stringBuffer, Long.valueOf(longValue));
            this.msgId2MsgData.put(Long.valueOf(longValue), new MsgData(longValue, false, str, currentTimeMillis, stringBuffer, aMQChannel, i, false));
        }
        this.sendButNotAckedMessageCount.incrementAndGet();
        HashSet<Long> hashSet = this.channelToMsgIDMap.get(aMQChannel.getId());
        if (hashSet == null) {
            HashSet<Long> hashSet2 = new HashSet<>();
            hashSet2.add(Long.valueOf(longValue));
            this.channelToMsgIDMap.put(aMQChannel.getId(), hashSet2);
        } else {
            hashSet.add(Long.valueOf(longValue));
        }
        this.messageIdToQueueEntryMap.put(Long.valueOf(longValue), queueEntry);
        if (i > ClusterResourceHolder.getInstance().getClusterConfiguration().getNumberOfMaximumDeliveryCount()) {
            log.warn("Number of Maximum Redelivery Tries Has Breached. Dropping The Message: " + longValue + "From Queue " + aMQShortString);
            return false;
        }
        if (!queueEntry.expired()) {
            return true;
        }
        log.warn("Message is expired. Dropping The Message: " + longValue);
        return false;
    }

    public void ackReceived(AMQChannel aMQChannel, long j) throws AMQStoreException {
        Long l = this.deliveryTag2MsgID.get(new StringBuffer(aMQChannel.getId().toString()).append("/").append(j).toString());
        if (l != null) {
            synchronized (this.msgId2MsgData) {
                MsgData msgData = this.msgId2MsgData.get(l);
                if (msgData == null) {
                    throw new RuntimeException("No message data found for messageId " + l);
                }
                msgData.ackreceived = true;
                msgData.ackReceivedTimeStamp = System.currentTimeMillis();
                aMQChannel.decrementNonAckedMessageCount();
                handleMessageRemovalWhenAcked(msgData);
                if (traceLog.isTraceEnabled()) {
                    traceLog.trace("TRACING>> OFMT-Ack received for MessageID-" + msgData.msgID + "-With Delivery Tag-" + j);
                }
                PerformanceCounter.recordMessageDelivered(msgData.queue);
                this.sendButNotAckedMessageCount.decrementAndGet();
                this.channelToMsgIDMap.get(aMQChannel.getId()).remove(l);
                this.messageIdToQueueEntryMap.remove(l);
            }
        }
    }

    public void releaseAckTrackingSinceChannelClosed(AMQChannel aMQChannel) {
        Iterator<Long> it;
        HashSet<Long> hashSet = this.channelToMsgIDMap.get(aMQChannel.getId());
        if (hashSet != null && hashSet.size() > 0 && (it = hashSet.iterator()) != null) {
            while (it.hasNext()) {
                long longValue = it.next().longValue();
                synchronized (this.msgId2MsgData) {
                    if (this.msgId2MsgData.get(Long.valueOf(longValue)) != null && !this.msgId2MsgData.get(Long.valueOf(longValue)).ackreceived) {
                        String str = this.msgId2MsgData.remove(Long.valueOf(longValue)).queue;
                        String substring = str.substring(0, str.lastIndexOf("_"));
                        this.sendButNotAckedMessageCount.decrementAndGet();
                        QueueEntry remove = this.messageIdToQueueEntryMap.remove(Long.valueOf(longValue));
                        deleteFromAlreadyReadFromNodeQueueMessagesInstantly(longValue);
                        ArrayList<QueueEntry> arrayList = this.queueTosentButNotAckedMessageMap.get(substring);
                        if (arrayList == null) {
                            ArrayList<QueueEntry> arrayList2 = new ArrayList<>();
                            arrayList2.add(remove);
                            this.queueTosentButNotAckedMessageMap.put(substring, arrayList2);
                            if (traceLog.isTraceEnabled()) {
                                traceLog.trace("TRACING>> OFMT- Added message-" + longValue + "-to delivered but not acked list");
                            }
                        } else {
                            arrayList.add(remove);
                        }
                    }
                }
            }
        }
        this.channelToMsgIDMap.remove(aMQChannel.getId());
    }

    public void removeAckedMessagesFromMemory() {
        if (this.queueNameToAckedCounterMap != null) {
            for (Map.Entry<String, AtomicInteger> entry : this.queueNameToAckedCounterMap.entrySet()) {
                ClusterResourceHolder.getInstance().getCassandraMessageStore().decrementQueueCount(entry.getKey(), entry.getValue().longValue());
            }
        }
    }

    private void handleMessageRemovalWhenAcked(MsgData msgData) throws AMQStoreException {
        if (this.deliveredButNotAckedMessages.contains(Long.valueOf(msgData.msgID))) {
            String substring = msgData.queue.substring(0, msgData.queue.lastIndexOf("_"));
            CassandraMessageStore cassandraMessageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
            if (this.isInMemoryMode) {
                cassandraMessageStore.removeIncomingQueueMessage(msgData.msgID);
            } else {
                cassandraMessageStore.removeMessageFromNodeQueue(AndesUtils.getMyNodeQueueName(), msgData.msgID);
                scheduleToDeleteMessageFromReadMessageFromNodeQueueMap(msgData.msgID);
                cassandraMessageStore.addContentDeletionTask(msgData.msgID);
                cassandraMessageStore.addMessageQueueMappingDeletionTask(substring, msgData.msgID);
            }
            if (this.queueNameToAckedCounterMap.get(substring) == null) {
                this.queueNameToAckedCounterMap.put(substring, new AtomicInteger(0));
            }
            if (this.queueNameToAckedCounterMap.get(substring).incrementAndGet() >= 500) {
                ClusterResourceHolder.getInstance().getCassandraMessageStore().decrementQueueCount(substring, 500L);
                this.queueNameToAckedCounterMap.get(substring).set(this.queueNameToAckedCounterMap.get(substring).get() - MessageOkBodyImpl.METHOD_ID);
                this.queueNameToLastAckReplicatedTimeStampMap.put(substring, Long.valueOf(System.currentTimeMillis()));
            }
            this.deliveredButNotAckedMessages.remove(Long.valueOf(msgData.msgID));
        }
    }

    private void addMessageToDeadLetterQueue(long j, CassandraQueueMessage cassandraQueueMessage) {
        CassandraMessageStore cassandraMessageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
        String identifyTenantInformationAndGenerateDLCString = DLCQueueUtils.identifyTenantInformationAndGenerateDLCString(cassandraQueueMessage.getDestinationQueueName(), AndesConstants.DEAD_LETTER_CHANNEL_QUEUE);
        byte[] message = cassandraQueueMessage.getMessage();
        try {
            if (cassandraMessageStore == null || message == null) {
                log.error("The Message Store is Not Properly Initialised, Error While Adding the Content to Dead Letter Queue");
            } else {
                cassandraMessageStore.addMessageToNodeQueue(identifyTenantInformationAndGenerateDLCString, j, message);
                cassandraMessageStore.incrementQueueCount(identifyTenantInformationAndGenerateDLCString, 1L);
                log.info("Message :" + j + " Added to the Dead Letter Queue");
            }
        } catch (Exception e) {
            log.error("Error While Adding Content to Dead Letter Queue", e);
        }
    }

    public void removeNodeQueueMessageFromStorePermanentlyAndDecrementMsgCount(long j, String str) {
        try {
            CassandraMessageStore cassandraMessageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
            String myNodeQueueName = AndesUtils.getMyNodeQueueName();
            CassandraQueueMessage messageFromNodeQueue = cassandraMessageStore.getMessageFromNodeQueue(myNodeQueueName, j - 1, 1);
            cassandraMessageStore.removeMessageFromNodeQueue(myNodeQueueName, j);
            if (log.isDebugEnabled()) {
                log.debug("Removed message " + j + "from" + myNodeQueueName + " when removeNodeQueueMessageFromStorePermanentlyAndDecrementMsgCount");
            }
            cassandraMessageStore.addMessageQueueMappingDeletionTask(str, j);
            cassandraMessageStore.decrementQueueCount(str, 1L);
            if (messageFromNodeQueue != null) {
                addMessageToDeadLetterQueue(j, messageFromNodeQueue);
                deleteFromAlreadyReadFromNodeQueueMessagesInstantly(j);
            } else {
                log.warn("Cannot Find Meta Information of The Message, Content Cannot be Added To Dead Letter Queue");
            }
            if (this.msgId2MsgData.get(Long.valueOf(j)) != null) {
                this.deliveredButNotAckedMessages.remove(Long.valueOf(j));
            }
        } catch (AMQStoreException e) {
            log.error("Error In Removing Message From Node Queue. ID: " + j);
        }
    }

    public long getSentButNotAckedMessageCount() {
        return this.sendButNotAckedMessageCount.get();
    }

    public ArrayList<QueueEntry> getSentButNotAckedMessagesOfQueue(String str) {
        return this.queueTosentButNotAckedMessageMap.remove(str);
    }

    public void checkAndRemoveAlreadySentAndAckedMessagesFromStore(List<Long> list) {
        CassandraMessageStore cassandraMessageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
        Iterator<Long> it = list.iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            MsgData msgData = this.msgId2MsgData.get(Long.valueOf(longValue));
            if (msgData != null && msgData.ackreceived) {
                String myNodeQueueName = AndesUtils.getMyNodeQueueName();
                try {
                    cassandraMessageStore.removeMessageFromNodeQueue(myNodeQueueName, longValue);
                    if (traceLog.isTraceEnabled()) {
                        traceLog.trace("Removed message " + longValue + " from NodeQueue " + myNodeQueueName + " since it is already acked ");
                    }
                } catch (AMQStoreException e) {
                    log.error("Error in removing already acked message " + longValue + "  from node queue " + myNodeQueueName, e);
                }
            }
        }
    }
}
