Commit 73622b32 authored by mohammad.salama's avatar mohammad.salama

Doublicating Problem Solved - Registery and Nodes

parent 08925d8d
......@@ -26,7 +26,7 @@ public class Application implements Watcher {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
int currentServerPort = args.length == 3 ? Integer.parseInt(args[0]) : DEFAULT_PORT;
numberOfInstances = args.length == 3 ? Integer.parseInt(args[1]) : 10;
numberOfInstances = args.length == 3 ? Integer.parseInt(args[1]) : 4;
pathToFile = args.length == 3 ? args[2] : "TransientWorker_jar/Registration&Discovery-AutoHealer.jar";
pathToFile = System.getProperty("user.dir") + RELATIVE_PATH_TO_JARS + pathToFile;
......@@ -39,7 +39,7 @@ public class Application implements Watcher {
OnElectionAction onElectionAction = new OnElectionAction(serviceRegistry, currentServerPort);
LeaderElection leaderElection = new LeaderElection(zooKeeper, onElectionAction);
leaderElection.volunteerForLeadership();
String ss = leaderElection.volunteerForLeadership();
leaderElection.reelectLeader();
application.run();
......
......@@ -8,13 +8,14 @@ import java.util.List;
public class LeaderElection implements Watcher {
private static final String PHYSICAL_ZNODES_PATH = "/physical_nodes";
private static final String ELECTION_NAMESPACE = "/election";
private String currentZnodeName;
private ZooKeeper zooKeeper;
private OnElectionCallback onElectionCallback;
public LeaderElection(ZooKeeper zooKeeper, OnElectionCallback onElectionCallback) {
public LeaderElection(ZooKeeper zooKeeper, OnElectionCallback onElectionCallback)
{
this.zooKeeper = zooKeeper;
this.onElectionCallback = onElectionCallback;
......@@ -22,13 +23,13 @@ public class LeaderElection implements Watcher {
public void volunteerForLeadership() throws InterruptedException, KeeperException, UnknownHostException {
String znodePrefix = PHYSICAL_ZNODES_PATH + "/node_";
String znodePrefix = ELECTION_NAMESPACE + "/node_";
String IP = InetAddress.getLocalHost().getHostAddress();
String username = System.getProperty("user.name")+"@"+IP;
String znodeFullPath = zooKeeper.create(znodePrefix, username.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(znodeFullPath);
this.currentZnodeName = znodeFullPath.replace(PHYSICAL_ZNODES_PATH + "/", "");
this.currentZnodeName = znodeFullPath.replace(ELECTION_NAMESPACE + "/", "");
}
public void reelectLeader() throws InterruptedException, KeeperException {
......@@ -38,7 +39,7 @@ public class LeaderElection implements Watcher {
//this while to guarantee get predecessor even if it deleted just before zookeeper.exist
while (predecessorStat == null)
{
List<String> children = zooKeeper.getChildren(PHYSICAL_ZNODES_PATH, false);
List<String> children = zooKeeper.getChildren(ELECTION_NAMESPACE, false);
Collections.sort(children);
String smallestChild = children.get(0); //the first element
......@@ -51,7 +52,7 @@ public class LeaderElection implements Watcher {
System.out.println("I'm not a leader");
int predecessorIndex = children.indexOf(currentZnodeName) - 1;
predecessorName = children.get(predecessorIndex);
predecessorStat = zooKeeper.exists(PHYSICAL_ZNODES_PATH + "/" + predecessorName, this);
predecessorStat = zooKeeper.exists(ELECTION_NAMESPACE + "/" + predecessorName, this);
}
}
onElectionCallback.onWorker();
......
......@@ -25,7 +25,7 @@ public class NodeSorting
for (String node : Nodes)
{
int x = map.getOrDefault(node , 0);
map.put(node , x+1);
map.put(node , x);
}
List<Pair> temp = new ArrayList<>();
for (Map.Entry<String , Integer> el : map.entrySet())
......@@ -48,7 +48,7 @@ public class NodeSorting
{
ans.add(pair.node);
}
//System.out.println(ans);
return ans;
}
}
......@@ -44,16 +44,13 @@ public class ServiceRegistry implements Watcher
}
public void registerToCluster(String metadata) throws KeeperException, InterruptedException {
System.out.println(metadata);
if (this.currentZnode != null)
{
zooKeeper.setData(PHYSICAL_ZNODES_PATH + "/"+currentZnode , metadata.getBytes() , -1);
if (this.currentZnode != null) {
System.out.println("Already registered to service registry");
return;
}
this.currentZnode = zooKeeper.create(PHYSICAL_ZNODES_PATH + "/node_", metadata.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Registered to service registry");
}
......@@ -109,34 +106,56 @@ public class ServiceRegistry implements Watcher
}
private void launchWorkersIfNecessary() throws KeeperException, InterruptedException, IOException {
List<String> physicalZnodes = zooKeeper.getChildren(PHYSICAL_ZNODES_PATH, false);
List<String> workers = zooKeeper.getChildren(WORKERS_ZNODES_PATH , this);
int neededInstances = numberOfInstances - workers.size();
synchronized (zooKeeper)
{
List<String> physicalZnodes = zooKeeper.getChildren(PHYSICAL_ZNODES_PATH, false);
List<String> workers = zooKeeper.getChildren(WORKERS_ZNODES_PATH, this);
for (String worker : workers) {
Stat stat = zooKeeper.exists(WORKERS_ZNODES_PATH + "/" + worker, false);
if (stat == null) {
workers.remove(worker);
continue;
}
int index= 0;
String node = new String(zooKeeper.getData(WORKERS_ZNODES_PATH + "/" + worker, false, stat));
if (physicalZnodes.contains(node)) {
System.out.println("NODE ALREADY EXIST");
continue;
}
workers.remove(worker);
}
int code = 1;
int neededInstances = numberOfInstances - workers.size();
System.out.println("Needed Instances is : " + neededInstances);
List<String> sortedWorkers = NodeSorting.sort(getOriginalNodes(workers) , physicalZnodes);
int index = 0;
while (neededInstances > 0 && sortedWorkers.size()>0)
{
Stat stat = zooKeeper.exists(PHYSICAL_ZNODES_PATH+"/"+sortedWorkers.get(index) ,false);
if (stat == null)
int code = 1;
List<String> sortedWorkers = NodeSorting.sort(getOriginalNodes(workers), physicalZnodes);
while (neededInstances > 0 && sortedWorkers.size() > 0)
{
code++;
if (code > sortedWorkers.size())
{
break;
Stat stat = zooKeeper.exists(PHYSICAL_ZNODES_PATH + "/" + sortedWorkers.get(index), false);
if (stat == null) {
code++;
if (code > sortedWorkers.size()) {
break;
}
continue;
}
continue;
System.out.println("I am In the While !!" + " Turn for " + sortedWorkers.get(index));
startNewWorker(sortedWorkers.get(index));
neededInstances--;
index = (index + 1) % sortedWorkers.size();
}
startNewWorker(sortedWorkers.get(index));
neededInstances--;
index = (index+1)%sortedWorkers.size();
}
}
}
private List<byte[]> getOriginalNodes(List<String> workers) throws InterruptedException, KeeperException {
......@@ -156,7 +175,7 @@ public class ServiceRegistry implements Watcher
String remoteUser = new String(zooKeeper.getData(PHYSICAL_ZNODES_PATH+"/"+physicalNode , false, null));
String remoteDirectory = "/JavaJars/";
/**String remoteDirectory = "/JavaJars/";
String remoteJarFilePath = remoteDirectory + file.getName();
......@@ -171,13 +190,11 @@ public class ServiceRegistry implements Watcher
scpProcess.waitFor();
if (scpProcess.exitValue() == 0) {
Process sshProcess = Runtime.getRuntime().exec(sshCommand);
}
}*/
String command = "java -Dorg.slf4j.simpleLogger.defaultLogLevel=off -jar " + file.getName() + " " + physicalNode;
System.out.println("Sending job to " + remoteUser + " of node : " + physicalNode);
Runtime.getRuntime().exec(command, null, file.getParentFile());
/**
*String command = "java -Dorg.slf4j.simpleLogger.defaultLogLevel=off -jar " + file.getName() + " " + physicalNode;
* System.out.println("Sendig job to " + ip + " of node : " + physicalNode);
* Runtime.getRuntime().exec(command, null, file.getParentFile());
*/
}
@Override
......
......@@ -3,6 +3,8 @@ import ch.qos.logback.core.FileAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class TestLogging {
private static final Logger logger = LoggerFactory.getLogger(TestLogging.class);
......@@ -22,13 +24,23 @@ public class TestLogging {
logger.error("Error log message");
}
public static void main(String[] args) {
//tryLogging();
//changeLogFile("logs/MyClassLogs.logs");
// someMethod();
String workingDir = System.getProperty("user.dir");
System.out.println("Current working directory: " + workingDir);
/// out put : D:\HIAST\FIY\FS\Distributed Systems\Lab\6\DS-06\Registration&Discovery-AutoHealer
/// out/artifacts/TransientWorker_jar/Registration&Discovery-AutoHealer.jar
public static void main(String[] args) throws InterruptedException, IOException {
/*
String remoteDirectory = "/JavaJars/";
String pathToProgram = System.getProperty("user.dir") + RELATIVE_PATH_TO_JARS + pathToFile;
String remoteJarFilePath = remoteDirectory + file.getName();
String scpCommand = "scp " + pathToProgram + " " + remoteUser + ":" + remoteDirectory;
String sshCommand = "ssh " + remoteUser +
" 'java -Dorg.slf4j.simpleLogger.defaultLogLevel=off -jar " +
remoteJarFilePath + " ' ";
Process scpProcess = Runtime.getRuntime().exec(scpCommand);
scpProcess.waitFor();
if (scpProcess.exitValue() == 0) {
Process sshProcess = Runtime.getRuntime().exec(sshCommand);
}*/
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment