package org.wso2.andes.server.store;

import com.google.common.base.Splitter;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.SortedMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
import me.prettyprint.cassandra.serializers.BytesArraySerializer;
import me.prettyprint.cassandra.serializers.IntegerSerializer;
import me.prettyprint.cassandra.serializers.LongSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.OrderedRows;
import me.prettyprint.hector.api.beans.Row;
import me.prettyprint.hector.api.exceptions.HectorException;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.ColumnQuery;
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.RangeSlicesQuery;
import me.prettyprint.hector.api.query.SliceQuery;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.iapi.services.daemon.DaemonService;
import org.apache.derby.iapi.store.raw.RawStoreFactory;
import org.wso2.andes.AMQException;
import org.wso2.andes.AMQStoreException;
import org.wso2.andes.client.AMQSession;
import org.wso2.andes.framing.AMQShortString;
import org.wso2.andes.framing.FieldTable;
import org.wso2.andes.framing.amqp_0_9.MessageOkBodyImpl;
import org.wso2.andes.jms.BrokerDetails;
import org.wso2.andes.pool.AndesExecuter;
import org.wso2.andes.server.ClusterResourceHolder;
import org.wso2.andes.server.cassandra.AndesConsistantLevelPolicy;
import org.wso2.andes.server.cassandra.CassandraMessageContentCache;
import org.wso2.andes.server.cassandra.CassandraQueueMessage;
import org.wso2.andes.server.cassandra.CassandraTopicPublisherManager;
import org.wso2.andes.server.cassandra.ClusteringEnabledSubscriptionManager;
import org.wso2.andes.server.cassandra.DefaultClusteringEnabledSubscriptionManager;
import org.wso2.andes.server.cassandra.OnceInOrderEnabledSubscriptionManager;
import org.wso2.andes.server.cluster.ClusterManagementInformationMBean;
import org.wso2.andes.server.cluster.ClusterManager;
import org.wso2.andes.server.cluster.GlobalQueueManager;
import org.wso2.andes.server.cluster.coordination.MessageIdGenerator;
import org.wso2.andes.server.cluster.coordination.SubscriptionCoordinationManagerImpl;
import org.wso2.andes.server.cluster.coordination.TimeStampBasedMessageIdGenerator;
import org.wso2.andes.server.cluster.coordination.TopicSubscriptionCoordinationManager;
import org.wso2.andes.server.configuration.ClusterConfiguration;
import org.wso2.andes.server.exchange.Exchange;
import org.wso2.andes.server.information.management.QueueManagementInformationMBean;
import org.wso2.andes.server.logging.LogSubject;
import org.wso2.andes.server.message.AMQMessage;
import org.wso2.andes.server.message.MessageMetaData;
import org.wso2.andes.server.protocol.AMQProtocolSession;
import org.wso2.andes.server.queue.AMQQueue;
import org.wso2.andes.server.queue.BaseQueue;
import org.wso2.andes.server.queue.IncomingMessage;
import org.wso2.andes.server.queue.QueueEntry;
import org.wso2.andes.server.queue.SimpleQueueEntryList;
import org.wso2.andes.server.stats.PerformanceCounter;
import org.wso2.andes.server.store.ConfigurationRecoveryHandler;
import org.wso2.andes.server.store.TransactionLog;
import org.wso2.andes.server.store.util.CassandraDataAccessException;
import org.wso2.andes.server.store.util.CassandraDataAccessHelper;
import org.wso2.andes.server.util.AndesUtils;
import org.wso2.andes.server.virtualhost.VirtualHostConfigSynchronizer;
import org.wso2.andes.tools.utils.DataCollector;

/* loaded from: input_file:org/wso2/andes/server/store/CassandraMessageStore.class */
public class CassandraMessageStore implements MessageStore {
    private Cluster cluster;
    private Keyspace keyspace;
    public static final String KEYSPACE = "QpidKeySpace";
    private static final String LONG_TYPE = "LongType";
    private static final String UTF8_TYPE = "UTF8Type";
    private static final String INTEGER_TYPE = "IntegerType";
    private static final String QUEUE_COLUMN_FAMILY = "Queue";
    private static final String QUEUE_DETAILS_COLUMN_FAMILY = "QueueDetails";
    private static final String QUEUE_DETAILS_ROW = "QUEUE_DETAILS";
    private static final String QUEUE_ENTRY_COLUMN_FAMILY = "QueueEntries";
    private static final String QUEUE_ENTRY_ROW = "QueueEntriesRow";
    private static final String EXCHANGE_COLUMN_FAMILY = "ExchangeColumnFamily";
    private static final String EXCHANGE_ROW = "ExchangesRow";
    private static final String BINDING_COLUMN_FAMILY = "Binding";
    private static final String MESSAGE_CONTENT_COLUMN_FAMILY = "MessageContent";
    private static final String MESSAGE_CONTENT_ID_COLUMN_FAMILY = "MessageContentIDs";
    private static final String MESSAGE_QUEUE_MAPPING_COLUMN_FAMILY = "MessageQueueMappingColumnFamily";
    private static final String MESSAGE_QUEUE_MAPPING_ROW = "MessageQueueMappingRow";
    private static final String SQ_COLUMN_FAMILY = "SubscriptionQueues";
    private static final String GLOBAL_QUEUE_TO_USER_QUEUE_COLUMN_FAMILY = "QpidQueues";
    private static final String USER_QUEUES_COLUMN_FAMILY = "UserQueues";
    private static final String GLOBAL_QUEUES_COLUMN_FAMILY = "GlobalQueue";
    private static final String GLOBAL_QUEUE_LIST_COLUMN_FAMILY = "GlobalQueueList";
    private static final String GLOBAL_QUEUE_LIST_ROW = "GlobalQueueListRow";
    private static final String QMD_COLUMN_FAMILY = "MetaData";
    private static final String QMD_ROW_NAME = "qpidMetaData";
    private static final String MSG_CONTENT_IDS_ROW = "messageContentIds";
    private static final String TOPIC_EXCHANGE_MESSAGE_IDS = "TopicExchangeMessageIds";
    private static final String PUB_SUB_MESSAGE_IDS = "pubSubMessages";
    private static final String TOPIC_SUBSCRIBERS = "topicSubscribers";
    private static final String TOPIC_SUBSCRIBER_QUEUES = "topicSubscriberQueues";
    private static final String TOPICS_COLUMN_FAMILY = "topics";
    private static final String TOPICS_ROW = "TOPICS";
    private static final String ACKED_MESSAGE_IDS_COLUMN_FAMILY = "acknowledgedMessageIds";
    private static final String ACKED_MESSAGE_IDS_ROW = "acknowledgedMessageIdsRow";
    private static final String NODE_DETAIL_COLUMN_FAMILY = "CusterNodeDetails";
    private static final String NODE_DETAIL_ROW = "NodeDetailsRow";
    private static final String MESSAGE_COUNTERS_COLUMN_FAMILY = "MessageCountDetails";
    private static final String MESSAGE_COUNTERS_RAW_NAME = "QueueMessageCountRow";
    private ConcurrentHashMap<Long, Long> pubSubMessageContentDeletionTasks;
    private ClusterManagementInformationMBean clusterManagementMBean;
    private QueueManagementInformationMBean queueManagementMBean;
    private PublishMessageWriter publishMessageWriter;
    private PublishMessageContentWriter publishMessageContentWriter;
    private static StringSerializer stringSerializer = StringSerializer.get();
    private static LongSerializer longSerializer = LongSerializer.get();
    private static BytesArraySerializer bytesArraySerializer = BytesArraySerializer.get();
    private static IntegerSerializer integerSerializer = IntegerSerializer.get();
    private static ByteBufferSerializer byteBufferSerializer = ByteBufferSerializer.get();
    private static Log log = LogFactory.getLog(CassandraMessageStore.class);
    static final Splitter pipeSplitter = Splitter.on('|');
    private final String USERNAME_KEY = BrokerDetails.USERNAME;
    private final String PASSWORD_KEY = BrokerDetails.PASSWORD;
    private final String CONNECTION_STRING = "connectionString";
    private final String CLUSTER_KEY = "cluster";
    private final String ID_GENENRATOR = "idGenerator";
    private final AtomicLong _messageId = new AtomicLong(0);
    private MessageIdGenerator messageIdGenerator = null;
    private SortedMap<Long, Long> contentDeletionTasks = new ConcurrentSkipListMap();
    private ConcurrentHashMap<String, ArrayList<String>> topicSubscribersMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, ArrayList<String>> topicNodeQueuesMap = new ConcurrentHashMap<>();
    private CassandraMessageContentCache messageCacheForCassandra = null;
    private ContentRemoverTask messageContentRemovalTask = null;
    private PubSubMessageContentRemoverTask pubSubMessageContentRemoverTask = null;
    private boolean configured = false;
    private boolean isCassandraConnectionLive = false;

    /* loaded from: input_file:org/wso2/andes/server/store/CassandraMessageStore$CassandraTransaction.class */
    private class CassandraTransaction implements TransactionLog.Transaction {
        private CassandraTransaction() {
        }

