package org.wso2.carbon.event.processor.core.internal.ha;

import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.event.processor.core.internal.ha.thrift.HAServiceClientThriftImpl;
import org.wso2.carbon.event.processor.core.internal.util.EventProcessorConstants;
import org.wso2.siddhi.core.SiddhiManager;

/* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/ha/HAManager.class */
public class HAManager {
    private static final Log log = LogFactory.getLog(HAManager.class);
    private final HazelcastInstance hazelcastInstance;
    private final String executionPlanName;
    private final int tenantId;
    private final SiddhiManager siddhiManager;
    private final int inputProcessors;
    private final CEPMembership currentCepMembershipInfo;
    private boolean activeLockAcquired;
    private boolean passiveLockAcquired;
    private ILock activeLock;
    private ILock passiveLock;
    private IMap<String, CEPMembership> membershipMap;
    private ThreadPoolExecutor processThreadPoolExecutor;
    private static final String HA_PREFIX = "org.wso2.cep.ha";
    private static String activeId;
    private static String passiveId;
    private final Map<String, SiddhiHAInputEventDispatcher> inputEventDispatcherMap = new HashMap();
    private List<SiddhiHAOutputStreamListener> streamCallbackList = new ArrayList();
    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
    private ThreadBarrier threadBarrier = new ThreadBarrier();
    private Future stateChanger = null;

    /* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/ha/HAManager$PeriodicStateChanger.class */
    class PeriodicStateChanger implements Runnable {
        PeriodicStateChanger() {
        }

        @Override // java.lang.Runnable
        public void run() {
            HAManager.this.tryChangeState();
            if (HAManager.this.activeLockAcquired) {
                return;
            }
            HAManager.this.stateChanger = HAManager.this.scheduledThreadPoolExecutor.schedule(this, 15L, TimeUnit.SECONDS);
        }
    }

    public HAManager(HazelcastInstance hazelcastInstance, String str, int i, SiddhiManager siddhiManager, int i2, CEPMembership cEPMembership) {
        this.hazelcastInstance = hazelcastInstance;
        this.executionPlanName = str;
        this.tenantId = i;
        this.siddhiManager = siddhiManager;
        this.inputProcessors = i2;
        this.currentCepMembershipInfo = cEPMembership;
        activeId = "Active:" + i + EventProcessorConstants.STREAM_SEPARATOR + str;
        passiveId = "Passive:" + i + EventProcessorConstants.STREAM_SEPARATOR + str;
        this.activeLock = hazelcastInstance.getLock("org.wso2.cep.ha:" + i + EventProcessorConstants.STREAM_SEPARATOR + str + ":ActiveLock");
        this.passiveLock = hazelcastInstance.getLock("org.wso2.cep.ha:" + i + EventProcessorConstants.STREAM_SEPARATOR + str + ":PassiveLock");
        this.processThreadPoolExecutor = new ThreadPoolExecutor(i2, i2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        hazelcastInstance.getCluster().addMembershipListener(new MembershipListener() { // from class: org.wso2.carbon.event.processor.core.internal.ha.HAManager.1
            public void memberAdded(MembershipEvent membershipEvent) {
            }

            public void memberRemoved(MembershipEvent membershipEvent) {
                if (HAManager.this.activeLockAcquired) {
                    return;
                }
                HAManager.this.tryChangeState();
            }
        });
        this.membershipMap = hazelcastInstance.getMap("org.wso2.cep.ha:MembershipMap");
        this.membershipMap.addEntryListener(new EntryAdapter<String, CEPMembership>() { // from class: org.wso2.carbon.event.processor.core.internal.ha.HAManager.2
            public void entryRemoved(EntryEvent<String, CEPMembership> entryEvent) {
                HAManager.this.tryChangeState();
            }
        }, activeId, false);
    }

    public void init() {
        tryChangeState();
        if (this.activeLockAcquired) {
            return;
        }
        this.scheduledThreadPoolExecutor.execute(new PeriodicStateChanger());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void tryChangeState() {
        if (this.passiveLockAcquired) {
            if (this.activeLockAcquired || !this.activeLock.tryLock()) {
                return;
            }
            this.activeLockAcquired = true;
            becomeActive();
            return;
        }
        if (this.passiveLock.tryLock()) {
            this.passiveLockAcquired = true;
            if (!this.activeLock.tryLock()) {
                becomePassive();
            } else {
                this.activeLockAcquired = true;
                becomeActive();
            }
        }
    }

    private void becomePassive() {
        this.membershipMap.put(passiveId, this.currentCepMembershipInfo);
        this.threadBarrier.close();
        Iterator<SiddhiHAOutputStreamListener> it = this.streamCallbackList.iterator();
        while (it.hasNext()) {
            it.next().setDrop(true);
        }
        try {
            SnapshotData snapshot = new HAServiceClientThriftImpl().getSnapshot(this.tenantId, this.executionPlanName, (CEPMembership) this.membershipMap.get(activeId), this.currentCepMembershipInfo);
            for (int i = 0; i < 1000 && this.threadBarrier.getBlockedThreads().longValue() != this.inputProcessors; i++) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                this.siddhiManager.restore(snapshot.getStates());
                for (Map.Entry entry : ((HashMap) ByteSerializer.BToO(snapshot.getNextEventData())).entrySet()) {
                    SiddhiHAInputEventDispatcher siddhiHAInputEventDispatcher = this.inputEventDispatcherMap.get(entry.getKey());
                    if (siddhiHAInputEventDispatcher == null) {
                        throw new Exception(((String) entry.getKey()) + " stream mismatched with the Active Node " + this.executionPlanName + " execution plan for tenant:" + this.tenantId);
                    }
                    BlockingQueue<Object[]> eventQueue = siddhiHAInputEventDispatcher.getEventQueue();
                    Object[] objArr = (Object[]) entry.getValue();
                    for (Object[] peek = eventQueue.peek(); !Arrays.equals(peek, objArr); peek = eventQueue.peek()) {
                        eventQueue.remove();
                    }
                }
            } catch (Throwable th) {
                log.error("Syncing failed when becoming a Passive Node for tenant:" + this.tenantId + " on:" + this.executionPlanName + " execution plan", th);
            }
            this.threadBarrier.open();
            log.info("Became Passive Member for tenant:" + this.tenantId + " on:" + this.executionPlanName);
        } catch (Exception e2) {
            log.error("Error in becoming the passive member for " + this.executionPlanName + " on tenant:" + this.tenantId + ", " + e2.getMessage(), e2);
            this.threadBarrier.open();
        }
    }

    private void becomeActive() {
        this.membershipMap.remove(passiveId);
        this.membershipMap.put(activeId, this.currentCepMembershipInfo);
        Iterator<SiddhiHAOutputStreamListener> it = this.streamCallbackList.iterator();
        while (it.hasNext()) {
            it.next().setDrop(false);
        }
        this.passiveLock.forceUnlock();
        log.info("Became Active Member for tenant:" + this.tenantId + " on:" + this.executionPlanName);
    }

    public ExecutorService getProcessThreadPoolExecutor() {
        return this.processThreadPoolExecutor;
    }

    public ThreadBarrier getThreadBarrier() {
        return this.threadBarrier;
    }

    public void addStreamCallback(SiddhiHAOutputStreamListener siddhiHAOutputStreamListener) {
        this.streamCallbackList.add(siddhiHAOutputStreamListener);
    }

    public void addInputEventDispatcher(String str, SiddhiHAInputEventDispatcher siddhiHAInputEventDispatcher) {
        this.inputEventDispatcherMap.put(str, siddhiHAInputEventDispatcher);
    }

    public boolean isActiveMember() {
        return this.activeLockAcquired;
    }

    public void shutdown() {
        if (this.passiveLockAcquired) {
            this.membershipMap.remove(passiveId);
            this.passiveLock.forceUnlock();
        }
        if (this.activeLockAcquired) {
            this.activeLock.forceUnlock();
            this.membershipMap.remove(activeId);
        }
    }

    public SnapshotData getActiveSnapshotData() {
        this.threadBarrier.close();
        SnapshotData snapshotData = new SnapshotData();
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, SiddhiHAInputEventDispatcher> entry : this.inputEventDispatcherMap.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue().getEventQueue().peek());
        }
        snapshotData.setNextEventData(ByteSerializer.OToB(hashMap));
        snapshotData.setStates(this.siddhiManager.snapshot());
        this.threadBarrier.open();
        return snapshotData;
    }
}
