package org.wso2.andes.server.cassandra;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.andes.AMQStoreException;
import org.wso2.andes.exchange.ExchangeDefaults;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.binding.Binding;
import org.wso2.andes.server.exchange.Exchange;
import org.wso2.andes.server.message.AMQMessage;
import org.wso2.andes.server.store.CassandraMessageStore;
import org.wso2.andes.server.util.AndesConstants;
import org.wso2.andes.server.util.AndesUtils;
import org.wso2.andes.server.virtualhost.VirtualHost;

/* loaded from: input_file:org/wso2/andes/server/cassandra/TopicDeliveryWorker.class */
public class TopicDeliveryWorker extends Thread {
    private VirtualHost virtualHost;
    private boolean markedForRemoval;
    private CassandraMessageStore messageStore;
    private boolean isInMemoryMode;
    private SequentialThreadPoolExecutor messagePublishingExecutor;
    private static final ScheduledExecutorService alreadyDeliveredMessageIDsRemovingScheduler = Executors.newSingleThreadScheduledExecutor();
    private static Log log = LogFactory.getLog(TopicDeliveryWorker.class);
    private static final Log traceLog = LogFactory.getLog(AndesConstants.TRACE_LOGGER);
    private long lastDeliveredMessageID = 0;
    private long lastProcessedId = 0;
    private boolean working = false;
    private SortedMap<Long, Long> alreadyReadFromTopicNodeQueueMessagesRemovalTasks = new ConcurrentSkipListMap();
    private ConcurrentHashMap<Long, Long> alreadyReadFromTopicNodeQueueMessages = new ConcurrentHashMap<>();
    private boolean killMe = false;
    private long timeOutPerMessage = 900000000000L;
    private ConcurrentLinkedQueue<AMQMessage> laggards = new ConcurrentLinkedQueue<>();
    private String topicNodeQueueName = AndesUtils.getTopicNodeQueueName();
    private String id = this.topicNodeQueueName;

    /* JADX WARN: Type inference failed for: r0v21, types: [org.wso2.andes.server.cassandra.TopicDeliveryWorker$1] */
    public TopicDeliveryWorker(VirtualHost virtualHost, boolean z) {
        this.messageStore = null;
        this.isInMemoryMode = false;
        this.messagePublishingExecutor = null;
        this.virtualHost = virtualHost;
        this.messageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
        this.isInMemoryMode = z;
        this.messagePublishingExecutor = new SequentialThreadPoolExecutor(ClusterResourceHolder.getInstance().getClusterConfiguration().getPublisherPoolSize(), "TopicMessagePublishingExecutor");
        start();
        setWorking();
        startRemovingAlreadyDeliveredTopicMessageIDS();
        new Thread() { // from class: org.wso2.andes.server.cassandra.TopicDeliveryWorker.1
            /* JADX WARN: Code restructure failed: missing block: B:11:0x003c, code lost:
            
                if (r9 >= r7.this$0.lastProcessedId) goto L56;
             */
            @Override // java.lang.Thread, java.lang.Runnable
            /*
                Code decompiled incorrectly, please refer to instructions dump.
                To view partially-correct add '--show-bad-code' argument
            */
            public void run() {
                /*
                    Method dump skipped, instructions count: 597
                    To view this dump add '--comments-level debug' option
                */
                throw new UnsupportedOperationException("Method not decompiled: org.wso2.andes.server.cassandra.TopicDeliveryWorker.AnonymousClass1.run():void");
            }
        }.start();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!this.killMe) {
            if (!this.working) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            } else if (this.isInMemoryMode) {
                try {
                    try {
                        List<AMQMessage> nextTopicMessageToDeliver = this.messageStore.getNextTopicMessageToDeliver();
                        if (nextTopicMessageToDeliver != null) {
                            ArrayList arrayList = new ArrayList();
                            try {
                                for (AMQMessage aMQMessage : nextTopicMessageToDeliver) {
                                    enqueueMessageToWorkerDestinationQueue(aMQMessage);
                                    arrayList.add(aMQMessage.getMessageNumber());
                                    this.lastDeliveredMessageID = aMQMessage.getMessageNumber().longValue();
                                    if (traceLog.isTraceEnabled()) {
                                        traceLog.trace("TRACING>> TDW - Sending message  " + this.lastDeliveredMessageID + "from cassandra topic publisher");
                                    }
                                }
                            } catch (Exception e2) {
                                log.error("Error on enqueue messages to relevant queue:" + e2.getMessage(), e2);
                            }
                            this.messageStore.removeDeliveredTopicMessageIdsFromIncomingMessagesTable(arrayList);
                        } else {
                            try {
                                Thread.sleep(ClusterResourceHolder.getInstance().getClusterConfiguration().getQueueWorkerInterval());
                            } catch (InterruptedException e3) {
                            }
                        }
                        this.working = false;
                    } catch (Exception e4) {
                        log.error("Error in sending message out in in memory mode ", e4);
                        this.working = false;
                    }
                } catch (Throwable th) {
                    this.working = false;
                    throw th;
                }
            } else {
                try {
                    if (traceLog.isTraceEnabled()) {
                        traceLog.trace("***************************************\r\n\r\n");
                    }
                    this.lastDeliveredMessageID++;
                    List<AMQMessage> subscriberMessages = this.messageStore.getSubscriberMessages(this.topicNodeQueueName, this.lastDeliveredMessageID, Long.MAX_VALUE);
                    if (subscriberMessages != null && subscriberMessages.size() > 0) {
                        if (log.isDebugEnabled()) {
                            log.debug("TRACING >> TDW - read " + subscriberMessages.size() + " messages from store starting from id " + this.lastDeliveredMessageID);
                        }
                        Iterator<AMQMessage> it = subscriberMessages.iterator();
                        while (it.hasNext()) {
                            AMQMessage next = it.next();
                            if (!testAndAddMessage(next.getMessageNumber().longValue())) {
                                it.remove();
                            }
                            addTaskremoveMessageFromAlreadyReadFromTopicNodeQueueMessages(next.getMessageNumber().longValue());
                        }
                    }
                    Iterator<AMQMessage> it2 = this.laggards.iterator();
                    while (it2.hasNext()) {
                        AMQMessage next2 = it2.next();
                        subscriberMessages.add(next2);
                        if (traceLog.isTraceEnabled()) {
                            traceLog.trace("TRACING>> TDW- adding from laggard delivery thread  id=" + next2.getMessageNumber());
                        }
                        it2.remove();
                    }
                    if (subscriberMessages == null || subscriberMessages.size() <= 0) {
                        try {
                            Thread.sleep(ClusterResourceHolder.getInstance().getClusterConfiguration().getQueueWorkerInterval());
                        } catch (InterruptedException e5) {
                        }
                    } else {
                        Collections.sort(subscriberMessages, new Comparator<AMQMessage>() { // from class: org.wso2.andes.server.cassandra.TopicDeliveryWorker.2
                            @Override // java.util.Comparator
                            public int compare(AMQMessage aMQMessage2, AMQMessage aMQMessage3) {
                                return aMQMessage2.getMessageNumber().compareTo(aMQMessage3.getMessageNumber());
                            }
                        });
                        ArrayList arrayList2 = new ArrayList();
                        for (AMQMessage aMQMessage2 : subscriberMessages) {
                            try {
                                enqueueMessage(aMQMessage2);
                                arrayList2.add(aMQMessage2.getMessageNumber());
                                this.lastDeliveredMessageID = aMQMessage2.getMessageNumber().longValue();
                                this.lastProcessedId = aMQMessage2.getMessageNumber().longValue();
                                if (traceLog.isTraceEnabled()) {
                                    traceLog.trace("TRACING >> TDW - Sending message  " + this.lastDeliveredMessageID + " from cassandra topic publisher msgID: " + AndesUtils.getHID(aMQMessage2));
                                }
                            } catch (Exception e6) {
                                log.error("Error on enqueue messages to relevant queue:" + e6.getMessage(), e6);
                            }
                        }
                        this.messageStore.removeDeliveredTopicMessageIds(arrayList2, this.topicNodeQueueName);
                        if (traceLog.isTraceEnabled()) {
                            traceLog.trace("***************************************\r\n\r\n");
                        }
                    }
                    Thread.sleep(100L);
                } catch (InterruptedException e7) {
                } catch (AMQStoreException e8) {
                    log.error("Error removing delivered Message Ids from Message store ", e8);
                }
            }
        }
    }

    private void startRemovingAlreadyDeliveredTopicMessageIDS() {
        alreadyDeliveredMessageIDsRemovingScheduler.scheduleAtFixedRate(new Runnable() { // from class: org.wso2.andes.server.cassandra.TopicDeliveryWorker.3
            @Override // java.lang.Runnable
            public void run() {
                while (!TopicDeliveryWorker.this.alreadyReadFromTopicNodeQueueMessagesRemovalTasks.isEmpty()) {
                    for (Long l : TopicDeliveryWorker.this.alreadyReadFromTopicNodeQueueMessagesRemovalTasks.headMap(Long.valueOf(System.nanoTime() - TopicDeliveryWorker.this.timeOutPerMessage)).keySet()) {
                        long longValue = ((Long) TopicDeliveryWorker.this.alreadyReadFromTopicNodeQueueMessagesRemovalTasks.get(l)).longValue();
                        TopicDeliveryWorker.this.alreadyReadFromTopicNodeQueueMessages.remove(Long.valueOf(longValue));
                        TopicDeliveryWorker.this.alreadyReadFromTopicNodeQueueMessagesRemovalTasks.remove(l);
                        TopicDeliveryWorker.this.messageStore.removeAlreadyMetaDataConsumedMessageIdFromList(l.longValue());
                        if (TopicDeliveryWorker.traceLog.isTraceEnabled()) {
                            TopicDeliveryWorker.traceLog.trace("TRACING>> TDW - removing already delivered message id from list id=" + longValue);
                        }
                    }
                }
            }
        }, 5L, 10L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sleep4waitInterval(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    public void setLastdeliveredMessageID(long j) {
        this.lastDeliveredMessageID = j;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean testAndAddMessage(long j) {
        if (traceLog.isTraceEnabled()) {
            traceLog.trace("TRACING>> TDW - Checking message " + j);
        }
        if (this.alreadyReadFromTopicNodeQueueMessages.get(Long.valueOf(j)) == null) {
            this.alreadyReadFromTopicNodeQueueMessages.put(Long.valueOf(j), Long.valueOf(j));
            this.messageStore.addAlreadyMetaDataConsumerMessageIdToList(j);
            if (!traceLog.isTraceEnabled()) {
                return true;
            }
            traceLog.trace("TRACING>> TDW - testAndAddMessage - allowing to send - " + j);
            return true;
        }
        if (traceLog.isTraceEnabled()) {
            traceLog.trace("TRACING>> TDW - testAndAddMessage - rejecting - " + j);
        }
        try {
            this.messageStore.removeDeliveredTopicMessageId(Long.valueOf(j), this.topicNodeQueueName);
            return false;
        } catch (AMQStoreException e) {
            log.error("Error removing already delivered Message Id " + j + " from Message store ", e);
            return false;
        }
    }

    public void addTaskremoveMessageFromAlreadyReadFromTopicNodeQueueMessages(long j) {
        this.alreadyReadFromTopicNodeQueueMessagesRemovalTasks.put(Long.valueOf(System.nanoTime()), Long.valueOf(j));
    }

    private void enqueueMessage(AMQMessage aMQMessage) {
        Exchange exchange = this.virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME);
        if (exchange == null) {
            if (traceLog.isTraceEnabled()) {
                traceLog.trace("TRACING>> TDW- enqueue message   with messageID-" + aMQMessage.getMessageNumber() + "exchange = null ");
                return;
            }
            return;
        }
        String aMQShortString = aMQMessage.getMessageMetaData().getMessagePublishInfo().getRoutingKey().toString();
        if (traceLog.isTraceEnabled()) {
            traceLog.trace("TRACING>> TDW- enqueue message   with messageID-" + aMQMessage.getMessageNumber() + " binding size  " + exchange.getBindings().size());
        }
        for (Binding binding : exchange.getBindings()) {
            if (isMatching(binding.getBindingKey(), aMQShortString)) {
                aMQMessage.setTopicMessage(true);
                deliverAsynchronously(binding, aMQMessage);
            } else if (traceLog.isTraceEnabled()) {
                traceLog.trace("TRACING>> TDW- enqueue message   with messageID-" + aMQMessage.getMessageNumber() + " not matching  " + binding.getBindingKey() + " to the queue name " + aMQShortString);
            }
        }
    }

    private void enqueueMessageToWorkerDestinationQueue(AMQMessage aMQMessage) {
        Exchange exchange = this.virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME);
        if (exchange != null) {
            String aMQShortString = aMQMessage.getMessageMetaData().getMessagePublishInfo().getRoutingKey().toString();
            for (Binding binding : exchange.getBindings()) {
                if (isMatching(binding.getBindingKey(), aMQShortString)) {
                    aMQMessage.setTopicMessage(true);
                    deliverAsynchronously(binding, aMQMessage);
                }
            }
        }
    }

    public boolean isMatching(String str, String str2) {
        boolean z = false;
        if (str.equals(str2)) {
            z = true;
        } else if (str.indexOf(".#") > 1) {
            z = Pattern.compile(str.substring(0, str.indexOf(".#")) + ".*").matcher(str2).matches();
        } else if (str.indexOf(".*") > 1) {
            z = Pattern.compile("^" + str.substring(0, str.indexOf(".*")) + "[.][^.]+$").matcher(str2).matches();
        }
        return z;
    }

    public boolean isWorking() {
        return this.working;
    }

    public void setWorking() {
        this.working = true;
    }

    public void stopWorking() {
        this.working = false;
    }

    public void setKillMe(boolean z) {
        this.killMe = z;
    }

    public boolean isMarkedForRemoval() {
        return this.markedForRemoval;
    }

    public void setMarkedForRemoval(boolean z) {
        this.markedForRemoval = z;
    }

    public String getQueueId() {
        return this.id;
    }

    private void deliverAsynchronously(final Binding binding, final AMQMessage aMQMessage) {
        this.messagePublishingExecutor.submit(new Runnable() { // from class: org.wso2.andes.server.cassandra.TopicDeliveryWorker.4
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (!binding.getQueue().isDurable()) {
                        binding.getQueue().enqueue(aMQMessage);
                        if (TopicDeliveryWorker.traceLog.isTraceEnabled()) {
                            TopicDeliveryWorker.traceLog.trace("TRACING>> TDW- Sent message " + AndesUtils.getHID(aMQMessage) + " with messageID-" + aMQMessage.getMessageNumber() + "-to subscription-" + binding.getQueue().getName());
                        }
                    }
                } catch (Throwable th) {
                    TopicDeliveryWorker.log.error("Error while delivering message ", th);
                }
            }
        }, Math.abs(binding.getId().hashCode()));
    }
}
