Commit a9a99f8f authored by mohammad.salama's avatar mohammad.salama

Third Comment - Cluster only

parent edef04e6
......@@ -9,7 +9,21 @@
<option name="autoReloadType" value="SELECTIVE" />
</component>
<component name="ChangeListManager">
<list default="true" id="b8bcd35c-99f3-44c8-a866-59f81c3c8bd0" name="Changes" comment="" />
<list default="true" id="b8bcd35c-99f3-44c8-a866-59f81c3c8bd0" name="Changes" comment="">
<change afterPath="$PROJECT_DIR$/src/main/java/ObjectExchangeInCluster/FileWordPair.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/Application.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/Application.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/Coordinator.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/Coordinator.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/LeaderElection.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/LeaderElection.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/OnElectionAction.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/OnElectionAction.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/ServiceRegistry.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/ServiceRegistry.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/TransientWorker.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/TransientWorker.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/TransientWorkerApplication.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/TransientWorkerApplication.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/GRPC/grpcClient.java" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/GRPC/grpcServer.java" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/GRPC/service.proto" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/GRPC/service.proto" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/ObjectExchangeInCluster/Response.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/ObjectExchangeInCluster/Response.java" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
......@@ -43,19 +57,19 @@
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent">{
&quot;keyToString&quot;: {
&quot;RunOnceActivity.OpenProjectViewOnStart&quot;: &quot;true&quot;,
&quot;RunOnceActivity.ShowReadmeOnStart&quot;: &quot;true&quot;,
&quot;SHARE_PROJECT_CONFIGURATION_FILES&quot;: &quot;true&quot;,
&quot;jdk.selected.JAVA_MODULE&quot;: &quot;17 (2)&quot;,
&quot;last_opened_file_path&quot;: &quot;D:/HIAST/FIY/FS/Distributed Systems/Lab/8/apache-maven-3.9.5&quot;,
&quot;project.structure.last.edited&quot;: &quot;Modules&quot;,
&quot;project.structure.proportion&quot;: &quot;0.15&quot;,
&quot;project.structure.side.proportion&quot;: &quot;0.2&quot;,
&quot;settings.editor.selected.configurable&quot;: &quot;reference.settings.project.maven.repository.indices&quot;
<component name="PropertiesComponent"><![CDATA[{
"keyToString": {
"RunOnceActivity.OpenProjectViewOnStart": "true",
"RunOnceActivity.ShowReadmeOnStart": "true",
"SHARE_PROJECT_CONFIGURATION_FILES": "true",
"jdk.selected.JAVA_MODULE": "17 (2)",
"last_opened_file_path": "D:/HIAST/FIY/FS/Parallel Programming/Lab/2/pp-02-executorservice-master",
"project.structure.last.edited": "Modules",
"project.structure.proportion": "0.15",
"project.structure.side.proportion": "0.2",
"settings.editor.selected.configurable": "reference.settings.project.maven.repository.indices"
}
}</component>
}]]></component>
<component name="RecentsManager">
<key name="CopyFile.RECENT_KEYS">
<recent name="D:\HIAST\FIY\FS\Distributed Systems\Lab\8\Distributed-Search\src\main\java\GRPC" />
......
......@@ -17,8 +17,6 @@ public class Application implements Watcher
private static final int DEFAULT_PORT = 54321;
private ZooKeeper zooKeeper;
public static int numberOfInstances;
public static String pathToFile = "";
private final Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
......@@ -26,16 +24,15 @@ public class Application implements Watcher
/*int currentServerPort = args.length == 3 ? Integer.parseInt(args[0]) : DEFAULT_PORT;
numberOfInstances = args.length == 3 ? Integer.parseInt(args[1]) : 4;
String IP = args.length == 3 ? args[2] : "M_Salameh@127.0.0.1";*/
String IP = args.length == 1 ? args[0] : "127.0.0.1";
pathToFile = System.getProperty("user.dir") + "Worker.jar";
String IP = args.length == 1 ? args[0] : "127.0.0.1:55451";
int port = Integer.parseInt(IP.split(":")[1]);
Application application = new Application();
ZooKeeper zooKeeper = application.connectToZookeeper();
ServiceRegistry serviceRegistry = new ServiceRegistry(zooKeeper , numberOfInstances , pathToFile);
ServiceRegistry serviceRegistry = new ServiceRegistry(zooKeeper);
OnElectionAction onElectionAction = new OnElectionAction(serviceRegistry, DEFAULT_PORT);
OnElectionAction onElectionAction = new OnElectionAction(serviceRegistry, port);
LeaderElection leaderElection = new LeaderElection(zooKeeper, onElectionAction);
leaderElection.volunteerForLeadership(IP);
......
package AutoHealerAndClusterSearch;
import ObjectExchangeInCluster.FileWordPair;
import ObjectExchangeInCluster.Request;
import ObjectExchangeInCluster.Response;
import org.apache.zookeeper.KeeperException;
......@@ -21,46 +22,68 @@ public class Coordinator
private static final String PHYSICAL_ZNODES_PATH = "/physical_nodes";
///private static final String COORDINATOR_ZNODE_PATH = "/coordinator_node";
private int REQUEST_RECEIVE_PORT = 12345;
private int COORDINATOR_PORT;
private int CLUSTER_PORT = 54321;
private String FILES_DIRECTORY = "";
private final Logger logger = LoggerFactory.getLogger(Coordinator.class);
public Coordinator(ZooKeeper zooKeeper)
public Coordinator(ZooKeeper zooKeeper , String SOCKET)
{
this.zooKeeper = zooKeeper;
COORDINATOR_PORT = Integer.parseInt(SOCKET.split(":")[1]);
FILES_DIRECTORY = System.getProperty(("user.dir") + "/SearchFiles");
}
public void start() throws IOException, InterruptedException, KeeperException
{
ServerSocket serverSocket = new ServerSocket(REQUEST_RECEIVE_PORT);
System.out.println("Server started on port " + REQUEST_RECEIVE_PORT);
logger.info("Server started on port " + REQUEST_RECEIVE_PORT);
ServerSocket serverSocket = new ServerSocket(COORDINATOR_PORT);
System.out.println("Server started on port " + COORDINATOR_PORT);
logger.info("Server started on port " + COORDINATOR_PORT);
/*
* Scanner scanner = new Scanner(System.in);
String query;
System.out.println("Enter Query To Search : ");
query = scanner.nextLine();
*
* */
while (true)
{
Socket clientSocket = serverSocket.accept();
/*Socket clientSocket = serverSocket.accept();
System.out.println("New client connected: " + clientSocket.getInetAddress());
logger.info("New client connected: " + clientSocket.getInetAddress());
*/
Scanner scanner = new Scanner(System.in);
String query;
System.out.println("Enter Query To Search : ");
query = scanner.nextLine();
Thread clientThread = new Thread(() ->
{
Response responseMap = handleClient(clientSocket);
Response responseMap = handleClient(query);
printResponsr(responseMap);
sendResponsesToClient(clientSocket, responseMap);
//sendResponsesToClient(clientSocket, responseMap);
});
clientThread.start();
}
}
private void printResponsr(Response responseMap)
{
}
private Response sendRequestToNode(Request request, String ipAddress)
{
Response response = null;
try (Socket socket = new Socket(ipAddress, CLUSTER_PORT))
String ip = ipAddress.split(":")[0];
int port = Integer.parseInt(ipAddress.split(":")[1]);
try (Socket socket = new Socket(ip, port))
{
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
......@@ -92,22 +115,15 @@ public class Coordinator
}
private String extractRequest(Socket clientSocket) throws IOException, ClassNotFoundException
{
ObjectInputStream objectInputStream = new ObjectInputStream(clientSocket.getInputStream());
String request = (String) objectInputStream.readObject();
return request;
}
private Response handleClient(Socket clientSocket)
private Response handleClient(String query)
{
try
try
{
String query = extractRequest(clientSocket);
List<Response> responses = spreadQuery(query);
processResultsAndPrepareAnswer(responses);
}
catch (IOException | InterruptedException | ClassNotFoundException | KeeperException e)
catch (IOException | InterruptedException | KeeperException e)
{
/**
* retry the responses ???
......@@ -130,7 +146,19 @@ public class Coordinator
* */
private void processResultsAndPrepareAnswer(List<Response> responses)
{
for (Response response : responses)
{
TreeMap<String , List<FileWordPair>> treeMap = response.getWordFrequencies();
for(Map.Entry<String, List<FileWordPair>> mapEntry : treeMap.entrySet())
{
System.out.println("The word " + mapEntry.getKey() + " is :");
for(FileWordPair fileWordPair : mapEntry.getValue())
{
System.out.println(fileWordPair);
}
}
}
///send via GRPC
return;
}
......
......@@ -22,7 +22,6 @@ public class LeaderElection implements Watcher {
{
this.zooKeeper = zooKeeper;
this.onElectionCallback = onElectionCallback;
}
public void volunteerForLeadership(String IP) throws InterruptedException, KeeperException, UnknownHostException {
......
......@@ -21,16 +21,17 @@ public class OnElectionAction implements OnElectionCallback
public void onElectedToBeLeader(String IP)
{
serviceRegistry.unregisterFromCluster();
serviceRegistry.registerForUpdates();
try
{
serviceRegistry.registerToCoordinator(IP);
//start the server
}
catch (InterruptedException | KeeperException e)
{
logger.error("Could Not Register to be Coordinator");
throw new RuntimeException(e);
}
serviceRegistry.registerForUpdates();
}
@Override
......
......@@ -17,19 +17,17 @@ public class ServiceRegistry implements Watcher
private static final String COORDINATOR_ZNODE_PATH = "/coordinator_node";
private final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);
private String pathToProgram = "";
private final ZooKeeper zooKeeper;
private Coordinator coordinator = null;
private TransientWorker transientWorker = null;
private String currentZnode = null;
private List<String> allServiceAddresses = null;
private int numberOfInstances;
public ServiceRegistry(ZooKeeper zooKeeper , int numberOfInstances , String pathToProgram)
public ServiceRegistry(ZooKeeper zooKeeper)
{
this.zooKeeper = zooKeeper;
this.numberOfInstances = numberOfInstances;
this.pathToProgram = pathToProgram;
createServiceRegistryZnode();
}
......@@ -59,11 +57,13 @@ public class ServiceRegistry implements Watcher
if (this.currentZnode != null)
{
logger.info("Already registered to service registry");
return;
}
this.currentZnode = zooKeeper.create(PHYSICAL_ZNODES_PATH + "/physical_node_", metadata.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
logger.info("Registered to service registry");
else {
this.currentZnode = zooKeeper.create(PHYSICAL_ZNODES_PATH + "/physical_node_", metadata.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
logger.info("Registered to service registry");
}
//transientWorker = new TransientWorker(metadata);
}
public void registerToCoordinator(String metadata) throws InterruptedException, KeeperException
......@@ -71,7 +71,7 @@ public class ServiceRegistry implements Watcher
this.currentZnode = zooKeeper.create(COORDINATOR_ZNODE_PATH + "/coordinator_", metadata.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
logger.info("Registered to be Coordinator and I am Master !");
coordinator = new Coordinator(this.zooKeeper);
coordinator = new Coordinator(this.zooKeeper , metadata);
}
public void registerForUpdates() {
......
package AutoHealerAndClusterSearch;
import ObjectExchangeInCluster.Request;
import ObjectExchangeInCluster.Response;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
......@@ -10,54 +12,91 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.math.BigInteger;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
public class TransientWorker {
public class TransientWorker
{
private static final String ZOOKEEPER_ADDRESS = "192.168.184.10:2181";
private static final int SESSION_TIMEOUT = 3000;
private final Logger logger = LoggerFactory.getLogger(TransientWorker.class);
// Parent Znode where each worker stores an ephemeral child to indicate it is alive
private static final String WORKERS_ZNODES_PATH = "/workers";
private static final float CHANCE_TO_FAIL = 0.001F;
private int CLUSTER_PORT;
private final Random random = new Random();
private ZooKeeper zooKeeper;
private String filesLocation;
private String myName = "";
/// regex is used to know how the information will be concatenated to encode them to onw string
/// like : nodenum + regex+somehting+regex....
private String regex = "^";
public TransientWorker (String SOCKET) throws IOException {
CLUSTER_PORT =Integer.parseInt(SOCKET.split(":")[1]);
filesLocation = System.getProperty("user.dir") + "/SearchFiles";
connectToZookeeper();
}
public void connectToZookeeper() throws IOException {
this.zooKeeper = new ZooKeeper(ZOOKEEPER_ADDRESS, SESSION_TIMEOUT, event -> {
});
}
public void work(String nodeNum, String query, int numberOfFilesToScan, int filesOffset) throws KeeperException, InterruptedException {
filesLocation = System.getProperty("user.dir") + "/SearchFiles";
addChildZnode(nodeNum, numberOfFilesToScan, filesOffset);
logger.info(myName + " is Working and responsible for " + numberOfFilesToScan + " Files");
List<String> myFiles = getMyFiles(numberOfFilesToScan, filesOffset);
double score = getScore(query, myFiles);
/// send them to coordinator
public void start() throws KeeperException, InterruptedException, IOException
{
ServerSocket serverSocket = new ServerSocket(CLUSTER_PORT);
System.out.println("Server started on port " + CLUSTER_PORT);
logger.info("Server started on port " + CLUSTER_PORT);
while (true)
{
Socket clientSocket = serverSocket.accept();
System.out.println("Coordinator connected: " + clientSocket.getInetAddress());
logger.info("Coordinator connected: " + clientSocket.getInetAddress());
Thread clientThread = new Thread(() ->
{
Response responseMap = null;
try
{
responseMap = handleCoordinator(clientSocket);
sendResponsesToClient(clientSocket, responseMap);
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
});
clientThread.start();
}
}
private void addChildZnode(String nodeNumber, int numberOfFilesToScan, int filesOffset) throws KeeperException, InterruptedException {
private void sendResponsesToClient(Socket clientSocket, Response responseMap)
{
String info = nodeNumber + regex + numberOfFilesToScan + regex + filesOffset;
myName = zooKeeper.create(WORKERS_ZNODES_PATH + "/worker_",
info.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
private Response handleCoordinator(Socket clientSocket) throws IOException, ClassNotFoundException
{
Request request = extractRequest(clientSocket);
double score = getScore(request , getMyFiles(request.getNumberOfFilesToScan() , request.getFilesOffset()));
return null;
}
private Request extractRequest(Socket clientSocket) throws IOException, ClassNotFoundException
{
ObjectInputStream objectInputStream = new ObjectInputStream(clientSocket.getInputStream());
Request request = (Request) objectInputStream.readObject();
return request;
}
private List<String> getMyFiles(int numberOfFilesToScan, int filesOffset)
......@@ -81,9 +120,9 @@ public class TransientWorker {
return fileNames.subList(startIndex, endIndex);
}
private double getScore(String query, List<String> myFiles)
private double getScore(Request query, List<String> myFiles)
{
String[] wordsToCount = query.split(".");
String[] wordsToCount = query.getQueryWords();
Map<String , BigInteger> numberOfWordsInFile = new HashMap<>();
Map<String, Integer> wordFrequency = new HashMap<>();
Map<String, Integer> fileCount = new HashMap<>();
......@@ -118,5 +157,25 @@ public class TransientWorker {
return 0.0;
}
}
private void addChildZnode(int numberOfFilesToScan, int filesOffset) throws KeeperException, InterruptedException {
String info = numberOfFilesToScan + regex + filesOffset;
myName = zooKeeper.create(WORKERS_ZNODES_PATH + "/worker_",
info.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
public void work(String nodeNum, String query, int numberOfFilesToScan, int filesOffset) throws KeeperException, InterruptedException
{
filesLocation = System.getProperty("user.dir") + "/SearchFiles";
//addChildZnode(numberOfFilesToScan, filesOffset);
logger.info(myName + " is Working and responsible for " + numberOfFilesToScan + " Files");
List<String> myFiles = getMyFiles(numberOfFilesToScan, filesOffset);
//double score = getScore(query, myFiles);
/// send them to coordinator
}
}
......@@ -18,8 +18,8 @@ public class TransientWorkerApplication
* modifiedQuery + " " + ///arg[1]
* numberOfFilesToScan + " "+ ///arg[2]
* filesOffset+"\""; ///arg[3]
* */
TransientWorker worker = new TransientWorker();
*
TransientWorker worker = new TransientWorker(args[0]);
String nodeNum = args[0];
String query = args[1];
int numberOfFilesToScan = Integer.parseInt(args[2]);
......@@ -33,7 +33,7 @@ public class TransientWorkerApplication
logger.warn("Cannot Connect To ZooKeeper");
throw new RuntimeException(e);
}
try
/* try
{
worker.work(nodeNum , query , numberOfFilesToScan , filesOffset);
}
......@@ -41,6 +41,6 @@ public class TransientWorkerApplication
{
logger.error("Worker Shut Down");
System.exit(1);
}
}*/
}
}
package GRPC;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.List;
import java.util.concurrent.TimeUnit;
/*
public class SearchClient {
private final ManagedChannel channel;
private final SearchServiceGrpc.SearchServiceBlockingStub blockingStub;
public SearchClient(String host, int port) {
channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext()
.build();
blockingStub = SearchServiceGrpc.newBlockingStub(channel);
}
public void search(String query) {
SearchRequest request = SearchRequest.newBuilder()
.setQuery(query)
.build();
SearchResponse response = blockingStub.search(request);
List<String> results = response.getResultsList();
System.out.println("Received search results:");
for (String result : results) {
System.out.println(result);
}
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
public static void main(String[] args) throws InterruptedException
{
SearchClient searchClient = new SearchClient("localhost", 50051);
searchClient.search("example query");
searchClient.shutdown();
}
}*/
\ No newline at end of file
package GRPC;
import io.grpc.Server;
import io.grpc.ServerBuilder;
/*import io.grpc.stub.StreamObserver;
import com.example.grpc.search_service.SearchRequest;
import com.example.grpc.search_service.SearchResponse;
import com.example.grpc.search_service.SearchServiceGrpc;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SearchServer {
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new SearchServiceImpl())
.build()
.start();
System.out.println("Server started on port " + port);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutting down server...");
SearchServer.this.stop();
}));
}
private void stop() {
if (server != null) {
server.shutdown();
}
}
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
SearchServer searchServer = new SearchServer();
searchServer.start();
searchServer.blockUntilShutdown();
}
private static class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase {
@Override
public void search(SearchRequest request, StreamObserver<SearchResponse> responseObserver) {
String query = request.getQuery();
System.out.println("Received search query: " + query);
// Perform search logic based on the query and generate results
List<String> results = new ArrayList<>();
results.add("Result 1");
results.add("Result 2");
results.add("Result 3");
SearchResponse response = SearchResponse.newBuilder()
.addAllResults(results)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
}*/
\ No newline at end of file
syntax = "proto3";
option java_multiple_files = true;
option java_package = "GRPCConnection";
option java_outer_classname = "SearchServiceProto";
package search_service;
service SearchService
{
rpc Search(SearchRequest) returns (SearchResponse) {}
}
message SearchRequest
{
string query = 1;
}
message SearchResponse
{
repeated string results = 1;
}
\ No newline at end of file
package ObjectExchangeInCluster;
public class FileWordPair
{
public String fileName;
public long freq;
@Override
public String toString()
{
return "appeared in : [ "+ fileName+" ] " + freq;
}
}
......@@ -6,10 +6,6 @@ import java.util.List;
import java.util.TreeMap;
public class Response implements Serializable {
private class FileWordPair {
public String fileName;
public long freq;
}
private TreeMap<String, List<FileWordPair>> wordFrequencies;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment