package org.wso2.carbon.broker.core.internal.brokers.agent;

import com.google.gson.Gson;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.ResourceBundle;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.broker.core.BrokerConfiguration;
import org.wso2.carbon.broker.core.BrokerListener;
import org.wso2.carbon.broker.core.BrokerTypeDto;
import org.wso2.carbon.broker.core.Property;
import org.wso2.carbon.broker.core.exception.BrokerEventProcessingException;
import org.wso2.carbon.broker.core.internal.BrokerType;
import org.wso2.carbon.broker.core.internal.ds.BrokerServiceValueHolder;
import org.wso2.carbon.broker.core.internal.util.BrokerConstants;
import org.wso2.carbon.context.CarbonContext;
import org.wso2.carbon.databridge.agent.thrift.Agent;
import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.commons.Credentials;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.core.AgentCallback;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionNotFoundException;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionStoreException;

/* loaded from: input_file:org/wso2/carbon/broker/core/internal/brokers/agent/AgentBrokerType.class */
public final class AgentBrokerType implements BrokerType {
    private BrokerTypeDto brokerTypeDto;
    private Agent agent;
    private static final Log log = LogFactory.getLog(AgentBrokerType.class);
    private static AgentBrokerType agentBrokerType = new AgentBrokerType();
    private Gson gson = new Gson();
    private Map<String, Map<String, BrokerListener>> topicBrokerListenerMap = new ConcurrentHashMap();
    private Map<String, StreamDefinition> topicStreamDefinitionMap = new ConcurrentHashMap();
    private Map<String, Map<String, BrokerListener>> streamIdBrokerListenerMap = new ConcurrentHashMap();
    private ConcurrentHashMap<Integer, ConcurrentHashMap<BrokerConfiguration, AsyncDataPublisher>> dataPublisherMap = new ConcurrentHashMap<>();

    /* loaded from: input_file:org/wso2/carbon/broker/core/internal/brokers/agent/AgentBrokerType$AgentBrokerCallback.class */
    private class AgentBrokerCallback implements AgentCallback {
        private AgentBrokerCallback() {
        }

        public void definedStream(StreamDefinition streamDefinition, Credentials credentials) {
            AgentBrokerType.this.topicStreamDefinitionMap.put(createTopic(streamDefinition), streamDefinition);
            Map map = (Map) AgentBrokerType.this.topicBrokerListenerMap.get(createTopic(streamDefinition));
            if (map == null) {
                map = new HashMap();
                AgentBrokerType.this.topicBrokerListenerMap.put(createTopic(streamDefinition), map);
            }
            Iterator it = map.values().iterator();
            while (it.hasNext()) {
                try {
                    ((BrokerListener) it.next()).onEventDefinition(streamDefinition);
                } catch (BrokerEventProcessingException e) {
                    AgentBrokerType.log.error("Cannot send Stream Definition to a brokerListener subscribed to " + streamDefinition.getStreamId(), e);
                }
            }
            AgentBrokerType.this.streamIdBrokerListenerMap.put(streamDefinition.getStreamId(), AgentBrokerType.this.topicBrokerListenerMap.get(createTopic(streamDefinition)));
        }

        private String createTopic(StreamDefinition streamDefinition) {
            return streamDefinition.getName() + "/" + streamDefinition.getVersion();
        }

        public void receive(List<Event> list, Credentials credentials) {
            for (Event event : list) {
                Map map = (Map) AgentBrokerType.this.streamIdBrokerListenerMap.get(event.getStreamId());
                if (map == null) {
                    try {
                        definedStream(BrokerServiceValueHolder.getDataBridgeSubscriberService().getStreamDefinition(credentials, event.getStreamId()), credentials);
                        map = (Map) AgentBrokerType.this.streamIdBrokerListenerMap.get(event.getStreamId());
                        if (map == null) {
                            AgentBrokerType.log.error("No broker listeners for  " + event.getStreamId());
                            return;
                        }
                    } catch (StreamDefinitionNotFoundException e) {
                        AgentBrokerType.log.error("No Stream definition store found for the event " + event.getStreamId(), e);
                        return;
                    } catch (StreamDefinitionStoreException e2) {
                        AgentBrokerType.log.error("No Stream definition store found when checking stream definition for " + event.getStreamId(), e2);
                        return;
                    }
                }
                Iterator it = map.values().iterator();
                while (it.hasNext()) {
                    try {
                        ((BrokerListener) it.next()).onEvent(event);
                    } catch (BrokerEventProcessingException e3) {
                        AgentBrokerType.log.error("Cannot send event to a brokerListener subscribed to " + event.getStreamId(), e3);
                    }
                }
            }
        }
    }

    private AgentBrokerType() {
        this.brokerTypeDto = null;
        this.brokerTypeDto = new BrokerTypeDto();
        this.brokerTypeDto.setName(BrokerConstants.BROKER_TYPE_AGENT);
        ResourceBundle bundle = ResourceBundle.getBundle("org.wso2.carbon.broker.core.i18n.Resources", Locale.getDefault());
        Property property = new Property(BrokerConstants.BROKER_CONF_AGENT_PROP_RECEIVER_URL);
        property.setDisplayName(bundle.getString(BrokerConstants.BROKER_CONF_AGENT_PROP_RECEIVER_URL));
        property.setRequired(true);
        this.brokerTypeDto.addProperty(property);
        Property property2 = new Property(BrokerConstants.BROKER_CONF_AGENT_PROP_AUTHENTICATOR_URL);
        property2.setDisplayName(bundle.getString(BrokerConstants.BROKER_CONF_AGENT_PROP_AUTHENTICATOR_URL));
        property2.setRequired(false);
        this.brokerTypeDto.addProperty(property2);
        Property property3 = new Property("username");
        property3.setRequired(true);
        property3.setDisplayName(bundle.getString("username"));
        this.brokerTypeDto.addProperty(property3);
        Property property4 = new Property("password");
        property4.setRequired(true);
        property4.setSecured(true);
        property4.setDisplayName(bundle.getString("password"));
        this.brokerTypeDto.addProperty(property4);
        BrokerServiceValueHolder.getDataBridgeSubscriberService().subscribe(new AgentBrokerCallback());
    }

    public static AgentBrokerType getInstance() {
        return agentBrokerType;
    }

    @Override // org.wso2.carbon.broker.core.internal.BrokerType
    public String subscribe(String str, BrokerListener brokerListener, BrokerConfiguration brokerConfiguration, AxisConfiguration axisConfiguration) throws BrokerEventProcessingException {
        String uuid = UUID.randomUUID().toString();
        if (this.topicBrokerListenerMap.containsKey(str)) {
            this.topicBrokerListenerMap.get(str).put(uuid, brokerListener);
            StreamDefinition streamDefinition = this.topicStreamDefinitionMap.get(str);
            if (streamDefinition != null) {
                brokerListener.onEventDefinition(streamDefinition);
            }
        } else {
            HashMap hashMap = new HashMap();
            hashMap.put(uuid, brokerListener);
            this.topicBrokerListenerMap.put(str, hashMap);
        }
        return uuid;
    }

    @Override // org.wso2.carbon.broker.core.internal.BrokerType
    public void publish(String str, Object obj, BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException {
        Integer valueOf = Integer.valueOf(CarbonContext.getCurrentContext().getTenantId());
        ConcurrentHashMap<BrokerConfiguration, AsyncDataPublisher> concurrentHashMap = this.dataPublisherMap.get(valueOf);
        if (concurrentHashMap == null) {
            this.dataPublisherMap.putIfAbsent(valueOf, new ConcurrentHashMap<>());
            concurrentHashMap = this.dataPublisherMap.get(valueOf);
        }
        AsyncDataPublisher asyncDataPublisher = concurrentHashMap.get(brokerConfiguration);
        if (asyncDataPublisher == null) {
            synchronized (this) {
                asyncDataPublisher = concurrentHashMap.get(brokerConfiguration);
                if (asyncDataPublisher == null) {
                    asyncDataPublisher = createDataPublisher(brokerConfiguration);
                    concurrentHashMap.putIfAbsent(brokerConfiguration, asyncDataPublisher);
                }
            }
        }
        try {
            Event event = (Event) ((Object[]) obj)[0];
            StreamDefinition streamDefinition = (StreamDefinition) ((Object[]) obj)[1];
            if (asyncDataPublisher.isStreamDefinitionAdded(streamDefinition)) {
                publishEvent(brokerConfiguration, asyncDataPublisher, event, streamDefinition);
            } else {
                asyncDataPublisher.addStreamDefinition(streamDefinition);
                publishEvent(brokerConfiguration, asyncDataPublisher, event, streamDefinition);
            }
        } catch (Exception e) {
            throw new BrokerEventProcessingException(e.getMessage() + " Error Occurred When Publishing Events", e);
        }
    }

    private AsyncDataPublisher createDataPublisher(BrokerConfiguration brokerConfiguration) {
        if (this.agent == null) {
            this.agent = BrokerServiceValueHolder.getAgent();
        }
        Map<String, String> properties = brokerConfiguration.getProperties();
        return (null == properties.get(BrokerConstants.BROKER_CONF_AGENT_PROP_AUTHENTICATOR_URL) || properties.get(BrokerConstants.BROKER_CONF_AGENT_PROP_AUTHENTICATOR_URL).length() <= 0) ? new AsyncDataPublisher(properties.get(BrokerConstants.BROKER_CONF_AGENT_PROP_RECEIVER_URL), properties.get("username"), properties.get("password"), this.agent) : new AsyncDataPublisher(properties.get(BrokerConstants.BROKER_CONF_AGENT_PROP_AUTHENTICATOR_URL), properties.get(BrokerConstants.BROKER_CONF_AGENT_PROP_RECEIVER_URL), properties.get("username"), properties.get("password"), this.agent);
    }

    private void throwBrokerEventProcessingException(BrokerConfiguration brokerConfiguration, Exception exc) throws BrokerEventProcessingException {
        throw new BrokerEventProcessingException("Cannot create DataPublisher for the broker configuration:" + brokerConfiguration.getName(), exc);
    }

    private void publishEvent(BrokerConfiguration brokerConfiguration, AsyncDataPublisher asyncDataPublisher, Event event, StreamDefinition streamDefinition) throws BrokerEventProcessingException {
        try {
            asyncDataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event);
        } catch (AgentException e) {
            throw new BrokerEventProcessingException("Cannot publish data via DataPublisher for the broker configuration:" + brokerConfiguration.getName() + " for the  event " + event, e);
        }
    }

    @Override // org.wso2.carbon.broker.core.internal.BrokerType
    public BrokerTypeDto getBrokerTypeDto() {
        return this.brokerTypeDto;
    }

    @Override // org.wso2.carbon.broker.core.internal.BrokerType
    public void unsubscribe(String str, BrokerConfiguration brokerConfiguration, AxisConfiguration axisConfiguration, String str2) throws BrokerEventProcessingException {
        Map<String, BrokerListener> map = this.topicBrokerListenerMap.get(str);
        if (map != null) {
            map.remove(str2);
        }
    }
}
