Commit c91795eb authored by tammam.alsoleman's avatar tammam.alsoleman

write Node agent class

parent baa6879d
package node;
import common.LoggerUtil;
import common.ZkConfig;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Node represents a physical machine in the cluster.
* It registers itself in ZooKeeper and waits for task assignments (number of workers to run).
*/
public class Node implements Watcher {
private final String nodeId;
private final String pathToWorkerJar;
private ZooKeeper zooKeeper;
private final List<Process> localWorkers = new ArrayList<>();
public Node(String nodeId, String pathToWorkerJar) {
this.nodeId = nodeId;
this.pathToWorkerJar = pathToWorkerJar;
}
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.out.println("Usage: java -jar Node.jar <nodeId> <pathToWorkerJar>");
System.exit(1);
}
Node node = new Node(args[0], args[1]);
node.connectToZookeeper();
node.registerNode();
node.watchTaskPath();
// Keep the node running
synchronized (node) {
node.wait();
}
}
public void connectToZookeeper() throws IOException {
this.zooKeeper = new ZooKeeper(ZkConfig.ZOOKEEPER_ADDRESS, ZkConfig.SESSION_TIMEOUT, this);
}
/**
* Register this node in the cluster by creating an ephemeral Znode.
*/
private void registerNode() throws KeeperException, InterruptedException {
String nodePath = ZkConfig.NODES_PARENT_PATH + "/" + nodeId;
// Assuming local simulation, we use 127.0.0.1 or actual IP
String ipAddress = "127.0.0.1";
if (zooKeeper.exists(ZkConfig.NODES_PARENT_PATH, false) == null) {
zooKeeper.create(ZkConfig.NODES_PARENT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
zooKeeper.create(nodePath, ipAddress.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
/**
* Watch the task path specifically for this node (e.g., /tasks/node_8081).
*/
private void watchTaskPath() throws KeeperException, InterruptedException, IOException {
String taskPath = ZkConfig.TASKS_PARENT_PATH + "/" + nodeId;
// Ensure the task path exists
if (zooKeeper.exists(ZkConfig.TASKS_PARENT_PATH, false) == null) {
zooKeeper.create(ZkConfig.TASKS_PARENT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
if (zooKeeper.exists(taskPath, false) == null) {
// Initially, 0 workers assigned
zooKeeper.create(taskPath, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
// Set the watcher
byte[] data = zooKeeper.getData(taskPath, this, new Stat());
syncWorkersWithTarget(new String(data));
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
// Re-fetch data and set a new watch
byte[] data = zooKeeper.getData(event.getPath(), this, new Stat());
String targetCountStr = new String(data);
LoggerUtil.log("Node [" + nodeId + "] received updated target: " + targetCountStr + " workers.");
syncWorkersWithTarget(targetCountStr);
} catch (Exception e) {
LoggerUtil.log("Error handling task update on node [" + nodeId + "]: " + e.getMessage());
}
}
}
/**
* Adjusts the number of running worker processes to match the target count.
*/
private synchronized void syncWorkersWithTarget(String targetCountStr) throws IOException {
localWorkers.removeIf(process -> !process.isAlive());
int targetCount = Integer.parseInt(targetCountStr);
int currentCount = localWorkers.size();
if (targetCount > currentCount) {
int toStart = targetCount - currentCount;
for (int i = 0; i < toStart; i++) {
LoggerUtil.log("Node [" + nodeId + "] is restarting a failed worker to reach target [" + targetCount + "]");
startWorker();
}
} else if (targetCount < currentCount) {
int toStop = currentCount - targetCount;
for (int i = 0; i < toStop; i++) {
Process p = localWorkers.remove(0);
p.destroy();
}
}
}
/**
* Launches a new TransientWorker process.
*/
private void startWorker() throws IOException {
File jarFile = new File(pathToWorkerJar);
ProcessBuilder pb = new ProcessBuilder("java", "-jar", jarFile.getName(), nodeId);
pb.directory(jarFile.getParentFile());
// Redirect worker output to inherited to see it in the Node's console
pb.inheritIO();
Process process = pb.start();
localWorkers.add(process);
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment