package org.wso2.carbon.ntask.core.impl.clustered;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.coordination.common.CoordinationException;
import org.wso2.carbon.coordination.core.sync.Group;
import org.wso2.carbon.coordination.core.sync.GroupEventListener;
import org.wso2.carbon.ntask.common.TaskException;
import org.wso2.carbon.ntask.core.TaskManager;
import org.wso2.carbon.ntask.core.internal.TasksDSComponent;
import org.wso2.carbon.ntask.core.service.TaskService;

/* loaded from: input_file:org/wso2/carbon/ntask/core/impl/clustered/ClusterGroupCommunicator.class */
public class ClusterGroupCommunicator {
    public static final String TASK_SERVER_COUNT_SYS_PROP = "task.server.count";
    public static final String CARBON_TASK_GROUP_BASE = "__CARBON_TASK_GROUP_";
    public static final String CARBON_TASK_SERVER_STARTUP_GROUP = "__CARBON_TASK_GROUP___SERVER_STARTUP_GROUP__";
    private static final Log log = LogFactory.getLog(ClusterGroupCommunicator.class);
    private static ClusterGroupCommunicator instance;
    private Map<String, ClusterGroup> clusterGroupMap = new HashMap();
    private TaskService taskService;

    /* loaded from: input_file:org/wso2/carbon/ntask/core/impl/clustered/ClusterGroupCommunicator$ClusterGroup.class */
    public class ClusterGroup implements GroupEventListener {
        private String taskType;
        private Group group;
        private boolean leader;

        public ClusterGroup(String str) throws TaskException {
            this.taskType = str;
            try {
                this.group = TasksDSComponent.getCoordinationService().createGroup(ClusterGroupCommunicator.CARBON_TASK_GROUP_BASE + getTaskType());
                this.group.setGroupEventListener(this);
                try {
                    this.leader = getGroup().getLeaderId().equals(getGroup().getMemberId());
                } catch (CoordinationException e) {
                    throw new TaskException("Error in creating cluster group: " + e.getMessage(), TaskException.Code.UNKNOWN, e);
                }
            } catch (CoordinationException e2) {
                throw new TaskException(e2.getMessage(), TaskException.Code.UNKNOWN, e2);
            }
        }

        public List<String> getMemberIds() throws CoordinationException {
            return getGroup().getMemberIds();
        }

        public String getMemberId() throws CoordinationException {
            return getGroup().getMemberId();
        }

        public String getLeaderId() throws CoordinationException {
            return getGroup().getLeaderId();
        }

        public Group getGroup() {
            return this.group;
        }

        public byte[] sendReceive(int i, String str, String str2, byte[] bArr) throws Exception {
            return ((OperationResponse) ClusterGroupCommunicator.bytesToObject(getGroup().sendReceive(str, ClusterGroupCommunicator.objectToBytes(new OperationRequest(i, str2, bArr))))).getPayload();
        }

        public void onLeaderChange(String str) {
            if (ClusterGroupCommunicator.log.isDebugEnabled()) {
                ClusterGroupCommunicator.log.info("Task server leader changed [" + getTaskType() + "]: " + str);
            }
            this.leader = str.equals(getGroup().getMemberId());
            try {
                if (isLeader()) {
                    ClusterGroupCommunicator.log.info("Task server leader changed [" + getTaskType() + "], rescheduling missing tasks...");
                    scheduleAllMissingTasks();
                }
            } catch (TaskException e) {
                ClusterGroupCommunicator.log.error("Error in scheduling missing tasks: " + e.getMessage(), e);
            }
        }

        public boolean isLeader() {
            return this.leader;
        }

        public String getTaskType() {
            return this.taskType;
        }

        public void onGroupMessage(byte[] bArr) {
        }

        public void onMemberArrival(String str) {
            if (ClusterGroupCommunicator.log.isDebugEnabled()) {
                ClusterGroupCommunicator.log.debug("Task member arrived: " + str);
            }
        }

        private void scheduleAllMissingTasks() throws TaskException {
            for (TaskManager taskManager : ClusterGroupCommunicator.this.getTaskService().getAllTenantTaskManagersForType(getTaskType())) {
                if (taskManager instanceof ClusteredTaskManager) {
                    ((ClusteredTaskManager) taskManager).scheduleMissingTasks();
                }
            }
        }

        public void onMemberDeparture(String str) {
            if (ClusterGroupCommunicator.log.isDebugEnabled()) {
                ClusterGroupCommunicator.log.debug("Task member departed: " + str);
            }
            try {
                if (isLeader()) {
                    ClusterGroupCommunicator.log.info("Task member departed [" + getTaskType() + "], rescheduling missing tasks...");
                    scheduleAllMissingTasks();
                }
            } catch (TaskException e) {
                ClusterGroupCommunicator.log.error("Error in scheduling missing tasks: " + e.getMessage(), e);
            }
        }

        public byte[] onPeerMessage(byte[] bArr) throws CoordinationException {
            try {
                OperationRequest operationRequest = (OperationRequest) ClusterGroupCommunicator.bytesToObject(bArr);
                try {
                    PrivilegedCarbonContext.startTenantFlow();
                    PrivilegedCarbonContext.getCurrentContext().setTenantId(operationRequest.getTenantId());
                    TaskManager taskManager = ClusterGroupCommunicator.this.getTaskService().getTaskManager(getTaskType());
                    if (!(taskManager instanceof ClusteredTaskManager)) {
                        throw new CoordinationException("Invalid task manager type, expected 'clustered' type, got: " + taskManager, CoordinationException.ExceptionCode.GENERIC_ERROR);
                    }
                    byte[] objectToBytes = ClusterGroupCommunicator.objectToBytes(((ClusteredTaskManager) taskManager).onOperationRequest(operationRequest));
                    PrivilegedCarbonContext.endTenantFlow();
                    return objectToBytes;
                } catch (Throwable th) {
                    PrivilegedCarbonContext.endTenantFlow();
                    throw th;
                }
            } catch (Exception e) {
                throw new CoordinationException(e.getMessage(), CoordinationException.ExceptionCode.GENERIC_ERROR, e);
            }
        }
    }

    /* loaded from: input_file:org/wso2/carbon/ntask/core/impl/clustered/ClusterGroupCommunicator$OperationRequest.class */
    public static class OperationRequest implements Serializable {
        private static final long serialVersionUID = 1;
        private int tenantId;
        private String opName;
        private byte[] payload;

        public OperationRequest(int i, String str, byte[] bArr) {
            this.tenantId = i;
            this.opName = str;
            this.payload = bArr;
        }

        public int getTenantId() {
            return this.tenantId;
        }

        public String getOpName() {
            return this.opName;
        }

        public byte[] getPayload() {
            return this.payload;
        }
    }

    /* loaded from: input_file:org/wso2/carbon/ntask/core/impl/clustered/ClusterGroupCommunicator$OperationResponse.class */
    public static class OperationResponse implements Serializable {
        private static final long serialVersionUID = 1;
        private byte[] payload;

        public OperationResponse(byte[] bArr) {
            this.payload = bArr;
        }

        public byte[] getPayload() {
            return this.payload;
        }
    }

    public static ClusterGroupCommunicator getInstance() throws TaskException {
        if (instance == null) {
            synchronized (ClusterGroupCommunicator.class) {
                if (instance == null) {
                    instance = new ClusterGroupCommunicator(TasksDSComponent.getTaskService());
                }
            }
        }
        return instance;
    }

    private ClusterGroupCommunicator(TaskService taskService) throws TaskException {
        this.taskService = taskService;
    }

    public void checkServers() throws CoordinationException {
        int taskServerCount = getTaskService().getServerConfiguration().getTaskServerCount();
        if (taskServerCount != -1) {
            log.info("Waiting for " + taskServerCount + " task servers...");
            TasksDSComponent.getCoordinationService().createGroup(CARBON_TASK_SERVER_STARTUP_GROUP).waitForMemberCount(taskServerCount);
            log.info("All task servers activated.");
        }
    }

    public void newTaskTypeAdded(String str) throws TaskException {
        this.clusterGroupMap.put(str, new ClusterGroup(str));
    }

    public TaskService getTaskService() {
        return this.taskService;
    }

    public ClusterGroup getClusterGroup(String str) {
        return this.clusterGroupMap.get(str);
    }

    public String getLeaderId(String str) throws CoordinationException {
        return getClusterGroup(str).getLeaderId();
    }

    public List<String> getMemberIds(String str) throws CoordinationException {
        return getClusterGroup(str).getMemberIds();
    }

    public String getMemberId(String str) throws CoordinationException {
        return getClusterGroup(str).getMemberId();
    }

    public boolean isLeader(String str) throws TaskException {
        return getClusterGroup(str).isLeader();
    }

    public byte[] sendReceive(int i, String str, String str2, String str3, byte[] bArr) throws Exception {
        return getClusterGroup(str).sendReceive(i, str2, str3, bArr);
    }

    public static byte[] objectToBytes(Object obj) throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
        objectOutputStream.writeObject(obj);
        objectOutputStream.close();
        return byteArrayOutputStream.toByteArray();
    }

    public static Object bytesToObject(byte[] bArr) throws Exception {
        ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(bArr));
        Object readObject = objectInputStream.readObject();
        objectInputStream.close();
        return readObject;
    }
}
