package org.wso2.andes.server.cassandra;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.iapi.services.daemon.DaemonService;
import org.wso2.andes.server.AMQChannel;
import org.wso2.andes.server.ClusterResourceHolder;

/* loaded from: input_file:org/wso2/andes/server/cassandra/OnflightMessageTracker.class */
public class OnflightMessageTracker {
    private LinkedHashMap<Long, MsgData> msgId2MsgData;
    private static Log log = LogFactory.getLog(OnflightMessageTracker.class);
    private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private static OnflightMessageTracker instance = new OnflightMessageTracker();
    private int acktimeout = DaemonService.TIMER_DELAY;
    private Map<String, Long> deliveryTag2MsgID = new HashMap();

    /* loaded from: input_file:org/wso2/andes/server/cassandra/OnflightMessageTracker$MsgData.class */
    public class MsgData {
        final long msgID;
        boolean ackreceived;
        final String queue;
        final long timestamp;
        final String deliveryID;
        final AMQChannel channel;

        public MsgData(long j, boolean z, String str, long j2, String str2, AMQChannel aMQChannel) {
            this.ackreceived = false;
            this.msgID = j;
            this.ackreceived = z;
            this.queue = str;
            this.timestamp = j2;
            this.deliveryID = str2;
            this.channel = aMQChannel;
        }
    }

    public static OnflightMessageTracker getInstance() {
        return instance;
    }

    private OnflightMessageTracker() {
        this.msgId2MsgData = new LinkedHashMap<>();
        this.msgId2MsgData = new LinkedHashMap<Long, MsgData>() { // from class: org.wso2.andes.server.cassandra.OnflightMessageTracker.1
            private static final long serialVersionUID = -8681132571102532817L;

            @Override // java.util.LinkedHashMap
            protected boolean removeEldestEntry(Map.Entry<Long, MsgData> entry) {
                MsgData value = entry.getValue();
                boolean z = System.currentTimeMillis() - value.timestamp > ((long) (OnflightMessageTracker.this.acktimeout * 10));
                if (z) {
                    if (!value.ackreceived) {
                        value.channel.decrementNonAckedMessageCount();
                        OnflightMessageTracker.log.debug("No ack received for delivery tag " + value.deliveryID + " and message id " + value.msgID);
                    }
                    if (OnflightMessageTracker.this.deliveryTag2MsgID.remove(value.deliveryID) == null) {
                        OnflightMessageTracker.log.error("Cannot find delivery tag " + value.deliveryID + " and message id " + value.msgID);
                    }
                }
                return z;
            }
        };
        scheduler.scheduleAtFixedRate(new Runnable() { // from class: org.wso2.andes.server.cassandra.OnflightMessageTracker.2
            @Override // java.lang.Runnable
            public void run() {
                synchronized (this) {
                    long currentTimeMillis = System.currentTimeMillis();
                    Iterator it = OnflightMessageTracker.this.msgId2MsgData.values().iterator();
                    while (it.hasNext()) {
                        MsgData msgData = (MsgData) it.next();
                        if (currentTimeMillis - msgData.timestamp > OnflightMessageTracker.this.acktimeout) {
                            it.remove();
                            OnflightMessageTracker.this.deliveryTag2MsgID.remove(msgData.deliveryID);
                            msgData.channel.decrementNonAckedMessageCount();
                        }
                    }
                }
            }
        }, 5L, 10L, TimeUnit.SECONDS);
    }

    public synchronized boolean testMessage(long j) {
        long currentTimeMillis = System.currentTimeMillis();
        MsgData msgData = this.msgId2MsgData.get(Long.valueOf(j));
        if (msgData != null && (msgData.ackreceived || currentTimeMillis - msgData.timestamp <= this.acktimeout)) {
            return false;
        }
        if (msgData == null) {
            return true;
        }
        this.msgId2MsgData.remove(Long.valueOf(j));
        this.deliveryTag2MsgID.remove(msgData.deliveryID);
        msgData.channel.decrementNonAckedMessageCount();
        return true;
    }

    public void removeMessage(AMQChannel aMQChannel, long j, long j2) {
        String stringBuffer = new StringBuffer(aMQChannel.getId().toString()).append("/").append(j).toString();
        Long remove = this.deliveryTag2MsgID.remove(stringBuffer);
        if (remove != null && remove.longValue() != j2) {
            throw new RuntimeException("Delivery Tag " + stringBuffer + " reused for " + j2 + " and " + remove + " , this should not happen");
        }
        this.msgId2MsgData.remove(Long.valueOf(j2));
    }

    public synchronized boolean testAndAddMessage(AMQChannel aMQChannel, long j, long j2, String str) {
        String stringBuffer = new StringBuffer(aMQChannel.getId().toString()).append("/").append(j).toString();
        long currentTimeMillis = System.currentTimeMillis();
        MsgData msgData = this.msgId2MsgData.get(Long.valueOf(j2));
        if (msgData != null && (msgData.ackreceived || currentTimeMillis - msgData.timestamp <= this.acktimeout)) {
            return false;
        }
        if (this.deliveryTag2MsgID.containsKey(stringBuffer)) {
            throw new RuntimeException("Delivery Tag " + stringBuffer + " reused, this should not happen");
        }
        if (msgData != null) {
            this.deliveryTag2MsgID.remove(msgData.deliveryID);
            this.msgId2MsgData.remove(Long.valueOf(j2));
            msgData.channel.decrementNonAckedMessageCount();
        }
        this.deliveryTag2MsgID.put(stringBuffer, Long.valueOf(j2));
        this.msgId2MsgData.put(new Long(j2), new MsgData(j2, false, str, currentTimeMillis, stringBuffer, aMQChannel));
        return true;
    }

    private void decrementCassandraMsgCount(MsgData msgData) {
        String str = msgData.queue;
        ClusterResourceHolder.getInstance().getCassandraMessageStore().decrementQueueCount(str.substring(0, str.lastIndexOf("_")), 1L);
    }

    public synchronized MsgData ackReceived(AMQChannel aMQChannel, long j) {
        Long l = this.deliveryTag2MsgID.get(new StringBuffer(aMQChannel.getId().toString()).append("/").append(j).toString());
        if (l == null) {
            return null;
        }
        MsgData msgData = this.msgId2MsgData.get(l);
        if (msgData == null) {
            throw new RuntimeException("No message data found for messageid " + l);
        }
        msgData.ackreceived = true;
        aMQChannel.decrementNonAckedMessageCount();
        decrementCassandraMsgCount(msgData);
        return msgData;
    }

    public MsgData getMsgData(long j) {
        return this.msgId2MsgData.get(Long.valueOf(j));
    }
}