        @Override // org.wso2.andes.server.store.TransactionLog.Transaction
        public void enqueueMessage(final TransactionLogResource transactionLogResource, final Long l) throws AMQStoreException {
            try {
                AndesExecuter.submit(new Runnable() { // from class: org.wso2.andes.server.store.CassandraMessageStore.CassandraTransaction.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            Mutator createMutator = HFactory.createMutator(CassandraMessageStore.this.keyspace, CassandraMessageStore.stringSerializer);
                            createMutator.addInsertion(CassandraMessageStore.QUEUE_ENTRY_ROW, CassandraMessageStore.QUEUE_ENTRY_COLUMN_FAMILY, HFactory.createColumn(transactionLogResource.getResourceName(), l, CassandraMessageStore.stringSerializer, LongSerializer.get()));
                            createMutator.execute();
                        } catch (Throwable th) {
                            CassandraMessageStore.log.error("Error adding Queue Entry ", th);
                        }
                    }
                }, null);
            } catch (Throwable th) {
                CassandraMessageStore.log.error("Error adding Queue Entry ", th);
                throw new AMQStoreException("Error adding Queue Entry " + transactionLogResource.getResourceName(), th);
            }
        }

        @Override // org.wso2.andes.server.store.TransactionLog.Transaction
        public void dequeueMessage(final TransactionLogResource transactionLogResource, Long l) throws AMQStoreException {
            try {
                AndesExecuter.submit(new Runnable() { // from class: org.wso2.andes.server.store.CassandraMessageStore.CassandraTransaction.2
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            CassandraDataAccessHelper.deleteStringColumnFromRaw(CassandraMessageStore.QUEUE_ENTRY_COLUMN_FAMILY, CassandraMessageStore.QUEUE_DETAILS_ROW, transactionLogResource.getResourceName(), CassandraMessageStore.this.keyspace);
                        } catch (Throwable th) {
                            CassandraMessageStore.log.error("Error deleting Queue Entry", th);
                        }
                    }
                }, null);
            } catch (Throwable th) {
                CassandraMessageStore.log.error("Error deleting Queue Entry", th);
                throw new AMQStoreException("Error deleting Queue Entry :" + transactionLogResource.getResourceName(), th);
            }
        }

        @Override // org.wso2.andes.server.store.TransactionLog.Transaction
        public void commitTran() throws AMQStoreException {
        }

        @Override // org.wso2.andes.server.store.TransactionLog.Transaction
        public TransactionLog.StoreFuture commitTranAsync() throws AMQStoreException {
            return new TransactionLog.StoreFuture() { // from class: org.wso2.andes.server.store.CassandraMessageStore.CassandraTransaction.3
                @Override // org.wso2.andes.server.store.TransactionLog.StoreFuture
                public boolean isComplete() {
                    return true;
                }

                @Override // org.wso2.andes.server.store.TransactionLog.StoreFuture
                public void waitForCompletion() {
                }
            };
        }

        @Override // org.wso2.andes.server.store.TransactionLog.Transaction
        public void abortTran() throws AMQStoreException {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/andes/server/store/CassandraMessageStore$ContentRemoverTask.class */
    public class ContentRemoverTask implements Runnable {
        private int waitInterval;
        private long timeOutPerMessage = 60000;
        private boolean running = true;

        public ContentRemoverTask(int i) {
            this.waitInterval = 5000;
            this.waitInterval = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.running) {
                try {
                    if (!CassandraMessageStore.this.contentDeletionTasks.isEmpty()) {
                        SortedMap headMap = CassandraMessageStore.this.contentDeletionTasks.headMap(Long.valueOf(System.currentTimeMillis() - this.timeOutPerMessage));
                        ArrayList arrayList = new ArrayList();
                        Iterator it = headMap.keySet().iterator();
                        while (it.hasNext()) {
                            arrayList.add(new StringBuffer("mid").append((Long) it.next()).toString());
                        }
                        CassandraDataAccessHelper.deleteIntegerColumnsFromRow(CassandraMessageStore.MESSAGE_CONTENT_COLUMN_FAMILY, arrayList, CassandraMessageStore.this.keyspace);
                        Iterator it2 = headMap.keySet().iterator();
                        while (it2.hasNext()) {
                            CassandraMessageStore.this.contentDeletionTasks.remove((Long) it2.next());
                        }
                    }
                    try {
                        Thread.sleep(this.waitInterval);
                    } catch (InterruptedException e) {
                        CassandraMessageStore.log.error("Error while Executing content removal Task", e);
                    }
                } catch (Throwable th) {
                    CassandraMessageStore.log.error("Error while Executing content removal Task", th);
                }
            }
        }

        public boolean isRunning() {
            return this.running;
        }

        public void setRunning(boolean z) {
            this.running = z;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/andes/server/store/CassandraMessageStore$PubSubMessageContentRemoverTask.class */
    public class PubSubMessageContentRemoverTask implements Runnable {
        private int waitInterval;
        private boolean running = true;

        public PubSubMessageContentRemoverTask(int i) {
            this.waitInterval = 5000;
            this.waitInterval = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.running) {
                while (!CassandraMessageStore.this.pubSubMessageContentDeletionTasks.isEmpty()) {
                    try {
                        Iterator it = CassandraMessageStore.this.pubSubMessageContentDeletionTasks.keySet().iterator();
                        while (it.hasNext()) {
                            long longValue = ((Long) it.next()).longValue();
                            if (CassandraMessageStore.this.isReadyAndRemovedMessageContent(longValue)) {
                                CassandraMessageStore.this.pubSubMessageContentDeletionTasks.remove(Long.valueOf(longValue));
                            }
                        }
                    } catch (Throwable th) {
                        th.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(this.waitInterval);
                } catch (InterruptedException e) {
                    CassandraMessageStore.log.error(e);
                }
            }
        }

        public boolean isRunning() {
            return this.running;
        }

        public void setRunning(boolean z) {
            this.running = z;
        }
    }

    /* loaded from: input_file:org/wso2/andes/server/store/CassandraMessageStore$PublishMessageContentWriter.class */
    public class PublishMessageContentWriter implements Runnable {
        private int writeCount;
        private boolean start = false;
        private BlockingQueue<PublishMessageContentWriterMessage> messageQueue = new LinkedBlockingQueue();
        private List<PublishMessageContentWriterMessage> writtenMessages = new ArrayList();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/wso2/andes/server/store/CassandraMessageStore$PublishMessageContentWriter$PublishMessageContentWriterMessage.class */
        public class PublishMessageContentWriterMessage {
            private String rowKey;
            private int offset;
            private byte[] message;

            public PublishMessageContentWriterMessage(String str, int i, byte[] bArr) throws InterruptedException {
                this.rowKey = str;
                this.offset = i;
                this.message = bArr;
            }

            public void release() {
            }

            public void waitForToBeWritten() throws InterruptedException {
            }
        }

        public PublishMessageContentWriter() {
            this.writeCount = 1;
            this.writeCount = ClusterResourceHolder.getInstance().getClusterConfiguration().getContentPublisherMessageBatchSize();
        }

        @Override // java.lang.Runnable
        public void run() {
            Mutator<String> createMutator = HFactory.createMutator(CassandraMessageStore.this.keyspace, CassandraMessageStore.stringSerializer);
            while (this.start) {
                try {
                    if (this.messageQueue.peek() == null) {
                        createMutator.execute();
                        Iterator<PublishMessageContentWriterMessage> it = this.writtenMessages.iterator();
                        while (it.hasNext()) {
                            it.next().release();
                        }
                        this.writtenMessages.clear();
                        bufferMessageToCassandra(this.messageQueue.take(), createMutator);
                        int i = 0 + 1;
                    } else {
                        bufferMessageToCassandra(this.messageQueue.take(), createMutator);
                        if (0 + 1 >= this.writeCount) {
                            createMutator.execute();
                            Iterator<PublishMessageContentWriterMessage> it2 = this.writtenMessages.iterator();
                            while (it2.hasNext()) {
                                it2.next().release();
                            }
                            this.writtenMessages.clear();
                        }
                    }
                } catch (InterruptedException e) {
                    CassandraMessageStore.log.error("Error while writing incoming messages content", e);
                }
            }
        }

        private void bufferMessageToCassandra(PublishMessageContentWriterMessage publishMessageContentWriterMessage, Mutator<String> mutator) {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                CassandraDataAccessHelper.addIntegerByteArrayContentToRaw(CassandraMessageStore.MESSAGE_CONTENT_COLUMN_FAMILY, publishMessageContentWriterMessage.rowKey, publishMessageContentWriterMessage.offset, publishMessageContentWriterMessage.message, mutator, false);
                this.writtenMessages.add(publishMessageContentWriterMessage);
                PerformanceCounter.recordCassandraWrite(System.currentTimeMillis() - currentTimeMillis);
            } catch (Exception e) {
                CassandraMessageStore.log.error("Error in adding message to global queue", e);
            }
        }

        public void addMessage(String str, int i, byte[] bArr) {
            try {
                final PublishMessageContentWriterMessage publishMessageContentWriterMessage = new PublishMessageContentWriterMessage(str, i, bArr);
                if (CassandraMessageStore.this.isCassandraConnectionLive) {
                    AndesExecuter.submit(new Runnable() { // from class: org.wso2.andes.server.store.CassandraMessageStore.PublishMessageContentWriter.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                long currentTimeMillis = System.currentTimeMillis();
                                Mutator createMutator = HFactory.createMutator(CassandraMessageStore.this.keyspace, CassandraMessageStore.stringSerializer);
                                CassandraDataAccessHelper.addIntegerByteArrayContentToRaw(CassandraMessageStore.MESSAGE_CONTENT_COLUMN_FAMILY, publishMessageContentWriterMessage.rowKey, publishMessageContentWriterMessage.offset, publishMessageContentWriterMessage.message, createMutator, false);
                                createMutator.execute();
                                PerformanceCounter.recordCassandraWrite(System.currentTimeMillis() - currentTimeMillis);
                            } catch (Throwable th) {
                                CassandraMessageStore.log.error(th);
                            }
                        }
                    }, null);
                } else {
                    CassandraMessageStore.log.error("Error while adding incoming message. Message Store is Inaccessible.");
                }
            } catch (InterruptedException e) {
                throw new RuntimeException("Error while adding Incoming message", e);
            }
        }

        public void start() {
            this.start = true;
        }

        public void stop() {
            this.start = false;
        }
    }

    /* loaded from: input_file:org/wso2/andes/server/store/CassandraMessageStore$PublishMessageWriter.class */
    public class PublishMessageWriter implements Runnable {
        private int writeCount;
        private boolean start = false;
        private BlockingQueue<PublishMessageWriterMessage> messageQueue = new LinkedBlockingQueue();
        private List<PublishMessageWriterMessage> writtenMessages = new ArrayList();
        private HashMap<String, Long> messageCountForQueues = new HashMap<>();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/wso2/andes/server/store/CassandraMessageStore$PublishMessageWriter$PublishMessageWriterMessage.class */
        public class PublishMessageWriterMessage {
            private Semaphore messageCallBack = new Semaphore(1);
            private String queue;
            private String routingKey;
            private long messageId;
            private byte[] message;

            public PublishMessageWriterMessage(String str, String str2, long j, byte[] bArr) throws InterruptedException {
                this.queue = str;
                this.messageId = j;
                this.message = bArr;
                this.routingKey = str2;
                this.messageCallBack.acquire();
            }

            public void release() {
                this.messageCallBack.release();
            }

            public void waitForToBeWritten() throws InterruptedException {
            }
        }

        public PublishMessageWriter() {
            this.writeCount = 20;
            this.writeCount = ClusterResourceHolder.getInstance().getClusterConfiguration().getMetadataPublisherMessageBatchSize();
        }

        @Override // java.lang.Runnable
        public void run() {
            PublishMessageWriterMessage take;
            Mutator<String> createMutator = HFactory.createMutator(CassandraMessageStore.this.keyspace, CassandraMessageStore.stringSerializer);
            Mutator<String> createMutator2 = HFactory.createMutator(CassandraMessageStore.this.keyspace, CassandraMessageStore.stringSerializer);
            while (this.start) {
                try {
                    if (this.messageQueue.peek() == null) {
                        long currentTimeMillis = System.currentTimeMillis();
                        createMutator.execute();
                        createMutator2.execute();
                        updateCounters();
                        if (CassandraMessageStore.log.isDebugEnabled()) {
                            CassandraMessageStore.log.info("message Write, batch= 0 took " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
                        }
                        Iterator<PublishMessageWriterMessage> it = this.writtenMessages.iterator();
                        while (it.hasNext()) {
                            it.next().release();
                        }
                        this.writtenMessages.clear();
                        take = this.messageQueue.take();
                        bufferMessageToCassandra(take, createMutator, createMutator2);
                        int i = 0 + 1;
                    } else {
                        take = this.messageQueue.take();
                        bufferMessageToCassandra(take, createMutator, createMutator2);
                        if (0 + 1 >= this.writeCount) {
                            createMutator.execute();
                            createMutator2.execute();
                            updateCounters();
                            Iterator<PublishMessageWriterMessage> it2 = this.writtenMessages.iterator();
                            while (it2.hasNext()) {
                                it2.next().release();
                            }
                            this.writtenMessages.clear();
                        }
                    }
                    if (CassandraMessageStore.log.isDebugEnabled()) {
                        CassandraMessageStore.log.debug("Adding Message with id " + take.messageId + " to Queue " + take.queue);
                    }
                } catch (InterruptedException e) {
                    CassandraMessageStore.log.error("Error while writing incoming messages", e);
                }
            }
        }

        private void bufferMessageToCassandra(PublishMessageWriterMessage publishMessageWriterMessage, Mutator<String> mutator, Mutator<String> mutator2) {
            ClusterResourceHolder.getInstance().getClusterManager();
            if (!CassandraMessageStore.this.isCassandraConnectionLive) {
                CassandraMessageStore.log.error("Error writing messages to global queue. Message Store is Inaccessible.");
                return;
            }
            try {
                long nanoTime = System.nanoTime();
                CassandraDataAccessHelper.addMessageToQueue(CassandraMessageStore.GLOBAL_QUEUES_COLUMN_FAMILY, publishMessageWriterMessage.queue, publishMessageWriterMessage.messageId, publishMessageWriterMessage.message, mutator, false);
                CassandraDataAccessHelper.addMappingToRaw(CassandraMessageStore.GLOBAL_QUEUE_LIST_COLUMN_FAMILY, CassandraMessageStore.GLOBAL_QUEUE_LIST_ROW, publishMessageWriterMessage.queue, publishMessageWriterMessage.queue, mutator2, false);
                this.writtenMessages.add(publishMessageWriterMessage);
                DataCollector.write(DataCollector.PUBLISHER_WRITE_LATENCY, System.nanoTime() - nanoTime);
                DataCollector.flush();
                if (this.messageCountForQueues.containsKey(publishMessageWriterMessage.routingKey)) {
                    this.messageCountForQueues.put(publishMessageWriterMessage.routingKey, Long.valueOf(this.messageCountForQueues.get(publishMessageWriterMessage.routingKey).longValue() + 1));
                } else {
                    CassandraMessageStore.this.addMessageCounterForQueue(publishMessageWriterMessage.routingKey);
                    this.messageCountForQueues.put(publishMessageWriterMessage.routingKey, 1L);
                }
            } catch (Exception e) {
                CassandraMessageStore.log.error("Error in adding message to global queue", e);
            }
        }

        public void addMessage(String str, String str2, long j, byte[] bArr) {
            try {
                this.messageQueue.add(new PublishMessageWriterMessage(str, str2, j, bArr));
            } catch (InterruptedException e) {
                throw new RuntimeException("Error while adding Incomming message", e);
            }
        }

        private void updateCounters() {
            for (String str : this.messageCountForQueues.keySet()) {
                CassandraMessageStore.this.incrementQueueCount(str, this.messageCountForQueues.get(str).longValue());
            }
            this.messageCountForQueues.clear();
        }

        public void start() {
            this.start = true;
        }

        public void stop() {
            this.start = false;
        }
    }

    /* loaded from: input_file:org/wso2/andes/server/store/CassandraMessageStore$StoredCassandraMessage.class */
    public class StoredCassandraMessage implements StoredMessage {
        private final long _messageId;
        private StorableMessageMetaData metaData;
        private String channelID;

        private StoredCassandraMessage(long j, StorableMessageMetaData storableMessageMetaData) {
            this._messageId = j;
            this.metaData = storableMessageMetaData;
        }

        private StoredCassandraMessage(long j, StorableMessageMetaData storableMessageMetaData, boolean z) {
            this._messageId = j;
            this.metaData = storableMessageMetaData;
        }

        @Override // org.wso2.andes.server.store.StoredMessage
        public StorableMessageMetaData getMetaData() {
            if (this.metaData == null) {
                this.metaData = CassandraMessageStore.this.getMetaData(this._messageId);
            }
            return this.metaData;
        }

        @Override // org.wso2.andes.server.store.StoredMessage
        public long getMessageNumber() {
            return this._messageId;
        }

        @Override // org.wso2.andes.server.store.StoredMessage
        public void addContent(final int i, final ByteBuffer byteBuffer) {
            AndesExecuter.submit(new Runnable() { // from class: org.wso2.andes.server.store.CassandraMessageStore.StoredCassandraMessage.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        CassandraMessageStore.this.addMessageContent(StoredCassandraMessage.this._messageId + "", i, byteBuffer);
                    } catch (Throwable th) {
                        CassandraMessageStore.log.error("Error processing completed messages", th);
                    }
                }
            }, this.channelID);
        }

        @Override // org.wso2.andes.server.store.StoredMessage
        public int getContent(int i, ByteBuffer byteBuffer) {
            return CassandraMessageStore.this.getContent(this._messageId + "", i, byteBuffer);
        }

        @Override // org.wso2.andes.server.store.StoredMessage
        public TransactionLog.StoreFuture flushToStore() {
            CassandraMessageStore.this.storeMetaData(this._messageId, this.metaData);
            return MessageStore.IMMEDIATE_FUTURE;
        }

        public String getChannelID() {
            return this.channelID;
        }

        public void setChannelID(String str) {
            this.channelID = str;
        }

        @Override // org.wso2.andes.server.store.StoredMessage
        public void remove() {
        }
    }

    public CassandraMessageStore() {
        ClusterResourceHolder.getInstance().setCassandraMessageStore(this);
    }

    public void addMessage(IncomingMessage incomingMessage) {
        long longValue = incomingMessage.getMessageNumber().longValue();
        MessageMetaData headersReceived = incomingMessage.headersReceived();
        Iterator<? extends BaseQueue> it = incomingMessage.getDestinationQueues().iterator();
        while (it.hasNext()) {
            BaseQueue next = it.next();
            try {
                if (this.isCassandraConnectionLive) {
                    byte[] bArr = new byte[1 + headersReceived.getStorableSize()];
                    bArr[0] = (byte) headersReceived.getType().ordinal();
                    ByteBuffer wrap = ByteBuffer.wrap(bArr);
                    wrap.position(1);
                    headersReceived.writeToBuffer(0, wrap.slice());
                    addMessageToGlobalQueue(AndesUtils.getGlobalQueueNameForQueue(next.getResourceName()), incomingMessage.getRoutingKey(), longValue, bArr);
                } else {
                    log.error("Error while adding messages to queues. Message Store is Inaccessible");
                }
            } catch (Exception e) {
                throw new RuntimeException("Error while adding messages to queues  ", e);
            }
        }
    }

    public List<QueueEntry> getMessagesFromUserQueue(String str, AMQQueue aMQQueue, int i, long j) throws AMQStoreException {
        SimpleQueueEntryList simpleQueueEntryList = new SimpleQueueEntryList(aMQQueue);
        ArrayList arrayList = new ArrayList();
        if (!this.isCassandraConnectionLive) {
            log.error("Cassandra Message Store is Inaccessible. Cannot Receive Messages from User Queues");
            return arrayList;
        }
        try {
            for (HColumn hColumn : CassandraDataAccessHelper.getMessagesFromQueue(str, USER_QUEUES_COLUMN_FAMILY, this.keyspace, j, i).getColumns()) {
                if (hColumn instanceof HColumn) {
                    long longValue = ((Long) hColumn.getName()).longValue();
                    byte[] bArr = (byte[]) hColumn.getValue();
                    ByteBuffer wrap = ByteBuffer.wrap(bArr);
                    wrap.position(1);
                    arrayList.add(simpleQueueEntryList.add(new AMQMessage(new StoredCassandraMessage(longValue, MessageMetaDataType.values()[bArr[0]].getFactory().createMetaData(wrap.slice())))));
                }
            }
            return arrayList;
        } catch (NumberFormatException e) {
            throw new AMQStoreException("Error while accessing user queue" + str, e);
        } catch (Exception e2) {
            throw new AMQStoreException("Error while accessing user queue" + str, e2);
        }
    }

    public List<CassandraQueueMessage> getMessagesFromUserQueue(String str, int i, long j) {
        ArrayList arrayList = new ArrayList();
        ClusterResourceHolder.getInstance().getClusterManager();
        if (!this.isCassandraConnectionLive) {
            log.error("Cassandra Message Store is Inaccessible. Cannot Receive Messages from User Queues");
            return arrayList;
        }
        try {
            for (HColumn hColumn : CassandraDataAccessHelper.getMessagesFromQueue(str.trim(), USER_QUEUES_COLUMN_FAMILY, this.keyspace, j, i).getColumns()) {
                if (hColumn instanceof HColumn) {
                    long longValue = ((Long) hColumn.getName()).longValue();
                    byte[] bArr = (byte[]) hColumn.getValue();
                    ByteBuffer wrap = ByteBuffer.wrap(bArr);
                    wrap.position(1);
                    arrayList.add(new CassandraQueueMessage(longValue, new AMQMessage(new StoredCassandraMessage(longValue, MessageMetaDataType.values()[bArr[0]].getFactory().createMetaData(wrap.slice()))).getMessageMetaData().getMessagePublishInfo().getRoutingKey().toString(), bArr));
                }
            }
        } catch (NumberFormatException e) {
            log.error(e);
        } catch (Exception e2) {
            log.error(e2);
        }
        return arrayList;
    }

    public int getMessageCountOfUserQueues(String str) {
        int i = 0;
        if (!this.isCassandraConnectionLive) {
            log.error("Error in Getting Messages from Global Queue: " + str + ". Message Store is Inaccessible.");
            return 0;
        }
        try {
            Iterator<String> it = getUserQueues(str).iterator();
            while (it.hasNext()) {
                i = CassandraDataAccessHelper.getMessagesFromQueue(it.next().trim(), USER_QUEUES_COLUMN_FAMILY, this.keyspace, Integer.MAX_VALUE).getColumns().size();
            }
        } catch (NumberFormatException e) {
            log.error("Number format error in getting messages from global queue : " + str, e);
        } catch (Exception e2) {
            log.error("Error in getting messages from global queue: " + str, e2);
        }
        return i;
    }

    public int getMessageCountOfGlobalQueue(String str) {
        int i = 0;
        if (!this.isCassandraConnectionLive) {
            log.error("Error in getting messages from global queue: " + str + ". Message Store is Inaccessible.");
            return 0;
        }
        try {
            i = CassandraDataAccessHelper.getMessagesFromQueue(str.trim(), GLOBAL_QUEUES_COLUMN_FAMILY, this.keyspace, Integer.MAX_VALUE).getColumns().size();
        } catch (NumberFormatException e) {
            log.error("Number format error in getting messages from global queue : " + str, e);
        } catch (Exception e2) {
            log.error("Error in getting messages from global queue: " + str, e2);
        }
        return i;
    }

    public Queue<CassandraQueueMessage> getMessagesFromGlobalQueue(String str, int i) throws AMQStoreException {
        LinkedList linkedList = new LinkedList();
        if (!this.isCassandraConnectionLive) {
            log.error("Error in getting messages from global queue: " + str + ". Message Store is Inaccessible.");
            return linkedList;
        }
        try {
            for (HColumn hColumn : CassandraDataAccessHelper.getMessagesFromQueue(str.trim(), GLOBAL_QUEUES_COLUMN_FAMILY, this.keyspace, i).getColumns()) {
                if (hColumn instanceof HColumn) {
                    linkedList.add(new CassandraQueueMessage(((Long) hColumn.getName()).longValue(), str, (byte[]) hColumn.getValue()));
                }
            }
            return linkedList;
        } catch (NumberFormatException e) {
            throw new AMQStoreException("Number format error in getting messages from global queue : " + str, e);
        } catch (Exception e2) {
            throw new AMQStoreException("Error in getting messages from global queue: " + str, e2);
        }
    }

    public List<QueueEntry> getMessagesFromGlobalQueue(AMQQueue aMQQueue, AMQProtocolSession aMQProtocolSession, int i) throws AMQStoreException {
        ArrayList arrayList = new ArrayList();
        SimpleQueueEntryList simpleQueueEntryList = new SimpleQueueEntryList(aMQQueue);
        if (!this.isCassandraConnectionLive) {
            log.error("Error while getting messages from queue : " + aMQQueue + ". Message Store is Inaccessible.");
            return arrayList;
        }
        try {
            for (HColumn hColumn : CassandraDataAccessHelper.getMessagesFromQueue(aMQQueue.getName().trim(), GLOBAL_QUEUES_COLUMN_FAMILY, this.keyspace, i).getColumns()) {
                if (hColumn instanceof HColumn) {
                    long longValue = ((Long) hColumn.getName()).longValue();
                    byte[] bArr = (byte[]) hColumn.getValue();
                    ByteBuffer wrap = ByteBuffer.wrap(bArr);
                    wrap.position(1);
                    AMQMessage aMQMessage = new AMQMessage(new StoredCassandraMessage(longValue, MessageMetaDataType.values()[bArr[0]].getFactory().createMetaData(wrap.slice())));
                    aMQMessage.setClientIdentifier(aMQProtocolSession);
                    arrayList.add(simpleQueueEntryList.add(aMQMessage));
                }
            }
            return arrayList;
        } catch (Exception e) {
            throw new AMQStoreException("Error while getting messages from queue : " + aMQQueue, e);
        }
    }

    public void dequeueMessages(AMQQueue aMQQueue, List<QueueEntry> list) {
        try {
            String str = aMQQueue.getResourceName() + "_" + ClusterResourceHolder.getInstance().getClusterManager().getNodeId();
            Iterator<QueueEntry> it = list.iterator();
            while (it.hasNext()) {
                removeMessageFromUserQueue(str, it.next().getMessage().getMessageNumber().longValue());
            }
        } catch (Exception e) {
            log.error("Error in dequeuing messages from " + aMQQueue.getName(), e);
        }
    }

    public void removeMessageFromUserQueue(String str, long j) throws AMQStoreException {
        if (!this.isCassandraConnectionLive) {
            log.error("Error while removing message from User queue. Message Store is Inaccessible.");
            return;
        }
        try {
            CassandraDataAccessHelper.deleteLongColumnFromRaw(USER_QUEUES_COLUMN_FAMILY, AndesUtils.getNodeQueueNameForQueue(str), j, this.keyspace);
        } catch (CassandraDataAccessException e) {
            throw new AMQStoreException("Error while removing message from User queue", e);
        }
    }

    public void removeMessageBatchFromUserQueue(String str, List<CassandraQueueMessage> list) throws AMQStoreException {
        if (!this.isCassandraConnectionLive) {
            log.error("Error while removing messages from User queue. Message Store is Inaccessible.");
            return;
        }
        Mutator createMutator = HFactory.createMutator(this.keyspace, stringSerializer);
        try {
            try {
                String nodeQueueNameForQueue = AndesUtils.getNodeQueueNameForQueue(str);
                Iterator<CassandraQueueMessage> it = list.iterator();
                while (it.hasNext()) {
                    CassandraDataAccessHelper.deleteLongColumnFromRaw(USER_QUEUES_COLUMN_FAMILY, nodeQueueNameForQueue, it.next().getMessageId(), createMutator, false);
                }
            } catch (CassandraDataAccessException e) {
                throw new AMQStoreException("Error while removing messages from User queue", e);
            }
        } finally {
            createMutator.execute();
        }
    }

    public void removeMessageFromGlobalQueue(String str, long j) {
        if (!this.isCassandraConnectionLive) {
            log.error("Error while removing messages from global queue " + str + ". Message Store is Inaccessible.");
            return;
        }
        try {
            CassandraDataAccessHelper.deleteLongColumnFromRaw(GLOBAL_QUEUES_COLUMN_FAMILY, str, j, this.keyspace);
        } catch (CassandraDataAccessException e) {
            log.error("Error while removing messages from global queue " + str, e);
        }
    }

    public void removeMessageFromGlobalQueue(String str, long j, Mutator<String> mutator) {
        if (!this.isCassandraConnectionLive) {
            log.error("Error while removing messages from global queue " + str + ". Message Store is Inaccessible.");
            return;
        }
        try {
            CassandraDataAccessHelper.deleteLongColumnFromRaw(GLOBAL_QUEUES_COLUMN_FAMILY, str, j, mutator, false);
        } catch (CassandraDataAccessException e) {
            log.error("Error while removing messages from global queue " + str, e);
        }
    }

    public void transferMessageBatchFromGlobalQueueToUserQueue(CassandraQueueMessage[] cassandraQueueMessageArr, String str) {
        if (!this.isCassandraConnectionLive) {
            log.error("Error while transferring messages from Global Queue to User Queues. Message Store is Inaccessible.");
            return;
        }
        Mutator<String> createMutator = HFactory.createMutator(this.keyspace, stringSerializer);
        try {
            try {
                for (CassandraQueueMessage cassandraQueueMessage : cassandraQueueMessageArr) {
                    addMessageToUserQueue(cassandraQueueMessage.getQueue(), cassandraQueueMessage.getMessageId(), cassandraQueueMessage.getMessage(), createMutator);
                    removeMessageFromGlobalQueue(str, cassandraQueueMessage.getMessageId(), createMutator);
                }
                createMutator.execute();
            } catch (CassandraDataAccessException e) {
                e.printStackTrace();
                log.error("Error while transferring messages from Global Queue to User Queues");
                createMutator.execute();
            }
        } catch (Throwable th) {
            createMutator.execute();
            throw th;
        }
    }

    public void removeMessageBatchFromGlobalQueue(List<CassandraQueueMessage> list, String str) {
        if (!this.isCassandraConnectionLive) {
            log.error("Error while removing messages from global queue " + str + ". Message Store is Inaccessible.");
            return;
        }
        Mutator<String> createMutator = HFactory.createMutator(this.keyspace, stringSerializer);
        try {
            Iterator<CassandraQueueMessage> it = list.iterator();
            while (it.hasNext()) {
                removeMessageFromGlobalQueue(str, it.next().getMessageId(), createMutator);
            }
        } finally {
            createMutator.execute();
        }
    }

    public void recover(ConfigurationRecoveryHandler configurationRecoveryHandler) throws AMQException {
        boolean z = false;
        boolean z2 = false;
        int i = 0;
        while (!z) {
            try {
                try {
                    ConfigurationRecoveryHandler.QueueRecoveryHandler begin = configurationRecoveryHandler.begin(this);
                    loadQueues(begin);
                    ConfigurationRecoveryHandler.ExchangeRecoveryHandler completeQueueRecovery = begin.completeQueueRecovery();
                    List<String> loadExchanges = loadExchanges(completeQueueRecovery);
                    ConfigurationRecoveryHandler.BindingRecoveryHandler completeExchangeRecovery = completeQueueRecovery.completeExchangeRecovery();
                    recoverBindings(completeExchangeRecovery, loadExchanges);
                    completeExchangeRecovery.completeBindingRecovery();
                    if (z2) {
                        long pow = 10 * 1000 * ((long) Math.pow(2.0d, i));
                        log.warn("Waiting for Cluster data to be synced Please ,start the other nodes soon, wait time: " + pow + "ms");
                        try {
                            Thread.sleep(pow);
                        } catch (InterruptedException e) {
                        }
                        if (i > 10) {
                            throw new AMQStoreException("Max Backoff attempts expired for data recovery");
                        }
                        i++;
                    } else {
                        z = true;
                    }
                } catch (Exception e2) {
                    z2 = true;
                    log.error("Error recovering persistent state: " + e2.getMessage(), e2);
                    if (1 == 0) {
                        z = true;
                    } else {
                        long pow2 = 10 * 1000 * ((long) Math.pow(2.0d, i));
                        log.warn("Waiting for Cluster data to be synced Please ,start the other nodes soon, wait time: " + pow2 + "ms");
                        try {
                            Thread.sleep(pow2);
                        } catch (InterruptedException e3) {
                        }
                        if (i > 10) {
                            throw new AMQStoreException("Max Backoff attempts expired for data recovery");
                        }
                        i++;
                    }
                }
            } catch (Throwable th) {
                if (z2) {
                    long pow3 = 10 * 1000 * ((long) Math.pow(2.0d, i));
                    log.warn("Waiting for Cluster data to be synced Please ,start the other nodes soon, wait time: " + pow3 + "ms");
                    try {
                        Thread.sleep(pow3);
                    } catch (InterruptedException e4) {
                    }
                    if (i > 10) {
                        throw new AMQStoreException("Max Backoff attempts expired for data recovery");
                    }
                    int i2 = i + 1;
                    throw th;
                }
                z = true;
            }
        }
    }

    private Keyspace createKeySpace() throws CassandraDataAccessException {
        this.keyspace = CassandraDataAccessHelper.createKeySpace(this.cluster, KEYSPACE);
        CassandraDataAccessHelper.createColumnFamily("Queue", KEYSPACE, this.cluster, LONG_TYPE);
        CassandraDataAccessHelper.createColumnFamily(BINDING_COLUMN_FAMILY, KEYSPACE, this.cluster, UTF8_TYPE);
        CassandraDataAccessHelper.createColumnFamily(MESSAGE_CONTENT_COLUMN_FAMILY, KEYSPACE, this.cluster, INTEGER_TYPE);
        CassandraDataAccessHelper.createColumnFamily(MESSAGE_CONTENT_ID_COLUMN_FAMILY, KEYSPACE, this.cluster, LONG_TYPE);
        CassandraDataAccessHelper.createColumnFamily(SQ_COLUMN_FAMILY, KEYSPACE, this.cluster, UTF8_TYPE);
        CassandraDataAccessHelper.createColumnFamily(GLOBAL_QUEUE_TO_USER_QUEUE_COLUMN_FAMILY, KEYSPACE, this.cluster, UTF8_TYPE);
        CassandraDataAccessHelper.createColumnFamily(QMD_COLUMN_FAMILY, KEYSPACE, this.cluster, LONG_TYPE);
        CassandraDataAccessHelper.createColumnFamily(QUEUE_DETAILS_COLUMN_FAMILY, KEYSPACE, this.cluster, UTF8_TYPE);
        CassandraDataAccessHelper.createColumnFamily(QUEUE_ENTRY_COLUMN_FAMILY, KEYSPACE, this.cluster, UTF8_TYPE);
        CassandraDataAccessHelper.createColumnFamily(EXCHANGE_COLUMN_FAMILY, KEYSPACE, this.cluster, UTF8_TYPE);
        CassandraDataAccessHelper.createColumnFamily(USER_QUEUES_COLUMN_FAMILY, KEYSPACE, this.cluster, LONG_TYPE);
        CassandraDataAccessHelper.createColumnFamily(MESSAGE_QUEUE_MAPPING_COLUMN_FAMILY, KEYSPACE, this.cluster, UTF8_TYPE);
        CassandraDataAccessHelper.createColumnFamily(GLOBAL_QUEUES_COLUMN_FAMILY, KEYSPACE, this.cluster, LONG_TYPE);
        CassandraDataAccessHelper.createColumnFamily(GLOBAL_QUEUE_LIST_COLUMN_FAMILY, KEYSPACE, this.cluster, UTF8_TYPE);
        CassandraDataAccessHelper.createColumnFamily(TOPIC_EXCHANGE_MESSAGE_IDS, KEYSPACE, this.cluster, LONG_TYPE);
        CassandraDataAccessHelper.createColumnFamily(PUB_SUB_MESSAGE_IDS, KEYSPACE, this.cluster, LONG_TYPE);
        CassandraDataAccessHelper.createColumnFamily(TOPIC_SUBSCRIBERS, KEYSPACE, this.cluster, UTF8_TYPE);
        CassandraDataAccessHelper.createColumnFamily(TOPIC_SUBSCRIBER_QUEUES, KEYSPACE, this.cluster, UTF8_TYPE);
        CassandraDataAccessHelper.createColumnFamily(TOPICS_COLUMN_FAMILY, KEYSPACE, this.cluster, UTF8_TYPE);
        CassandraDataAccessHelper.createColumnFamily(ACKED_MESSAGE_IDS_COLUMN_FAMILY, KEYSPACE, this.cluster, LONG_TYPE);
        CassandraDataAccessHelper.createColumnFamily(NODE_DETAIL_COLUMN_FAMILY, KEYSPACE, this.cluster, UTF8_TYPE);
        CassandraDataAccessHelper.createCounterColumnFamily(MESSAGE_COUNTERS_COLUMN_FAMILY, KEYSPACE, this.cluster);
        return this.keyspace;
    }

    public void addMessageCounterForQueue(String str) throws Exception {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in adding message counters");
            return;
        }
        try {
            if (!getDestinationQueueNames().contains(str)) {
                CassandraDataAccessHelper.insertCounterColumn(MESSAGE_COUNTERS_COLUMN_FAMILY, MESSAGE_COUNTERS_RAW_NAME, str, this.keyspace);
            }
        } catch (Exception e) {
            log.error("Error in accessing message counters", e);
            throw e;
        }
    }

    public void removeMessageCounterForQueue(String str) {
        if (!this.isCassandraConnectionLive) {
            log.error("Error removing the counter. Message Store is Inaccessible.");
            return;
        }
        try {
            CassandraDataAccessHelper.removeCounterColumn(MESSAGE_COUNTERS_COLUMN_FAMILY, MESSAGE_COUNTERS_RAW_NAME, str, this.keyspace);
        } catch (CassandraDataAccessException e) {
            log.error("Error in accessing message counters", e);
        }
    }

    public void incrementQueueCount(String str, long j) {
        if (!this.isCassandraConnectionLive) {
            log.error("Error while incrementing message counters. Message Store is Inaccessible.");
            return;
        }
        try {
            CassandraDataAccessHelper.incrementCounter(str, MESSAGE_COUNTERS_COLUMN_FAMILY, MESSAGE_COUNTERS_RAW_NAME, this.keyspace, j);
        } catch (CassandraDataAccessException e) {
            log.error("Error in accessing message counters", e);
        }
    }

    public void decrementQueueCount(String str, long j) {
        if (!this.isCassandraConnectionLive) {
            log.error("Error while decrementing message counters. Message Store is Inaccessible.");
            return;
        }
        try {
            CassandraDataAccessHelper.decrementCounter(str, MESSAGE_COUNTERS_COLUMN_FAMILY, MESSAGE_COUNTERS_RAW_NAME, this.keyspace, j);
        } catch (CassandraDataAccessException e) {
            log.error("Error in accessing message counters", e);
        }
    }

    public long getCassandraMessageCountForQueue(String str) {
        long j = 0;
        if (!this.isCassandraConnectionLive) {
            log.error("Error while getting message count for queue. Message Store is Inaccessible.");
        }
        try {
            j = CassandraDataAccessHelper.getCountValue(this.keyspace, MESSAGE_COUNTERS_COLUMN_FAMILY, str, MESSAGE_COUNTERS_RAW_NAME);
        } catch (CassandraDataAccessException e) {
            log.error("Error in accessing message counters", e);
        }
        return j;
    }

    private int getUserQueueCount(String str) throws AMQStoreException {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in getting user queue count for " + str + ". Message Store is Inaccessible.");
            return 0;
        }
        try {
            return CassandraDataAccessHelper.getStringTypeColumnsInARow(str, GLOBAL_QUEUE_TO_USER_QUEUE_COLUMN_FAMILY, this.keyspace, Integer.MAX_VALUE).getColumns().size();
        } catch (Exception e) {
            throw new AMQStoreException("Error in getting user queue count", e);
        }
    }

    public void addMessageToUserQueue(String str, long j, byte[] bArr, Mutator<String> mutator) throws CassandraDataAccessException {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in adding message :" + j + " to user queue :" + str + ". Message Store is Inaccessible");
            return;
        }
        try {
            CassandraDataAccessHelper.addMessageToQueue(USER_QUEUES_COLUMN_FAMILY, str, j, bArr, mutator, false);
            CassandraDataAccessHelper.addMappingToRaw(MESSAGE_QUEUE_MAPPING_COLUMN_FAMILY, MESSAGE_QUEUE_MAPPING_ROW, "" + j, str, mutator, false);
        } catch (Exception e) {
            throw new CassandraDataAccessException("Error in adding message :" + j + " to user queue :" + str, e);
        }
    }

    /* JADX WARN: Finally extract failed */
    public void addMessageBatchToUserQueues(CassandraQueueMessage[] cassandraQueueMessageArr) throws CassandraDataAccessException {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in adding message batch to Queues. Message Store is Inaccessible.");
            return;
        }
        try {
            Mutator<String> createMutator = HFactory.createMutator(this.keyspace, stringSerializer);
            try {
                for (CassandraQueueMessage cassandraQueueMessage : cassandraQueueMessageArr) {
                    addMessageToUserQueue(cassandraQueueMessage.getQueue(), cassandraQueueMessage.getMessageId(), cassandraQueueMessage.getMessage(), createMutator);
                }
                createMutator.execute();
            } catch (Throwable th) {
                createMutator.execute();
                throw th;
            }
        } catch (CassandraDataAccessException e) {
            throw new CassandraDataAccessException("Error in adding message batch to Queues ", e);
        }
    }

    public List<QueueEntry> getPreparedBrowserMessages(AMQQueue aMQQueue, AMQProtocolSession aMQProtocolSession, List<CassandraQueueMessage> list) throws AMQStoreException {
        ArrayList arrayList = new ArrayList();
        SimpleQueueEntryList simpleQueueEntryList = new SimpleQueueEntryList(aMQQueue);
        if (!this.isCassandraConnectionLive) {
            log.error("Error while getting messages from queue : " + aMQQueue + "Message Store is Inaccessible.");
            return arrayList;
        }
        try {
            for (CassandraQueueMessage cassandraQueueMessage : list) {
                long messageId = cassandraQueueMessage.getMessageId();
                byte[] message = cassandraQueueMessage.getMessage();
                ByteBuffer wrap = ByteBuffer.wrap(message);
                wrap.position(1);
                AMQMessage aMQMessage = new AMQMessage(new StoredCassandraMessage(messageId, MessageMetaDataType.values()[message[0]].getFactory().createMetaData(wrap.slice())));
                aMQMessage.setClientIdentifier(aMQProtocolSession);
                arrayList.add(simpleQueueEntryList.add(aMQMessage));
            }
            return arrayList;
        } catch (Exception e) {
            throw new AMQStoreException("Error while getting messages from queue : " + aMQQueue, e);
        }
    }

    public void addMessageToGlobalQueue(String str, String str2, long j, byte[] bArr) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("Adding Message with id " + j + " to Queue " + str);
        }
        this.publishMessageWriter.addMessage(str, str2, j, bArr);
    }

    public void addMessageContent(String str, int i, ByteBuffer byteBuffer) throws AMQStoreException {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in adding message content. Message Store is Inaccessible.");
            return;
        }
        try {
            String str2 = "mid" + str;
            ByteBuffer slice = byteBuffer.slice();
            byte[] bArr = new byte[slice.limit()];
            slice.duplicate().get(bArr);
            long currentTimeMillis = System.currentTimeMillis();
            Mutator createMutator = HFactory.createMutator(this.keyspace, stringSerializer);
            CassandraDataAccessHelper.addIntegerByteArrayContentToRaw(MESSAGE_CONTENT_COLUMN_FAMILY, str2, i, bArr, createMutator, false);
            createMutator.execute();
            if (log.isDebugEnabled()) {
                log.debug("Content Write for " + str2 + " took " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
            }
        } catch (Exception e) {
            throw new AMQStoreException("Error in adding message content", e);
        }
    }

    public int getContent(String str, int i, ByteBuffer byteBuffer) {
        int i2 = 0;
        if (!this.isCassandraConnectionLive) {
            log.error("Error in reading content. Message Store is Inaccessible.");
            return 0;
        }
        if (0 == 0) {
            try {
                String str2 = "mid" + str;
                if (i == 0) {
                    ColumnQuery createColumnQuery = HFactory.createColumnQuery(this.keyspace, stringSerializer, integerSerializer, byteBufferSerializer);
                    createColumnQuery.setColumnFamily(MESSAGE_CONTENT_COLUMN_FAMILY);
                    createColumnQuery.setKey(str2.trim());
                    createColumnQuery.setName(Integer.valueOf(i));
                    HColumn hColumn = (HColumn) createColumnQuery.execute().get();
                    if (hColumn == null) {
                        throw new RuntimeException("Unexpected Error , content already deleted");
                    }
                    int intValue = ((Integer) hColumn.getName()).intValue();
                    byte[] fromByteBuffer = bytesArraySerializer.fromByteBuffer((ByteBuffer) hColumn.getValue());
                    int length = fromByteBuffer.length - ((intValue + 0) - intValue);
                    if (length > byteBuffer.remaining()) {
                        length = byteBuffer.remaining();
                    }
                    byteBuffer.put(fromByteBuffer, 0, length);
                    i2 = length;
                } else {
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    int i3 = i / 65534;
                    SliceQuery createSliceQuery = HFactory.createSliceQuery(this.keyspace, stringSerializer, integerSerializer, byteBufferSerializer);
                    createSliceQuery.setColumnFamily(MESSAGE_CONTENT_COLUMN_FAMILY);
                    createSliceQuery.setKey(str2.trim());
                    createSliceQuery.setRange(Integer.valueOf(i3 * 65534), Integer.valueOf(((i3 + 1) * 65534) + 1), false, 10);
                    Iterator it = ((ColumnSlice) createSliceQuery.execute().get()).getColumns().iterator();
                    while (it.hasNext()) {
                        byteArrayOutputStream.write(bytesArraySerializer.fromByteBuffer((ByteBuffer) ((HColumn) it.next()).getValue()));
                    }
                    byte[] byteArray = byteArrayOutputStream.toByteArray();
                    int length2 = byteArray.length;
                    int i4 = i - (i3 * 65534);
                    int i5 = length2 - i4;
                    if (i5 > byteBuffer.remaining()) {
                        i5 = byteBuffer.remaining();
                    }
                    byteBuffer.put(byteArray, i4, i5);
                    i2 = 0 + i5;
                }
            } catch (Exception e) {
                e.printStackTrace();
                log.error("Error in reading content", e);
            }
        }
        return i2;
    }

    public void storeMetaData(long j, StorableMessageMetaData storableMessageMetaData) {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in storing meta data. Message Store is Inaccessible.");
            return;
        }
        try {
            byte[] bArr = new byte[1 + storableMessageMetaData.getStorableSize()];
            bArr[0] = (byte) storableMessageMetaData.getType().ordinal();
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            wrap.position(1);
            storableMessageMetaData.writeToBuffer(0, wrap.slice());
            Mutator createMutator = HFactory.createMutator(this.keyspace, stringSerializer);
            createMutator.addInsertion(QMD_ROW_NAME, QMD_COLUMN_FAMILY, HFactory.createColumn(Long.valueOf(j), bArr, longSerializer, bytesArraySerializer));
            createMutator.execute();
        } catch (Exception e) {
            log.error("Error in storing meta data", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public StorableMessageMetaData getMetaData(long j) {
        StorableMessageMetaData storableMessageMetaData = null;
        if (!this.isCassandraConnectionLive) {
            log.error("Error in getting meta data of provided message id. Message Store is Inaccessible.");
            return null;
        }
        try {
            byte[] bArr = (byte[]) CassandraDataAccessHelper.getLongByteArrayColumnInARow(QMD_ROW_NAME, QMD_COLUMN_FAMILY, j, this.keyspace).getValue();
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            wrap.position(1);
            storableMessageMetaData = MessageMetaDataType.values()[bArr[0]].getFactory().createMetaData(wrap.slice());
        } catch (Exception e) {
            log.error("Error in getting meta data of provided message id", e);
        }
        return storableMessageMetaData;
    }

    private void removeMetaData(long j) throws AMQStoreException {
        if (this.isCassandraConnectionLive) {
            log.error("Error in removing metadata. Message Store is Inaccessible.");
            return;
        }
        try {
            CassandraDataAccessHelper.deleteLongColumnFromRaw(QMD_COLUMN_FAMILY, QMD_ROW_NAME, j, HFactory.createMutator(this.keyspace, stringSerializer), true);
        } catch (Exception e) {
            throw new AMQStoreException("Error in removing metadata", e);
        }
    }

    public void addAckedMessage(long j) {
        if (this.isCassandraConnectionLive) {
            log.error("Error in storing meta data. Message Store is Inaccessible.");
            return;
        }
        try {
            this.pubSubMessageContentDeletionTasks.put(Long.valueOf(j), Long.valueOf(j));
            Mutator createMutator = HFactory.createMutator(this.keyspace, stringSerializer);
            createMutator.addInsertion(ACKED_MESSAGE_IDS_ROW, ACKED_MESSAGE_IDS_COLUMN_FAMILY, HFactory.createColumn(Long.valueOf(j), Long.valueOf(System.currentTimeMillis()), longSerializer, longSerializer));
            createMutator.execute();
        } catch (Exception e) {
            log.error("Error in storing meta data", e);
        }
    }

    private void removeAckedMessage(long j) throws AMQStoreException {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in storing meta data. Message Store is Inaccessible.");
            return;
        }
        try {
            CassandraDataAccessHelper.deleteLongColumnFromRaw(ACKED_MESSAGE_IDS_COLUMN_FAMILY, ACKED_MESSAGE_IDS_ROW, j, HFactory.createMutator(this.keyspace, stringSerializer), true);
        } catch (Exception e) {
            throw new AMQStoreException("Error in storing meta data", e);
        }
    }

    public boolean isReadyAndRemovedMessageContent(long j) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            ColumnQuery createColumnQuery = HFactory.createColumnQuery(this.keyspace, stringSerializer, longSerializer, longSerializer);
            createColumnQuery.setKey(ACKED_MESSAGE_IDS_ROW);
            createColumnQuery.setColumnFamily(ACKED_MESSAGE_IDS_COLUMN_FAMILY);
            createColumnQuery.setName(Long.valueOf(j));
            QueryResult queryResult = null;
            if (this.isCassandraConnectionLive) {
                queryResult = createColumnQuery.execute();
            } else {
                log.error("Error while removing meta data. Message Store is Inaccessible.");
            }
            if (queryResult == null) {
                return true;
            }
            HColumn hColumn = (HColumn) queryResult.get();
            if (hColumn == null || hColumn.getValue() == null) {
                return false;
            }
            if (currentTimeMillis - ((Long) hColumn.getValue()).longValue() < ClusterResourceHolder.getInstance().getClusterConfiguration().getContentRemovalTimeDifference()) {
                return false;
            }
            removeMetaData(j);
            removeAckedMessage(j);
            return true;
        } catch (Exception e) {
            log.error("Error while removing Message data", e);
            return false;
        }
    }

    public void addBinding(Exchange exchange, AMQQueue aMQQueue, String str) throws CassandraDataAccessException {
        if (this.keyspace == null) {
            return;
        }
        if (!this.isCassandraConnectionLive) {
            log.error("Cannot add bindings. Message Store is Inaccessible.");
        } else {
            CassandraDataAccessHelper.addMappingToRaw(BINDING_COLUMN_FAMILY, exchange.getName(), str, aMQQueue.getName(), this.keyspace);
        }
    }

    public void removeBinding(Exchange exchange, AMQQueue aMQQueue, String str) throws CassandraDataAccessException {
        if (this.keyspace == null) {
            return;
        }
        if (this.isCassandraConnectionLive) {
            CassandraDataAccessHelper.deleteStringColumnFromRaw(BINDING_COLUMN_FAMILY, exchange.getName(), str, this.keyspace);
        } else {
            log.error("Cannot add bindings. Message Store is Inaccessible.");
        }
    }

    public void addTopicExchangeMessageIds(String str, long j) {
        try {
            List<String> registeredTopicNodeQueuesForTopic = getRegisteredTopicNodeQueuesForTopic(str);
            if (registeredTopicNodeQueuesForTopic != null) {
                for (String str2 : registeredTopicNodeQueuesForTopic) {
                    try {
                        addMessageIdToSubscriberQueue(str2, j);
                    } catch (AMQStoreException e) {
                        log.error("Error adding message id " + j + "To subscriber " + str2);
                    }
                }
            }
        } catch (Exception e2) {
            log.error("Error while adding Message Id to Subscriber queue", e2);
        }
    }

    public List<AMQMessage> getSubscriberMessages(String str, long j) {
        ArrayList arrayList = null;
        List<Long> pendingMessageIds = getPendingMessageIds(str, j);
        if (pendingMessageIds.size() > 0) {
            arrayList = new ArrayList();
            Iterator<Long> it = pendingMessageIds.iterator();
            while (it.hasNext()) {
                long longValue = it.next().longValue();
                StorableMessageMetaData metaData = getMetaData(longValue);
                if (metaData != null) {
                    arrayList.add(new AMQMessage(new StoredCassandraMessage(longValue, metaData, true), null));
                }
            }
        }
        return arrayList;
    }

    private void registerTopic(String str) {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in registering queue for the topic. Message Store is Inaccessible.");
            return;
        }
        if (str != null) {
            try {
                if (this.topicSubscribersMap.get(str) == null) {
                    this.topicSubscribersMap.put(str, new ArrayList<>());
                }
            } catch (Exception e) {
                log.error("Error in registering queue for the topic", e);
                return;
            }
        }
        if (str != null && this.topicNodeQueuesMap.get(str) == null) {
            this.topicNodeQueuesMap.put(str, new ArrayList<>());
        }
        CassandraDataAccessHelper.addMappingToRaw(TOPICS_COLUMN_FAMILY, TOPICS_ROW, str, str, this.keyspace);
        log.info("Created Topic : " + str);
    }

    public List<String> getTopics() throws Exception {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in getting the topic list. Message Store is Inaccessible.");
            return null;
        }
        try {
            return CassandraDataAccessHelper.getRowList(TOPICS_COLUMN_FAMILY, TOPICS_ROW, this.keyspace);
        } catch (Exception e) {
            log.error("Error in getting the topic list", e);
            throw e;
        }
    }

    public List<String> getUserQueues(String str) throws Exception {
        if (this.keyspace == null) {
            return new ArrayList();
        }
        if (!this.isCassandraConnectionLive) {
            log.error("Error in getting user queues for qpid queue :" + str + ". Message Store is Inaccessible.");
            return new ArrayList();
        }
        try {
            return CassandraDataAccessHelper.getRowList(GLOBAL_QUEUE_TO_USER_QUEUE_COLUMN_FAMILY, str, this.keyspace);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("Error in getting user queues for qpid queue :" + str, e);
            throw e;
        }
    }

    public List<String> getDestinationQueueNames() throws Exception {
        if (this.keyspace == null) {
            return new ArrayList();
        }
        if (!this.isCassandraConnectionLive) {
            log.error("Error in getting global queues. Message Store is Inaccessible.");
        }
        try {
            return CassandraDataAccessHelper.getDestinationQueueNamesFromCounterColumns(MESSAGE_COUNTERS_COLUMN_FAMILY, MESSAGE_COUNTERS_RAW_NAME, this.keyspace);
        } catch (Exception e) {
            log.error("Error in getting global queues", e);
            throw e;
        }
    }

    private void unRegisterTopic(String str) throws AMQStoreException {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in un registering topic. Cassandra Message Store is Inaccessible.");
        }
        try {
            CassandraDataAccessHelper.deleteStringColumnFromRaw(TOPICS_COLUMN_FAMILY, TOPICS_ROW, str, this.keyspace);
            log.info("Removing Topic : " + str);
        } catch (Exception e) {
            throw new AMQStoreException("Error in un registering topic", e);
        }
    }

    public void registerSubscriberForTopic(String str, String str2, String str3) {
        if (this.keyspace == null) {
            return;
        }
        if (!this.isCassandraConnectionLive) {
            log.error("Error in registering queue for the topic. Message store is inaccessible.");
            return;
        }
        try {
            registerTopic(str);
            CassandraDataAccessHelper.addMappingToRaw(TOPIC_SUBSCRIBERS, str, str2, str2, this.keyspace);
            CassandraDataAccessHelper.addMappingToRaw(TOPIC_SUBSCRIBER_QUEUES, str, str3, str3, this.keyspace);
            ClusterResourceHolder.getInstance().getTopicSubscriptionCoordinationManager().handleSubscriptionChange(str);
            log.info("Registered Subscription " + str2 + " for Topic " + str);
        } catch (Exception e) {
            log.error("Error in registering queue for the topic", e);
        }
    }

    public List<String> getRegisteredSubscribersForTopic(String str) throws Exception {
        try {
            return this.topicSubscribersMap.get(str);
        } catch (Exception e) {
            log.error("Error in getting registered subscribers for the topic", e);
            throw e;
        }
    }

    public List<String> getRegisteredTopicNodeQueuesForTopic(String str) throws Exception {
        try {
            return this.topicNodeQueuesMap.get(str);
        } catch (Exception e) {
            log.error("Error in getting registered subscribers for the topic", e);
            throw e;
        }
    }

    public void unRegisterQueueFromTopic(String str, String str2) {
        try {
            if (log.isDebugEnabled()) {
                log.debug(" removing queue = " + str2 + " from topic =" + str);
            }
            if (!this.isCassandraConnectionLive) {
                log.error("Error in un registering queue from the topic. Message store in inaccessible.");
                return;
            }
            CassandraDataAccessHelper.deleteStringColumnFromRaw(TOPIC_SUBSCRIBER_QUEUES, str, str2, this.keyspace);
            if (!str.startsWith("tmp_")) {
                log.info("Removing Subscription " + str2 + " from Topic " + str);
            }
            if (getRegisteredSubscribersForTopic(str) != null && getRegisteredSubscribersForTopic(str).size() == 0) {
                unRegisterTopic(str);
                this.topicSubscribersMap.remove(str);
            }
            if (getRegisteredTopicNodeQueuesForTopic(str) != null && getRegisteredTopicNodeQueuesForTopic(str).size() == 0) {
                this.topicNodeQueuesMap.remove(str);
            }
            ClusterResourceHolder.getInstance().getTopicSubscriptionCoordinationManager().handleSubscriptionChange(str);
        } catch (Exception e) {
            log.error("Error in un registering queue from the topic", e);
        }
    }

    private void addMessageIdToSubscriberQueue(String str, long j) throws AMQStoreException {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in adding message Id to subscriber queue. Message store is Inaccessible.");
            return;
        }
        try {
            CassandraDataAccessHelper.addLongContentToRow(PUB_SUB_MESSAGE_IDS, str, j, j, this.keyspace);
        } catch (Exception e) {
            throw new AMQStoreException("Error in adding message Id to subscriber queue", e);
        }
    }

    private List<Long> getPendingMessageIds(String str, long j) {
        ArrayList arrayList = new ArrayList();
        if (!this.isCassandraConnectionLive) {
            log.error("Error in retriving message ids of the queue:" + str + ". Message store is inaccessible.");
            return arrayList;
        }
        try {
            SliceQuery createSliceQuery = HFactory.createSliceQuery(this.keyspace, stringSerializer, longSerializer, longSerializer);
            createSliceQuery.setKey(str);
            createSliceQuery.setColumnFamily(PUB_SUB_MESSAGE_IDS);
            createSliceQuery.setRange(Long.valueOf(j), Long.MAX_VALUE, false, 1000);
            Iterator it = ((ColumnSlice) createSliceQuery.execute().get()).getColumns().iterator();
            while (it.hasNext()) {
                arrayList.add(((HColumn) it.next()).getValue());
            }
        } catch (Exception e) {
            log.error("Error in retriving message ids of the queue", e);
        }
        return arrayList;
    }

    public void removeDeliveredMessageIds(List<Long> list, String str) throws AMQStoreException {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in removing message ids from subscriber queue. Message Store is inaccessible");
            return;
        }
        try {
            Mutator createMutator = HFactory.createMutator(this.keyspace, stringSerializer);
            for (Long l : list) {
                CassandraDataAccessHelper.deleteLongColumnFromRaw(PUB_SUB_MESSAGE_IDS, str, l.longValue(), createMutator, false);
                if (log.isDebugEnabled()) {
                    log.debug(" removing mid = " + l + " from =" + str);
                }
            }
            createMutator.execute();
        } catch (Exception e) {
            throw new AMQStoreException("Error in removing message ids from subscriber queue", e);
        }
    }

    public void synchBindings(VirtualHostConfigSynchronizer virtualHostConfigSynchronizer) {
        try {
            if (!this.isCassandraConnectionLive) {
                log.error("Error in synchronizing bindings. Message store is unreachable.");
                return;
            }
            HFactory.createMutator(this.keyspace, stringSerializer);
            RangeSlicesQuery createRangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, stringSerializer, stringSerializer, stringSerializer);
            createRangeSlicesQuery.setKeys("", "");
            createRangeSlicesQuery.setColumnFamily(BINDING_COLUMN_FAMILY);
            createRangeSlicesQuery.setRange("", "", false, 100);
            for (Row row : ((OrderedRows) createRangeSlicesQuery.execute().get()).getList()) {
                String str = (String) row.getKey();
                for (HColumn hColumn : row.getColumnSlice().getColumns()) {
                    if (hColumn instanceof HColumn) {
                        virtualHostConfigSynchronizer.binding(str, (String) hColumn.getValue(), (String) hColumn.getName(), null);
                    }
                }
            }
        } catch (NumberFormatException e) {
            log.error("Error in synchronizing bindings", e);
        }
    }

    public void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler bindingRecoveryHandler, List<String> list) throws Exception {
        if (!this.isCassandraConnectionLive) {
            log.error("Error occurred when recovering bindings. Message store is inaccessible.");
        }
        try {
            HFactory.createMutator(this.keyspace, stringSerializer);
            RangeSlicesQuery createRangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, stringSerializer, stringSerializer, stringSerializer);
            createRangeSlicesQuery.setKeys("", "");
            createRangeSlicesQuery.setColumnFamily(BINDING_COLUMN_FAMILY);
            createRangeSlicesQuery.setRange("", "", false, 100);
            for (Row row : ((OrderedRows) createRangeSlicesQuery.execute().get()).getList()) {
                String str = (String) row.getKey();
                for (HColumn hColumn : row.getColumnSlice().getColumns()) {
                    if (hColumn instanceof HColumn) {
                        bindingRecoveryHandler.binding(str, (String) hColumn.getValue(), (String) hColumn.getName(), null);
                    }
                }
            }
        } catch (NumberFormatException e) {
            log.error("Number formatting error occurred when recovering bindings", e);
        }
    }

    private List<String> getBindings(String str) {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in getting bindings. Message store is inaccessible.");
            return new ArrayList();
        }
        ArrayList arrayList = new ArrayList();
        try {
            HFactory.createMutator(this.keyspace, stringSerializer);
            RangeSlicesQuery createRangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, stringSerializer, stringSerializer, stringSerializer);
            createRangeSlicesQuery.setKeys("DirectExchange", "DirectExchange");
            createRangeSlicesQuery.setColumnFamily(BINDING_COLUMN_FAMILY);
            createRangeSlicesQuery.setRange(str, "", false, 10);
            OrderedRows orderedRows = (OrderedRows) createRangeSlicesQuery.execute().get();
            orderedRows.getList();
            for (HColumn hColumn : ((Row) orderedRows.getList().get(0)).getColumnSlice().getColumns()) {
                if (hColumn instanceof HColumn) {
                    arrayList.add(new String((String) hColumn.getValue()));
                }
            }
        } catch (Exception e) {
            log.error("Error in getting bindings", e);
        }
        return arrayList;
    }

    private void recoverMessages(MessageStoreRecoveryHandler messageStoreRecoveryHandler) {
        long j = 0;
        if (!this.isCassandraConnectionLive) {
            log.error("Error in recovering bindings. Message store is inaccessible.");
            return;
        }
        try {
            SliceQuery createSliceQuery = HFactory.createSliceQuery(this.keyspace, stringSerializer, LongSerializer.get(), BytesArraySerializer.get());
            createSliceQuery.setColumnFamily(QMD_COLUMN_FAMILY);
            createSliceQuery.setKey(QMD_ROW_NAME);
            createSliceQuery.setRange(Long.valueOf(Long.parseLong(RawStoreFactory.PAGE_RESERVED_ZERO_SPACE_STRING)), Long.MAX_VALUE, false, DaemonService.TIMER_DELAY);
            for (HColumn hColumn : ((ColumnSlice) createSliceQuery.execute().get()).getColumns()) {
                long longValue = ((Long) hColumn.getName()).longValue();
                if (longValue > j) {
                    j = longValue;
                }
                byte[] bArr = (byte[]) hColumn.getValue();
                ByteBuffer wrap = ByteBuffer.wrap(bArr);
                wrap.position(1);
                MessageMetaDataType.values()[bArr[0]].getFactory().createMetaData(wrap.slice());
            }
            this._messageId.set(j);
        } catch (Exception e) {
            log.error("Error in recovering bindings", e);
        }
    }

    public AtomicLong currentMessageId() {
        return this._messageId;
    }

    public void synchQueues(VirtualHostConfigSynchronizer virtualHostConfigSynchronizer) throws Exception {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in queue synchronization. Message store is inaccessble.");
        }
        try {
            for (HColumn hColumn : CassandraDataAccessHelper.getStringTypeColumnsInARow(QUEUE_DETAILS_ROW, QUEUE_DETAILS_COLUMN_FAMILY, this.keyspace, Integer.MAX_VALUE).getColumns()) {
                if (hColumn instanceof HColumn) {
                    String str = (String) hColumn.getName();
                    Iterator it = pipeSplitter.split((String) hColumn.getValue()).iterator();
                    virtualHostConfigSynchronizer.queue(str, (String) it.next(), Boolean.parseBoolean((String) it.next()), null);
                }
            }
        } catch (Exception e) {
            throw new AMQStoreException("Error in queue synchronization", e);
        }
    }

    public void loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler queueRecoveryHandler) throws Exception {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in loading queues. Message store is inaccessible.");
            return;
        }
        try {
            for (HColumn hColumn : CassandraDataAccessHelper.getStringTypeColumnsInARow(QUEUE_DETAILS_ROW, QUEUE_DETAILS_COLUMN_FAMILY, this.keyspace, Integer.MAX_VALUE).getColumns()) {
                if (hColumn instanceof HColumn) {
                    String str = (String) hColumn.getName();
                    String[] split = ((String) hColumn.getValue()).split("|");
                    queueRecoveryHandler.queue(str, split[1], Boolean.parseBoolean(split[2]), null);
                }
            }
        } catch (Exception e) {
            throw new AMQStoreException("Error in loading queues", e);
        }
    }

    public void addUserQueueToGlobalQueue(String str) throws AMQStoreException {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in adding user queue to global queue. Message store is inaccessible.");
            return;
        }
        try {
            String nodeQueueNameForQueue = AndesUtils.getNodeQueueNameForQueue(str);
            Mutator createMutator = HFactory.createMutator(this.keyspace, stringSerializer);
            CassandraDataAccessHelper.addMappingToRaw(GLOBAL_QUEUE_TO_USER_QUEUE_COLUMN_FAMILY, str, nodeQueueNameForQueue, nodeQueueNameForQueue, createMutator, false);
            CassandraDataAccessHelper.addMappingToRaw(GLOBAL_QUEUE_LIST_COLUMN_FAMILY, GLOBAL_QUEUE_LIST_ROW, str, str, createMutator, true);
        } catch (Exception e) {
            throw new AMQStoreException("Error in adding user queue to global queue", e);
        }
    }

    public void removeUserQueueFromQpidQueue(String str) {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in removing user queue from qpid queue. Message store is inaccessible.");
            return;
        }
        try {
            CassandraDataAccessHelper.deleteStringColumnFromRaw(GLOBAL_QUEUE_TO_USER_QUEUE_COLUMN_FAMILY, str.trim(), str + "_" + ClusterResourceHolder.getInstance().getClusterManager().getNodeId(), this.keyspace);
        } catch (Exception e) {
            log.error("Error in removing user queue from qpid queue", e);
        }
    }

    @Override // org.wso2.andes.server.store.MessageStore
    public void configureMessageStore(String str, MessageStoreRecoveryHandler messageStoreRecoveryHandler, Configuration configuration, LogSubject logSubject) throws Exception {
        if (!this.configured) {
            performCommonConfiguration(configuration);
            ClusterResourceHolder clusterResourceHolder = ClusterResourceHolder.getInstance();
            CassandraTopicPublisherManager cassandraTopicPublisherManager = clusterResourceHolder.getCassandraTopicPublisherManager();
            if (cassandraTopicPublisherManager == null) {
                cassandraTopicPublisherManager = new CassandraTopicPublisherManager();
                clusterResourceHolder.setCassandraTopicPublisherManager(cassandraTopicPublisherManager);
            }
            cassandraTopicPublisherManager.init();
            cassandraTopicPublisherManager.start();
        }
        recoverMessages(messageStoreRecoveryHandler);
    }

    private void performCommonConfiguration(Configuration configuration) throws Exception {
        String str = (String) configuration.getProperty(BrokerDetails.USERNAME);
        String str2 = (String) configuration.getProperty(BrokerDetails.PASSWORD);
        Object property = configuration.getProperty("connectionString");
        String str3 = "";
        if (property instanceof ArrayList) {
            Iterator it = ((ArrayList) property).iterator();
            while (it.hasNext()) {
                str3 = str3 + ((String) it.next()) + ",";
            }
            str3 = str3.substring(0, str3.length() - 1);
        } else if (str3 instanceof String) {
            str3 = (String) property;
        }
        String str4 = (String) configuration.getProperty("cluster");
        String str5 = (String) configuration.getProperty("idGenerator");
        this.cluster = CassandraDataAccessHelper.createCluster(str, str2, str4, str3);
        checkCassandraConnection();
        this.keyspace = createKeySpace();
        if (str5 == null || "".equals(str5)) {
            this.messageIdGenerator = new TimeStampBasedMessageIdGenerator();
        } else {
            try {
                this.messageIdGenerator = (MessageIdGenerator) Class.forName(str5).newInstance();
            } catch (Exception e) {
                log.error("Error while loading Message id generator implementation : " + str5 + " adding TimeStamp based implementation as the default", e);
                this.messageIdGenerator = new TimeStampBasedMessageIdGenerator();
            }
        }
        this.messageContentRemovalTask = new ContentRemoverTask(ClusterResourceHolder.getInstance().getClusterConfiguration().getContentRemovalTaskInterval());
        this.messageContentRemovalTask.setRunning(true);
        Thread thread = new Thread(this.messageContentRemovalTask);
        thread.setName(this.messageContentRemovalTask.getClass().getSimpleName() + "-Thread");
        thread.start();
        this.pubSubMessageContentDeletionTasks = new ConcurrentHashMap<>();
        ClusterConfiguration clusterConfiguration = ClusterResourceHolder.getInstance().getClusterConfiguration();
        this.pubSubMessageContentRemoverTask = new PubSubMessageContentRemoverTask(clusterConfiguration.getPubSubMessageRemovalTaskInterval());
        this.pubSubMessageContentRemoverTask.setRunning(true);
        new Thread(this.pubSubMessageContentRemoverTask).start();
        this.publishMessageWriter = new PublishMessageWriter();
        this.publishMessageWriter.start();
        Thread thread2 = new Thread(this.publishMessageWriter);
        thread2.setName(PublishMessageWriter.class.getName());
        thread2.start();
        this.publishMessageContentWriter = new PublishMessageContentWriter();
        this.messageCacheForCassandra = new CassandraMessageContentCache();
        this.keyspace.setConsistencyLevelPolicy(new AndesConsistantLevelPolicy());
        if (ClusterResourceHolder.getInstance().getSubscriptionCoordinationManager() == null) {
            SubscriptionCoordinationManagerImpl subscriptionCoordinationManagerImpl = new SubscriptionCoordinationManagerImpl();
            subscriptionCoordinationManagerImpl.init();
            ClusterResourceHolder.getInstance().setSubscriptionCoordinationManager(subscriptionCoordinationManagerImpl);
        }
        if (ClusterResourceHolder.getInstance().getTopicSubscriptionCoordinationManager() == null) {
            TopicSubscriptionCoordinationManager topicSubscriptionCoordinationManager = new TopicSubscriptionCoordinationManager();
            topicSubscriptionCoordinationManager.init();
            ClusterResourceHolder.getInstance().setTopicSubscriptionCoordinationManager(topicSubscriptionCoordinationManager);
        }
        ClusterManager clusterManager = clusterConfiguration.isClusteringEnabled().booleanValue() ? new ClusterManager(ClusterResourceHolder.getInstance().getCassandraMessageStore(), clusterConfiguration.getZookeeperConnection()) : new ClusterManager(ClusterResourceHolder.getInstance().getCassandraMessageStore());
        ClusterResourceHolder.getInstance().setClusterManager(clusterManager);
        clusterManager.init();
        clusterManager.startAllGlobalQueueWorkers();
        this.clusterManagementMBean = new ClusterManagementInformationMBean(clusterManager);
        this.clusterManagementMBean.register();
        this.queueManagementMBean = new QueueManagementInformationMBean();
        this.queueManagementMBean.register();
        if (ClusterResourceHolder.getInstance().getClusterConfiguration().isOnceInOrderSupportEnabled()) {
            OnceInOrderEnabledSubscriptionManager onceInOrderEnabledSubscriptionManager = new OnceInOrderEnabledSubscriptionManager();
            ClusterResourceHolder.getInstance().setSubscriptionManager(onceInOrderEnabledSubscriptionManager);
            onceInOrderEnabledSubscriptionManager.init();
        } else {
            DefaultClusteringEnabledSubscriptionManager defaultClusteringEnabledSubscriptionManager = new DefaultClusteringEnabledSubscriptionManager();
            ClusterResourceHolder.getInstance().setSubscriptionManager(defaultClusteringEnabledSubscriptionManager);
            defaultClusteringEnabledSubscriptionManager.init();
        }
        this.configured = true;
    }

    public void syncTopicSubscriptionsWithDatabase(String str) throws Exception {
        if (!this.isCassandraConnectionLive) {
            log.error("Error Synchronizing subscribers for topic. Message store is inaccessible.");
            return;
        }
        if (str != null) {
            ArrayList<String> arrayList = new ArrayList<>();
            Iterator<String> it = CassandraDataAccessHelper.getRowList(TOPIC_SUBSCRIBER_QUEUES, str, this.keyspace).iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
            }
            this.topicSubscribersMap.remove(str);
            this.topicSubscribersMap.put(str, arrayList);
        }
        if (log.isDebugEnabled()) {
            log.debug("Synchronizing subscribers for topic" + str);
        }
    }

    public void syncTopicNodeQueuesWithDatabase(String str) throws Exception {
        if (!this.isCassandraConnectionLive) {
            log.error("Error Synchronizing subscribers for topic. Message store is inaccessible.");
            return;
        }
        if (str != null) {
            ArrayList<String> arrayList = new ArrayList<>();
            Iterator<String> it = CassandraDataAccessHelper.getRowList(TOPIC_SUBSCRIBERS, str, this.keyspace).iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
            }
            this.topicNodeQueuesMap.remove(str);
            this.topicNodeQueuesMap.put(str, arrayList);
        }
        if (log.isDebugEnabled()) {
            log.debug("Synchronizing subscribers for topic" + str);
        }
    }

    @Override // org.wso2.andes.server.store.MessageStore
    public void close() throws Exception {
        GlobalQueueManager globalQueueManager;
        if (!ClusterResourceHolder.getInstance().getClusterManager().isClusteringEnabled()) {
            ClusterResourceHolder.getInstance().getClusterManager().shutDownMyNode();
        }
        if (ClusterResourceHolder.getInstance().getClusterManager().isClusteringEnabled()) {
            deleteNodeData("" + ClusterResourceHolder.getInstance().getClusterManager().getNodeId());
        }
        if (this.messageContentRemovalTask != null && this.messageContentRemovalTask.isRunning()) {
            this.messageContentRemovalTask.setRunning(false);
        }
        if (this.pubSubMessageContentRemoverTask != null && this.pubSubMessageContentRemoverTask.isRunning()) {
            this.pubSubMessageContentRemoverTask.setRunning(false);
        }
        log.info("Stopping all current queue message publishers");
        ClusteringEnabledSubscriptionManager subscriptionManager = ClusterResourceHolder.getInstance().getSubscriptionManager();
        if (subscriptionManager != null) {
            subscriptionManager.stopAllMessageFlushers();
        }
        log.info("Stopping all current topic message publishers");
        CassandraTopicPublisherManager cassandraTopicPublisherManager = ClusterResourceHolder.getInstance().getCassandraTopicPublisherManager();
        if (cassandraTopicPublisherManager != null && cassandraTopicPublisherManager.isActive()) {
            cassandraTopicPublisherManager.stop();
        }
        log.info("Stopping all global queue workers locally");
        ClusterManager clusterManager = ClusterResourceHolder.getInstance().getClusterManager();
        if (clusterManager == null || (globalQueueManager = clusterManager.getGlobalQueueManager()) == null) {
            return;
        }
        globalQueueManager.stopAllQueueWorkersLocally();
    }

    @Override // org.wso2.andes.server.store.MessageStore
    public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T t) {
        long nextId = this.messageIdGenerator.getNextId();
        if (log.isDebugEnabled()) {
            log.debug("MessageID generated:" + nextId);
        }
        return new StoredCassandraMessage(nextId, t);
    }

    @Override // org.wso2.andes.server.store.MessageStore
    public boolean isPersistent() {
        return false;
    }

    @Override // org.wso2.andes.server.store.DurableConfigurationStore
    public void configureConfigStore(String str, ConfigurationRecoveryHandler configurationRecoveryHandler, Configuration configuration, LogSubject logSubject) throws Exception {
        if (this.configured) {
            return;
        }
        performCommonConfiguration(configuration);
        recover(configurationRecoveryHandler);
        ClusterResourceHolder clusterResourceHolder = ClusterResourceHolder.getInstance();
        CassandraTopicPublisherManager cassandraTopicPublisherManager = clusterResourceHolder.getCassandraTopicPublisherManager();
        if (cassandraTopicPublisherManager == null) {
            cassandraTopicPublisherManager = new CassandraTopicPublisherManager();
            clusterResourceHolder.setCassandraTopicPublisherManager(cassandraTopicPublisherManager);
        }
        cassandraTopicPublisherManager.init();
        cassandraTopicPublisherManager.start();
    }

    @Override // org.wso2.andes.server.store.DurableConfigurationStore
    public void createExchange(Exchange exchange) throws AMQStoreException {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in creating exchange " + exchange.getName() + ". Message store is inaccessible.");
            return;
        }
        try {
            String name = exchange.getName();
            CassandraDataAccessHelper.addMappingToRaw(EXCHANGE_COLUMN_FAMILY, EXCHANGE_ROW, name, name + "|" + exchange.getTypeShortString().asString() + "|" + Short.valueOf(exchange.isAutoDelete() ? (short) 1 : (short) 0), this.keyspace);
        } catch (Exception e) {
            throw new AMQStoreException("Error in creating exchange " + exchange.getName(), e);
        }
    }

    public List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler exchangeRecoveryHandler) throws Exception {
        ArrayList arrayList = new ArrayList();
        if (!this.isCassandraConnectionLive) {
            log.error("Error in loading exchanges. Message store is inaccessible.");
            return arrayList;
        }
        try {
            for (HColumn hColumn : CassandraDataAccessHelper.getStringTypeColumnsInARow(EXCHANGE_ROW, EXCHANGE_COLUMN_FAMILY, this.keyspace, Integer.MAX_VALUE).getColumns()) {
                if (hColumn instanceof HColumn) {
                    String str = (String) hColumn.getName();
                    String[] split = ((String) hColumn.getValue()).split("|");
                    String str2 = split[1];
                    short parseShort = Short.parseShort(split[2]);
                    arrayList.add(str);
                    exchangeRecoveryHandler.exchange(str, str2, parseShort != 0);
                }
            }
            return arrayList;
        } catch (Exception e) {
            throw new AMQStoreException("Error in loading exchanges", e);
        }
    }

    public List<String> synchExchanges(VirtualHostConfigSynchronizer virtualHostConfigSynchronizer) throws Exception {
        ArrayList arrayList = new ArrayList();
        if (!this.isCassandraConnectionLive) {
            log.error("Error in synchronizing exchanges. Message store is inaccessible.");
            return arrayList;
        }
        try {
            for (HColumn hColumn : CassandraDataAccessHelper.getStringTypeColumnsInARow(EXCHANGE_ROW, EXCHANGE_COLUMN_FAMILY, this.keyspace, Integer.MAX_VALUE).getColumns()) {
                if (hColumn instanceof HColumn) {
                    String str = (String) hColumn.getName();
                    String[] split = ((String) hColumn.getValue()).split("|");
                    String str2 = split[1];
                    short parseShort = Short.parseShort(split[2]);
                    arrayList.add(str);
                    virtualHostConfigSynchronizer.exchange(str, str2, parseShort != 0);
                }
            }
            return arrayList;
        } catch (Exception e) {
            throw new AMQStoreException("Error in synchronizing exchanges", e);
        }
    }

    @Override // org.wso2.andes.server.store.DurableConfigurationStore
    public void removeExchange(Exchange exchange) throws AMQStoreException {
        throw new UnsupportedOperationException("removeExchange function is unsupported");
    }

    @Override // org.wso2.andes.server.store.DurableConfigurationStore
    public void bindQueue(Exchange exchange, AMQShortString aMQShortString, AMQQueue aMQQueue, FieldTable fieldTable) throws AMQStoreException {
        try {
            addBinding(exchange, aMQQueue, aMQShortString.asString());
        } catch (CassandraDataAccessException e) {
            throw new AMQStoreException("Error adding Binding details to cassandra store", e);
        }
    }

    @Override // org.wso2.andes.server.store.DurableConfigurationStore
    public void unbindQueue(Exchange exchange, AMQShortString aMQShortString, AMQQueue aMQQueue, FieldTable fieldTable) throws AMQStoreException {
        try {
            removeBinding(exchange, aMQQueue, aMQShortString.asString());
        } catch (CassandraDataAccessException e) {
            throw new AMQStoreException("Error removing binding details from cassandra store", e);
        }
    }

    @Override // org.wso2.andes.server.store.DurableConfigurationStore
    public void createQueue(AMQQueue aMQQueue, FieldTable fieldTable) throws AMQStoreException {
        createQueue(aMQQueue);
    }

    @Override // org.wso2.andes.server.store.DurableConfigurationStore
    public void createQueue(AMQQueue aMQQueue) {
        if (!this.isCassandraConnectionLive) {
            log.error("Error While creating queue" + aMQQueue.getName() + "Message store is inaccessible.");
            return;
        }
        try {
            CassandraDataAccessHelper.addMappingToRaw(QUEUE_DETAILS_COLUMN_FAMILY, QUEUE_DETAILS_ROW, aMQQueue.getNameShortString().toString(), aMQQueue.getNameShortString().toString() + "|" + (aMQQueue.getOwner() == null ? null : aMQQueue.getOwner().toString()) + "|" + (aMQQueue.isExclusive() ? AMQSession.STRICT_AMQP_FATAL_DEFAULT : "false"), this.keyspace);
        } catch (Exception e) {
            throw new RuntimeException("Error While creating queue" + aMQQueue.getName(), e);
        }
    }

    public void addNodeDetails(String str, String str2) {
        if (!this.isCassandraConnectionLive) {
            log.error("Error writing Node details to cassandra database. Message store is inaccessible.");
            return;
        }
        try {
            CassandraDataAccessHelper.addMappingToRaw(NODE_DETAIL_COLUMN_FAMILY, NODE_DETAIL_ROW, str, str2, this.keyspace);
        } catch (CassandraDataAccessException e) {
            throw new RuntimeException("Error writing Node details to cassandra database", e);
        }
    }

    public String getNodeData(String str) {
        if (!this.isCassandraConnectionLive) {
            log.error("Error accessing Node details to cassandra database. Message store is inaccessible.");
            return null;
        }
        try {
            HColumn columnByName = CassandraDataAccessHelper.getStringTypeColumnsInARow(NODE_DETAIL_ROW, NODE_DETAIL_COLUMN_FAMILY, this.keyspace, Integer.MAX_VALUE).getColumnByName(str);
            return (String) columnByName.getValue();
        } catch (CassandraDataAccessException e) {
            throw new RuntimeException("Error accessing Node details to cassandra database");
        }
    }

    public List<String> storedNodeDetails() {
        if (!this.isCassandraConnectionLive) {
            log.error("Error accessing Node details to cassandra database. Message store is inaccessible.");
            return new ArrayList();
        }
        try {
            List columns = CassandraDataAccessHelper.getStringTypeColumnsInARow(NODE_DETAIL_ROW, NODE_DETAIL_COLUMN_FAMILY, this.keyspace, Integer.MAX_VALUE).getColumns();
            ArrayList arrayList = new ArrayList();
            Iterator it = columns.iterator();
            while (it.hasNext()) {
                arrayList.add(((HColumn) it.next()).getName());
            }
            return arrayList;
        } catch (CassandraDataAccessException e) {
            throw new RuntimeException("Error accessing Node details to cassandra database");
        }
    }

    public void deleteNodeData(String str) {
        if (!this.isCassandraConnectionLive) {
            log.error("Error accessing Node details to cassandra database. Message store is inaccessible.");
        } else {
            try {
                CassandraDataAccessHelper.deleteStringColumnFromRaw(NODE_DETAIL_COLUMN_FAMILY, NODE_DETAIL_ROW, str, this.keyspace);
            } catch (CassandraDataAccessException e) {
                throw new RuntimeException("Error accessing Node details to cassandra database");
            }
        }
    }

    public void createGlobalQueue(String str) throws AMQStoreException {
        if (!this.isCassandraConnectionLive) {
            log.error("Error while adding Global Queue to Cassandra message store. Message store is inaccessible.");
            return;
        }
        try {
            CassandraDataAccessHelper.addMappingToRaw(GLOBAL_QUEUE_LIST_COLUMN_FAMILY, GLOBAL_QUEUE_LIST_ROW, str, str, this.keyspace);
            log.info("Created Queue : " + str);
        } catch (CassandraDataAccessException e) {
            throw new AMQStoreException("Error while adding Global Queue to Cassandra message store", e);
        }
    }

    @Override // org.wso2.andes.server.store.DurableConfigurationStore
    public void removeQueue(AMQQueue aMQQueue) throws AMQStoreException {
        try {
            CassandraDataAccessHelper.deleteStringColumnFromRaw(QUEUE_DETAILS_COLUMN_FAMILY, QUEUE_DETAILS_ROW, aMQQueue.getNameShortString().toString(), this.keyspace);
        } catch (CassandraDataAccessException e) {
            throw new AMQStoreException("Error while deleting queue : " + aMQQueue, e);
        }
    }

    public void removeGlobalQueue(String str) throws AMQStoreException {
        if (!this.isCassandraConnectionLive) {
            log.error("Error while removing Global Queue" + str + ". Message store is inaccessible.");
            return;
        }
        try {
            Iterator<String> it = getUserQueues(str).iterator();
            while (it.hasNext()) {
                CassandraDataAccessHelper.deleteStringColumnFromRaw(GLOBAL_QUEUE_TO_USER_QUEUE_COLUMN_FAMILY, str, it.next(), this.keyspace);
            }
            CassandraDataAccessHelper.deleteStringColumnFromRaw(GLOBAL_QUEUE_LIST_COLUMN_FAMILY, GLOBAL_QUEUE_LIST_ROW, str, this.keyspace);
            removeMessageCounterForQueue(str);
            if (str.startsWith("tmp_")) {
                log.info("Removed Global Queue Assigned for Topic Subscription: " + str);
            } else {
                log.info("Removed Global Queue Assigned for Queue: " + str);
            }
        } catch (Exception e) {
            throw new AMQStoreException("Error while removing Global Queue  : " + str, e);
        }
    }

    public void checkCassandraConnection() {
        new Thread(new Runnable() { // from class: org.wso2.andes.server.store.CassandraMessageStore.1
            @Override // java.lang.Runnable
            public void run() {
                GlobalQueueManager globalQueueManager;
                GlobalQueueManager globalQueueManager2;
                int i = 0;
                while (true) {
                    try {
                        if (CassandraMessageStore.this.cluster.describeClusterName() != null) {
                            boolean z = CassandraMessageStore.this.isCassandraConnectionLive;
                            CassandraMessageStore.this.isCassandraConnectionLive = true;
                            i = 0;
                            if (!z) {
                                CassandraMessageStore.log.info("Cassandra Message Store is alive....");
                                CassandraMessageStore.log.info("Starting all current queue message publishers");
                                ClusteringEnabledSubscriptionManager subscriptionManager = ClusterResourceHolder.getInstance().getSubscriptionManager();
                                if (subscriptionManager != null) {
                                    subscriptionManager.startAllMessageFlushers();
                                }
                                CassandraMessageStore.log.info("Starting all current topic message publishers");
                                CassandraTopicPublisherManager cassandraTopicPublisherManager = ClusterResourceHolder.getInstance().getCassandraTopicPublisherManager();
                                if (cassandraTopicPublisherManager != null && !cassandraTopicPublisherManager.isActive()) {
                                    cassandraTopicPublisherManager.start();
                                }
                                CassandraMessageStore.log.info("Starting all available Global Queue Workers");
                                ClusterManager clusterManager = ClusterResourceHolder.getInstance().getClusterManager();
                                if (clusterManager != null && (globalQueueManager2 = clusterManager.getGlobalQueueManager()) != null) {
                                    globalQueueManager2.startAllQueueWorkersLocally();
                                }
                                CassandraMessageStore.log.info("Starting all message content writers");
                                if (CassandraMessageStore.this.publishMessageContentWriter != null) {
                                    CassandraMessageStore.this.publishMessageWriter.start();
                                }
                                CassandraMessageStore.log.info("Starting message content deletion");
                                if (CassandraMessageStore.this.messageContentRemovalTask != null && !CassandraMessageStore.this.messageContentRemovalTask.isRunning()) {
                                    CassandraMessageStore.this.messageContentRemovalTask.setRunning(true);
                                }
                                CassandraMessageStore.log.info("Starting pub-sub message removal task");
                                if (CassandraMessageStore.this.pubSubMessageContentRemoverTask != null && !CassandraMessageStore.this.pubSubMessageContentRemoverTask.isRunning()) {
                                    CassandraMessageStore.this.pubSubMessageContentRemoverTask.setRunning(true);
                                }
                            }
                            Thread.sleep(10000L);
                        }
                    } catch (InterruptedException e) {
                    } catch (Exception e2) {
                        CassandraMessageStore.log.error("Error while checking if Cassandra Connection is alive.", e2);
                    } catch (HectorException e3) {
                        try {
                            if (e3.getMessage().contains("All host pools marked down. Retry burden pushed out to client")) {
                                CassandraMessageStore.this.isCassandraConnectionLive = false;
                                if (i < 5) {
                                    CassandraMessageStore.log.error(e3);
                                }
                                i++;
                                if (i == 4) {
                                    CassandraMessageStore.log.error("Cassandra Message Store is Inaccessible....");
                                    CassandraMessageStore.log.info("Stopping all current queue message publishers");
                                    ClusteringEnabledSubscriptionManager subscriptionManager2 = ClusterResourceHolder.getInstance().getSubscriptionManager();
                                    if (subscriptionManager2 != null) {
                                        subscriptionManager2.stopAllMessageFlushers();
                                    }
                                    CassandraMessageStore.log.info("Stopping all current topic message publishers");
                                    CassandraTopicPublisherManager cassandraTopicPublisherManager2 = ClusterResourceHolder.getInstance().getCassandraTopicPublisherManager();
                                    if (cassandraTopicPublisherManager2 != null && cassandraTopicPublisherManager2.isActive()) {
                                        cassandraTopicPublisherManager2.stop();
                                    }
                                    CassandraMessageStore.log.info("Stopping all global queue workers locally");
                                    ClusterManager clusterManager2 = ClusterResourceHolder.getInstance().getClusterManager();
                                    if (clusterManager2 != null && (globalQueueManager = clusterManager2.getGlobalQueueManager()) != null) {
                                        globalQueueManager.stopAllQueueWorkersLocally();
                                    }
                                    CassandraMessageStore.log.info("Stopping all message content writers");
                                    if (CassandraMessageStore.this.publishMessageContentWriter != null) {
                                        CassandraMessageStore.this.publishMessageWriter.stop();
                                    }
                                    CassandraMessageStore.log.info("Stopping message content deletion");
                                    if (CassandraMessageStore.this.messageContentRemovalTask != null && CassandraMessageStore.this.messageContentRemovalTask.isRunning()) {
                                        CassandraMessageStore.this.messageContentRemovalTask.setRunning(false);
                                    }
                                    CassandraMessageStore.log.info("Stopping pub-sub message removal task");
                                    if (CassandraMessageStore.this.pubSubMessageContentRemoverTask != null && CassandraMessageStore.this.pubSubMessageContentRemoverTask.isRunning()) {
                                        CassandraMessageStore.this.pubSubMessageContentRemoverTask.setRunning(false);
                                    }
                                }
                                CassandraMessageStore.log.info("Waiting for Cassandra connection configured to become live...");
                                if (i <= 10) {
                                    Thread.sleep(6000L);
                                } else {
                                    if (i == 120) {
                                        i = 10;
                                    }
                                    Thread.sleep(MessageOkBodyImpl.METHOD_ID * i);
                                }
                            }
                        } catch (InterruptedException e4) {
                        } catch (Exception e5) {
                            CassandraMessageStore.log.error("Error while checking if Cassandra Connection is alive.", e5);
                        }
                    }
                }
            }
        }).start();
    }

    @Override // org.wso2.andes.server.store.DurableConfigurationStore
    public void updateQueue(AMQQueue aMQQueue) throws AMQStoreException {
        if (!this.isCassandraConnectionLive) {
            log.error("Error in updating the queue. Message store is inaccessible.");
            return;
        }
        try {
            CassandraDataAccessHelper.addMappingToRaw(QUEUE_DETAILS_COLUMN_FAMILY, QUEUE_DETAILS_ROW, aMQQueue.getNameShortString().toString(), aMQQueue.getNameShortString().toString() + "|" + (aMQQueue.getOwner() == null ? null : aMQQueue.getOwner().toString()) + "|" + (aMQQueue.isExclusive() ? AMQSession.STRICT_AMQP_FATAL_DEFAULT : "false"), this.keyspace);
        } catch (CassandraDataAccessException e) {
            throw new AMQStoreException("Error in updating the queue", e);
        }
    }

    @Override // org.wso2.andes.server.store.TransactionLog
    public void configureTransactionLog(String str, TransactionLogRecoveryHandler transactionLogRecoveryHandler, Configuration configuration, LogSubject logSubject) throws Exception {
    }

    @Override // org.wso2.andes.server.store.TransactionLog
    public TransactionLog.Transaction newTransaction() {
        return new CassandraTransaction();
    }

    public boolean isConfigured() {
        return this.configured;
    }

    public void addContentDeletionTask(long j) {
        this.contentDeletionTasks.put(Long.valueOf(System.currentTimeMillis()), Long.valueOf(j));
    }
}
