package org.wso2.carbon.broker.core.internal.brokers.agent;

import com.google.gson.Gson;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.ResourceBundle;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.axiom.om.OMElement;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.broker.core.BrokerConfiguration;
import org.wso2.carbon.broker.core.BrokerListener;
import org.wso2.carbon.broker.core.BrokerTypeDto;
import org.wso2.carbon.broker.core.Property;
import org.wso2.carbon.broker.core.exception.BrokerEventProcessingException;
import org.wso2.carbon.broker.core.internal.BrokerType;
import org.wso2.carbon.broker.core.internal.ds.BrokerServiceValueHolder;
import org.wso2.carbon.broker.core.internal.util.BrokerConstants;
import org.wso2.carbon.databridge.agent.thrift.Agent;
import org.wso2.carbon.databridge.agent.thrift.DataPublisher;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.AttributeType;
import org.wso2.carbon.databridge.commons.Credentials;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.exception.AuthenticationException;
import org.wso2.carbon.databridge.commons.exception.TransportException;
import org.wso2.carbon.databridge.core.AgentCallback;

/* loaded from: input_file:org/wso2/carbon/broker/core/internal/brokers/agent/AgentBrokerType.class */
public final class AgentBrokerType implements BrokerType {
    private BrokerTypeDto brokerTypeDto;
    private Agent agent;
    private static final Log log = LogFactory.getLog(AgentBrokerType.class);
    private static AgentBrokerType agentBrokerType = new AgentBrokerType();
    private Gson gson = new Gson();
    private Map<String, Map<BrokerConfiguration, BrokerListener>> brokerListenerMap = new ConcurrentHashMap();
    private Map<String, Map<BrokerConfiguration, BrokerListener>> brokerListenerStreamIdMap = new ConcurrentHashMap();
    private Map<String, StreamDefinition> outputTypeDefMap = new ConcurrentHashMap();
    private Map<BrokerConfiguration, DataPublisher> dataPublisherMap = new ConcurrentHashMap();

    /* loaded from: input_file:org/wso2/carbon/broker/core/internal/brokers/agent/AgentBrokerType$AgentBrokerCallback.class */
    private class AgentBrokerCallback implements AgentCallback {
        private AgentBrokerCallback() {
        }

        public void definedStream(StreamDefinition streamDefinition, Credentials credentials) {
            Map map = (Map) AgentBrokerType.this.brokerListenerMap.get(streamDefinition.getName());
            if (map == null) {
                map = new HashMap();
                AgentBrokerType.this.brokerListenerMap.put(streamDefinition.getName(), map);
            }
            Iterator it = map.values().iterator();
            while (it.hasNext()) {
                try {
                    ((BrokerListener) it.next()).onEventDefinition(streamDefinition);
                } catch (BrokerEventProcessingException e) {
                    AgentBrokerType.log.error("Cannot send Stream Definition to a brokerListener subscribed to " + streamDefinition.getStreamId(), e);
                }
            }
            AgentBrokerType.this.brokerListenerStreamIdMap.put(streamDefinition.getStreamId(), AgentBrokerType.this.brokerListenerMap.get(streamDefinition.getName()));
        }

        public void receive(List<Event> list, Credentials credentials) {
            for (Event event : list) {
                Iterator it = ((Map) AgentBrokerType.this.brokerListenerStreamIdMap.get(event.getStreamId())).values().iterator();
                while (it.hasNext()) {
                    try {
                        ((BrokerListener) it.next()).onEvent(event);
                    } catch (BrokerEventProcessingException e) {
                        AgentBrokerType.log.error("Cannot send event to a brokerListener subscribed to " + event.getStreamId(), e);
                    }
                }
            }
        }
    }

    private AgentBrokerType() {
        this.brokerTypeDto = null;
        this.brokerTypeDto = new BrokerTypeDto();
        this.brokerTypeDto.setName(BrokerConstants.BROKER_TYPE_AGENT);
        ResourceBundle bundle = ResourceBundle.getBundle("org.wso2.carbon.broker.core.i18n.Resources", Locale.getDefault());
        Property property = new Property(BrokerConstants.BROKER_CONF_AGENT_PROP_RECEIVER_URL);
        property.setDisplayName(bundle.getString(BrokerConstants.BROKER_CONF_AGENT_PROP_RECEIVER_URL));
        property.setRequired(true);
        this.brokerTypeDto.addProperty(property);
        Property property2 = new Property(BrokerConstants.BROKER_CONF_AGENT_PROP_AUTHENTICATOR_URL);
        property2.setDisplayName(bundle.getString(BrokerConstants.BROKER_CONF_AGENT_PROP_AUTHENTICATOR_URL));
        property2.setRequired(false);
        this.brokerTypeDto.addProperty(property2);
        Property property3 = new Property("username");
        property3.setRequired(true);
        property3.setDisplayName(bundle.getString("username"));
        this.brokerTypeDto.addProperty(property3);
        Property property4 = new Property("password");
        property4.setRequired(true);
        property4.setSecured(true);
        property4.setDisplayName(bundle.getString("password"));
        this.brokerTypeDto.addProperty(property4);
        BrokerServiceValueHolder.getDataBridgeSubscriberService().subscribe(new AgentBrokerCallback());
    }

    public static AgentBrokerType getInstance() {
        return agentBrokerType;
    }

    @Override // org.wso2.carbon.broker.core.internal.BrokerType
    public void subscribe(String str, BrokerListener brokerListener, BrokerConfiguration brokerConfiguration, AxisConfiguration axisConfiguration) throws BrokerEventProcessingException {
        if (this.brokerListenerMap.containsKey(str)) {
            this.brokerListenerMap.get(str).put(brokerConfiguration, brokerListener);
            return;
        }
        HashMap hashMap = new HashMap();
        hashMap.put(brokerConfiguration, brokerListener);
        this.brokerListenerMap.put(str, hashMap);
    }

