Commit 68553e5c authored by abdullh.alsoleman's avatar abdullh.alsoleman

Leader Election

parent b86192ac
<component name="ArtifactManager">
<artifact type="jar" name="zookeeper:jar">
<output-path>$PROJECT_DIR$/out/artifacts/zookeeper_jar</output-path>
<root id="archive" name="zookeeper.jar">
<element id="module-output" name="zookeeper" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-common/4.1.94.Final/netty-common-4.1.94.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-resolver/4.1.94.Final/netty-resolver-4.1.94.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.61.Final/netty-tcnative-boringssl-static-2.0.61.Final-windows-x86_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/yetus/audience-annotations/0.12.0/audience-annotations-0.12.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.61.Final/netty-tcnative-boringssl-static-2.0.61.Final-osx-aarch_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.61.Final/netty-tcnative-boringssl-static-2.0.61.Final-linux-x86_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.61.Final/netty-tcnative-boringssl-static-2.0.61.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/zookeeper/zookeeper-jute/3.9.1/zookeeper-jute-3.9.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.30/slf4j-api-1.7.30.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/commons-io/commons-io/2.11.0/commons-io-2.11.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-tcnative-classes/2.0.61.Final/netty-tcnative-classes-2.0.61.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/ch/qos/logback/logback-core/1.2.10/logback-core-1.2.10.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-buffer/4.1.94.Final/netty-buffer-4.1.94.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/ch/qos/logback/logback-classic/1.2.10/logback-classic-1.2.10.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport/4.1.94.Final/netty-transport-4.1.94.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/zookeeper/zookeeper/3.9.1/zookeeper-3.9.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec/4.1.94.Final/netty-codec-4.1.94.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-native-unix-common/4.1.94.Final/netty-transport-native-unix-common-4.1.94.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-classes-epoll/4.1.94.Final/netty-transport-classes-epoll-4.1.94.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.61.Final/netty-tcnative-boringssl-static-2.0.61.Final-linux-aarch_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-handler/4.1.94.Final/netty-handler-4.1.94.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.61.Final/netty-tcnative-boringssl-static-2.0.61.Final-osx-x86_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-native-epoll/4.1.94.Final/netty-transport-native-epoll-4.1.94.Final-linux-x86_64.jar" path-in-jar="/" />
</root>
</artifact>
</component>
\ No newline at end of file
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
</list> </list>
</option> </option>
</component> </component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK"> <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="true" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" /> <output url="file://$PROJECT_DIR$/out" />
</component> </component>
</project> </project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="ArtifactsWorkspaceSettings">
<artifacts-to-build>
<artifact name="zookeeper:jar" />
</artifacts-to-build>
</component>
<component name="AutoImportSettings"> <component name="AutoImportSettings">
<option name="autoReloadType" value="SELECTIVE" /> <option name="autoReloadType" value="SELECTIVE" />
</component> </component>
...@@ -34,7 +39,11 @@ ...@@ -34,7 +39,11 @@
"keyToString": { "keyToString": {
"RunOnceActivity.OpenProjectViewOnStart": "true", "RunOnceActivity.OpenProjectViewOnStart": "true",
"RunOnceActivity.ShowReadmeOnStart": "true", "RunOnceActivity.ShowReadmeOnStart": "true",
"SHARE_PROJECT_CONFIGURATION_FILES": "true" "SHARE_PROJECT_CONFIGURATION_FILES": "true",
"last_opened_file_path": "C:/Users/Abdullah/Desktop/Java/zookeeper",
"project.structure.last.edited": "Artifacts",
"project.structure.proportion": "0.0",
"project.structure.side.proportion": "0.0"
} }
}]]></component> }]]></component>
<component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" /> <component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
......
...@@ -2,29 +2,64 @@ package org.example; ...@@ -2,29 +2,64 @@ package org.example;
import org.apache.zookeeper.*; import org.apache.zookeeper.*;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
public class LeaderElection implements Watcher { public class LeaderElection implements Watcher {
private static final String address = "192.168.56.115:2181"; private static final String ADDRESS = "192.168.56.115:2181";
private static final int SESSION_TIMEOUT = 3000; //dead client private static final int SESSION_TIMEOUT = 3000;
private static final String ELECTION_NAMESPACE = "/election";
private String currentZnodeName;
private String leader;
private ZooKeeper zooKeeper = null; private ZooKeeper zooKeeper = null;
private void volunteerForLeadership() throws InterruptedException, KeeperException {
String znodePrefix = ELECTION_NAMESPACE + "/c_";
String path = zooKeeper.create(znodePrefix, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
this.currentZnodeName = path.replace(ELECTION_NAMESPACE + "/", "");
System.out.println("Created znode: " + path);
}
private void electLeader() throws InterruptedException, KeeperException {
List<String> children = zooKeeper.getChildren(ELECTION_NAMESPACE, false);
Collections.sort(children);
leader = children.get(0);
if (currentZnodeName.equals(leader)) {
System.out.println("I'm the leader");
//set data in leader Znode.
setLeaderData(currentZnodeName);
} else {
System.out.println("I'm not the leader, the leader is " + leader);
// Set a watch on the znode with one less sequence number
int currentZnodeIndex = children.indexOf(currentZnodeName);
String previousZnode = ELECTION_NAMESPACE + "/" + children.get(currentZnodeIndex - 1);
zooKeeper.exists(previousZnode, this);
}
}
private void setLeaderData(String znodeName) {
String znodePath = ELECTION_NAMESPACE + "/" + znodeName;
try {
zooKeeper.setData(znodePath, ("The leader is " + znodeName).getBytes(), -1);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException("Error setting leader data", e);
}
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException { public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
LeaderElection leaderElection = new LeaderElection(); LeaderElection leaderElection = new LeaderElection();
leaderElection.connectToZookeeper(); leaderElection.connectToZookeeper();
leaderElection.volunteerForLeadership();
leaderElection.electLeader();
leaderElection.run(); leaderElection.run();
} }
public void connectToZookeeper() throws IOException { public void connectToZookeeper() throws IOException {
this.zooKeeper = new ZooKeeper(ADDRESS, SESSION_TIMEOUT, this);
this.zooKeeper = new ZooKeeper(address, SESSION_TIMEOUT, this);
} }
public void run() throws InterruptedException { public void run() throws InterruptedException {
...@@ -38,20 +73,71 @@ public class LeaderElection implements Watcher { ...@@ -38,20 +73,71 @@ public class LeaderElection implements Watcher {
public void process(WatchedEvent watchedEvent) { public void process(WatchedEvent watchedEvent) {
switch (watchedEvent.getType()) { switch (watchedEvent.getType()) {
case None: case None:
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { handleNoneEvent(watchedEvent);
System.out.println("Successfully connected to ZooKeeper"); break;
} else if (watchedEvent.getState() == Event.KeeperState.Expired) { case NodeDeleted:
System.out.println("ZooKeeper session expired. Shutting down the program."); handleNodeDeletedEvent(watchedEvent);
synchronized (zooKeeper) { break;
zooKeeper.notifyAll(); case NodeDataChanged:
} handleNodeDataChangedEvent(watchedEvent);
} else if (watchedEvent.getState() == Event.KeeperState.Disconnected) {
System.out.println("Disconnected from ZooKeeper. Shutting down the program.");
synchronized (zooKeeper) {
zooKeeper.notifyAll();
}
}
break; break;
} }
} }
private void handleNodeDeletedEvent(WatchedEvent watchedEvent) {
String deletedZnodePath = watchedEvent.getPath();
System.out.println("Znode deleted: " + deletedZnodePath);
// Check if the deleted znode is the leader
if (deletedZnodePath.equals(ELECTION_NAMESPACE + "/" + leader)) {
System.out.println("The deleted znode was the leader. Re-electing leader...");
try {
// Re-elect leader
electLeader();
} catch (InterruptedException | KeeperException e) {
throw new RuntimeException("Error re-electing leader", e);
}
}
}
private void handleNodeDataChangedEvent(WatchedEvent watchedEvent) {
String znodePath = watchedEvent.getPath();
System.out.println("Znode data changed: " + znodePath);
byte[] znodePathBytes = new byte[]{};
try {
//get data of the znode which changed its data
znodePathBytes = zooKeeper.getData(znodePath, false, null);
// Set the data of the current znode to the combined data
zooKeeper.setData(ELECTION_NAMESPACE+"/"+currentZnodeName, znodePathBytes, -1);
String data = new String(znodePathBytes);
System.out.println(data);
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void handleNoneEvent(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
System.out.println("Successfully connected to ZooKeeper");
} else if (watchedEvent.getState() == Event.KeeperState.Expired) {
System.out.println("ZooKeeper session expired. Shutting down the program.");
synchronized (zooKeeper) {
zooKeeper.notifyAll();
}
} else if (watchedEvent.getState() == Event.KeeperState.Disconnected) {
System.out.println("Disconnected from ZooKeeper. Shutting down the program.");
synchronized (zooKeeper) {
zooKeeper.notifyAll();
}
}
}
} }
Manifest-Version: 1.0
Main-Class: org.example.LeaderElection
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment