package org.wso2.carbon.bam.agent.publish;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.THttpClient;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.wso2.carbon.bam.agent.conf.AgentConfiguration;
import org.wso2.carbon.bam.agent.pool.TFramedTransportPool;
import org.wso2.carbon.bam.agent.pool.TFramedTransportPoolFactory;
import org.wso2.carbon.bam.agent.queue.EventReceiverComposite;
import org.wso2.carbon.bam.data.publisher.util.stats.AtomicIntSingleton;
import org.wso2.carbon.bam.service.Event;
import org.wso2.carbon.bam.service.ReceiverService;
import org.wso2.carbon.bam.service.SessionTimeOutException;

/* loaded from: input_file:org/wso2/carbon/bam/agent/publish/DataPublisher.class */
public class DataPublisher implements EventPublisher {
    private static Log log = LogFactory.getLog(DataPublisher.class);
    private GenericKeyedObjectPool transportPool;
    private Map<TTransport, ReceiverService.Client> receiverClientCache = new ConcurrentHashMap();
    private ThriftAuthenticationClient authenticationClient = new ThriftAuthenticationClient();

    public DataPublisher(AgentConfiguration agentConfiguration) {
        this.transportPool = new TFramedTransportPool().getClientPool(new TFramedTransportPoolFactory(), agentConfiguration.getMaxPoolSize(), agentConfiguration.getMaxIdleConnections(), true, agentConfiguration.getEvictionTimePeriod(), agentConfiguration.getMinIdleTimeInPool());
    }

    @Override // org.wso2.carbon.bam.agent.publish.EventPublisher
    public void shutdown() {
    }

    @Override // org.wso2.carbon.bam.agent.publish.EventPublisher
    public void publish(ArrayList<EventReceiverComposite> arrayList) {
        Iterator<EventReceiverComposite> it = arrayList.iterator();
        while (it.hasNext()) {
            EventReceiverComposite next = it.next();
            if (next.getEventReceiver().isSocketTransportEnabled()) {
                publishUsingTSocketTransport(next);
            } else if (next.getEventReceiver().isHttpTransportEnabled()) {
                publishUsingHttp(next);
            }
        }
    }

    private Event fixEvent(Event event) {
        if (event.getCorrelation() == null) {
            event.setCorrelation(new HashMap());
        }
        if (event.getEvent() == null) {
            event.setEvent(new HashMap());
        }
        if (event.getMeta() == null) {
            event.setMeta(new HashMap());
        }
        return event;
    }

    private void publishUsingTSocketTransport(EventReceiverComposite eventReceiverComposite) {
        TTransport tTransport = null;
        String str = null;
        try {
            try {
                try {
                    try {
                        EventReceiver eventReceiver = eventReceiverComposite.getEventReceiver();
                        String sessionId = this.authenticationClient.getSessionId(eventReceiver);
                        str = new URL(eventReceiver.getUrl()).getHost() + ":" + eventReceiver.getPort();
                        tTransport = (TTransport) this.transportPool.borrowObject(str);
                        ReceiverService.Client receiverClient = getReceiverClient(tTransport);
                        for (Event event : eventReceiverComposite.getEvent()) {
                            receiverClient.publish(fixEvent(event), sessionId);
                            if (log.isTraceEnabled()) {
                                log.trace(event + " event published to url : " + eventReceiverComposite.getEventReceiver().getUrl());
                            }
                        }
                        if (log.isDebugEnabled()) {
                            AtomicIntSingleton.getAtomicInteger().incrementAndGet();
                        }
                        int i = 0 + 1;
                        if (log.isDebugEnabled()) {
                            log.debug("No of active connections in pool : " + this.transportPool.getNumActive());
                        }
                        try {
                            this.transportPool.returnObject(str, tTransport);
                        } catch (Exception e) {
                            log.warn("Error occurred while returning object to connection pool");
                        }
                    } catch (Throwable th) {
                        try {
                            this.transportPool.returnObject(str, tTransport);
                        } catch (Exception e2) {
                            log.warn("Error occurred while returning object to connection pool");
                        }
                        throw th;
                    }
                } catch (TException e3) {
                    log.error("Unable to publish event to BAM", e3);
                    try {
                        this.transportPool.returnObject(str, tTransport);
                    } catch (Exception e4) {
                        log.warn("Error occurred while returning object to connection pool");
                    }
                }
            } catch (SessionTimeOutException e5) {
                log.warn("Session Timeout, retrying .........");
                publishRetryUsingTSocket(eventReceiverComposite, this.transportPool);
                try {
                    this.transportPool.returnObject(str, tTransport);
                } catch (Exception e6) {
                    log.warn("Error occurred while returning object to connection pool");
                }
            } catch (Throwable th2) {
                log.error("Unable to publish event to BAM", th2);
                try {
                    this.transportPool.returnObject(str, tTransport);
                } catch (Exception e7) {
                    log.warn("Error occurred while returning object to connection pool");
                }
            }
        } catch (MalformedURLException e8) {
            log.error("BAM url is not correct", e8);
            try {
                this.transportPool.returnObject(str, tTransport);
            } catch (Exception e9) {
                log.warn("Error occurred while returning object to connection pool");
            }
        } catch (TTransportException e10) {
            log.warn("TransportException, retrying to publish again..", e10);
            this.transportPool.clear(str);
            publishRetryUsingTSocket(eventReceiverComposite, this.transportPool);
            try {
                this.transportPool.returnObject(str, tTransport);
            } catch (Exception e11) {
                log.warn("Error occurred while returning object to connection pool");
            }
        }
    }

    private ReceiverService.Client getReceiverClient(TTransport tTransport) {
        if (this.receiverClientCache.containsKey(tTransport)) {
            return this.receiverClientCache.get(tTransport);
        }
        ReceiverService.Client client = new ReceiverService.Client(new TCompactProtocol(tTransport));
        this.receiverClientCache.put(tTransport, client);
        return client;
    }