    @Override // org.wso2.carbon.broker.core.internal.BrokerType
    public void publish(String str, Object obj, BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException {
        DataPublisher dataPublisher = this.dataPublisherMap.get(brokerConfiguration);
        if (dataPublisher == null) {
            dataPublisher = createDataPublihser(brokerConfiguration);
        }
        String localName = ((OMElement) obj).getLocalName();
        StreamDefinition streamDefinition = this.outputTypeDefMap.get(localName);
        if (streamDefinition != null) {
            sendEvent(brokerConfiguration, dataPublisher, localName, buildPayloadData((OMElement) obj, streamDefinition));
            return;
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        buildPayLoadDataAndAttributes((OMElement) obj, arrayList, arrayList2);
        StreamDefinition streamDefinition2 = new StreamDefinition(localName);
        streamDefinition2.setPayloadData(arrayList);
        this.outputTypeDefMap.put(localName, streamDefinition2);
        try {
            dataPublisher.defineStream(this.gson.toJson(streamDefinition2));
            sendEvent(brokerConfiguration, dataPublisher, localName, arrayList2.toArray());
        } catch (Exception e) {
            throw new BrokerEventProcessingException("Cannot define type via DataPublisher for the broker configuration:" + brokerConfiguration.getName() + " on the eventStreamDefinition " + streamDefinition2, e);
        }
    }

    private void buildPayLoadDataAndAttributes(OMElement oMElement, List<Attribute> list, List<String> list2) {
        Iterator childElements = oMElement.getChildElements();
        while (childElements.hasNext()) {
            OMElement oMElement2 = (OMElement) childElements.next();
            list.add(new Attribute(oMElement2.getLocalName(), AttributeType.STRING));
            list2.add(oMElement2.getText());
        }
    }

    private Object[] buildPayloadData(OMElement oMElement, StreamDefinition streamDefinition) {
        Object[] objArr = new Object[streamDefinition.getPayloadData().size()];
        List payloadData = streamDefinition.getPayloadData();
        for (int i = 0; i < streamDefinition.getPayloadData().size(); i++) {
            objArr[i] = oMElement.getChildrenWithLocalName(((Attribute) payloadData.get(i)).getName()).next();
        }
        return objArr;
    }

    private void sendEvent(BrokerConfiguration brokerConfiguration, DataPublisher dataPublisher, String str, Object[] objArr) throws BrokerEventProcessingException {
        Event event = new Event();
        event.setStreamId(str);
        event.setCorrelationData(objArr);
        publishEvent(brokerConfiguration, dataPublisher, event);
    }

    private DataPublisher createDataPublihser(BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException {
        if (this.agent == null) {
            this.agent = BrokerServiceValueHolder.getAgent();
        }
        DataPublisher dataPublisher = null;
        Map<String, String> properties = brokerConfiguration.getProperties();
        try {
            dataPublisher = (null == properties.get(BrokerConstants.BROKER_CONF_AGENT_PROP_AUTHENTICATOR_URL) || properties.get(BrokerConstants.BROKER_CONF_AGENT_PROP_AUTHENTICATOR_URL).length() <= 0) ? new DataPublisher(properties.get(BrokerConstants.BROKER_CONF_AGENT_PROP_RECEIVER_URL), properties.get("username"), properties.get("password"), this.agent) : new DataPublisher(properties.get(BrokerConstants.BROKER_CONF_AGENT_PROP_AUTHENTICATOR_URL), properties.get(BrokerConstants.BROKER_CONF_AGENT_PROP_RECEIVER_URL), properties.get("username"), properties.get("password"), this.agent);
        } catch (AgentException e) {
            throwBrokerEventProcessingException(brokerConfiguration, e);
        } catch (AuthenticationException e2) {
            throwBrokerEventProcessingException(brokerConfiguration, e2);
        } catch (MalformedURLException e3) {
            throwBrokerEventProcessingException(brokerConfiguration, e3);
        } catch (TransportException e4) {
            throwBrokerEventProcessingException(brokerConfiguration, e4);
        }
        this.dataPublisherMap.put(brokerConfiguration, dataPublisher);
        return dataPublisher;
    }

    private void throwBrokerEventProcessingException(BrokerConfiguration brokerConfiguration, Exception exc) throws BrokerEventProcessingException {
        throw new BrokerEventProcessingException("Cannot create DataPublisher for the broker configuration:" + brokerConfiguration.getName(), exc);
    }

    private void publishEvent(BrokerConfiguration brokerConfiguration, DataPublisher dataPublisher, Event event) throws BrokerEventProcessingException {
        try {
            dataPublisher.publish(event);
        } catch (Exception e) {
            throw new BrokerEventProcessingException("Cannot publish data via DataPublisher for the broker configuration:" + brokerConfiguration.getName() + " for the  event " + event, e);
        }
    }

    @Override // org.wso2.carbon.broker.core.internal.BrokerType
    public BrokerTypeDto getBrokerTypeDto() {
        return this.brokerTypeDto;
    }

    @Override // org.wso2.carbon.broker.core.internal.BrokerType
    public void unsubscribe(String str, BrokerConfiguration brokerConfiguration, AxisConfiguration axisConfiguration) throws BrokerEventProcessingException {
        Map<BrokerConfiguration, BrokerListener> map = this.brokerListenerMap.get(str);
        if (map != null) {
            map.remove(brokerConfiguration);
        }
    }
}
