package org.wso2.andes.server.cassandra;

import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.impl.sql.compile.SQLParserConstants;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.store.CassandraMessageStore;
import org.wso2.andes.server.util.AndesUtils;

/* loaded from: input_file:org/wso2/andes/server/cassandra/ExpiredCassandraMessageRemover.class */
public class ExpiredCassandraMessageRemover {
    private static Log log = LogFactory.getLog(ExpiredCassandraMessageRemover.class);
    private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private static ExpiredCassandraMessageRemover instance = new ExpiredCassandraMessageRemover();
    private CassandraMessageStore messageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
    private int messageCountToRead = SQLParserConstants.OUTER;

    public static ExpiredCassandraMessageRemover getInstance() {
        return instance;
    }

    private ExpiredCassandraMessageRemover() {
        log.info("Starting periodic expired messages cleaning task");
        scheduler.scheduleAtFixedRate(new Runnable() { // from class: org.wso2.andes.server.cassandra.ExpiredCassandraMessageRemover.1
            @Override // java.lang.Runnable
            public void run() {
                synchronized (this) {
                    try {
                        for (String str : ExpiredCassandraMessageRemover.this.messageStore.getDestinationQueueNames()) {
                            List<CassandraQueueMessage> messagesFromNodeQueue = ExpiredCassandraMessageRemover.this.messageStore.getMessagesFromNodeQueue(AndesUtils.getMyNodeQueueName(), ExpiredCassandraMessageRemover.this.messageCountToRead, 0L);
                            long currentTimeMillis = System.currentTimeMillis();
                            for (CassandraQueueMessage cassandraQueueMessage : messagesFromNodeQueue) {
                                if (cassandraQueueMessage.getAmqMessage().getExpiration() != 0 && currentTimeMillis > cassandraQueueMessage.getAmqMessage().getExpiration()) {
                                    ExpiredCassandraMessageRemover.this.removeNodeQueueMessagesFromCassandraStore(cassandraQueueMessage.getMessageId(), str);
                                    ExpiredCassandraMessageRemover.log.warn("Message is expired. Dropping message: " + cassandraQueueMessage.getMessageId());
                                }
                            }
                        }
                        for (String str2 : ClusterResourceHolder.getInstance().getClusterManager().getGlobalQueueManager().getWorkerRunningGlobalQueueNames()) {
                            Queue<CassandraQueueMessage> messagesFromGlobalQueue = ExpiredCassandraMessageRemover.this.messageStore.getMessagesFromGlobalQueue(str2, ExpiredCassandraMessageRemover.this.messageCountToRead);
                            long currentTimeMillis2 = System.currentTimeMillis();
                            for (CassandraQueueMessage cassandraQueueMessage2 : messagesFromGlobalQueue) {
                                String destinationQueueName = cassandraQueueMessage2.getDestinationQueueName();
                                if (cassandraQueueMessage2.getAmqMessage().getExpiration() != 0 && currentTimeMillis2 > cassandraQueueMessage2.getAmqMessage().getExpiration()) {
                                    ExpiredCassandraMessageRemover.this.removeGlobalQueueMessagesFromCassandraStore(cassandraQueueMessage2.getMessageId(), str2, destinationQueueName);
                                    ExpiredCassandraMessageRemover.log.warn("Message is expired. Dropping message: " + cassandraQueueMessage2.getMessageId());
                                }
                            }
                        }
                    } catch (Exception e) {
                        ExpiredCassandraMessageRemover.log.error("Error in removing expired messages from MessageStore", e);
                    }
                }
            }
        }, 10L, 10L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeNodeQueueMessagesFromCassandraStore(long j, String str) {
        OnflightMessageTracker.getInstance().removeNodeQueueMessageFromStorePermanentlyAndDecrementMsgCount(j, str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeGlobalQueueMessagesFromCassandraStore(long j, String str, String str2) {
        ClusterResourceHolder.getInstance().getCassandraMessageStore().decrementQueueCount(str2, 1L);
        ClusterResourceHolder.getInstance().getCassandraMessageStore().removeMessageFromGlobalQueue(str, j);
        ClusterResourceHolder.getInstance().getCassandraMessageStore().addContentDeletionTask(j);
        ClusterResourceHolder.getInstance().getCassandraMessageStore().addMessageQueueMappingDeletionTask(str2, j);
    }

    public void stopTask() {
        scheduler.shutdownNow();
    }
}
