Commit bbfde650 authored by tammam.alsoleman's avatar tammam.alsoleman

write Scheduler class

parent e43c1d0a
package scheduler;
import common.LoggerUtil;
import common.ZkConfig;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* The Scheduler is the "Master" of the cluster.
* It monitors nodes and workers, ensuring N workers are always running.
*/
public class Scheduler implements Watcher {
private ZooKeeper zooKeeper;
private final int targetWorkerCount;
// To track the current state and detect what changed
private List<String> lastKnownNodes = new ArrayList<>();
private boolean isConnected = false;
public Scheduler(int targetWorkerCount) {
this.targetWorkerCount = targetWorkerCount;
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.out.println("Usage: java -jar Scheduler.jar <targetWorkerCount>");
System.exit(1);
}
int N = Integer.parseInt(args[0]);
Scheduler scheduler = new Scheduler(N);
scheduler.connectToZookeeper();
scheduler.initializeClusterPaths();
// Initial balancing
scheduler.rebalance(0);
// Start watching for changes in nodes and workers
scheduler.watchNodes();
scheduler.watchWorkers();
// Keep the scheduler running
synchronized (scheduler) {
scheduler.wait();
}
}
public void connectToZookeeper() throws IOException {
this.zooKeeper = new ZooKeeper(ZkConfig.ZOOKEEPER_ADDRESS, ZkConfig.SESSION_TIMEOUT, this);
}
private void initializeClusterPaths() throws KeeperException, InterruptedException {
createPathIfNotExist(ZkConfig.NODES_PARENT_PATH);
createPathIfNotExist(ZkConfig.WORKERS_PARENT_PATH);
createPathIfNotExist(ZkConfig.TASKS_PARENT_PATH);
}
private void createPathIfNotExist(String path) throws KeeperException, InterruptedException {
if (zooKeeper.exists(path, false) == null) {
zooKeeper.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
/**
* Watches for changes in physical nodes (/nodes).
*/
public void watchNodes() throws KeeperException, InterruptedException {
List<String> currentNodes = zooKeeper.getChildren(ZkConfig.NODES_PARENT_PATH, this);
detectNodeChanges(currentNodes);
}
/**
* Watches for changes in running workers (/workers).
*/
public void watchWorkers() throws KeeperException, InterruptedException {
zooKeeper.getChildren(ZkConfig.WORKERS_PARENT_PATH, this);
}
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
if (!isConnected) {
LoggerUtil.log("Connected to ZooKeeper");
isConnected = true;
}
}
try {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
if (event.getPath().equals(ZkConfig.NODES_PARENT_PATH)) {
// عند تغيير العقد، نعتبر أن الجميع يحتاج إعادة جدولة (مرر 0)
rebalance(0);
watchNodes();
} else if (event.getPath().equals(ZkConfig.WORKERS_PARENT_PATH)) {
healWorkers();
watchWorkers();
}
}
} catch (Exception e) {
// Log error
}
}
/**
* Rebalance logic: Distributes the target N workers across available nodes
* using the "Least Load" (Equal Distribution) algorithm.
*/
private synchronized void rebalance(int aliveCount) throws KeeperException, InterruptedException {
// 1. Get current nodes and their info for the log
List<String> nodeNames = zooKeeper.getChildren(ZkConfig.NODES_PARENT_PATH, false);
Map<String, String> nodeMap = new HashMap<>();
for (String name : nodeNames) {
byte[] data = zooKeeper.getData(ZkConfig.NODES_PARENT_PATH + "/" + name, false, null);
nodeMap.put(name, new String(data));
}
String reason = (aliveCount > 0) ? "to heal failed workers..." : "on node change...";
LoggerUtil.log("Rebalancing workers " + reason);
// 2. Prepare to track how many workers each node will receive
Map<String, Integer> nodeTaskCounts = new HashMap<>();
for (String name : nodeNames) {
nodeTaskCounts.put(name, 0);
}
// 3. Iterate through all N workers and assign them
for (int i = 1; i <= targetWorkerCount; i++) {
String workerName = String.format("worker-%04d", i);
if (!nodeNames.isEmpty()) {
String assignedNode = nodeNames.get((i - 1) % nodeNames.size());
nodeTaskCounts.put(assignedNode, nodeTaskCounts.get(assignedNode) + 1);
// التعديل الجوهري: اطبع السجل فقط إذا كان العامل يتجاوز عدد العمال الأحياء
// أي أنه العامل "الجديد" الذي يتم تعويضه
if (i > aliveCount) {
LoggerUtil.log("Assigned " + workerName + " to node " + assignedNode);
}
} else {
LoggerUtil.log("No nodes available for " + workerName);
}
}
// 4. Update the actual Znodes in /tasks so Nodes can react
for (String nodeName : nodeNames) {
updateNodeTask(nodeName, nodeTaskCounts.get(nodeName));
}
}
/**
* Self-healing logic for worker failure.
*/
private synchronized void healWorkers() throws KeeperException, InterruptedException {
List<String> aliveWorkers = zooKeeper.getChildren(ZkConfig.WORKERS_PARENT_PATH, false);
int aliveCount = aliveWorkers.size();
int missingWorkers = targetWorkerCount - aliveWorkers.size();
if (missingWorkers > 0) {
LoggerUtil.log("Healing: Detected " + missingWorkers + " missing worker(s). Triggering recovery.");
rebalance(aliveCount);
}
}
private void updateNodeTask(String nodeName, int count) throws KeeperException, InterruptedException {
String taskPath = ZkConfig.TASKS_PARENT_PATH + "/" + nodeName;
String data = String.valueOf(count);
if (zooKeeper.exists(taskPath, false) != null) {
zooKeeper.setData(taskPath, data.getBytes(), -1);
} else {
zooKeeper.create(taskPath, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
LoggerUtil.log("Assigned Target: " + count + " workers to Node [" + nodeName + "].");
}
private void detectNodeChanges(List<String> currentNodes) {
for (String node : currentNodes) {
if (!lastKnownNodes.contains(node)) {
LoggerUtil.log("Node Joined: [" + node + "]");
}
}
for (String node : lastKnownNodes) {
if (!currentNodes.contains(node)) {
LoggerUtil.log("Node Left/Failed: [" + node + "]. Its workers will be rescheduled.");
}
}
lastKnownNodes = new ArrayList<>(currentNodes);
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment