package org.wso2.carbon.broker.core.internal.brokers.jms;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.axiom.om.OMElement;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.broker.core.BrokerConfiguration;
import org.wso2.carbon.broker.core.BrokerListener;
import org.wso2.carbon.broker.core.BrokerType;
import org.wso2.carbon.broker.core.BrokerTypeDto;
import org.wso2.carbon.broker.core.exception.BrokerEventProcessingException;

/* loaded from: input_file:org/wso2/carbon/broker/core/internal/brokers/jms/JMSBrokerType.class */
public abstract class JMSBrokerType implements BrokerType {
    private static final Log log = LogFactory.getLog(JMSBrokerType.class);
    private Map<String, Map<String, SubscriptionDetails>> brokerSubscriptionsMap;
    private BrokerTypeDto brokerTypeDto = null;
    private ConcurrentHashMap<BrokerConfiguration, ConcurrentHashMap<String, JMSConnection>> jmsConnectionMap = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/carbon/broker/core/internal/brokers/jms/JMSBrokerType$JMSConnection.class */
    public class JMSConnection {
        private TopicConnection topicConnection;
        private Session session;
        private MessageProducer messageProducer;

        JMSConnection(TopicConnection topicConnection, Session session, MessageProducer messageProducer) {
            this.topicConnection = topicConnection;
            this.session = session;
            this.messageProducer = messageProducer;
        }

        public Session getSession() {
            return this.session;
        }

        public MessageProducer getMessageProducer() {
            return this.messageProducer;
        }

        public TopicConnection getTopicConnection() {
            return this.topicConnection;
        }
    }

    @Override // org.wso2.carbon.broker.core.BrokerType
    public BrokerTypeDto getBrokerTypeDto() {
        return this.brokerTypeDto;
    }

    @Override // org.wso2.carbon.broker.core.BrokerType
    public String subscribe(String str, BrokerListener brokerListener, BrokerConfiguration brokerConfiguration, AxisConfiguration axisConfiguration) throws BrokerEventProcessingException {
        String uuid = UUID.randomUUID().toString();
        TopicConnection topicConnection = getTopicConnection(brokerConfiguration);
        try {
            TopicSession createTopicSession = topicConnection.createTopicSession(false, 1);
            TopicSubscriber createSubscriber = createTopicSession.createSubscriber(createTopicSession.createTopic(str));
            createSubscriber.setMessageListener(new JMSMessageListener(brokerListener));
            topicConnection.start();
            Map<String, SubscriptionDetails> map = this.brokerSubscriptionsMap.get(brokerConfiguration.getName());
            if (map == null) {
                map = new ConcurrentHashMap();
                this.brokerSubscriptionsMap.put(brokerConfiguration.getName(), map);
            }
            map.put(uuid, new SubscriptionDetails(topicConnection, createTopicSession, createSubscriber));
            return uuid;
        } catch (JMSException e) {
            String str2 = "Failed to subscribe to topic:" + str;
            log.error(str2, e);
            throw new BrokerEventProcessingException(str2, e);
        }
    }

    protected abstract TopicConnection getTopicConnection(BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException;

    @Override // org.wso2.carbon.broker.core.BrokerType
    public void publish(String str, Object obj, BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException {
        ConcurrentHashMap<String, JMSConnection> concurrentHashMap = this.jmsConnectionMap.get(brokerConfiguration);
        if (null == concurrentHashMap) {
            concurrentHashMap = new ConcurrentHashMap<>();
            this.jmsConnectionMap.putIfAbsent(brokerConfiguration, concurrentHashMap);
        }
        JMSConnection jMSConnection = concurrentHashMap.get(str);
        if (null == jMSConnection) {
            try {
                TopicConnection topicConnection = getTopicConnection(brokerConfiguration);
                TopicSession createTopicSession = topicConnection.createTopicSession(false, 1);
                jMSConnection = new JMSConnection(topicConnection, createTopicSession, createTopicSession.createProducer(createTopicSession.createTopic(str)));
                concurrentHashMap.putIfAbsent(str, jMSConnection);
            } catch (JMSException e) {
                String str2 = "Failed to publish to topic:" + str;
                log.error(str2, e);
                throw new BrokerEventProcessingException(str2, e);
            }
        }
        try {
            Session session = jMSConnection.getSession();
            TextMessage textMessage = null;
            if (obj instanceof OMElement) {
                textMessage = session.createTextMessage(obj.toString());
            } else if (obj instanceof String) {
                textMessage = session.createTextMessage((String) obj);
            } else if (obj instanceof Map) {
                TextMessage createMapMessage = session.createMapMessage();
                Map map = (Map) obj;
                for (Object obj2 : map.keySet()) {
                    createMapMessage.setObject((String) obj2, map.get(obj2));
                }
                textMessage = createMapMessage;
            }
            jMSConnection.getMessageProducer().send(textMessage);
        } catch (JMSException e2) {
            concurrentHashMap.remove(str);
            String str3 = "Failed to publish to topic:" + str;
            log.error(str3, e2);
            try {
                if (jMSConnection.getSession() != null) {
                    jMSConnection.getSession().close();
                }
                if (jMSConnection.getTopicConnection() != null) {
                    jMSConnection.getTopicConnection().close();
                }
            } catch (JMSException e3) {
                log.warn("Failed to reallocate resources.", e3);
            }
            throw new BrokerEventProcessingException(str3, e2);
        }
    }

    @Override // org.wso2.carbon.broker.core.BrokerType
    public void testConnection(BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException {
        publish("test", " <brokerConfigurationTest>\n   <message>This is a test message.</message>\n   </brokerConfigurationTest>", brokerConfiguration);
    }

    @Override // org.wso2.carbon.broker.core.BrokerType
    public void unsubscribe(String str, BrokerConfiguration brokerConfiguration, AxisConfiguration axisConfiguration, String str2) throws BrokerEventProcessingException {
        Map<String, SubscriptionDetails> map = this.brokerSubscriptionsMap.get(brokerConfiguration.getName());
        if (map == null) {
            throw new BrokerEventProcessingException("There is no subscription for broker " + brokerConfiguration.getName());
        }
        SubscriptionDetails remove = map.remove(str2);
        if (remove == null) {
            throw new BrokerEventProcessingException("There is no subscriptions for topic" + str + " with subscriptionId " + str2);
        }
        try {
            remove.close();
        } catch (JMSException e) {
            throw new BrokerEventProcessingException("Can not unsubscribe from the broker withconfiguration " + brokerConfiguration.getName(), e);
        }
    }

    protected Map<String, Map<String, SubscriptionDetails>> getBrokerSubscriptionsMap() {
        return this.brokerSubscriptionsMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setBrokerSubscriptionsMap(Map<String, Map<String, SubscriptionDetails>> map) {
        this.brokerSubscriptionsMap = map;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setBrokerTypeDto(BrokerTypeDto brokerTypeDto) {
        this.brokerTypeDto = brokerTypeDto;
    }
}