    private void publishRetryUsingTSocket(EventReceiverComposite eventReceiverComposite, GenericKeyedObjectPool genericKeyedObjectPool) {
        this.authenticationClient.removeSessionId(eventReceiverComposite.getEventReceiver());
        for (int i = 0; i < 3; i++) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            TTransport tTransport = null;
            String str = null;
            try {
                try {
                    try {
                        try {
                            try {
                                try {
                                    EventReceiver eventReceiver = eventReceiverComposite.getEventReceiver();
                                    String sessionId = this.authenticationClient.getSessionId(eventReceiver);
                                    str = new URL(eventReceiver.getUrl()).getHost() + ":" + eventReceiver.getPort();
                                    tTransport = (TTransport) genericKeyedObjectPool.borrowObject(str);
                                    ReceiverService.Client receiverClient = getReceiverClient(tTransport);
                                    Iterator<Event> it = eventReceiverComposite.getEvent().iterator();
                                    while (it.hasNext()) {
                                        receiverClient.publish(fixEvent(it.next()), sessionId);
                                    }
                                    if (log.isDebugEnabled()) {
                                        AtomicIntSingleton.getAtomicInteger().incrementAndGet();
                                    }
                                    try {
                                        genericKeyedObjectPool.returnObject(str, tTransport);
                                    } catch (Exception e2) {
                                        log.warn("Error occurred while returning object to connection pool");
                                    }
                                } catch (Throwable th) {
                                    try {
                                        genericKeyedObjectPool.returnObject(str, tTransport);
                                    } catch (Exception e3) {
                                        log.warn("Error occurred while returning object to connection pool");
                                    }
                                    throw th;
                                }
                            } catch (SessionTimeOutException e4) {
                                log.warn("Session Timeout, retrying .........");
                                try {
                                    genericKeyedObjectPool.returnObject(str, tTransport);
                                } catch (Exception e5) {
                                    log.warn("Error occurred while returning object to connection pool");
                                }
                            }
                        } catch (TTransportException e6) {
                            log.error("Unable to publish event to BAM", e6);
                            try {
                                genericKeyedObjectPool.returnObject(str, tTransport);
                            } catch (Exception e7) {
                                log.warn("Error occurred while returning object to connection pool");
                            }
                        }
                    } catch (TException e8) {
                        log.error("Unable to publish event to BAM", e8);
                        try {
                            genericKeyedObjectPool.returnObject(str, tTransport);
                        } catch (Exception e9) {
                            log.warn("Error occurred while returning object to connection pool");
                        }
                    }
                } catch (Exception e10) {
                    log.error("Unable to publish event to BAM", e10);
                    try {
                        genericKeyedObjectPool.returnObject(str, tTransport);
                    } catch (Exception e11) {
                        log.warn("Error occurred while returning object to connection pool");
                    }
                }
            } catch (MalformedURLException e12) {
                log.error("BAM url is not correct", e12);
                try {
                    genericKeyedObjectPool.returnObject(str, tTransport);
                } catch (Exception e13) {
                    log.warn("Error occurred while returning object to connection pool");
                }
            }
        }
    }

    private void publishUsingHttp(EventReceiverComposite eventReceiverComposite) {
        THttpClient tHttpClient = null;
        try {
            try {
                try {
                    try {
                        EventReceiver eventReceiver = eventReceiverComposite.getEventReceiver();
                        String sessionId = this.authenticationClient.getSessionId(eventReceiver);
                        tHttpClient = new THttpClient(eventReceiver.getUrl() + "thriftReceiver");
                        ReceiverService.Client client = new ReceiverService.Client(new TCompactProtocol(tHttpClient));
                        tHttpClient.open();
                        Iterator<Event> it = eventReceiverComposite.getEvent().iterator();
                        while (it.hasNext()) {
                            client.publish(fixEvent(it.next()), sessionId);
                        }
                        if (log.isDebugEnabled()) {
                            AtomicIntSingleton.getAtomicInteger().incrementAndGet();
                        }
                        int i = 0 + 1;
                        tHttpClient.close();
                    } catch (TException e) {
                        log.error("Unable to publish event to BAM", e);
                        tHttpClient.close();
                    }
                } catch (TTransportException e2) {
                    log.error("Unable to publish event to BAM", e2);
                    tHttpClient.close();
                }
            } catch (SessionTimeOutException e3) {
                log.warn("Session Timeout, retrying .........");
                publishRetryUsingHttp(eventReceiverComposite);
                tHttpClient.close();
            }
        } catch (Throwable th) {
            tHttpClient.close();
            throw th;
        }
    }

    private void publishRetryUsingHttp(EventReceiverComposite eventReceiverComposite) {
        EventReceiver eventReceiver = eventReceiverComposite.getEventReceiver();
        this.authenticationClient.removeSessionId(eventReceiver);
        for (int i = 0; i < 3; i++) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            THttpClient tHttpClient = null;
            String sessionId = this.authenticationClient.getSessionId(eventReceiver);
            try {
                try {
                    try {
                        tHttpClient = new THttpClient(eventReceiver.getUrl() + "thriftReceiver");
                        ReceiverService.Client client = new ReceiverService.Client(new TCompactProtocol(tHttpClient));
                        tHttpClient.open();
                        Iterator<Event> it = eventReceiverComposite.getEvent().iterator();
                        while (it.hasNext()) {
                            client.publish(fixEvent(it.next()), sessionId);
                        }
                        tHttpClient.close();
                    } catch (Throwable th) {
                        tHttpClient.close();
                        throw th;
                    }
                } catch (TException e2) {
                    log.error("Unable to publish event to BAM", e2);
                    tHttpClient.close();
                }
            } catch (SessionTimeOutException e3) {
                log.warn("Session Timeout, retrying .........");
                tHttpClient.close();
            } catch (TTransportException e4) {
                log.error("Unable to publish event to BAM", e4);
                tHttpClient.close();
            }
        }
    }
}
