package org.wso2.carbon.coordination.core.sync.impl;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.wso2.carbon.coordination.common.CoordinationException;
import org.wso2.carbon.coordination.core.services.impl.ZKCoordinationService;
import org.wso2.carbon.coordination.core.sync.Group;
import org.wso2.carbon.coordination.core.sync.GroupEventListener;
import org.wso2.carbon.coordination.core.utils.CoordinationUtils;

/* loaded from: input_file:org/wso2/carbon/coordination/core/sync/impl/ZKGroup.class */
public class ZKGroup extends ZKSyncPrimitive implements Group {
    private static final Log log = LogFactory.getLog(ZKGroup.class);
    public static final String COMM_BASE_NAME = "__COMM__";
    public static final String GROUP_COMM_NODE_ID = "__COMM__/GROUP_COMMUNICATION";
    public static final String PEER_RESULTS_BASE_NAME = "__COMM__/PEER_RESULTS";
    public static final String PEER_REQUESTS_BASE_NAME = "__COMM__/PEER_REQUESTS";
    private GroupEventListener groupEventListener;
    private String memberPath;
    private String memberId;
    private List<String> lastProcessedMemberIds;
    private String leaderId;
    private boolean active;
    private CommunicationChannel groupCommChannel;
    private CommunicationChannel myRequestChannel;
    private Map<String, CommunicationChannel> peerRequestChannels;
    private static Marshaller peerRequestMarshaller;
    private static Marshaller peerResponseMarshaller;
    private static Unmarshaller peerRequestUnmarshaller;
    private static Unmarshaller peerResponseUnmarshaller;
    private Object memberArrivalCountLock;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/carbon/coordination/core/sync/impl/ZKGroup$CommunicationChannel.class */
    public class CommunicationChannel implements Watcher {
        public static final int CHANNEL_TYPE_BROADCAST = 0;
        public static final int CHANNEL_TYPE_PEER = 1;
        private String commRootPath;
        private String lastProcessedMessageId;
        private int channelType;
        private boolean incoming;

        public CommunicationChannel(String str, int i, boolean z) throws CoordinationException {
            this.commRootPath = str;
            this.channelType = i;
            this.incoming = z;
            ZKGroup.this.createRecursive(getCommRootPath());
            try {
                if (isIncoming()) {
                    ZKGroup.this.getZooKeeper().getData(getCommRootPath(), this, (Stat) null);
                }
            } catch (Exception e) {
                throw new CoordinationException(CoordinationException.ExceptionCode.GENERIC_ERROR, e);
            }
        }

        public boolean isIncoming() {
            return this.incoming;
        }

        public int getChannelType() {
            return this.channelType;
        }

        public String getCommRootPath() {
            return this.commRootPath;
        }

        private byte[] validateAndReturnDataFromId(String str) throws Exception {
            String str2 = getCommRootPath() + "/" + str;
            byte[] bArr = null;
            try {
                bArr = ZKGroup.this.getZooKeeper().getData(str2, false, (Stat) null);
                if (getChannelType() == 1) {
                    ZKGroup.this.getZooKeeper().delete(str2, -1);
                }
                return bArr;
            } catch (KeeperException e) {
                if (e.code() == KeeperException.Code.NONODE) {
                    return bArr;
                }
                throw e;
            }
        }

        private byte[] getNextMessage() throws Exception {
            List<String> children = ZKGroup.this.getZooKeeper().getChildren(getCommRootPath(), false);
            Collections.sort(children);
            if (this.lastProcessedMessageId == null) {
                if (children.size() <= 0) {
                    return null;
                }
                this.lastProcessedMessageId = (String) children.get(children.size() - 1);
                return validateAndReturnDataFromId(this.lastProcessedMessageId);
            }
            for (String str : children) {
                if (str.compareTo(this.lastProcessedMessageId) > 0) {
                    this.lastProcessedMessageId = str;
                    return validateAndReturnDataFromId(this.lastProcessedMessageId);
                }
            }
            return null;
        }

        private void processMessageData(byte[] bArr) throws Exception {
            if (getChannelType() == 0) {
                handleBroadcastMessage(bArr);
            } else if (getChannelType() == 1) {
                handlePeerMessage(bArr);
            }
        }

        private void handleBroadcastMessage(byte[] bArr) {
            GroupEventListener groupEventListener = ZKGroup.this.getGroupEventListener();
            if (groupEventListener != null) {
                groupEventListener.onGroupMessage(bArr);
            }
        }

        private void handlePeerMessage(byte[] bArr) {
            try {
                GroupEventListener groupEventListener = ZKGroup.this.getGroupEventListener();
                if (groupEventListener == null) {
                    throw new CoordinationException("No listener registered for peer requests");
                }
                try {
                    PeerRequestMessage peerRequestMessage = (PeerRequestMessage) ZKGroup.getPeerRequestUnmarshaller().unmarshal(new ByteArrayInputStream(bArr));
                    String correlationId = peerRequestMessage.getCorrelationId();
                    List<String> createResultDataNodesFromData = createResultDataNodesFromData(groupEventListener.onPeerMessage(peerRequestMessage.getData()));
                    PeerResponseMessage peerResponseMessage = new PeerResponseMessage();
                    peerResponseMessage.setMessageIds((String[]) createResultDataNodesFromData.toArray(new String[createResultDataNodesFromData.size()]));
                    peerResponseMessage.setSuccess(true);
                    createResultDataNode(correlationId, marshalPeerResponse(peerResponseMessage));
                } catch (JAXBException e) {
                    throw new CoordinationException("Error in unmarshalling peer message", CoordinationException.ExceptionCode.GENERIC_ERROR, e);
                }
            } catch (Exception e2) {
                if (0 != 0) {
                    PeerResponseMessage peerResponseMessage2 = new PeerResponseMessage();
                    peerResponseMessage2.setSuccess(false);
                    peerResponseMessage2.setMessage(e2.getMessage());
                    try {
                        createResultDataNode(null, marshalPeerResponse(peerResponseMessage2));
                    } catch (Exception e3) {
                        ZKGroup.log.error("Error in creating peer error result node: " + e3.getMessage(), e3);
                    }
                }
            }
        }

        private byte[] marshalPeerResponse(PeerResponseMessage peerResponseMessage) throws Exception {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            ZKGroup.getPeerResponseMarshaller().marshal(peerResponseMessage, byteArrayOutputStream);
            return byteArrayOutputStream.toByteArray();
        }

        private List<String> createResultDataNodesFromData(byte[] bArr) throws Exception {
            ArrayList<byte[]> arrayList = new ArrayList();
            int i = 0;
            while (true) {
                int i2 = i;
                if (i2 >= bArr.length) {
                    break;
                }
                arrayList.add(Arrays.copyOfRange(bArr, i2, Math.min(bArr.length, i2 + ZKCoordinationService.MAX_ZK_MESSAGE_SIZE)));
                i = i2 + ZKCoordinationService.MAX_ZK_MESSAGE_SIZE;
            }
            ArrayList arrayList2 = new ArrayList();
            for (byte[] bArr2 : arrayList) {
                String uuid = UUID.randomUUID().toString();
                createResultDataNode(uuid, bArr2);
                arrayList2.add(uuid);
            }
            return arrayList2;
        }

        private void createResultDataNode(String str, byte[] bArr) throws Exception {
            ZKCoordinationService.scheduleTimedZNodeDeletion(ZKGroup.this.getZooKeeper().create(ZKGroup.this.getRootPath() + "/" + ZKGroup.PEER_RESULTS_BASE_NAME + "/" + str, bArr, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL));
        }

        public void process(WatchedEvent watchedEvent) {
            if (!ZKGroup.this.isUsefulWatchedEvent(watchedEvent)) {
                return;
            }
            try {
                ZKGroup.this.getZooKeeper().getData(getCommRootPath(), this, (Stat) null);
                while (true) {
                    byte[] nextMessage = getNextMessage();
                    if (nextMessage == null) {
                        return;
                    } else {
                        processMessageData(nextMessage);
                    }
                }
            } catch (Exception e) {
                if (CoordinationUtils.isJVMShuttingDown()) {
                    return;
                }
                ZKGroup.log.error("Error in receiving group messages: " + e.getMessage(), e);
            }
        }

        public void sendMessage(byte[] bArr) throws CoordinationException {
            try {
                ZKCoordinationService.scheduleTimedZNodeDeletion(ZKGroup.this.getZooKeeper().create(getCommRootPath() + "/node", bArr, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL));
                ZKGroup.this.getZooKeeper().setData(getCommRootPath(), new byte[0], -1);
            } catch (Exception e) {
                throw new CoordinationException(CoordinationException.ExceptionCode.GENERIC_ERROR, e);
            }
        }

        public void clearMessages() throws CoordinationException {
            try {
                Iterator it = ZKGroup.this.getZooKeeper().getChildren(getCommRootPath(), false).iterator();
                while (it.hasNext()) {
                    try {
                        ZKGroup.this.getZooKeeper().delete(getCommRootPath() + "/" + ((String) it.next()), -1);
                    } catch (Exception e) {
                        throw new CoordinationException(CoordinationException.ExceptionCode.GENERIC_ERROR, e);
                    } catch (KeeperException e2) {
                        if (e2.code() != KeeperException.Code.NONODE) {
                            throw new CoordinationException(CoordinationException.ExceptionCode.GENERIC_ERROR, e2);
                        }
                    }
                }
            } catch (Exception e3) {
                throw new CoordinationException(CoordinationException.ExceptionCode.GENERIC_ERROR, e3);
            }
        }
    }

    @XmlRootElement(name = "peerRequestMessage")
    /* loaded from: input_file:org/wso2/carbon/coordination/core/sync/impl/ZKGroup$PeerRequestMessage.class */
    public static class PeerRequestMessage {
        private String correlationId;
        private byte[] data;

        public String getCorrelationId() {
            return this.correlationId;
        }

        public void setCorrelationId(String str) {
            this.correlationId = str;
        }

        public byte[] getData() {
            return this.data;
        }

        public void setData(byte[] bArr) {
            this.data = bArr;
        }
    }

    @XmlRootElement(name = "peerResponseMessage")
    /* loaded from: input_file:org/wso2/carbon/coordination/core/sync/impl/ZKGroup$PeerResponseMessage.class */
    public static class PeerResponseMessage {
        private boolean success = true;
        private String message;
        private String[] messageIds;

        public String[] getMessageIds() {
            return this.messageIds;
        }

        public void setMessageIds(String[] strArr) {
            this.messageIds = strArr;
        }

        public boolean isSuccess() {
            return this.success;
        }

        public void setSuccess(boolean z) {
            this.success = z;
        }

        public String getMessage() {
            return this.message;
        }

        public void setMessage(String str) {
            this.message = str;
        }
    }

    /* loaded from: input_file:org/wso2/carbon/coordination/core/sync/impl/ZKGroup$PollingDataNode.class */
    public class PollingDataNode implements Watcher {
        public static final int TOTAL_TIMEOUT = 120000;
        public static final int CHECK_ALIVE_TIMEOUT = 5000;
        private Object lock = new Object();
        private byte[] data;
        private boolean ready;
        private String dataPath;
        private String testPath;

        public PollingDataNode(String str, String str2) throws Exception {
            this.dataPath = str;
            this.testPath = str2;
            if (ZKGroup.this.getZooKeeper().exists(getDataPath(), this) != null) {
                try {
                    this.data = ZKGroup.this.getZooKeeper().getData(str, false, (Stat) null);
                    this.ready = true;
                    try {
                        ZKGroup.this.getZooKeeper().delete(str, -1);
                    } catch (Exception e) {
                    }
                } catch (Throwable th) {
                    try {
                        ZKGroup.this.getZooKeeper().delete(str, -1);
                    } catch (Exception e2) {
                    }
                    throw th;
                }
            }
        }

        public String getDataPath() {
            return this.dataPath;
        }

        public String getTestPath() {
            return this.testPath;
        }

        public byte[] getData() {
            return this.data;
        }

        public boolean checkTestPath() {
            if (getTestPath() == null) {
                return true;
            }
            try {
                return ZKGroup.this.getZooKeeper().exists(getTestPath(), false) != null;
            } catch (Exception e) {
                return false;
            }
        }

        public boolean waitForData() throws Exception {
            int i = 0;
            do {
                synchronized (this.lock) {
                    if (isReady()) {
                        return true;
                    }
                    this.lock.wait(5000L);
                    if (isReady()) {
                        return true;
                    }
                    if (!checkTestPath()) {
                        return false;
                    }
                    i += CHECK_ALIVE_TIMEOUT;
                }
            } while (i <= 120000);
            cleanupDataNode();
            return false;
        }

        private void cleanupDataNode() {
            try {
                ZKGroup.this.getZooKeeper().create(getDataPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            } catch (Exception e) {
            }
            try {
                ZKGroup.this.getZooKeeper().delete(getDataPath(), -1);
            } catch (Exception e2) {
            }
        }

        public boolean isReady() {
            return this.ready;
        }

        public void process(WatchedEvent watchedEvent) {
            if (ZKGroup.this.isUsefulWatchedEvent(watchedEvent)) {
                synchronized (this.lock) {
                    try {
                        this.data = ZKGroup.this.getZooKeeper().getData(getDataPath(), false, (Stat) null);
                        ZKGroup.this.getZooKeeper().delete(this.dataPath, -1);
                    } catch (Exception e) {
                    }
                    this.ready = true;
                    this.lock.notifyAll();
                }
            }
        }
    }

    public ZKGroup(ZooKeeper zooKeeper, String str) throws CoordinationException {
        super(zooKeeper, ZKGroup.class.getCanonicalName(), str, -1);
        this.memberArrivalCountLock = new Object();
        this.lastProcessedMemberIds = new ArrayList();
        initMessageSerializers();
        init(getGroupId());
    }

    @Override // org.wso2.carbon.coordination.core.sync.impl.ZKSyncPrimitive
    public void init(String str) {
        try {
            super.init(str);
            this.peerRequestChannels = new HashMap();
            initGroupCommChannel();
            initPeerResults();
            join(str);
            initMyRequestCommChannel();
            this.active = true;
        } catch (CoordinationException e) {
            log.error("Error while creating the group : " + str);
        }
    }

    private void initMessageSerializers() throws CoordinationException {
        try {
            if (peerRequestMarshaller == null || peerRequestUnmarshaller == null || peerResponseMarshaller == null || peerRequestUnmarshaller == null) {
                JAXBContext newInstance = JAXBContext.newInstance(new Class[]{PeerRequestMessage.class});
                JAXBContext newInstance2 = JAXBContext.newInstance(new Class[]{PeerResponseMessage.class});
                peerRequestMarshaller = newInstance.createMarshaller();
                peerRequestUnmarshaller = newInstance.createUnmarshaller();
                peerResponseMarshaller = newInstance2.createMarshaller();
                peerResponseUnmarshaller = newInstance2.createUnmarshaller();
            }
        } catch (JAXBException e) {
            throw new CoordinationException("Error in initializing peer message serializers", CoordinationException.ExceptionCode.GENERIC_ERROR, e);
        }
    }

    public static Marshaller getPeerRequestMarshaller() {
        return peerRequestMarshaller;
    }

    public static Marshaller getPeerResponseMarshaller() {
        return peerResponseMarshaller;
    }

    public static Unmarshaller getPeerRequestUnmarshaller() {
        return peerRequestUnmarshaller;
    }

    public static Unmarshaller getPeerResponseUnmarshaller() {
        return peerResponseUnmarshaller;
    }

    public Map<String, CommunicationChannel> getPeerRequestChannels() {
        return this.peerRequestChannels;
    }

    private CommunicationChannel createRequestChannelForPeer(String str) throws CoordinationException {
        return new CommunicationChannel(getRootPath() + "/" + PEER_REQUESTS_BASE_NAME + "/" + str, 1, false);
    }

    public CommunicationChannel retrievePeerRequestChannel(String str) throws CoordinationException {
        CommunicationChannel communicationChannel = getPeerRequestChannels().get(str);
        if (communicationChannel == null) {
            communicationChannel = createRequestChannelForPeer(str);
            getPeerRequestChannels().put(str, communicationChannel);
        }
        return communicationChannel;
    }

    public CommunicationChannel getGroupCommChannel() {
        return this.groupCommChannel;
    }

    public CommunicationChannel getMyRequestChannel() {
        return this.myRequestChannel;
    }

    private void initGroupCommChannel() throws CoordinationException {
        this.groupCommChannel = new CommunicationChannel(getRootPath() + "/" + GROUP_COMM_NODE_ID, 0, true);
    }

    private void initPeerResults() throws CoordinationException {
        createRecursive(getRootPath() + "/" + PEER_RESULTS_BASE_NAME);
    }

    private void initMyRequestCommChannel() throws CoordinationException {
        this.myRequestChannel = new CommunicationChannel(getRootPath() + "/" + PEER_REQUESTS_BASE_NAME + "/" + getMemberId(), 1, true);
        ZKCoordinationService.scheduleOnCloseZNodeDeletion(getMyRequestChannel().getCommRootPath());
    }

    public boolean isActive() {
        return this.active;
    }

    private void setActive(boolean z) {
        this.active = z;
    }

    public String getMemberPath() {
        return this.memberPath;
    }

    private void join(String str) throws CoordinationException {
        try {
            this.memberPath = getZooKeeper().create(getRootPath() + "/node", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            this.memberId = getMemberPath().substring(getMemberPath().lastIndexOf("/") + 1);
            processMemberNodes();
        } catch (Exception e) {
            throw new CoordinationException(CoordinationException.ExceptionCode.GENERIC_ERROR, e);
        } catch (CoordinationException e2) {
            throw e2;
        }
    }

    @Override // org.wso2.carbon.coordination.core.sync.Group
    public List<String> getMemberIds() throws CoordinationException {
        return lookupMemberIds(false);
    }

    public List<String> getLastProcessedMemberIds() {
        return this.lastProcessedMemberIds;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processMemberNodes() throws CoordinationException {
        List<String> lastProcessedMemberIds = getLastProcessedMemberIds();
        String leaderId = getLeaderId();
        try {
            this.lastProcessedMemberIds = lookupMemberIds(true);
            processArrivals(lastProcessedMemberIds);
            processLeader(leaderId);
            processDepartures(lastProcessedMemberIds);
            synchronized (this.memberArrivalCountLock) {
                this.memberArrivalCountLock.notifyAll();
            }
        } catch (Exception e) {
            throw new CoordinationException(CoordinationException.ExceptionCode.GENERIC_ERROR, e);
        }
    }

    private void processLeader(String str) throws CoordinationException {
        this.leaderId = getMemberIds().get(0);
        if (this.leaderId.equals(str) || getGroupEventListener() == null) {
            return;
        }
        getGroupEventListener().onLeaderChange(getLeaderId());
    }

    private void processArrivals(List<String> list) throws CoordinationException {
        ArrayList arrayList = new ArrayList(getMemberIds());
        if (list != null) {
            arrayList.removeAll(list);
        }
        if (getGroupEventListener() != null) {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                getGroupEventListener().onMemberArrival((String) it.next());
            }
        }
    }

    private void processDepartures(List<String> list) throws CoordinationException {
        if (list == null) {
            return;
        }
        ArrayList arrayList = new ArrayList(list);
        arrayList.removeAll(getMemberIds());
        if (getGroupEventListener() != null) {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                getGroupEventListener().onMemberDeparture((String) it.next());
            }
        }
    }

