Commit d1b92d05 authored by tammam.alsoleman's avatar tammam.alsoleman

Edit Application to perform the new constructor onElectedToBeLeader

parent 45f333cb
...@@ -9,7 +9,8 @@ import org.apache.zookeeper.KeeperException; ...@@ -9,7 +9,8 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper;
import com.distributed.search.cluster.LeaderHttpRegistry;
import com.distributed.search.model.SearchResponse;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
...@@ -19,7 +20,7 @@ import java.util.Scanner; ...@@ -19,7 +20,7 @@ import java.util.Scanner;
* Main Entry point for the Distributed Search Engine. * Main Entry point for the Distributed Search Engine.
*/ */
public class Application implements Watcher { public class Application implements Watcher {
private static final String ZOOKEEPER_ADDRESS = "192.168.96.198:2181"; // Change if ZK is on another machine private static final String ZOOKEEPER_ADDRESS = "192.168.73.250:2181"; // Change if ZK is on another machine
private static final int SESSION_TIMEOUT = 3000; private static final int SESSION_TIMEOUT = 3000;
private static final int DEFAULT_PORT = 8080; private static final int DEFAULT_PORT = 8080;
private static final String STORAGE_DIR = "storage"; // Folder containing .txt files private static final String STORAGE_DIR = "storage"; // Folder containing .txt files
...@@ -39,14 +40,19 @@ public class Application implements Watcher { ...@@ -39,14 +40,19 @@ public class Application implements Watcher {
ZooKeeper zooKeeper = application.connectToZookeeper(); ZooKeeper zooKeeper = application.connectToZookeeper();
ServiceRegistry serviceRegistry = new ServiceRegistry(zooKeeper); ServiceRegistry serviceRegistry = new ServiceRegistry(zooKeeper);
LeaderHttpRegistry leaderHttpRegistry = new LeaderHttpRegistry(zooKeeper); // Discovery Bridge
SearchClient searchClient = new SearchClient(); SearchClient searchClient = new SearchClient();
// Action to take when election completes // Action to take when election completes
OnElectionAction onElectionAction = new OnElectionAction(serviceRegistry, currentServerPort) { OnElectionAction onElectionAction = new OnElectionAction(
serviceRegistry,
leaderHttpRegistry,
searchClient,
currentServerPort) {
@Override @Override
public void onElectedToBeLeader() { public void onElectedToBeLeader() {
super.onElectedToBeLeader(); super.onElectedToBeLeader();
isLeader = true; // Mark this node as the leader isLeader = true;
} }
}; };
...@@ -68,38 +74,34 @@ public class Application implements Watcher { ...@@ -68,38 +74,34 @@ public class Application implements Watcher {
while (true) { while (true) {
if (isLeader) { if (isLeader) {
System.out.println("\n[Coordinator] Enter search query (or 'exit' to quit):"); // If Leader: Keep providing the Console UI for local testing
System.out.println("\n[Leader Mode] Enter query (Internal API is also listening...):");
if (!scanner.hasNextLine()) break;
String input = scanner.nextLine(); String input = scanner.nextLine();
if (input.equalsIgnoreCase("exit")) break; if (input.equalsIgnoreCase("exit")) break;
if (input.trim().isEmpty()) continue; if (input.trim().isEmpty()) continue;
// 1. Get current active workers from Zookeeper
List<String> workers = serviceRegistry.getAllServiceAddresses(); List<String> workers = serviceRegistry.getAllServiceAddresses();
if (workers.isEmpty()) { if (workers.isEmpty()) {
System.out.println("No workers registered yet. Please wait..."); System.out.println("Wait for workers to join...");
continue; continue;
} }
// 2. Prepare search data
List<String> terms = Arrays.asList(input.toLowerCase().split("\\s+")); List<String> terms = Arrays.asList(input.toLowerCase().split("\\s+"));
List<String> allFiles = FileManager.getSortedDocumentNames(STORAGE_DIR); List<String> allFiles = FileManager.getSortedDocumentNames(STORAGE_DIR);
if (allFiles.isEmpty()) {
System.out.println("No documents found in 'storage' directory.");
continue;
}
// 3. Update gRPC channels and perform distributed search
System.out.println("Searching in " + allFiles.size() + " files across " + workers.size() + " workers...");
searchClient.updateWorkers(workers); searchClient.updateWorkers(workers);
searchClient.performSearch(terms, allFiles); // Perform search and get results back
List<SearchResponse.DocumentResult> results = searchClient.performSearch(terms, allFiles);
// Print top results in console
results.stream().limit(10).forEach(r ->
System.out.println(r.getDocumentName() + " (Score: " + r.getScore() + ")"));
} else { } else {
// If Worker: Just wait and keep the connection alive // If Worker: Just sleep and keep connection alive (HTTP and gRPC servers run in background threads)
synchronized (zooKeeper) { Thread.sleep(10000);
zooKeeper.wait(5000);
}
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment