package org.wso2.carbon.databridge.agent.thrift;

import com.google.gson.Gson;
import java.net.MalformedURLException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.agent.thrift.lb.ReceiverStateObserver;
import org.wso2.carbon.databridge.agent.thrift.util.DataPublisherUtil;
import org.wso2.carbon.databridge.agent.thrift.util.PublishData;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.exception.AuthenticationException;
import org.wso2.carbon.databridge.commons.exception.DifferentStreamDefinitionAlreadyDefinedException;
import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
import org.wso2.carbon.databridge.commons.exception.NoStreamDefinitionExistException;
import org.wso2.carbon.databridge.commons.exception.StreamDefinitionException;
import org.wso2.carbon.databridge.commons.exception.TransportException;

/* loaded from: input_file:org/wso2/carbon/databridge/agent/thrift/AsyncDataPublisher.class */
public class AsyncDataPublisher {
    private static Log log = LogFactory.getLog(AsyncDataPublisher.class);
    private DataPublisher dataPublisher;
    private ConcurrentHashMap<String, String> streamIdCache;
    private ConcurrentHashMap<String, String> streamDefnCache;
    private LinkedBlockingQueue<PublishData> publishDataQueue;
    private AtomicBoolean isPublisherAlive;
    private AtomicBoolean isConnectorAlive;
    private ReceiverConnectionWorker receiverConnectionWorker;
    private ExecutorService connectorService;
    private ExecutorService publisherService;
    private ReceiverStateObserver receiverStateObserver;
    private Gson gson;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/carbon/databridge/agent/thrift/AsyncDataPublisher$DataPublishWorker.class */
    public class DataPublishWorker implements Runnable {
        private DataPublishWorker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            PublishData publishData;
            String streamId;
            try {
                publishData = (PublishData) AsyncDataPublisher.this.publishDataQueue.poll();
            } catch (Throwable th) {
                AsyncDataPublisher.log.error(th.getMessage(), th);
                AsyncDataPublisher.this.isPublisherAlive.set(false);
                if (null == AsyncDataPublisher.this.publishDataQueue.peek()) {
                    return;
                } else {
                    return;
                }
            }
            while (null != publishData) {
                String str = null;
                if (publishData.getStreamName() == null || publishData.getStreamVersion() == null) {
                    streamId = publishData.getEvent().getStreamId();
                } else {
                    str = DataPublisherUtil.getStreamCacheKey(publishData.getStreamName(), publishData.getStreamVersion());
                    streamId = (String) AsyncDataPublisher.this.streamIdCache.get(str);
                }
                if (null == streamId) {
                    try {
                        try {
                            Object obj = AsyncDataPublisher.this.streamDefnCache.get(str);
                            if (obj instanceof StreamDefinition) {
                                streamId = AsyncDataPublisher.this.dataPublisher.defineStream((StreamDefinition) obj);
                            } else if (obj instanceof String) {
                                streamId = AsyncDataPublisher.this.dataPublisher.defineStream(obj.toString());
                            } else {
                                AsyncDataPublisher.log.error("Not Supported stream definition type");
                            }
                            if (null != streamId) {
                                AsyncDataPublisher.this.streamIdCache.put(str, streamId);
                                publishData.getEvent().setStreamId(streamId);
                                AsyncDataPublisher.this.dataPublisher.publish(publishData.getEvent());
                            } else {
                                AsyncDataPublisher.log.error("Stream Id is null for stream definition :" + obj.toString());
                            }
                        } catch (DifferentStreamDefinitionAlreadyDefinedException e) {
                            AsyncDataPublisher.log.error("Stream definition already exist", e);
                        } catch (AgentException e2) {
                            AsyncDataPublisher.log.error("Error occurred while finding | defining the event", e2);
                        }
                    } catch (StreamDefinitionException e3) {
                        AsyncDataPublisher.log.error("Error occurred while defining the event", e3);
                    } catch (MalformedStreamDefinitionException e4) {
                        AsyncDataPublisher.log.error("Malformed stream definition", e4);
                    }
                    publishData = (PublishData) AsyncDataPublisher.this.publishDataQueue.poll();
                } else {
                    publishData.getEvent().setStreamId(streamId);
                    try {
                        AsyncDataPublisher.this.dataPublisher.publish(publishData.getEvent());
                    } catch (AgentException e5) {
                        AsyncDataPublisher.log.error("Error occurred while publishing the event", e5);
                    }
                    publishData = (PublishData) AsyncDataPublisher.this.publishDataQueue.poll();
                }
                AsyncDataPublisher.log.error(th.getMessage(), th);
                AsyncDataPublisher.this.isPublisherAlive.set(false);
                if (null == AsyncDataPublisher.this.publishDataQueue.peek() && !AsyncDataPublisher.this.isPublisherAlive.get() && AsyncDataPublisher.this.isPublisherAlive.compareAndSet(false, true)) {
                    AsyncDataPublisher.this.publisherService.submit(new DataPublishWorker());
                    return;
                }
                return;
            }
            AsyncDataPublisher.this.isPublisherAlive.set(false);
            if (null != AsyncDataPublisher.this.publishDataQueue.peek() && !AsyncDataPublisher.this.isPublisherAlive.get() && AsyncDataPublisher.this.isPublisherAlive.compareAndSet(false, true)) {
                AsyncDataPublisher.this.publisherService.submit(new DataPublishWorker());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/carbon/databridge/agent/thrift/AsyncDataPublisher$ReceiverConnectionWorker.class */
    public class ReceiverConnectionWorker implements Runnable {
        private String authenticationUrl;
        private String receiverUrl;
        private String username;
        private String password;
        private Agent agent;
        private boolean isReconnecting;

        private ReceiverConnectionWorker(String str, String str2, String str3, String str4, Agent agent) {
            this.authenticationUrl = str;
            this.receiverUrl = str2;
            this.username = str3;
            this.password = str4;
            this.agent = agent;
        }

        private ReceiverConnectionWorker(String str, String str2, String str3) {
            this.receiverUrl = str;
            this.username = str2;
            this.password = str3;
        }

        private ReceiverConnectionWorker(String str, String str2, String str3, Agent agent) {
            this.receiverUrl = str;
            this.username = str2;
            this.password = str3;
            this.agent = agent;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    try {
                        if (null != this.authenticationUrl) {
                            if (null != this.agent) {
                                AsyncDataPublisher.this.dataPublisher = new DataPublisher(this.authenticationUrl, this.receiverUrl, this.username, this.password, this.agent);
                            } else {
                                AsyncDataPublisher.this.dataPublisher = new DataPublisher(this.authenticationUrl, this.receiverUrl, this.username, this.password);
                            }
                        } else if (null != this.agent) {
                            AsyncDataPublisher.this.dataPublisher = new DataPublisher(this.receiverUrl, this.username, this.password, this.agent);
                        } else {
                            AsyncDataPublisher.this.dataPublisher = new DataPublisher(this.receiverUrl, this.username, this.password);
                        }
                        if (null != AsyncDataPublisher.this.receiverStateObserver) {
                            AsyncDataPublisher.this.dataPublisher.registerReceiverStateObserver(AsyncDataPublisher.this.receiverStateObserver);
                            AsyncDataPublisher.this.receiverStateObserver.notifyConnectionSuccess(this.receiverUrl, this.username, this.password);
                        }
                        AsyncDataPublisher.this.isPublisherAlive.compareAndSet(false, true);
                        new Thread(new DataPublishWorker()).start();
                    } catch (AuthenticationException e) {
                        AsyncDataPublisher.this.dataPublisher = null;
                        if (this.isReconnecting) {
                            AsyncDataPublisher.log.error("Reconnection failed for" + this.receiverUrl);
                        } else {
                            AsyncDataPublisher.log.error("Error while connection to event receiver", e);
                        }
                        if (null != AsyncDataPublisher.this.receiverStateObserver) {
                            AsyncDataPublisher.this.receiverStateObserver.notifyConnectionFailure(this.receiverUrl, this.username, this.password);
                            AsyncDataPublisher.this.receiverStateObserver.resendPublishedData(AsyncDataPublisher.this.publishDataQueue);
                        }
                    } catch (MalformedURLException e2) {
                        AsyncDataPublisher.this.dataPublisher = null;
                        if (this.isReconnecting) {
                            AsyncDataPublisher.log.error("Reconnection failed for " + this.receiverUrl);
                        } else {
                            AsyncDataPublisher.log.error("Malformed url error when connecting to receiver", e2);
                        }
                        if (null != AsyncDataPublisher.this.receiverStateObserver) {
                            AsyncDataPublisher.this.receiverStateObserver.notifyConnectionFailure(this.receiverUrl, this.username, this.password);
                            AsyncDataPublisher.this.receiverStateObserver.resendPublishedData(AsyncDataPublisher.this.publishDataQueue);
                        }
                    }
                } catch (TransportException e3) {
                    AsyncDataPublisher.this.dataPublisher = null;
                    if (this.isReconnecting) {
                        AsyncDataPublisher.log.error("Reconnection failed for " + this.receiverUrl);
                    } else {
                        AsyncDataPublisher.log.error("Error while connection to event receiver", e3);
                    }
                    if (null != AsyncDataPublisher.this.receiverStateObserver) {
                        AsyncDataPublisher.this.receiverStateObserver.notifyConnectionFailure(this.receiverUrl, this.username, this.password);
                        AsyncDataPublisher.this.receiverStateObserver.resendPublishedData(AsyncDataPublisher.this.publishDataQueue);
                    }
                } catch (AgentException e4) {
                    AsyncDataPublisher.this.dataPublisher = null;
                    if (this.isReconnecting) {
                        AsyncDataPublisher.log.error("Reconnection failed for for " + this.receiverUrl);
                    } else {
                        AsyncDataPublisher.log.error("Error while connection to event receiver", e4);
                    }
                    if (null != AsyncDataPublisher.this.receiverStateObserver) {
                        AsyncDataPublisher.this.receiverStateObserver.notifyConnectionFailure(this.receiverUrl, this.username, this.password);
                        AsyncDataPublisher.this.receiverStateObserver.resendPublishedData(AsyncDataPublisher.this.publishDataQueue);
                    }
                }
                this.isReconnecting = false;
                AsyncDataPublisher.this.isConnectorAlive.set(false);
            } catch (Throwable th) {
                AsyncDataPublisher.log.error(th.getMessage(), th);
                this.isReconnecting = false;
                AsyncDataPublisher.this.isConnectorAlive.set(false);
            }
        }
    }

    public AsyncDataPublisher(String str, String str2, String str3, String str4) {
        this.streamIdCache = new ConcurrentHashMap<>();
        this.streamDefnCache = new ConcurrentHashMap<>();
        this.isPublisherAlive = new AtomicBoolean(false);
        this.connectorService = Executors.newSingleThreadExecutor();
        this.publisherService = Executors.newSingleThreadExecutor();
        this.gson = new Gson();
        this.isConnectorAlive = new AtomicBoolean(true);
        this.publishDataQueue = new LinkedBlockingQueue<>(10000);
        this.receiverConnectionWorker = new ReceiverConnectionWorker(str, str2, str3, str4, null);
        this.connectorService.submit(this.receiverConnectionWorker);
    }

    public AsyncDataPublisher(String str, String str2, String str3) {
        this.streamIdCache = new ConcurrentHashMap<>();
        this.streamDefnCache = new ConcurrentHashMap<>();
        this.isPublisherAlive = new AtomicBoolean(false);
        this.connectorService = Executors.newSingleThreadExecutor();
        this.publisherService = Executors.newSingleThreadExecutor();
        this.gson = new Gson();
        this.isConnectorAlive = new AtomicBoolean(true);
        this.publishDataQueue = new LinkedBlockingQueue<>(10000);
        this.receiverConnectionWorker = new ReceiverConnectionWorker(str, str2, str3);
        this.connectorService.submit(this.receiverConnectionWorker);
    }

    public AsyncDataPublisher(String str, String str2, String str3, Agent agent) {
        this.streamIdCache = new ConcurrentHashMap<>();
        this.streamDefnCache = new ConcurrentHashMap<>();
        this.isPublisherAlive = new AtomicBoolean(false);
        this.connectorService = Executors.newSingleThreadExecutor();
        this.publisherService = Executors.newSingleThreadExecutor();
        this.gson = new Gson();
        this.isConnectorAlive = new AtomicBoolean(true);
        this.publishDataQueue = new LinkedBlockingQueue<>(agent.getAgentConfiguration().getAsyncDataPublisherBufferedEventSize());
        this.receiverConnectionWorker = new ReceiverConnectionWorker(str, str2, str3, agent);
        this.connectorService.submit(this.receiverConnectionWorker);
    }

    public AsyncDataPublisher(String str, String str2, String str3, String str4, Agent agent) {
        this.streamIdCache = new ConcurrentHashMap<>();
        this.streamDefnCache = new ConcurrentHashMap<>();
        this.isPublisherAlive = new AtomicBoolean(false);
        this.connectorService = Executors.newSingleThreadExecutor();
        this.publisherService = Executors.newSingleThreadExecutor();
        this.gson = new Gson();
        this.isConnectorAlive = new AtomicBoolean(true);
        this.publishDataQueue = new LinkedBlockingQueue<>(agent.getAgentConfiguration().getAsyncDataPublisherBufferedEventSize());
        this.receiverConnectionWorker = new ReceiverConnectionWorker(str, str2, str3, str4, agent);
        this.connectorService.submit(this.receiverConnectionWorker);
    }

    public AsyncDataPublisher(String str, String str2, String str3, String str4, Agent agent, ConcurrentHashMap<String, String> concurrentHashMap) {
        this.streamIdCache = new ConcurrentHashMap<>();
        this.streamDefnCache = new ConcurrentHashMap<>();
        this.isPublisherAlive = new AtomicBoolean(false);
        this.connectorService = Executors.newSingleThreadExecutor();
        this.publisherService = Executors.newSingleThreadExecutor();
        this.gson = new Gson();
        if (concurrentHashMap != null) {
            this.streamDefnCache = concurrentHashMap;
        }
        this.isConnectorAlive = new AtomicBoolean(true);
        if (agent != null) {
            this.publishDataQueue = new LinkedBlockingQueue<>(agent.getAgentConfiguration().getAsyncDataPublisherBufferedEventSize());
            this.receiverConnectionWorker = new ReceiverConnectionWorker(str, str2, str3, str4, agent);
        } else {
            this.publishDataQueue = new LinkedBlockingQueue<>(10000);
            this.receiverConnectionWorker = new ReceiverConnectionWorker(str, str2, str3, str4, null);
        }
        this.connectorService.submit(this.receiverConnectionWorker);
    }

    public AsyncDataPublisher(DataPublisher dataPublisher) {
        this.streamIdCache = new ConcurrentHashMap<>();
        this.streamDefnCache = new ConcurrentHashMap<>();
        this.isPublisherAlive = new AtomicBoolean(false);
        this.connectorService = Executors.newSingleThreadExecutor();
        this.publisherService = Executors.newSingleThreadExecutor();
        this.gson = new Gson();
        this.dataPublisher = dataPublisher;
        this.publishDataQueue = new LinkedBlockingQueue<>(dataPublisher.getAgent().getAgentConfiguration().getAsyncDataPublisherBufferedEventSize());
    }

    public void reconnect() {
        if (this.isConnectorAlive.get() || !this.isConnectorAlive.compareAndSet(false, true)) {
            return;
        }
        this.receiverConnectionWorker.isReconnecting = true;
        this.publisherService.submit(this.receiverConnectionWorker);
    }

    public boolean canPublish() {
        return null != this.dataPublisher || this.isConnectorAlive.get();
    }

    public void setAgent(Agent agent) throws AgentException, AuthenticationException, TransportException {
        this.dataPublisher.setAgent(agent);
    }

    public Agent getAgent() {
        if (null != this.dataPublisher) {
            return this.dataPublisher.getAgent();
        }
        return null;
    }

    public void registerReceiverObserver(ReceiverStateObserver receiverStateObserver) {
        this.receiverStateObserver = receiverStateObserver;
        if (null != this.dataPublisher) {
            this.dataPublisher.registerReceiverStateObserver(this.receiverStateObserver);
        }
    }

    public void publish(String str, String str2, long j, Object[] objArr, Object[] objArr2, Object[] objArr3) throws AgentException {
        publish(str, str2, j, objArr, objArr2, objArr3, null);
    }

    public void publish(String str, String str2, long j, Object[] objArr, Object[] objArr2, Object[] objArr3, Map<String, String> map) throws AgentException {
        if (!canPublish()) {
            boolean offer = this.publishDataQueue.offer(new PublishData(str, str2, j, objArr, objArr2, objArr3, map));
            reconnect();
            if (offer || !log.isDebugEnabled()) {
                return;
            }
            log.debug("Event queue is full, and Event is not added to the queue to publish");
            return;
        }
        if (null == this.dataPublisher) {
            if (this.publishDataQueue.offer(new PublishData(str, str2, j, objArr, objArr2, objArr3, map)) || !log.isDebugEnabled()) {
                return;
            }
            log.debug("Event queue is full, and Event is not added to the queue to publish");
            return;
        }
        String str3 = this.streamIdCache.get(DataPublisherUtil.getStreamCacheKey(str, str2));
        if (null != str3) {
            this.dataPublisher.publish(str3, j, objArr, objArr2, objArr3, map);
            return;
        }
        boolean offer2 = this.publishDataQueue.offer(new PublishData(str, str2, j, objArr, objArr2, objArr3, map));
        if (this.isPublisherAlive.compareAndSet(false, true)) {
            this.publisherService.submit(new DataPublishWorker());
        }
        if (offer2) {
            return;
        }
        log.error("Event queue is full, and Event is not added to the queue to publish");
    }

    public void publish(String str, String str2, Object[] objArr, Object[] objArr2, Object[] objArr3) throws AgentException {
        publish(str, str2, objArr, objArr2, objArr3, (Map<String, String>) null);
    }

    public void publish(String str, String str2, Object[] objArr, Object[] objArr2, Object[] objArr3, Map<String, String> map) throws AgentException {
        if (!canPublish()) {
            boolean offer = this.publishDataQueue.offer(new PublishData(str, str2, objArr, objArr2, objArr3, map));
            reconnect();
            if (offer || !log.isDebugEnabled()) {
                return;
            }
            log.debug("Event queue is full, and Event is not added to the queue to publish");
            return;
        }
        if (null == this.dataPublisher) {
            if (this.publishDataQueue.offer(new PublishData(str, str2, objArr, objArr2, objArr3, map)) || !log.isDebugEnabled()) {
                return;
            }
            log.debug("Event queue is full, and Event is not added to the queue to publish");
            return;
        }
        String str3 = this.streamIdCache.get(DataPublisherUtil.getStreamCacheKey(str, str2));
        if (null != str3) {
            this.dataPublisher.publish(str3, objArr, objArr2, objArr3);
            return;
        }
        boolean offer2 = this.publishDataQueue.offer(new PublishData(str, str2, objArr, objArr2, objArr3, map));
        if (this.isPublisherAlive.compareAndSet(false, true)) {
            this.publisherService.submit(new DataPublishWorker());
        }
        if (offer2) {
            return;
        }
        log.error("Event queue is full, and Event is not added to the queue to publish");
    }

    public void publish(String str, String str2, Event event) throws AgentException {
        if (!canPublish()) {
            boolean offer = this.publishDataQueue.offer(new PublishData(str, str2, event));
            reconnect();
            if (offer || !log.isDebugEnabled()) {
                return;
            }
            log.debug("Event queue is full, and Event is not added to the queue to publish");
            return;
        }
        if (null == this.dataPublisher) {
            if (this.publishDataQueue.offer(new PublishData(str, str2, event)) || !log.isDebugEnabled()) {
                return;
            }
            log.debug("Event queue is full, and Event is not added to the queue to publish");
            return;
        }
        String str3 = this.streamIdCache.get(DataPublisherUtil.getStreamCacheKey(str, str2));
        if (null != str3) {
            event.setStreamId(str3);
            this.dataPublisher.publish(event);
            return;
        }
        boolean offer2 = this.publishDataQueue.offer(new PublishData(str, str2, event));
        if (this.isPublisherAlive.compareAndSet(false, true)) {
            this.publisherService.submit(new DataPublishWorker());
        }
        if (offer2) {
            return;
        }
        log.error("Event queue is full, and Event is not added to the queue to publish");
    }

    public void publish(Event event) throws AgentException {
        if (!canPublish()) {
            boolean offer = this.publishDataQueue.offer(new PublishData(null, null, event));
            reconnect();
            if (offer || !log.isDebugEnabled()) {
                return;
            }
            log.debug("Event queue is full, and Event is not added to the queue to publish");
            return;
        }
        if (null != this.dataPublisher) {
            this.dataPublisher.publish(event);
        } else {
            if (this.publishDataQueue.offer(new PublishData(null, null, event)) || !log.isDebugEnabled()) {
                return;
            }
            log.debug("Event queue is full, and Event is not added to the queue to publish");
        }
    }

    public synchronized LinkedBlockingQueue<PublishData> getQueuedEventsAndReset() {
        LinkedBlockingQueue<PublishData> linkedBlockingQueue = this.publishDataQueue;
        this.publishDataQueue = null;
        return linkedBlockingQueue;
    }

    public void addStreamDefinition(String str, String str2, String str3) {
        this.streamDefnCache.put(DataPublisherUtil.getStreamCacheKey(str2, str3), str);
    }

    public void addStreamDefinition(StreamDefinition streamDefinition) {
        this.streamDefnCache.put(DataPublisherUtil.getStreamCacheKey(streamDefinition.getName(), streamDefinition.getVersion()), this.gson.toJson(streamDefinition));
    }

    public boolean isStreamDefinitionAdded(String str, String str2) {
        return null != this.streamDefnCache.get(DataPublisherUtil.getStreamCacheKey(str, str2));
    }

    public boolean isStreamDefinitionAdded(StreamDefinition streamDefinition) {
        return null != this.streamDefnCache.get(DataPublisherUtil.getStreamCacheKey(streamDefinition.getName(), streamDefinition.getVersion()));
    }

    @Deprecated
    public String findStream(String str, String str2) throws AgentException, StreamDefinitionException, NoStreamDefinitionExistException {
        if (null == this.dataPublisher) {
            return null;
        }
        String streamCacheKey = DataPublisherUtil.getStreamCacheKey(str, str2);
        String str3 = this.streamIdCache.get(streamCacheKey);
        if (null == str3) {
            str3 = this.dataPublisher.findStream(str, str2);
            this.streamIdCache.put(streamCacheKey, str3);
        }
        return str3;
    }

    public String findStreamId(String str, String str2) throws AgentException {
        if (null == this.dataPublisher) {
            return null;
        }
        String streamCacheKey = DataPublisherUtil.getStreamCacheKey(str, str2);
        String str3 = this.streamIdCache.get(streamCacheKey);
        if (null == str3) {
            str3 = this.dataPublisher.findStreamId(str, str2);
            this.streamIdCache.put(streamCacheKey, str3);
        }
        return str3;
    }

    public void stop() {
        this.publisherService.shutdown();
        this.connectorService.shutdown();
        if (null != this.dataPublisher) {
            this.dataPublisher.stop();
        }
    }
}
