package org.wso2.carbon.broker.core.internal.jms;

import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.ResourceBundle;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.axiom.om.OMElement;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.CarbonConstants;
import org.wso2.carbon.broker.core.BrokerConfiguration;
import org.wso2.carbon.broker.core.BrokerListener;
import org.wso2.carbon.broker.core.BrokerTypeDto;
import org.wso2.carbon.broker.core.Property;
import org.wso2.carbon.broker.core.exception.BrokerEventProcessingException;
import org.wso2.carbon.broker.core.internal.BrokerType;
import org.wso2.carbon.broker.core.internal.util.BrokerConstants;

/* loaded from: input_file:org/wso2/carbon/broker/core/internal/jms/JMSBrokerType.class */
public class JMSBrokerType implements BrokerType {
    private static final Log log = LogFactory.getLog(JMSBrokerType.class);
    private static JMSBrokerType instance = new JMSBrokerType();
    private BrokerTypeDto brokerTypeDto;
    private Map<String, Map<String, SubscriptionDetails>> brokerSubscriptionsMap;

    private JMSBrokerType() {
        this.brokerTypeDto = null;
        this.brokerTypeDto = new BrokerTypeDto();
        this.brokerTypeDto.setName(BrokerConstants.BROKER_TYPE_JMS_QPID);
        ResourceBundle bundle = ResourceBundle.getBundle("org.wso2.carbon.broker.core.i18n.Resources", Locale.getDefault());
        Property property = new Property(BrokerConstants.BROKER_CONF_JMS_PROP_JNDI_NAME);
        property.setRequired(true);
        property.setDisplayName(bundle.getString(BrokerConstants.BROKER_CONF_JMS_PROP_JNDI_NAME));
        this.brokerTypeDto.addProperty(property);
        Property property2 = new Property(BrokerConstants.BROKER_CONF_JMS_PROP_PRINCIPAL);
        property2.setRequired(true);
        property2.setDisplayName(bundle.getString(BrokerConstants.BROKER_CONF_JMS_PROP_PRINCIPAL));
        this.brokerTypeDto.addProperty(property2);
        Property property3 = new Property(BrokerConstants.BROKER_CONF_JMS_PROP_CREDENTIALS);
        property3.setRequired(true);
        property3.setSecured(true);
        property3.setDisplayName(bundle.getString(BrokerConstants.BROKER_CONF_JMS_PROP_CREDENTIALS));
        this.brokerTypeDto.addProperty(property3);
        Property property4 = new Property(BrokerConstants.BROKER_CONF_JMS_PROP_IP_ADDRESS);
        property4.setDisplayName(bundle.getString(BrokerConstants.BROKER_CONF_JMS_PROP_IP_ADDRESS));
        property4.setRequired(true);
        this.brokerTypeDto.addProperty(property4);
        Property property5 = new Property(BrokerConstants.BROKER_CONF_JMS_PROP_PORT);
        property5.setDisplayName(bundle.getString(BrokerConstants.BROKER_CONF_JMS_PROP_PORT));
        property5.setRequired(true);
        this.brokerTypeDto.addProperty(property5);
        Property property6 = new Property(BrokerConstants.BROKER_CONF_JMS_PROP_VIRTURAL_HOST_NAME);
        property6.setRequired(true);
        property6.setDisplayName(bundle.getString(BrokerConstants.BROKER_CONF_JMS_PROP_VIRTURAL_HOST_NAME));
        this.brokerTypeDto.addProperty(property6);
        this.brokerSubscriptionsMap = new ConcurrentHashMap();
    }

    public static JMSBrokerType getInstance() {
        return instance;
    }

    @Override // org.wso2.carbon.broker.core.internal.BrokerType
    public BrokerTypeDto getBrokerTypeDto() {
        return this.brokerTypeDto;
    }

    @Override // org.wso2.carbon.broker.core.internal.BrokerType
    public void subscribe(String str, BrokerListener brokerListener, BrokerConfiguration brokerConfiguration, AxisConfiguration axisConfiguration) throws BrokerEventProcessingException {
        TopicConnection topicConnection = getTopicConnection(brokerConfiguration);
        try {
            TopicSession createTopicSession = topicConnection.createTopicSession(false, 1);
            TopicSubscriber createSubscriber = createTopicSession.createSubscriber(createTopicSession.createTopic(str));
            createSubscriber.setMessageListener(new JMSMessageListener(brokerListener));
            topicConnection.start();
            Map<String, SubscriptionDetails> map = this.brokerSubscriptionsMap.get(brokerConfiguration.getName());
            if (map == null) {
                map = new ConcurrentHashMap();
                this.brokerSubscriptionsMap.put(brokerConfiguration.getName(), map);
            }
            map.put(str, new SubscriptionDetails(topicConnection, createTopicSession, createSubscriber));
        } catch (JMSException e) {
            String str2 = "Failed to subscribe to topic:" + str;
            log.error(str2, e);
            throw new BrokerEventProcessingException(str2, e);
        }
    }

