Commit 59db520e authored by drnull03's avatar drnull03

Clearning part2 and finalizing

parent 35837c95
......@@ -26,7 +26,7 @@
<configuration>
<transformers>
<transformer>
<mainClass>Application</mainClass>
<mainClass>AutohealerApplication</mainClass>
</transformer>
</transformers>
</configuration>
......
......@@ -73,7 +73,7 @@
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>Application</mainClass>
<mainClass>AutohealerApplication</mainClass>
</transformer>
</transformers>
</configuration>
......
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class Application {
private static final Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
if (args.length < 2) {
logger.error("Usage: java -jar autohealer.jar <number_of_workers> <path_to_worker_jar>");
System.exit(1);
}
int numWorkers = Integer.parseInt(args[0]);
String workerPath = args[1];
Autohealer autohealer = new Autohealer(numWorkers, workerPath);
autohealer.connectToZookeeper();
autohealer.startWatchingWorkers();
autohealer.run(); // blocks
}
}
......@@ -3,117 +3,108 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.*;
public class Autohealer implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(Autohealer.class);
// Update to include all ensemble nodes
private static final String ZOOKEEPER_ADDRESS = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
private static final int SESSION_TIMEOUT = 3000;
private static final String WORKERS_PATH = "/workers";
private static final String ZK = "127.0.0.1:2181";
private static final int TIMEOUT = 3000;
private final String pathToWorkerJar;
private final int numberOfWorkers;
private ZooKeeper zooKeeper;
private static final String NODES = "/nodes";
private static final String WORKERS = "/workers";
// Simulate multiple physical nodes
private final List<String> physicalNodes = Arrays.asList("node1", "node2", "node3");
private final int desiredWorkers;
private final String workerJar;
private ZooKeeper zk;
public Autohealer(int numberOfWorkers, String pathToWorkerJar) {
this.numberOfWorkers = numberOfWorkers;
this.pathToWorkerJar = pathToWorkerJar;
public Autohealer(int desiredWorkers, String workerJar) {
this.desiredWorkers = desiredWorkers;
this.workerJar = workerJar;
}
public void connectToZookeeper() throws IOException {
this.zooKeeper = new ZooKeeper(ZOOKEEPER_ADDRESS, SESSION_TIMEOUT, this);
logger.info("Connecting to ZooKeeper ensemble: {}", ZOOKEEPER_ADDRESS);
public void connect() throws Exception {
zk = new ZooKeeper(ZK, TIMEOUT, this);
}
public void startWatchingWorkers() throws KeeperException, InterruptedException {
// Ensure parent znode exists
if (zooKeeper.exists(WORKERS_PATH, false) == null) {
zooKeeper.create(WORKERS_PATH, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
logger.info("Created parent znode: {}", WORKERS_PATH);
}
public void bootstrap() throws Exception {
ensure(NODES);
ensure(WORKERS);
checkAndLaunchWorkers();
zooKeeper.getChildren(WORKERS_PATH, this);
rebalance();
}
public void run() throws InterruptedException {
synchronized (zooKeeper) {
zooKeeper.wait(); // block main thread
private void ensure(String path) throws Exception {
if (zk.exists(path, false) == null) {
zk.create(path, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
public void close() throws InterruptedException {
zooKeeper.close();
logger.info("ZooKeeper connection closed");
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.None) {
if (event.getState() == Event.KeeperState.SyncConnected) {
logger.info("Successfully connected to ZooKeeper");
} else {
synchronized (zooKeeper) {
logger.warn("Disconnected from ZooKeeper");
zooKeeper.notifyAll();
}
public void run() throws InterruptedException {
synchronized (zk) {
zk.wait();
}
return;
}
if (event.getType() == Event.EventType.NodeChildrenChanged && event.getPath().equals(WORKERS_PATH)) {
@Override
public void process(WatchedEvent e) {
try {
logger.info("Workers changed, checking cluster health...");
checkAndLaunchWorkers();
} catch (Exception e) {
logger.error("Error checking/launching workers", e);
rebalance();
} catch (Exception ex) {
logger.error("Rebalance failed", ex);
}
}
// Re-set the watch
try {
zooKeeper.getChildren(WORKERS_PATH, this);
} catch (Exception e) {
logger.error("Failed to reset watch", e);
}
private void rebalance() throws Exception {
List<String> nodes = zk.getChildren(NODES, this);
List<String> workers = zk.getChildren(WORKERS, this);
if (nodes.isEmpty()) return;
Map<String, List<String>> byNode = new HashMap<>();
for (String n : nodes) byNode.put(n, new ArrayList<>());
for (String w : workers) {
byte[] data = zk.getData(WORKERS + "/" + w, false, null);
String node = new String(data);
byNode.computeIfAbsent(node, k -> new ArrayList<>()).add(w);
}
private void checkAndLaunchWorkers() throws KeeperException, InterruptedException {
List<String> children = zooKeeper.getChildren(WORKERS_PATH, false);
int currentWorkers = children.size();
int toLaunch = numberOfWorkers - currentWorkers;
// worker failure → restart on same node
while (workers.size() < desiredWorkers) {
String node = leastLoaded(byNode);
startWorker(node);
workers.add("new");
}
if (toLaunch > 0) {
logger.info("Need to launch {} new worker(s)", toLaunch);
for (int i = 0; i < toLaunch; i++) {
try {
String node = selectNodeForWorker(children);
startNewWorker(node);
} catch (IOException e) {
logger.error("Failed to start new worker", e);
// node failure → redistribute
for (String deadNode : new HashSet<>(byNode.keySet())) {
if (!nodes.contains(deadNode)) {
for (String w : byNode.get(deadNode)) {
String target = leastLoaded(byNode);
startWorker(target);
}
byNode.remove(deadNode);
}
} else {
logger.debug("All workers are running, no action needed");
}
}
// Simple round-robin assignment of worker to a physical node
private String selectNodeForWorker(List<String> currentWorkers) {
int idx = currentWorkers.size() % physicalNodes.size();
return physicalNodes.get(idx);
private String leastLoaded(Map<String, List<String>> map) {
return map.entrySet()
.stream()
.min(Comparator.comparingInt(e -> e.getValue().size()))
.get().getKey();
}
private void startNewWorker(String node) throws IOException {
File file = new File(pathToWorkerJar);
String command = String.format("ssh %s java -jar %s", node, file.getAbsolutePath());
logger.info("Launching worker on {}: {}", node, command);
Runtime.getRuntime().exec(command, null, file.getParentFile());
private void startWorker(String node) throws Exception {
File jar = new File(workerJar);
ProcessBuilder pb = new ProcessBuilder(
"java", "-jar", jar.getAbsolutePath()
);
pb.environment().put("NODE_ID", node);
pb.start();
logger.info("Started worker on {}", node);
}
}
public class AutohealerApplication {
public static void main(String[] args) throws Exception {
int desiredWorkers = Integer.parseInt(args[0]);
String workerJar = args[1];
Autohealer healer = new Autohealer(desiredWorkers, workerJar);
healer.connect();
healer.bootstrap();
healer.run();
}
}
<configuration>
<property name="NODE" value="${node:-leader}"/>
<!-- Console logger: minimal -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/${NODE}.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/${NODE}.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder>
<pattern>%d{HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
<pattern>%d %-5level [%thread] %logger - %msg%n</pattern>
</encoder>
</appender>
<!-- File logger -->
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>worker.log</file>
<append>true</append>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- Root logger -->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
</root>
</configuration>
This source diff could not be displayed because it is too large. You can view the blob instead.
/opt/zookeeper/bin/zkServer.sh start /opt/zookeeper/conf/zoo1.cfg
/opt/zookeeper/bin/zkServer.sh start /opt/zookeeper/conf/zoo2.cfg
/opt/zookeeper/bin/zkServer.sh start /opt/zookeeper/conf/zoo3.cfg
# launch one physical node
/opt/zookeeper/bin/zkServer.sh start ./standalone.cfg
#!/bin/bash
/opt/zookeeper/bin/zkServer.sh create /nodes ""
/opt/zookeeper/bin/zkCli.sh create /nodes/nodeA ""
/opt/zookeeper/bin/zkCli.sh create /nodes/nodeB ""
/opt/zookeeper/bin/zkCli.sh create /nodes/nodeC ""
tickTime=2000
dataDir=/data/zookeeper1
clientPort=2181
initLimit=5
syncLimit=2
......@@ -2,8 +2,6 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class Application {
private static final Logger logger = LoggerFactory.getLogger(Application.class);
......@@ -14,11 +12,8 @@ public class Application {
Worker worker = new Worker();
worker.connectToZookeeper();
worker.work();
} catch (IOException | KeeperException | InterruptedException e) {
logger.error("Worker failed with exception", e);
System.exit(1);
} catch (RuntimeException e) {
logger.error("Critical failure, shutting down worker", e);
} catch (Exception e) {
logger.error("Worker crashed", e);
System.exit(1);
}
}
......
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -12,43 +9,36 @@ import java.util.concurrent.locks.LockSupport;
public class Worker {
private static final Logger logger = LoggerFactory.getLogger(Worker.class);
// Update this if you run a cluster
private static final String ZOOKEEPER_ADDRESS = "127.0.0.1:2181";
private static final int SESSION_TIMEOUT = 3000;
private static final String ZK = "127.0.0.1:2181";
private static final int TIMEOUT = 3000;
private static final String WORKERS = "/workers";
private static final String AUTOHEALER_ZNODES_PATH = "/workers";
private static final float CHANCE_TO_FAIL = 0.1F;
private static final float FAIL_RATE = 0.1f;
private ZooKeeper zk;
private final Random random = new Random();
private ZooKeeper zooKeeper;
public void connectToZookeeper() throws IOException {
this.zooKeeper = new ZooKeeper(ZOOKEEPER_ADDRESS, SESSION_TIMEOUT, event -> {});
logger.info("Connected to ZooKeeper at {}", ZOOKEEPER_ADDRESS);
zk = new ZooKeeper(ZK, TIMEOUT, e -> {});
}
public void work() throws KeeperException, InterruptedException {
addChildZnode();
logger.info("Worker node created, starting work loop...");
public void work() throws Exception {
String nodeId = System.getenv("NODE_ID");
while (true) {
// Minimal console output, detailed logs go to file
logger.debug("Working...");
zk.create(
WORKERS + "/worker-",
nodeId.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL
);
LockSupport.parkNanos(100_000_000); // ~0.1 second
logger.info("Worker running on node {}", nodeId);
if (random.nextFloat() < CHANCE_TO_FAIL) {
logger.error("Critical error happened, exiting...");
throw new RuntimeException("Worker simulated failure");
}
while (true) {
LockSupport.parkNanos(100_000_000);
if (random.nextFloat() < FAIL_RATE) {
throw new RuntimeException("Simulated failure");
}
}
private void addChildZnode() throws KeeperException, InterruptedException {
zooKeeper.create(AUTOHEALER_ZNODES_PATH + "/worker_",
new byte[]{},
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
logger.info("Registered ephemeral znode under {}", AUTOHEALER_ZNODES_PATH);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment