package org.wso2.carbon.databridge.agent.thrift.internal.publisher.client;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.wso2.carbon.databridge.agent.thrift.conf.DataPublisherConfiguration;
import org.wso2.carbon.databridge.agent.thrift.conf.ReceiverConfiguration;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.agent.thrift.exception.EventPublisherException;
import org.wso2.carbon.databridge.agent.thrift.internal.EventQueue;
import org.wso2.carbon.databridge.agent.thrift.internal.publisher.authenticator.AgentAuthenticator;
import org.wso2.carbon.databridge.agent.thrift.internal.utils.AgentConstants;
import org.wso2.carbon.databridge.agent.thrift.lb.ReceiverStateObserver;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.exception.AuthenticationException;
import org.wso2.carbon.databridge.commons.exception.DifferentStreamDefinitionAlreadyDefinedException;
import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
import org.wso2.carbon.databridge.commons.exception.NoStreamDefinitionExistException;
import org.wso2.carbon.databridge.commons.exception.SessionTimeoutException;
import org.wso2.carbon.databridge.commons.exception.StreamDefinitionException;
import org.wso2.carbon.databridge.commons.exception.TransportException;
import org.wso2.carbon.databridge.commons.exception.UndefinedEventTypeException;
import org.wso2.carbon.databridge.commons.thrift.data.ThriftEventBundle;

/* loaded from: input_file:org/wso2/carbon/databridge/agent/thrift/internal/publisher/client/EventPublisher.class */
public abstract class EventPublisher implements Runnable {
    private static Log log = LogFactory.getLog(EventPublisher.class);
    private EventQueue<Event> eventQueue;
    private GenericKeyedObjectPool transportPool;
    private Semaphore queueSemaphore;
    private int maxMessageBundleSize;
    private DataPublisherConfiguration dataPublisherConfiguration;
    private AgentAuthenticator agentAuthenticator;
    private ThreadPoolExecutor threadPool;
    private ReceiverStateObserver receiverStateObserver;

    public EventPublisher(EventQueue<Event> eventQueue, GenericKeyedObjectPool genericKeyedObjectPool, Semaphore semaphore, int i, DataPublisherConfiguration dataPublisherConfiguration, AgentAuthenticator agentAuthenticator, ThreadPoolExecutor threadPoolExecutor) {
        this.eventQueue = eventQueue;
        this.transportPool = genericKeyedObjectPool;
        this.queueSemaphore = semaphore;
        this.maxMessageBundleSize = i;
        this.dataPublisherConfiguration = dataPublisherConfiguration;
        this.agentAuthenticator = agentAuthenticator;
        this.threadPool = threadPoolExecutor;
    }

    public void registerReceiverStateObserver(ReceiverStateObserver receiverStateObserver) {
        if (null == this.receiverStateObserver) {
            this.receiverStateObserver = receiverStateObserver;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        ArrayList<Event> arrayList = null;
        ThriftEventBundle thriftEventBundle = null;
        if (null != this.receiverStateObserver) {
            arrayList = new ArrayList<>();
        }
        while (true) {
            Event poll = this.eventQueue.poll();
            if (poll == null) {
                if (thriftEventBundle != null) {
                    publishEvent(thriftEventBundle, arrayList);
                    new ArrayList();
                    return;
                }
                return;
            }
            this.queueSemaphore.release();
            if (null != arrayList) {
                arrayList.add(poll);
            }
            thriftEventBundle = convertToEventBundle(thriftEventBundle, poll, this.dataPublisherConfiguration.getSessionId());
            if (getNumberOfEvents(thriftEventBundle) >= this.maxMessageBundleSize) {
                publishEvent(thriftEventBundle, arrayList);
                arrayList = new ArrayList<>();
                if (this.threadPool.getActiveCount() >= this.threadPool.getCorePoolSize()) {
                    this.threadPool.submit(this);
                    return;
                }
                thriftEventBundle = null;
            }
        }
    }

    private void publishEvent(Object obj, ArrayList<Event> arrayList) {
        Object obj2 = null;
        try {
            obj2 = getClient(this.dataPublisherConfiguration.getPublisherKey());
            setSessionId(obj, this.dataPublisherConfiguration.getSessionId());
            publish(obj2, obj);
        } catch (SessionTimeoutException e) {
            log.info("Session timed out for " + this.dataPublisherConfiguration.getPublisherKey() + AgentConstants.SEPARATOR + e.getMessage());
            setSessionId(obj, reconnect(getSessionId(obj)));
            republish(obj2, obj, arrayList);
        } catch (UndefinedEventTypeException e2) {
            log.error("Wrongly typed event " + obj + " sent to " + this.dataPublisherConfiguration.getPublisherKey(), e2);
        } catch (AgentException e3) {
            notifyConnectionFailure(arrayList);
            log.error("Cannot get a client to send events to " + this.dataPublisherConfiguration.getPublisherKey(), e3);
            this.transportPool.clear(this.dataPublisherConfiguration.getPublisherKey());
        } catch (EventPublisherException e4) {
            notifyConnectionFailure(arrayList);
            log.error("Cannot send events to " + this.dataPublisherConfiguration.getPublisherKey(), e4);
        }
        try {
            this.transportPool.returnObject(this.dataPublisherConfiguration.getPublisherKey(), obj2);
        } catch (Exception e5) {
            notifyConnectionFailure(arrayList);
            log.warn("Error occurred while returning object to connection pool");
            this.transportPool.clear(this.dataPublisherConfiguration.getPublisherKey());
        }
    }

    private void notifyConnectionFailure(ArrayList<Event> arrayList) {
        if (null != this.receiverStateObserver) {
            ReceiverConfiguration receiverConfiguration = this.dataPublisherConfiguration.getReceiverConfiguration();
            this.receiverStateObserver.notifyConnectionFailure(receiverConfiguration.getDataReceiverProtocol().toString() + "://" + receiverConfiguration.getDataReceiverIp() + AgentConstants.HOSTNAME_AND_PORT_SEPARATOR + receiverConfiguration.getDataReceiverPort(), receiverConfiguration.getUserName(), receiverConfiguration.getPassword());
            if (null != this.eventQueue) {
                LinkedBlockingQueue<Event> andResetQueue = this.eventQueue.getAndResetQueue();
                if (null != arrayList) {
                    Iterator<Event> it = arrayList.iterator();
                    while (it.hasNext()) {
                        try {
                            andResetQueue.put(it.next());
                        } catch (InterruptedException e) {
                            log.error("Error while populating resend events list", e);
                        }
                    }
                }
                if (log.isDebugEnabled()) {
                    log.debug("resending events size: " + andResetQueue.size() + "in thread id :" + Thread.currentThread().getId());
                }
                this.receiverStateObserver.resendEvents(andResetQueue);
            }
        }
    }

    private void republish(Object obj, Object obj2, ArrayList<Event> arrayList) {
        try {
            publish(obj, obj2);
        } catch (EventPublisherException e) {
            log.error("Cannot send events to " + this.dataPublisherConfiguration.getPublisherKey() + " even after reconnecting ", e);
            notifyConnectionFailure(arrayList);
        } catch (SessionTimeoutException e2) {
            log.error("Session timed out for " + this.dataPublisherConfiguration.getPublisherKey() + " even after reconnecting ", e2);
        } catch (UndefinedEventTypeException e3) {
            log.error("Wrongly typed event " + obj2.toString() + " sent  to " + this.dataPublisherConfiguration.getPublisherKey(), e3);
        }
    }

    protected abstract int getNumberOfEvents(Object obj);

    protected abstract ThriftEventBundle convertToEventBundle(Object obj, Event event, String str);

    protected abstract void setSessionId(Object obj, String str);

    protected abstract String getSessionId(Object obj);

    abstract void publish(Object obj, Object obj2) throws UndefinedEventTypeException, SessionTimeoutException, EventPublisherException;

    public String defineStream(String str, String str2) throws AgentException, DifferentStreamDefinitionAlreadyDefinedException, MalformedStreamDefinitionException, StreamDefinitionException {
        String str3 = null;
        Object client = getClient(this.dataPublisherConfiguration.getPublisherKey());
        try {
            str3 = defineStream(client, str, str2);
            this.transportPool.returnObject(this.dataPublisherConfiguration.getPublisherKey(), client);
        } catch (DifferentStreamDefinitionAlreadyDefinedException e) {
            throw new DifferentStreamDefinitionAlreadyDefinedException("Same stream id with different definition already defined before sending this event definitions to " + this.dataPublisherConfiguration.getPublisherKey(), e);
        } catch (StreamDefinitionException e2) {
            throw new StreamDefinitionException("Invalid type definition for stream " + str2, e2);
        } catch (MalformedStreamDefinitionException e3) {
            throw new MalformedStreamDefinitionException("Malformed event definition :" + str2 + " send  to " + this.dataPublisherConfiguration.getPublisherKey(), e3);
        } catch (EventPublisherException e4) {
            notifyConnectionFailure(null);
            throw new AgentException("Cannot define type " + str2, e4);
        } catch (Exception e5) {
            notifyConnectionFailure(null);
            log.warn("Error occurred while returning object to connection pool", e5);
        } catch (SessionTimeoutException e6) {
            log.info("Session timed out for " + this.dataPublisherConfiguration.getPublisherKey() + AgentConstants.SEPARATOR + e6.getMessage());
            str3 = redefineStream(client, str2, reconnect(str));
        }
        return str3;
    }

    protected abstract String defineStream(Object obj, String str, String str2) throws DifferentStreamDefinitionAlreadyDefinedException, MalformedStreamDefinitionException, EventPublisherException, SessionTimeoutException, StreamDefinitionException;

    private String redefineStream(Object obj, String str, String str2) throws DifferentStreamDefinitionAlreadyDefinedException, StreamDefinitionException, MalformedStreamDefinitionException {
        try {
            return defineStream(obj, str2, str);
        } catch (SessionTimeoutException e) {
            log.error("Session timed out for " + this.dataPublisherConfiguration.getPublisherKey() + " even after reconnecting ", e);
            return null;
        } catch (StreamDefinitionException e2) {
            throw new StreamDefinitionException("Wrongly defined event definition after reconnection  :" + str + " sent to " + this.dataPublisherConfiguration.getPublisherKey(), e2);
        } catch (DifferentStreamDefinitionAlreadyDefinedException e3) {
            throw new DifferentStreamDefinitionAlreadyDefinedException("Type already defined when send event definitions to" + this.dataPublisherConfiguration.getPublisherKey(), e3);
        } catch (EventPublisherException e4) {
            log.error("Cannot send events to " + this.dataPublisherConfiguration.getPublisherKey() + " even after reconnecting ", e4);
            notifyConnectionFailure(null);
            return null;
        } catch (MalformedStreamDefinitionException e5) {
            throw new MalformedStreamDefinitionException("Malformed event definition after reconnection  :" + str + " sent to " + this.dataPublisherConfiguration.getPublisherKey(), e5);
        }
    }

    public String findStreamId(String str, String str2, String str3) throws AgentException, StreamDefinitionException, NoStreamDefinitionExistException {
        String str4 = null;
        Object client = getClient(this.dataPublisherConfiguration.getPublisherKey());
        try {
            str4 = findStreamId(client, str, str2, str3);
            this.transportPool.returnObject(this.dataPublisherConfiguration.getPublisherKey(), client);
        } catch (NoStreamDefinitionExistException e) {
            throw new NoStreamDefinitionExistException("No stream id found for : " + str2 + " " + str3, e);
        } catch (SessionTimeoutException e2) {
            log.info("Session timed out for " + this.dataPublisherConfiguration.getPublisherKey() + AgentConstants.SEPARATOR + e2.getMessage());
            try {
                str4 = findStreamId(client, reconnect(str), str2, str3);
            } catch (SessionTimeoutException e3) {
                log.error("Session timed out for " + this.dataPublisherConfiguration.getPublisherKey() + " even after reconnecting ", e3);
            } catch (NoStreamDefinitionExistException e4) {
                throw new NoStreamDefinitionExistException("No stream id found for : " + str2 + " " + str3, e4);
            } catch (EventPublisherException e5) {
                log.error("Cannot send events to " + this.dataPublisherConfiguration.getPublisherKey() + " even after reconnecting ", e5);
                notifyConnectionFailure(null);
            }
        } catch (EventPublisherException e6) {
            notifyConnectionFailure(null);
            throw new AgentException("Error when finding event stream definition for : " + str2 + " " + str3, e6);
        } catch (Exception e7) {
            notifyConnectionFailure(null);
            log.warn("Error occurred while returning object to connection pool", e7);
        }
        return str4;
    }

    protected abstract String findStreamId(Object obj, String str, String str2, String str3) throws NoStreamDefinitionExistException, SessionTimeoutException, EventPublisherException;

    private String reconnect(String str) {
        attemptReconnection(3, str);
        return this.dataPublisherConfiguration.getSessionId();
    }

    public synchronized void attemptReconnection(int i, String str) {
        if (this.dataPublisherConfiguration.getSessionId().equals(str) && i > 0) {
            try {
                this.dataPublisherConfiguration.setSessionId(this.agentAuthenticator.connect(this.dataPublisherConfiguration));
            } catch (TransportException e) {
                attemptReconnection(i - 1, str);
            } catch (AgentException e2) {
                attemptReconnection(i - 1, str);
            } catch (AuthenticationException e3) {
                log.error(this.dataPublisherConfiguration.getReceiverConfiguration().getUserName() + " not authorised to access server at " + this.dataPublisherConfiguration.getPublisherKey());
            }
        }
    }

    private Object getClient(String str) throws AgentException {
        try {
            return this.transportPool.borrowObject(str);
        } catch (Exception e) {
            notifyConnectionFailure(null);
            throw new AgentException("Cannot borrow client for " + str, e);
        }
    }
}