    private TopicConnection getTopicConnection(BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException {
        try {
            return ((TopicConnectionFactory) createInitialContext(brokerConfiguration).lookup(BrokerConstants.BROKER_CONF_JMS_QPID_PROP_CONNECTION_FACTORY_LOOK_UP_NAME)).createTopicConnection();
        } catch (JMSException e) {
            throw new BrokerEventProcessingException("Can not create topic connection .", e);
        } catch (NamingException e2) {
            throw new BrokerEventProcessingException("Can not create topic connection factory.", e2);
        }
    }

    private InitialContext createInitialContext(BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException {
        Map<String, String> properties = brokerConfiguration.getProperties();
        String str = properties.get(BrokerConstants.BROKER_CONF_JMS_PROP_JNDI_NAME);
        String createConnectionUrl = createConnectionUrl(properties.get(BrokerConstants.BROKER_CONF_JMS_PROP_PRINCIPAL), properties.get(BrokerConstants.BROKER_CONF_JMS_PROP_CREDENTIALS), properties.get(BrokerConstants.BROKER_CONF_JMS_PROP_VIRTURAL_HOST_NAME), properties.get(BrokerConstants.BROKER_CONF_JMS_PROP_IP_ADDRESS), properties.get(BrokerConstants.BROKER_CONF_JMS_PROP_PORT));
        Properties properties2 = new Properties();
        properties2.put("java.naming.factory.initial", str);
        properties2.put("connectionfactory.qpidConnectionfactory", createConnectionUrl);
        properties2.put(CarbonConstants.REQUEST_BASE_CONTEXT, "true");
        try {
            return new InitialContext(properties2);
        } catch (NamingException e) {
            throw new BrokerEventProcessingException("Can not create initial context with given parameters.", e);
        }
    }

    private String createConnectionUrl(String str, String str2, String str3, String str4, String str5) {
        StringBuffer stringBuffer = new StringBuffer("amqp://");
        stringBuffer.append(str).append(":").append(str2).append("@").append("clientId").append("/").append(str3).append("?brokerlist='tcp://").append(str4).append(":").append(str5).append("'");
        return stringBuffer.toString();
    }

    @Override // org.wso2.carbon.broker.core.internal.BrokerType
    public void publish(String str, OMElement oMElement, BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException {
        TopicConnection topicConnection = getTopicConnection(brokerConfiguration);
        Session session = null;
        try {
            try {
                session = topicConnection.createTopicSession(false, 1);
                session.createProducer(session.createTopic(str)).send(session.createTextMessage(oMElement.toString()));
                if (session != null) {
                    try {
                        session.close();
                    } catch (JMSException e) {
                        log.warn("Failed to reallocate resources.", e);
                        return;
                    }
                }
                if (topicConnection != null) {
                    topicConnection.close();
                }
            } catch (JMSException e2) {
                String str2 = "Failed to publish to topic:" + str;
                log.error(str2, e2);
                throw new BrokerEventProcessingException(str2, e2);
            }
        } catch (Throwable th) {
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e3) {
                    log.warn("Failed to reallocate resources.", e3);
                    throw th;
                }
            }
            if (topicConnection != null) {
                topicConnection.close();
            }
            throw th;
        }
    }

    @Override // org.wso2.carbon.broker.core.internal.BrokerType
    public void unsubscribe(String str, BrokerConfiguration brokerConfiguration, AxisConfiguration axisConfiguration) throws BrokerEventProcessingException {
        Map<String, SubscriptionDetails> map = this.brokerSubscriptionsMap.get(brokerConfiguration.getName());
        if (map == null) {
            throw new BrokerEventProcessingException("There is no subscription for broker " + brokerConfiguration.getName());
        }
        SubscriptionDetails remove = map.remove(str);
        if (remove == null) {
            throw new BrokerEventProcessingException("There is no subscriptions for this topic" + str);
        }
        try {
            remove.close();
        } catch (JMSException e) {
            throw new BrokerEventProcessingException("Can not unsubscribe from the broker withconfiguration " + brokerConfiguration.getName());
        }
    }
}