    private String getPathFromMemberId(String str) {
        return getRootPath() + "/" + str;
    }

    @Override // org.wso2.carbon.coordination.core.sync.Group
    public String getLeaderId() {
        return this.leaderId;
    }

    @Override // org.wso2.carbon.coordination.core.sync.Group
    public void broadcast(byte[] bArr) throws CoordinationException {
        getGroupCommChannel().sendMessage(bArr);
    }

    @Override // org.wso2.carbon.coordination.core.sync.Group
    public byte[] sendReceive(String str, byte[] bArr) throws CoordinationException {
        if (str.equals(getMemberId())) {
            GroupEventListener groupEventListener = getGroupEventListener();
            if (groupEventListener == null) {
                throw new CoordinationException("No group event listener registerd to handle sendReceive");
            }
            return groupEventListener.onPeerMessage(bArr);
        }
        PeerRequestMessage peerRequestMessage = new PeerRequestMessage();
        String uuid = UUID.randomUUID().toString();
        peerRequestMessage.setCorrelationId(uuid);
        peerRequestMessage.setData(bArr);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            getPeerRequestMarshaller().marshal(peerRequestMessage, byteArrayOutputStream);
            retrievePeerRequestChannel(str).sendMessage(byteArrayOutputStream.toByteArray());
            try {
                PollingDataNode pollingDataNode = new PollingDataNode(getRootPath() + "/" + PEER_RESULTS_BASE_NAME + "/" + uuid, getPathFromMemberId(str));
                if (pollingDataNode.waitForData()) {
                    return processDataNodeReply(pollingDataNode.getData());
                }
                throw new CoordinationException("sendReceive failed in retrieving a reply from member with id: " + str);
            } catch (Exception e) {
                throw new CoordinationException("Error in retrieving data from polling data node", CoordinationException.ExceptionCode.GENERIC_ERROR, e);
            }
        } catch (JAXBException e2) {
            throw new CoordinationException("Error in marshalling the peer request message", CoordinationException.ExceptionCode.GENERIC_ERROR, e2);
        }
    }

    private byte[] processDataNodeReply(byte[] bArr) throws CoordinationException {
        try {
            PeerResponseMessage peerResponseMessage = (PeerResponseMessage) getPeerResponseUnmarshaller().unmarshal(new ByteArrayInputStream(bArr));
            if (!peerResponseMessage.isSuccess()) {
                throw new CoordinationException("Error occured in target peer processing: " + peerResponseMessage.getMessage(), CoordinationException.ExceptionCode.GENERIC_ERROR);
            }
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            for (String str : peerResponseMessage.getMessageIds()) {
                try {
                    byteArrayOutputStream.write(retrieveResultData(str));
                } catch (IOException e) {
                    throw new CoordinationException("Error creating result buffer: " + e.getMessage(), CoordinationException.ExceptionCode.GENERIC_ERROR, e);
                }
            }
            return byteArrayOutputStream.toByteArray();
        } catch (JAXBException e2) {
            throw new CoordinationException("Error in unmarshalling peer response message: " + e2.getMessage(), CoordinationException.ExceptionCode.GENERIC_ERROR, e2);
        }
    }

    private byte[] retrieveResultData(String str) throws CoordinationException {
        String str2 = getRootPath() + "/" + PEER_RESULTS_BASE_NAME + "/" + str;
        try {
            byte[] data = getZooKeeper().getData(str2, false, (Stat) null);
            try {
                getZooKeeper().delete(str2, -1);
            } catch (Exception e) {
            }
            return data;
        } catch (Exception e2) {
            throw new CoordinationException("Unknown error in retrieving peer result data", CoordinationException.ExceptionCode.GENERIC_ERROR, e2);
        } catch (KeeperException e3) {
            throw new CoordinationException("Coordination error in retrieving peer result data", CoordinationException.ExceptionCode.GENERIC_ERROR, e3);
        }
    }

    @Override // org.wso2.carbon.coordination.core.sync.Group
    public void leave() throws CoordinationException {
        setActive(false);
        try {
            getZooKeeper().delete(getMemberPath(), -1);
        } catch (Exception e) {
            throw new CoordinationException(CoordinationException.ExceptionCode.GENERIC_ERROR, e);
        }
    }

    @Override // org.wso2.carbon.coordination.core.sync.Group
    public GroupEventListener getGroupEventListener() {
        return this.groupEventListener;
    }

    @Override // org.wso2.carbon.coordination.core.sync.Group
    public void setGroupEventListener(GroupEventListener groupEventListener) {
        this.groupEventListener = groupEventListener;
    }

    @Override // org.wso2.carbon.coordination.core.sync.Group
    public String getMemberId() {
        return this.memberId;
    }

    @Override // org.wso2.carbon.coordination.core.sync.Group
    public String getGroupId() {
        return getId();
    }

    private boolean isCommPath(String str) {
        return str.startsWith(COMM_BASE_NAME);
    }

    @Override // org.wso2.carbon.coordination.core.sync.impl.ZKSyncPrimitive
    public void process(WatchedEvent watchedEvent) {
        String path;
        if (isActive() && isUsefulWatchedEvent(watchedEvent) && (path = watchedEvent.getPath()) != null && !isCommPath(path)) {
            new Thread(new Runnable() { // from class: org.wso2.carbon.coordination.core.sync.impl.ZKGroup.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        ZKGroup.this.processMemberNodes();
                    } catch (Exception e) {
                        if (CoordinationUtils.isJVMShuttingDown()) {
                            return;
                        }
                        ZKGroup.log.error("Error in processing WatchedEvent: " + e.getMessage(), e);
                    }
                }
            }).start();
        }
    }

    private List<String> lookupMemberIds(boolean z) throws CoordinationException {
        try {
            ArrayList arrayList = new ArrayList();
            for (String str : z ? getZooKeeper().getChildren(getRootPath(), this) : getZooKeeper().getChildren(getRootPath(), false)) {
                String substring = str.substring(str.lastIndexOf("/") + 1);
                if (!substring.equals(COMM_BASE_NAME)) {
                    arrayList.add(substring);
                }
            }
            Collections.sort(arrayList);
            return arrayList;
        } catch (Exception e) {
            throw new CoordinationException(CoordinationException.ExceptionCode.GENERIC_ERROR, e);
        }
    }

    @Override // org.wso2.carbon.coordination.core.sync.Group
    public void clearGroupMessages() throws CoordinationException {
        getGroupCommChannel().clearMessages();
    }

    @Override // org.wso2.carbon.coordination.core.sync.Group
    public void waitForMemberCount(int i) throws CoordinationException {
        synchronized (this.memberArrivalCountLock) {
            while (getMemberIds().size() < i) {
                try {
                    this.memberArrivalCountLock.wait();
                } catch (InterruptedException e) {
                }
            }
        }
    }

    @Override // org.wso2.carbon.coordination.core.sync.impl.ZKSyncPrimitive
    public void onExpired() {
        if (getGroupEventListener() != null) {
            getGroupEventListener().onExpired();
        }
    }

    @Override // org.wso2.carbon.coordination.core.sync.impl.ZKSyncPrimitive
    public void onConnect(String str) {
        init(str);
        if (getGroupEventListener() != null) {
            getGroupEventListener().onConnect();
        }
    }
}
