Commit 45f333cb authored by tammam.alsoleman's avatar tammam.alsoleman

Edit onElectedToBeLeader

parent c6b4dc19
package com.distributed.search.cluster; package com.distributed.search.cluster;
import com.distributed.search.grpc.SearchClient;
import com.distributed.search.grpc.SearchServiceImpl; import com.distributed.search.grpc.SearchServiceImpl;
import com.distributed.search.web.CoordinatorWebServer; // New Import
import io.grpc.Server; import io.grpc.Server;
import io.grpc.ServerBuilder; import io.grpc.ServerBuilder;
import org.apache.zookeeper.KeeperException;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
/** /**
* Defines the actions taken when a node's role changes in the cluster. * Defines the actions taken when a node's role changes in the cluster.
...@@ -12,12 +17,20 @@ import java.io.IOException; ...@@ -12,12 +17,20 @@ import java.io.IOException;
public class OnElectionAction implements OnElectionCallback { public class OnElectionAction implements OnElectionCallback {
private final ServiceRegistry serviceRegistry; private final ServiceRegistry serviceRegistry;
private final LeaderHttpRegistry leaderHttpRegistry; // For discovery
private final SearchClient searchClient; // To perform search
private final int port; private final int port;
private Server grpcServer; // The gRPC server instance for Workers private Server grpcServer; // The gRPC server instance for Workers
private CoordinatorWebServer leaderInternalWebServer; // The Internal API
private final String sharedDirectoryPath = "storage"; // The central storage folder private final String sharedDirectoryPath = "storage"; // The central storage folder
public OnElectionAction(ServiceRegistry serviceRegistry, int port) { public OnElectionAction(ServiceRegistry serviceRegistry,
LeaderHttpRegistry leaderHttpRegistry,
SearchClient searchClient,
int port) {
this.serviceRegistry = serviceRegistry; this.serviceRegistry = serviceRegistry;
this.leaderHttpRegistry = leaderHttpRegistry;
this.searchClient = searchClient;
this.port = port; this.port = port;
} }
...@@ -26,16 +39,25 @@ public class OnElectionAction implements OnElectionCallback { ...@@ -26,16 +39,25 @@ public class OnElectionAction implements OnElectionCallback {
*/ */
@Override @Override
public void onElectedToBeLeader() { public void onElectedToBeLeader() {
// If it was a worker, unregister itself from the worker list
serviceRegistry.unregisterFromCluster(); serviceRegistry.unregisterFromCluster();
// Start watching for other active workers in the cluster
serviceRegistry.registerForUpdates(); serviceRegistry.registerForUpdates();
System.out.println("Node elected as Leader. Monitoring the cluster..."); // 1. Start the Internal HTTP API (Running on port + 1000)
int internalApiPort = port + 1000;
leaderInternalWebServer = new CoordinatorWebServer(internalApiPort, searchClient, sharedDirectoryPath);
leaderInternalWebServer.startServer();
// 2. Register the Internal API address in Zookeeper for the Frontend to see
try {
String leaderApiAddress = String.format("http://localhost:%d/search", internalApiPort);
leaderHttpRegistry.registerLeader(leaderApiAddress);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
System.out.println("Node promoted to Leader. Internal API is live.");
// Note: The Coordinator doesn't need to run a gRPC server for searching. // Stop gRPC server if it was acting as a worker
// It acts as a client that sends requests to workers.
if (grpcServer != null) { if (grpcServer != null) {
grpcServer.shutdown(); grpcServer.shutdown();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment