package org.wso2.andes.server.cassandra;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.andes.AMQException;
import org.wso2.andes.AMQStoreException;
import org.wso2.andes.exchange.ExchangeDefaults;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.binding.Binding;
import org.wso2.andes.server.exchange.AbstractExchange;
import org.wso2.andes.server.exchange.Exchange;
import org.wso2.andes.server.message.AMQMessage;
import org.wso2.andes.server.protocol.AMQProtocolSession;
import org.wso2.andes.server.queue.AMQQueue;
import org.wso2.andes.server.queue.SimpleAMQQueue;
import org.wso2.andes.server.store.CassandraMessageStore;
import org.wso2.andes.server.virtualhost.VirtualHost;

/* loaded from: input_file:org/wso2/andes/server/cassandra/CassandraTopicPublisher.class */
public class CassandraTopicPublisher extends Thread {
    private AMQProtocolSession session;
    private Binding binding;
    private SimpleAMQQueue queue;
    private AbstractExchange exchange;
    private VirtualHost virtualHost;
    private boolean markedForRemoval;
    private String id;
    private CassandraMessageStore messageStore;
    private static Log log = LogFactory.getLog(CassandraTopicPublisher.class);
    private long lastDeliveredMessageID = 0;
    private boolean working = false;

    public CassandraTopicPublisher(Binding binding, AMQQueue aMQQueue, Exchange exchange, VirtualHost virtualHost) {
        this.messageStore = null;
        this.binding = binding;
        this.exchange = (AbstractExchange) exchange;
        this.queue = (SimpleAMQQueue) aMQQueue;
        this.virtualHost = virtualHost;
        this.id = aMQQueue.getName();
        this.messageStore = ClusterResourceHolder.getInstance().getCassandraMessageStore();
        this.messageStore.registerSubscriberForTopic(binding.getBindingKey(), aMQQueue.getName());
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            try {
                this.working = true;
                CassandraMessageStore cassandraMessageStore = this.messageStore;
                SimpleAMQQueue simpleAMQQueue = this.queue;
                long j = this.lastDeliveredMessageID;
                this.lastDeliveredMessageID = j + 1;
                List<AMQMessage> subscriberMessages = cassandraMessageStore.getSubscriberMessages(simpleAMQQueue, j);
                if (subscriberMessages == null || subscriberMessages.size() <= 0) {
                    try {
                        Thread.sleep(ClusterResourceHolder.getInstance().getClusterConfiguration().getQueueWorkerInterval());
                    } catch (InterruptedException e) {
                    }
                } else {
                    ArrayList arrayList = new ArrayList();
                    for (AMQMessage aMQMessage : subscriberMessages) {
                        try {
                            enqueueMessage(aMQMessage);
                            arrayList.add(aMQMessage.getMessageNumber());
                            this.lastDeliveredMessageID = aMQMessage.getMessageNumber().longValue();
                            if (log.isDebugEnabled()) {
                                log.debug("Sending message  " + this.lastDeliveredMessageID + "from cassandra topic publisher" + this.queue.getName());
                            }
                        } catch (Exception e2) {
                            log.error("Error on enqueing messages to relavent queue:" + e2.getMessage(), e2);
                            e2.printStackTrace();
                        }
                    }
                    this.messageStore.removeDeliveredMessageIds(arrayList, this.queue.getName());
                }
                this.working = false;
            } catch (AMQStoreException e3) {
                log.error("Error removing delivered Message Ids from Message store ", e3);
                this.working = false;
            }
        } catch (Throwable th) {
            this.working = false;
            throw th;
        }
    }

    private void enqueueMessage(AMQMessage aMQMessage) {
        Exchange exchange = this.virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME);
        if (exchange != null) {
            for (Binding binding : exchange.getBindings()) {
                if (binding.getQueue().getName().equalsIgnoreCase(this.queue.getName())) {
                    try {
                        aMQMessage.setTopicMessage(true);
                        if (log.isDebugEnabled()) {
                            ByteBuffer allocate = ByteBuffer.allocate(100);
                            log.debug("sent (" + aMQMessage.getMessageNumber() + ")" + new String(allocate.array(), 0, aMQMessage.getContent(allocate, 0)) + " to " + this.queue.getName());
                        }
                        binding.getQueue().enqueue(aMQMessage);
                        return;
                    } catch (AMQException e) {
                        log.error("Error in enqueing message to queue", e);
                    }
                }
            }
        }
    }

    public boolean isWorking() {
        return this.working;
    }

    public boolean isMarkedForRemoval() {
        return this.markedForRemoval;
    }

    public void setMarkedForRemoval(boolean z) {
        this.markedForRemoval = z;
    }

    public String getQueueId() {
        return this.id;
    }
}
