package org.wso2.carbon.databridge.agent.thrift;

import com.google.gson.Gson;
import java.lang.reflect.Field;
import java.net.MalformedURLException;
import java.net.SocketException;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.databridge.agent.thrift.conf.DataPublisherConfiguration;
import org.wso2.carbon.databridge.agent.thrift.conf.ReceiverConfiguration;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.agent.thrift.internal.EventQueue;
import org.wso2.carbon.databridge.agent.thrift.internal.publisher.client.EventPublisher;
import org.wso2.carbon.databridge.agent.thrift.internal.publisher.client.EventPublisherFactory;
import org.wso2.carbon.databridge.agent.thrift.internal.utils.AgentServerURL;
import org.wso2.carbon.databridge.agent.thrift.lb.ReceiverStateObserver;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.exception.AuthenticationException;
import org.wso2.carbon.databridge.commons.exception.DifferentStreamDefinitionAlreadyDefinedException;
import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
import org.wso2.carbon.databridge.commons.exception.NoStreamDefinitionExistException;
import org.wso2.carbon.databridge.commons.exception.StreamDefinitionException;
import org.wso2.carbon.databridge.commons.exception.TransportException;
import org.wso2.carbon.databridge.commons.thrift.utils.HostAddressFinder;

/* loaded from: input_file:org/wso2/carbon/databridge/agent/thrift/DataPublisher.class */
public class DataPublisher {
    private static Log log = LogFactory.getLog(DataPublisher.class);
    private DataPublisherConfiguration dataPublisherConfiguration;
    private EventPublisher eventPublisher;
    private EventQueue<Event> eventQueue;
    private Gson gson;
    private ThreadPoolExecutor threadPool;
    private ReceiverStateObserver receiverStateObserver;

    public DataPublisher(String str, String str2, String str3) throws MalformedURLException, AgentException, AuthenticationException, TransportException {
        this(str, str2, str3, AgentHolder.getOrCreateAgent());
    }

    public DataPublisher(String str, String str2, String str3, String str4) throws MalformedURLException, AgentException, AuthenticationException, TransportException {
        this(str, str2, str3, str4, AgentHolder.getOrCreateAgent());
    }

    public DataPublisher(String str, String str2, String str3, Agent agent) throws MalformedURLException, AgentException, AuthenticationException, TransportException {
        this.gson = new Gson();
        if (null == agent) {
            agent = AgentHolder.getOrCreateAgent();
        } else if (AgentHolder.getAgent() == null) {
            AgentHolder.setAgent(agent);
        }
        AgentServerURL agentServerURL = new AgentServerURL(str);
        checkHostAddress(agentServerURL.getHost());
        if (agentServerURL.isSecured()) {
            start(new ReceiverConfiguration(str2, str3, agentServerURL.getProtocol(), agentServerURL.getHost(), agentServerURL.getPort(), agentServerURL.getProtocol(), agentServerURL.getHost(), agentServerURL.getPort(), agentServerURL.isSecured()), agent);
        } else {
            if (agentServerURL.getProtocol() != ReceiverConfiguration.Protocol.TCP) {
                throw new AgentException("http not supported via this constructor use https, ssl or tcp ");
            }
            start(new ReceiverConfiguration(str2, str3, agentServerURL.getProtocol(), agentServerURL.getHost(), agentServerURL.getPort(), agentServerURL.getProtocol(), agentServerURL.getHost(), agentServerURL.getPort() + 100, agentServerURL.isSecured()), agent);
        }
    }

    public DataPublisher(String str, String str2, String str3, String str4, Agent agent) throws MalformedURLException, AgentException, AuthenticationException, TransportException {
        this.gson = new Gson();
        if (null == agent) {
            agent = AgentHolder.getOrCreateAgent();
        } else if (AgentHolder.getAgent() == null) {
            AgentHolder.setAgent(agent);
        }
        AgentServerURL agentServerURL = new AgentServerURL(str);
        if (!agentServerURL.isSecured()) {
            throw new MalformedURLException("Authentication url protocol is not ssl/https, expected = <ssl/https>://<HOST>:<PORT> but actual = " + str);
        }
        AgentServerURL agentServerURL2 = new AgentServerURL(str2);
        checkHostAddress(agentServerURL2.getHost());
        checkHostAddress(agentServerURL.getHost());
        start(new ReceiverConfiguration(str3, str4, agentServerURL2.getProtocol(), agentServerURL2.getHost(), agentServerURL2.getPort(), agentServerURL.getProtocol(), agentServerURL.getHost(), agentServerURL.getPort(), agentServerURL2.isSecured()), agent);
    }

    private void checkHostAddress(String str) throws AgentException {
        try {
            HostAddressFinder.findAddress(str);
        } catch (SocketException e) {
            throw new AgentException(str + " is malformed ", e);
        }
    }

    public void setAgent(Agent agent) throws AgentException, AuthenticationException, TransportException {
        AgentHolder.getAgent().shutdown(this);
        AgentHolder.setAgent(agent);
        start(this.dataPublisherConfiguration.getReceiverConfiguration(), agent);
        if (null != this.receiverStateObserver) {
            this.eventPublisher.registerReceiverStateObserver(this.receiverStateObserver);
        }
    }

    public Agent getAgent() {
        return AgentHolder.getAgent();
    }

    public void registerReceiverStateObserver(ReceiverStateObserver receiverStateObserver) {
        this.receiverStateObserver = receiverStateObserver;
        this.eventPublisher.registerReceiverStateObserver(this.receiverStateObserver);
    }

    private void start(ReceiverConfiguration receiverConfiguration, Agent agent) throws AgentException, AuthenticationException, TransportException {
        agent.addDataPublisher(this);
        this.dataPublisherConfiguration = new DataPublisherConfiguration(receiverConfiguration);
        this.eventQueue = new EventQueue<>();
        this.threadPool = agent.getThreadPool();
        if (receiverConfiguration.isDataTransferSecured()) {
            this.eventPublisher = EventPublisherFactory.getEventPublisher(this.dataPublisherConfiguration, this.eventQueue, agent, agent.getSecureTransportPool());
        } else {
            this.eventPublisher = EventPublisherFactory.getEventPublisher(this.dataPublisherConfiguration, this.eventQueue, agent, agent.getTransportPool());
        }
        this.dataPublisherConfiguration.setSessionId(agent.getAgentAuthenticator().connect(this.dataPublisherConfiguration));
    }

    public String defineStream(String str) throws AgentException, MalformedStreamDefinitionException, StreamDefinitionException, DifferentStreamDefinitionAlreadyDefinedException {
        return this.eventPublisher.defineStream(this.dataPublisherConfiguration.getSessionId(), str);
    }

    public String defineStream(StreamDefinition streamDefinition) throws AgentException, MalformedStreamDefinitionException, StreamDefinitionException, DifferentStreamDefinitionAlreadyDefinedException {
        String defineStream = this.eventPublisher.defineStream(this.dataPublisherConfiguration.getSessionId(), this.gson.toJson(streamDefinition));
        try {
            Field declaredField = StreamDefinition.class.getDeclaredField("streamId");
            declaredField.setAccessible(true);
            declaredField.set(streamDefinition, defineStream);
        } catch (IllegalAccessException e) {
        } catch (NoSuchFieldException e2) {
        }
        return defineStream;
    }

    @Deprecated
    public String findStream(String str, String str2) throws AgentException, StreamDefinitionException, NoStreamDefinitionExistException {
        String findStreamId = findStreamId(str, str2);
        if (findStreamId == null) {
            throw new NoStreamDefinitionExistException("Cannot find Stream Id for " + str + " " + str2);
        }
        return findStreamId;
    }

    public String findStreamId(String str, String str2) throws AgentException {
        return this.eventPublisher.findStreamId(this.dataPublisherConfiguration.getSessionId(), str, str2);
    }

    public boolean deleteStream(String str) throws AgentException {
        return this.eventPublisher.deleteStream(this.dataPublisherConfiguration.getSessionId(), str);
    }

    public boolean deleteStream(String str, String str2) throws AgentException {
        return this.eventPublisher.deleteStream(this.dataPublisherConfiguration.getSessionId(), str, str2);
    }

    public void publish(Event event) throws AgentException {
        try {
            AgentHolder.getAgent().getQueueSemaphore().acquire();
            if (!this.eventQueue.put(event)) {
                try {
                    this.threadPool.execute(this.eventPublisher);
                } catch (RejectedExecutionException e) {
                }
            }
        } catch (InterruptedException e2) {
            throw new AgentException("Cannot add " + event + " to event queue", e2);
        }
    }

    public void publish(String str, Object[] objArr, Object[] objArr2, Object[] objArr3) throws AgentException {
        publish(new Event(str, System.currentTimeMillis(), objArr, objArr2, objArr3));
    }

    public void publish(String str, Object[] objArr, Object[] objArr2, Object[] objArr3, Map<String, String> map) throws AgentException {
        publish(new Event(str, System.currentTimeMillis(), objArr, objArr2, objArr3, map));
    }

    public void publish(String str, long j, Object[] objArr, Object[] objArr2, Object[] objArr3) throws AgentException {
        publish(new Event(str, j, objArr, objArr2, objArr3));
    }

    public void publish(String str, long j, Object[] objArr, Object[] objArr2, Object[] objArr3, Map<String, String> map) throws AgentException {
        publish(new Event(str, j, objArr, objArr2, objArr3, map));
    }

    public void stop() {
        AgentHolder.getAgent().getAgentAuthenticator().disconnect(this.dataPublisherConfiguration);
        AgentHolder.getAgent().shutdown(this);
    }

    public void stopNow() {
        AgentHolder.getAgent().getAgentAuthenticator().disconnect(this.dataPublisherConfiguration);
        AgentHolder.getAgent().shutdownNow(this);
    }
}
