Commit 750bb9e5 authored by mohammad.salama's avatar mohammad.salama

Fourth Commit : Wroking Fine All but NO SSH and No Logging - Dublicating...

Fourth Commit : Wroking Fine All but NO SSH and No Logging - Dublicating Physical Nodes Solved and Transient Workers are Steady and Stable
parent 73622b32
...@@ -27,7 +27,6 @@ public class Application implements Watcher { ...@@ -27,7 +27,6 @@ public class Application implements Watcher {
int currentServerPort = args.length == 3 ? Integer.parseInt(args[0]) : DEFAULT_PORT; int currentServerPort = args.length == 3 ? Integer.parseInt(args[0]) : DEFAULT_PORT;
numberOfInstances = args.length == 3 ? Integer.parseInt(args[1]) : 4; numberOfInstances = args.length == 3 ? Integer.parseInt(args[1]) : 4;
pathToFile = args.length == 3 ? args[2] : "TransientWorker_jar/Registration&Discovery-AutoHealer.jar"; pathToFile = args.length == 3 ? args[2] : "TransientWorker_jar/Registration&Discovery-AutoHealer.jar";
pathToFile = System.getProperty("user.dir") + RELATIVE_PATH_TO_JARS + pathToFile; pathToFile = System.getProperty("user.dir") + RELATIVE_PATH_TO_JARS + pathToFile;
...@@ -39,7 +38,7 @@ public class Application implements Watcher { ...@@ -39,7 +38,7 @@ public class Application implements Watcher {
OnElectionAction onElectionAction = new OnElectionAction(serviceRegistry, currentServerPort); OnElectionAction onElectionAction = new OnElectionAction(serviceRegistry, currentServerPort);
LeaderElection leaderElection = new LeaderElection(zooKeeper, onElectionAction); LeaderElection leaderElection = new LeaderElection(zooKeeper, onElectionAction);
String ss = leaderElection.volunteerForLeadership(); leaderElection.volunteerForLeadership();
leaderElection.reelectLeader(); leaderElection.reelectLeader();
application.run(); application.run();
......
...@@ -23,7 +23,7 @@ public class LeaderElection implements Watcher { ...@@ -23,7 +23,7 @@ public class LeaderElection implements Watcher {
public void volunteerForLeadership() throws InterruptedException, KeeperException, UnknownHostException { public void volunteerForLeadership() throws InterruptedException, KeeperException, UnknownHostException {
String znodePrefix = ELECTION_NAMESPACE + "/node_"; String znodePrefix = ELECTION_NAMESPACE + "/c_";
String IP = InetAddress.getLocalHost().getHostAddress(); String IP = InetAddress.getLocalHost().getHostAddress();
String username = System.getProperty("user.name")+"@"+IP; String username = System.getProperty("user.name")+"@"+IP;
String znodeFullPath = zooKeeper.create(znodePrefix, username.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); String znodeFullPath = zooKeeper.create(znodePrefix, username.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
...@@ -45,6 +45,7 @@ public class LeaderElection implements Watcher { ...@@ -45,6 +45,7 @@ public class LeaderElection implements Watcher {
String smallestChild = children.get(0); //the first element String smallestChild = children.get(0); //the first element
if (smallestChild.equals(currentZnodeName)) { if (smallestChild.equals(currentZnodeName)) {
System.out.println("I'm a leader"); System.out.println("I'm a leader");
onElectionCallback.onElectedToBeLeader(); onElectionCallback.onElectedToBeLeader();
return; return;
} }
......
...@@ -12,13 +12,22 @@ public class NodeSorting ...@@ -12,13 +12,22 @@ public class NodeSorting
this.node = x; this.node = x;
this.tasksNo = y; this.tasksNo = y;
} }
@Override
public String toString()
{
return "node : " + this.node +" has : " + tasksNo + " workers !!";
}
} }
public static List<String> sort(List<byte[]> workers , List<String> Nodes) public static List<String> sort(List<byte[]> workers , List<String> Nodes)
{ {
Map<String , Integer> map = new HashMap<>(); Map<String , Integer> map = new HashMap<>();
System.out.println("physical nodes are : " + Nodes);
System.out.println("jars running are : " + workers.size());
for (byte[] worker : workers) for (byte[] worker : workers)
{ {
String s = new String(worker); String s = new String(worker);
if (!Nodes.contains(s)) continue;
int x = map.getOrDefault(s , 0); int x = map.getOrDefault(s , 0);
map.put(s , x+1); map.put(s , x+1);
} }
...@@ -43,7 +52,7 @@ public class NodeSorting ...@@ -43,7 +52,7 @@ public class NodeSorting
}; };
List<String> ans = new ArrayList<>(); List<String> ans = new ArrayList<>();
Collections.sort(temp, comparator); Collections.sort(temp, comparator);
System.out.println(temp);
for (Pair pair : temp) for (Pair pair : temp)
{ {
ans.add(pair.node); ans.add(pair.node);
......
...@@ -49,7 +49,7 @@ public class ServiceRegistry implements Watcher ...@@ -49,7 +49,7 @@ public class ServiceRegistry implements Watcher
System.out.println("Already registered to service registry"); System.out.println("Already registered to service registry");
return; return;
} }
this.currentZnode = zooKeeper.create(PHYSICAL_ZNODES_PATH + "/node_", metadata.getBytes(), this.currentZnode = zooKeeper.create(PHYSICAL_ZNODES_PATH + "/physical_node_", metadata.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Registered to service registry"); System.out.println("Registered to service registry");
} }
...@@ -78,14 +78,14 @@ public class ServiceRegistry implements Watcher ...@@ -78,14 +78,14 @@ public class ServiceRegistry implements Watcher
} }
} }
private void masterJob() throws InterruptedException, KeeperException, IOException { private synchronized void masterJob() throws InterruptedException, KeeperException, IOException {
updateAddresses(); updateAddresses();
launchWorkersIfNecessary(); launchWorkersIfNecessary();
} }
private synchronized void updateAddresses() throws KeeperException, InterruptedException private void updateAddresses() throws KeeperException, InterruptedException
{ {
List<String> workerZnodes = zooKeeper.getChildren(PHYSICAL_ZNODES_PATH, this); List<String> workerZnodes = zooKeeper.getChildren(PHYSICAL_ZNODES_PATH, false);
List<String> addresses = new ArrayList<>(workerZnodes.size()); List<String> addresses = new ArrayList<>(workerZnodes.size());
...@@ -105,12 +105,16 @@ public class ServiceRegistry implements Watcher ...@@ -105,12 +105,16 @@ public class ServiceRegistry implements Watcher
System.out.println("The cluster addresses are: " + this.allServiceAddresses); System.out.println("The cluster addresses are: " + this.allServiceAddresses);
} }
private void launchWorkersIfNecessary() throws KeeperException, InterruptedException, IOException { private void launchWorkersIfNecessary() throws KeeperException, InterruptedException, IOException
synchronized (zooKeeper)
{ {
List<String> physicalZnodes = zooKeeper.getChildren(PHYSICAL_ZNODES_PATH, false); List<String> physicalZnodes = zooKeeper.getChildren(PHYSICAL_ZNODES_PATH, this);
List<String> workers = zooKeeper.getChildren(WORKERS_ZNODES_PATH, this); List<String> workers = zooKeeper.getChildren(WORKERS_ZNODES_PATH, this);
for (String worker : workers) {
//System.out.println("Total workers (before editing) = " + workers.size());
for (String worker : workers)
{
Stat stat = zooKeeper.exists(WORKERS_ZNODES_PATH + "/" + worker, false); Stat stat = zooKeeper.exists(WORKERS_ZNODES_PATH + "/" + worker, false);
if (stat == null) { if (stat == null) {
workers.remove(worker); workers.remove(worker);
...@@ -118,45 +122,50 @@ public class ServiceRegistry implements Watcher ...@@ -118,45 +122,50 @@ public class ServiceRegistry implements Watcher
} }
String node = new String(zooKeeper.getData(WORKERS_ZNODES_PATH + "/" + worker, false, stat)); String node = new String(zooKeeper.getData(WORKERS_ZNODES_PATH + "/" + worker, false, stat));
if (physicalZnodes.contains(node)) { if (!physicalZnodes.contains(node))
System.out.println("NODE ALREADY EXIST"); {
//System.out.println("Physical Node is shut down !!");
workers.remove(worker);
}
}
List<String> sortedWorkers = NodeSorting.sort(getOriginalNodes(workers), physicalZnodes);
while (workers.size() > numberOfInstances)
{
Stat stat = zooKeeper.exists(WORKERS_ZNODES_PATH + "/" + workers.get(0), false);
if (stat == null) {
workers.remove(0);
continue; continue;
} }
workers.remove(worker); zooKeeper.delete(WORKERS_ZNODES_PATH + "/" + workers.get(0) , -1);
} }
int neededInstances = numberOfInstances - workers.size(); int neededInstances = numberOfInstances - workers.size();
if (neededInstances <= 0) return;
System.out.println("Needed Instances is : " + neededInstances); System.out.println("Needed Instances is : " + neededInstances);
int index = 0; int index = 0;
int code = 1; int size = sortedWorkers.size();
while (neededInstances>0
&& size>0)
List<String> sortedWorkers = NodeSorting.sort(getOriginalNodes(workers), physicalZnodes);
while (neededInstances > 0 && sortedWorkers.size() > 0)
{ {
Stat stat = zooKeeper.exists(PHYSICAL_ZNODES_PATH + "/" + sortedWorkers.get(index), false); Stat stat = zooKeeper.exists(PHYSICAL_ZNODES_PATH + "/" + sortedWorkers.get(index), false);
if (stat == null) { if (stat == null)
code++; {
if (code > sortedWorkers.size()) { sortedWorkers.remove(index);
break; size--;
}
continue; continue;
} }
System.out.println("I am In the While !!" + " Turn for " + sortedWorkers.get(index));
startNewWorker(sortedWorkers.get(index)); startNewWorker(sortedWorkers.get(index));
neededInstances--; neededInstances--;
index = (index + 1) % size;
index = (index + 1) % sortedWorkers.size();
} }
} }
}
private List<byte[]> getOriginalNodes(List<String> workers) throws InterruptedException, KeeperException { private List<byte[]> getOriginalNodes(List<String> workers) throws InterruptedException, KeeperException {
List<byte[]> ans = new ArrayList<>(); List<byte[]> ans = new ArrayList<>();
...@@ -165,6 +174,8 @@ public class ServiceRegistry implements Watcher ...@@ -165,6 +174,8 @@ public class ServiceRegistry implements Watcher
Stat stat = zooKeeper.exists(WORKERS_ZNODES_PATH+"/"+worker ,false); Stat stat = zooKeeper.exists(WORKERS_ZNODES_PATH+"/"+worker ,false);
if (stat == null) continue; if (stat == null) continue;
ans.add(zooKeeper.getData(WORKERS_ZNODES_PATH+"/"+worker,false,stat )); ans.add(zooKeeper.getData(WORKERS_ZNODES_PATH+"/"+worker,false,stat ));
// String jj = new String(ans.get(ans.size()-1));
// System.out.println("worker : " +worker + " belongs to : " + jj);
} }
return ans; return ans;
} }
...@@ -200,11 +211,20 @@ public class ServiceRegistry implements Watcher ...@@ -200,11 +211,20 @@ public class ServiceRegistry implements Watcher
@Override @Override
public void process(WatchedEvent watchedEvent) public void process(WatchedEvent watchedEvent)
{ {
try { switch (watchedEvent.getType())
{
case NodeChildrenChanged:
{
try
{
masterJob(); masterJob();
} catch (InterruptedException | KeeperException | IOException e) { }
catch (InterruptedException | KeeperException | IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
}
}
} }
...@@ -3,6 +3,7 @@ import ch.qos.logback.core.FileAppender; ...@@ -3,6 +3,7 @@ import ch.qos.logback.core.FileAppender;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException; import java.io.IOException;
public class TestLogging { public class TestLogging {
...@@ -25,9 +26,14 @@ public class TestLogging { ...@@ -25,9 +26,14 @@ public class TestLogging {
} }
public static void main(String[] args) throws InterruptedException, IOException { public static void main(String[] args) throws InterruptedException, IOException {
/*
String remoteDirectory = "/JavaJars/"; String remoteDirectory = "/JavaJars/";
String RELATIVE_PATH_TO_JARS = "/out/artifacts/";
String pathToFile = args.length == 3 ? args[2] : "TransientWorker_jar/Registration&Discovery-AutoHealer.jar";
pathToFile = System.getProperty("user.dir") + RELATIVE_PATH_TO_JARS + pathToFile;
File file = new File(pathToFile);
String remoteUser = "root@192.168.184.10";
String pathToProgram = System.getProperty("user.dir") + RELATIVE_PATH_TO_JARS + pathToFile; String pathToProgram = System.getProperty("user.dir") + RELATIVE_PATH_TO_JARS + pathToFile;
String remoteJarFilePath = remoteDirectory + file.getName(); String remoteJarFilePath = remoteDirectory + file.getName();
...@@ -41,6 +47,6 @@ public class TestLogging { ...@@ -41,6 +47,6 @@ public class TestLogging {
scpProcess.waitFor(); scpProcess.waitFor();
if (scpProcess.exitValue() == 0) { if (scpProcess.exitValue() == 0) {
Process sshProcess = Runtime.getRuntime().exec(sshCommand); Process sshProcess = Runtime.getRuntime().exec(sshCommand);
}*/ }
} }
} }
\ No newline at end of file
...@@ -29,7 +29,11 @@ public class TransientWorker ...@@ -29,7 +29,11 @@ public class TransientWorker
public void work(String nodeNum) throws KeeperException, InterruptedException { public void work(String nodeNum) throws KeeperException, InterruptedException {
addChildZnode(nodeNum); addChildZnode(nodeNum);
while (true) System.out.println(myName + " is Working...");
Thread.sleep(20000);
System.out.println(myName + " encountered Critical error happened");
throw new RuntimeException(myName + " : Oops");
/*while (true)
{ {
System.out.println(myName + " is Working..."); System.out.println(myName + " is Working...");
LockSupport.parkNanos(1000L); LockSupport.parkNanos(1000L);
...@@ -37,7 +41,7 @@ public class TransientWorker ...@@ -37,7 +41,7 @@ public class TransientWorker
System.out.println(myName + " encountered Critical error happened"); System.out.println(myName + " encountered Critical error happened");
throw new RuntimeException(myName + " : Oops"); throw new RuntimeException(myName + " : Oops");
} }
} }*/
} }
private void addChildZnode(String nodeNumber) throws KeeperException, InterruptedException { private void addChildZnode(String nodeNumber) throws KeeperException, InterruptedException {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment