You need to sign in or sign up before continuing.
Commit ac98241b authored by mohammad.salama's avatar mohammad.salama

GRPC and Distributed Search and Auto Recovery working good

parent 133d4005
...@@ -10,21 +10,26 @@ ...@@ -10,21 +10,26 @@
</component> </component>
<component name="ChangeListManager"> <component name="ChangeListManager">
<list default="true" id="b8bcd35c-99f3-44c8-a866-59f81c3c8bd0" name="Changes" comment=""> <list default="true" id="b8bcd35c-99f3-44c8-a866-59f81c3c8bd0" name="Changes" comment="">
<change afterPath="$PROJECT_DIR$/src/main/java/GRPCConnection/GRPCClient/GRPCClient.java" afterDir="false" /> <change afterPath="$PROJECT_DIR$/src/main/java/GRPCConnection/GRPCServiceStart.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/main/java/WebSide/WebClient.java" afterDir="false" /> <change afterPath="$PROJECT_DIR$/src/main/resources/application.properties" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/main/java/WebSide/WebServer.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/logs/app.log" beforeDir="false" afterPath="$PROJECT_DIR$/logs/app.log" afterDir="false" /> <change beforePath="$PROJECT_DIR$/logs/app.log" beforeDir="false" afterPath="$PROJECT_DIR$/logs/app.log" afterDir="false" />
<change beforePath="$PROJECT_DIR$/out/artifacts/Distributed_Search_jar/Distributed-Search.jar" beforeDir="false" /> <change beforePath="$PROJECT_DIR$/src/main/java/GRPCConnection/GRPCClient/GRPCClient.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/GRPCConnection/GRPCClient/GRPCClient.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/GRPCConnection/GRPCServer/GRPCServer.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/GRPCConnection/GRPCServer/GRPCServer.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/GRPCConnection/Main.java" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/WebSide/WebClient.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/WebSide/WebClient.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/WebSide/WebServer.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/WebSide/WebServer.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/Application.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/Application.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/Coordinator.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/Coordinator.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/ServiceRegistry.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/ServiceRegistry.java" afterDir="false" /> <change beforePath="$PROJECT_DIR$/src/main/java/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/ServiceRegistry.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/ServiceRegistry.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/org/AutoHealerAndClusterSearch/GRPCConnection/GRPCClient/GRPCClient.java" beforeDir="false" /> <change beforePath="$PROJECT_DIR$/target/classes/GRPCConnection/GRPCClient/GRPCClient.class" beforeDir="false" afterPath="$PROJECT_DIR$/target/classes/GRPCConnection/GRPCClient/GRPCClient.class" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/org/AutoHealerAndClusterSearch/GRPCConnection/GRPCServer/GRPCServer.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/GRPCConnection/GRPCServer/GRPCServer.java" afterDir="false" /> <change beforePath="$PROJECT_DIR$/target/classes/GRPCConnection/GRPCServer/GRPCServer.class" beforeDir="false" afterPath="$PROJECT_DIR$/target/classes/GRPCConnection/GRPCServer/GRPCServer.class" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/org/AutoHealerAndClusterSearch/GRPCConnection/Main.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/GRPCConnection/Main.java" afterDir="false" /> <change beforePath="$PROJECT_DIR$/target/classes/GRPCConnection/Main.class" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/Application.class" beforeDir="false" afterPath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/Application.class" afterDir="false" />
<change beforePath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/Coordinator$1.class" beforeDir="false" afterPath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/Coordinator$1.class" afterDir="false" /> <change beforePath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/Coordinator$1.class" beforeDir="false" afterPath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/Coordinator$1.class" afterDir="false" />
<change beforePath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/Coordinator.class" beforeDir="false" afterPath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/Coordinator.class" afterDir="false" /> <change beforePath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/Coordinator.class" beforeDir="false" afterPath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/Coordinator.class" afterDir="false" />
<change beforePath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/ServiceRegistry$1.class" beforeDir="false" afterPath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/ServiceRegistry$1.class" afterDir="false" /> <change beforePath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/ServiceRegistry$1.class" beforeDir="false" afterPath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/ServiceRegistry$1.class" afterDir="false" />
<change beforePath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/ServiceRegistry.class" beforeDir="false" afterPath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/ServiceRegistry.class" afterDir="false" /> <change beforePath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/ServiceRegistry.class" beforeDir="false" afterPath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/ServiceRegistry.class" afterDir="false" />
<change beforePath="$PROJECT_DIR$/target/classes/org/AutoHealerAndClusterSearch/GRPCConnection/Main.class" beforeDir="false" />
</list> </list>
<option name="SHOW_DIALOG" value="false" /> <option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" /> <option name="HIGHLIGHT_CONFLICTS" value="true" />
...@@ -66,7 +71,7 @@ ...@@ -66,7 +71,7 @@
&quot;SHARE_PROJECT_CONFIGURATION_FILES&quot;: &quot;true&quot;, &quot;SHARE_PROJECT_CONFIGURATION_FILES&quot;: &quot;true&quot;,
&quot;ToolWindowRun.ShowToolbar&quot;: &quot;false&quot;, &quot;ToolWindowRun.ShowToolbar&quot;: &quot;false&quot;,
&quot;jdk.selected.JAVA_MODULE&quot;: &quot;17 (2)&quot;, &quot;jdk.selected.JAVA_MODULE&quot;: &quot;17 (2)&quot;,
&quot;last_opened_file_path&quot;: &quot;D:/HIAST/FIY/FS/Distributed Systems/Lab/8/Distributed-Search/src/main/java&quot;, &quot;last_opened_file_path&quot;: &quot;D:/HIAST/FIY/FS/Distributed Systems/Lab/4/HomeWork&quot;,
&quot;project.structure.last.edited&quot;: &quot;Artifacts&quot;, &quot;project.structure.last.edited&quot;: &quot;Artifacts&quot;,
&quot;project.structure.proportion&quot;: &quot;0.15&quot;, &quot;project.structure.proportion&quot;: &quot;0.15&quot;,
&quot;project.structure.side.proportion&quot;: &quot;0.2&quot;, &quot;project.structure.side.proportion&quot;: &quot;0.2&quot;,
...@@ -95,20 +100,7 @@ ...@@ -95,20 +100,7 @@
<command value="mvn clean install" /> <command value="mvn clean install" />
</option> </option>
</component> </component>
<component name="RunManager" selected="Application.Node1"> <component name="RunManager">
<configuration name="Main" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
<option name="MAIN_CLASS_NAME" value="org.AutoHealerAndClusterSearch.GRPCConnection.Main" />
<module name="Distributed-Search" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="org.AutoHealerAndClusterSearch.GRPCConnection.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
<configuration name="Node1" type="Application" factoryName="Application"> <configuration name="Node1" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application" /> <option name="MAIN_CLASS_NAME" value="org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application" />
<module name="Distributed-Search" /> <module name="Distributed-Search" />
...@@ -130,13 +122,6 @@ ...@@ -130,13 +122,6 @@
<option name="Make" enabled="true" /> <option name="Make" enabled="true" />
</method> </method>
</configuration> </configuration>
<configuration name="Node4" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application" />
<module name="Distributed-Search" />
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
<configuration name="Distributed-Search.jar" type="JarApplication" temporary="true"> <configuration name="Distributed-Search.jar" type="JarApplication" temporary="true">
<option name="JAR_PATH" value="$PROJECT_DIR$/out/artifacts/Distributed_Search_jar/Distributed-Search.jar" /> <option name="JAR_PATH" value="$PROJECT_DIR$/out/artifacts/Distributed_Search_jar/Distributed-Search.jar" />
<method v="2" /> <method v="2" />
...@@ -145,14 +130,11 @@ ...@@ -145,14 +130,11 @@
<item itemvalue="Application.Node1" /> <item itemvalue="Application.Node1" />
<item itemvalue="Application.Node2" /> <item itemvalue="Application.Node2" />
<item itemvalue="Application.Node3" /> <item itemvalue="Application.Node3" />
<item itemvalue="Application.Node4" />
<item itemvalue="Application.Main" />
<item itemvalue="JAR Application.Distributed-Search.jar" /> <item itemvalue="JAR Application.Distributed-Search.jar" />
</list> </list>
<recent_temporary> <recent_temporary>
<list> <list>
<item itemvalue="JAR Application.Distributed-Search.jar" /> <item itemvalue="JAR Application.Distributed-Search.jar" />
<item itemvalue="Application.Main" />
</list> </list>
</recent_temporary> </recent_temporary>
</component> </component>
...@@ -181,4 +163,20 @@ ...@@ -181,4 +163,20 @@
</map> </map>
</option> </option>
</component> </component>
<component name="XDebuggerManager">
<breakpoint-manager>
<breakpoints>
<line-breakpoint enabled="true" type="java-line">
<url>file://$PROJECT_DIR$/src/main/java/GRPCConnection/GRPCServer/GRPCServer.java</url>
<line>41</line>
<option name="timeStamp" value="1" />
</line-breakpoint>
<line-breakpoint enabled="true" type="java-line">
<url>file://$PROJECT_DIR$/src/main/java/org/AutoHealerAndClusterSearch/AutoHealerAndClusterSearch/ServiceRegistry.java</url>
<line>122</line>
<option name="timeStamp" value="10" />
</line-breakpoint>
</breakpoints>
</breakpoint-manager>
</component>
</project> </project>
\ No newline at end of file
This diff is collapsed.
package GRPCConnection.GRPCClient; package GRPCConnection.GRPCClient;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import org.AutoHealerAndClusterSearch.SearchReply; import org.AutoHealerAndClusterSearch.SearchReply;
...@@ -21,7 +22,7 @@ public class GRPCClient ...@@ -21,7 +22,7 @@ public class GRPCClient
forAddress("localhost" , 6565). forAddress("localhost" , 6565).
usePlaintext().build(); usePlaintext().build();
SearchServiceGrpc.SearchServiceBlockingStub stub = SearchServiceGrpc.newBlockingStub(channel); SearchServiceGrpc.SearchServiceBlockingStub stub = SearchServiceGrpc.newBlockingStub(channel);
SearchRequest request = SearchRequest.newBuilder().setQuery("Hello There !!").build(); SearchRequest request = SearchRequest.newBuilder().setQuery("There is a file to be read be full of kindness").build();
SearchReply reply = stub.search(request); SearchReply reply = stub.search(request);
List<String> stringList = reply.getFilesList(); List<String> stringList = reply.getFilesList();
for (String s: stringList) for (String s: stringList)
......
...@@ -6,13 +6,17 @@ import org.AutoHealerAndClusterSearch.SearchReply; ...@@ -6,13 +6,17 @@ import org.AutoHealerAndClusterSearch.SearchReply;
import org.AutoHealerAndClusterSearch.SearchRequest; import org.AutoHealerAndClusterSearch.SearchRequest;
import org.AutoHealerAndClusterSearch.SearchServiceGrpc; import org.AutoHealerAndClusterSearch.SearchServiceGrpc;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.lognet.springboot.grpc.GRpcService; import org.lognet.springboot.grpc.GRpcService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException;
/** /**
...@@ -20,36 +24,25 @@ import java.util.List; ...@@ -20,36 +24,25 @@ import java.util.List;
* it has threads embedded , we must configure the Coordinator only yo * it has threads embedded , we must configure the Coordinator only yo
* handle requests sent from here , no need to act like a Server !! * handle requests sent from here , no need to act like a Server !!
* */ * */
@GRpcService @GRpcService
public class GRPCServer extends SearchServiceGrpc.SearchServiceImplBase public class GRPCServer extends SearchServiceGrpc.SearchServiceImplBase
{ {
private Coordinator coordinator;
/* public GRPCServer(Coordinator coordinator)
{
this.coordinator = coordinator;
}*/
//Logger logger = LoggerFactory.getLogger(GRPCServer.class);
@Override @Override
public void search(SearchRequest request, StreamObserver<SearchReply> responseObserver) public void search(SearchRequest request, StreamObserver<SearchReply> responseObserver)
{ {
//logger.info("Request Has Arrived , Query is :" + request.getQuery()); //logger.info("Request Has Arrived , Query is :" + request.getQuery());
List<String> ans = new ArrayList<>(); List<String> ans = new ArrayList<>();
String x = request.getQuery(); String x = request.getQuery();
ans.add("Sorry Something Went Wrong " + x); ans.add("replying to : " + x);
/*try try {
{ ans = Coordinator.search(x);
ans = coordinator.search(request.getQuery()); } catch (IOException | InterruptedException | KeeperException | ExecutionException e) {
throw new RuntimeException(e);
} }
catch (IOException | InterruptedException | KeeperException | ExecutionException e)
{
logger.error("Error In GRPC While Getting answers for Query");
//throw new RuntimeException(e);
}*/
SearchReply reply = SearchReply.newBuilder().addAllFiles(ans).build(); SearchReply reply = SearchReply.newBuilder().addAllFiles(ans).build();
responseObserver.onNext(reply); responseObserver.onNext(reply);
......
...@@ -3,11 +3,12 @@ package GRPCConnection; ...@@ -3,11 +3,12 @@ package GRPCConnection;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication @SpringBootApplication
public class Main public class GRPCServiceStart
{ {
public static void main(String[] args) public static void start()
{ {
SpringApplication.run(Main.class); SpringApplication.run(GRPCServiceStart.class);
} }
} }
package WebSide; package WebSide;
public class WebServer { public class WebServer
{
private final String GRPC_SERVER = "localhost";
private final String Search_End_Point = "/search";
} }
...@@ -15,7 +15,7 @@ public class Application implements Watcher ...@@ -15,7 +15,7 @@ public class Application implements Watcher
private static final String address = "192.168.184.10:2181"; private static final String address = "192.168.184.10:2181";
private static final int SESSION_TIMEOUT = 3000; //dead client private static final int SESSION_TIMEOUT = 3000; //dead client
private static final int DEFAULT_PORT = 54321; //private static final int DEFAULT_PORT = 54321;
private ZooKeeper zooKeeper; private ZooKeeper zooKeeper;
private final Logger logger = LoggerFactory.getLogger(Application.class); private final Logger logger = LoggerFactory.getLogger(Application.class);
......
...@@ -19,25 +19,12 @@ import java.util.stream.Stream; ...@@ -19,25 +19,12 @@ import java.util.stream.Stream;
public class Coordinator public class Coordinator
{ {
private ZooKeeper zooKeeper;
private static final String PHYSICAL_ZNODES_PATH = "/physical_nodes"; private static final String PHYSICAL_ZNODES_PATH = "/physical_nodes";
private int COORDINATOR_PORT; private static String FILES_DIRECTORY = System.getProperty("user.dir") + "/SearchFiles/";;
private static final Logger logger = LoggerFactory.getLogger(Coordinator.class);
private String FILES_DIRECTORY = "";
private final Logger logger = LoggerFactory.getLogger(Coordinator.class);
public Coordinator(ZooKeeper zooKeeper , String SOCKET)
{
this.zooKeeper = zooKeeper;
COORDINATOR_PORT = Integer.parseInt(SOCKET.split(":")[1]);
FILES_DIRECTORY = System.getProperty("user.dir") + "/SearchFiles/";
}
public List<String> search(String query) throws IOException, InterruptedException, KeeperException, ExecutionException {
public static List<String> search(String query) throws IOException, InterruptedException, KeeperException, ExecutionException {
List<SearchQueryResponse> responses = spreadQuery(query); List<SearchQueryResponse> responses = spreadQuery(query);
if (responses == null) if (responses == null)
{ {
...@@ -57,9 +44,7 @@ public class Coordinator ...@@ -57,9 +44,7 @@ public class Coordinator
} }
private static SearchQueryResponse sendRequestToNode(SearchQueryRequest searchQueryRequest, String ipAddress)
private SearchQueryResponse sendRequestToNode(SearchQueryRequest searchQueryRequest, String ipAddress)
{ {
SearchQueryResponse searchQueryResponse = null; SearchQueryResponse searchQueryResponse = null;
String ip = ipAddress.split(":")[0]; String ip = ipAddress.split(":")[0];
...@@ -81,7 +66,7 @@ public class Coordinator ...@@ -81,7 +66,7 @@ public class Coordinator
return searchQueryResponse; return searchQueryResponse;
} }
private void printResponses(List<String> filesAnswer) private static void printResponses(List<String> filesAnswer)
{ {
///GRPC Connection ///GRPC Connection
if (filesAnswer.size() == 0) if (filesAnswer.size() == 0)
...@@ -96,8 +81,7 @@ public class Coordinator ...@@ -96,8 +81,7 @@ public class Coordinator
} }
private static void handleClient(String query)
private void handleClient(String query)
{ {
try try
{ {
...@@ -136,7 +120,7 @@ public class Coordinator ...@@ -136,7 +120,7 @@ public class Coordinator
* Each slave will Send the set of words with their frequency percentage in each file * Each slave will Send the set of words with their frequency percentage in each file
* and EACH SLAVE HAS A UNIQUE SET OF FILES !! * and EACH SLAVE HAS A UNIQUE SET OF FILES !!
* */ * */
private Map<String , Double> getFilesScore(List<SearchQueryResponse> respons , String query) private static Map<String , Double> getFilesScore(List<SearchQueryResponse> respons , String query)
{ {
if (respons == null) return null; if (respons == null) return null;
if (query.isEmpty()) return null; if (query.isEmpty()) return null;
...@@ -158,7 +142,7 @@ public class Coordinator ...@@ -158,7 +142,7 @@ public class Coordinator
return fileScore; return fileScore;
} }
private List<String> getFilesInOrder(Map<String , Double> filesScore) private static List<String> getFilesInOrder(Map<String , Double> filesScore)
{ {
List<String> files = new ArrayList<>(); List<String> files = new ArrayList<>();
if(filesScore == null) if(filesScore == null)
...@@ -186,7 +170,7 @@ public class Coordinator ...@@ -186,7 +170,7 @@ public class Coordinator
* Each slave will Send the set of words with their frequency percentage in each file * Each slave will Send the set of words with their frequency percentage in each file
* and EACH SLAVE HAS A UNIQUE SET OF FILES !! * and EACH SLAVE HAS A UNIQUE SET OF FILES !!
*/ */
private Map<String , Double> calculateIDF(List<SearchQueryResponse> respons , String query) private static Map<String , Double> calculateIDF(List<SearchQueryResponse> respons , String query)
{ {
Map<String , Double> wordsIDF = new HashMap<>(); Map<String , Double> wordsIDF = new HashMap<>();
int totalNumberOfFiles = countFilesInDirectory(); int totalNumberOfFiles = countFilesInDirectory();
...@@ -206,7 +190,7 @@ public class Coordinator ...@@ -206,7 +190,7 @@ public class Coordinator
} }
return wordsIDF; return wordsIDF;
} }
private int countFilesInDirectory() private static int countFilesInDirectory()
{ {
try (Stream<Path> files = Files.list(Paths.get(FILES_DIRECTORY))) try (Stream<Path> files = Files.list(Paths.get(FILES_DIRECTORY)))
{ {
...@@ -225,45 +209,33 @@ public class Coordinator ...@@ -225,45 +209,33 @@ public class Coordinator
* response is map here the keys are words * response is map here the keys are words
* and values are list of pair<fileName,Number Of Appearance of the word in it> * and values are list of pair<fileName,Number Of Appearance of the word in it>
* */ * */
public List<SearchQueryResponse> spreadQuery(String query) throws InterruptedException, KeeperException, IOException, ExecutionException public static List<SearchQueryResponse> spreadQuery(String query) throws InterruptedException, KeeperException, IOException, ExecutionException
{ {
List<String> physicalZnodes = zooKeeper.getChildren(PHYSICAL_ZNODES_PATH, false); List<String> physicalZnodesAddresses = ServiceRegistry.getAllServiceAddresses();
if (physicalZnodes.isEmpty()) if (physicalZnodesAddresses.isEmpty())
{ {
return null; return null;
} }
int totalFilesNumber = countFilesInDirectory(); int totalFilesNumber = countFilesInDirectory();
//System.out.println("Files Number = " + totalFilesNumber); //System.out.println("Files Number = " + totalFilesNumber);
int filesNumberforNode = (totalFilesNumber + physicalZnodes.size()-1)/physicalZnodes.size(); int filesNumberforNode = (totalFilesNumber + physicalZnodesAddresses.size()-1)/physicalZnodesAddresses.size();
int remaining = totalFilesNumber; int remaining = totalFilesNumber;
int index = 0; int index = 0;
int filesOffset=0; int filesOffset=0;
ExecutorService executorService = Executors.newFixedThreadPool(physicalZnodes.size()); ExecutorService executorService = Executors.newFixedThreadPool(physicalZnodesAddresses.size());
List<Callable<SearchQueryResponse>> tasks = new ArrayList<>(); List<Callable<SearchQueryResponse>> tasks = new ArrayList<>();
///distributing Files for NODES!!! ///distributing Files for NODES!!!
while (remaining > 0) while (remaining > 0)
{ {
String physicalZnode = physicalZnodes.get(index);
Stat stat = zooKeeper.exists(PHYSICAL_ZNODES_PATH + "/" + physicalZnodes.get(index), false); String ipAddress = physicalZnodesAddresses.get(index);
if (stat == null) {
logger.warn("Physical Node : " + physicalZnode + " is dowm!");
physicalZnodes.remove(index);
///nodes size ==0 ??
filesNumberforNode = (totalFilesNumber + physicalZnodes.size() - 1) / physicalZnodes.size();
continue;
}
String ipAddress = new String(
zooKeeper.getData(
PHYSICAL_ZNODES_PATH + "/" + physicalZnodes.get(index),
false, stat)
);
SearchQueryRequest searchQueryRequest = new SearchQueryRequest(query , filesNumberforNode , filesOffset); SearchQueryRequest searchQueryRequest = new SearchQueryRequest(query , filesNumberforNode , filesOffset);
tasks.add(() -> sendRequestToNode(searchQueryRequest, ipAddress)); tasks.add(() -> sendRequestToNode(searchQueryRequest, ipAddress));
index = (1 + index) % physicalZnodes.size(); index = (1 + index) % physicalZnodesAddresses.size();
filesOffset = (filesOffset + filesNumberforNode) % totalFilesNumber; filesOffset = (filesOffset + filesNumberforNode) % totalFilesNumber;
remaining -= filesNumberforNode; remaining -= filesNumberforNode;
} }
......
package org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch; package org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch;
import GRPCConnection.GRPCServiceStart;
import org.apache.zookeeper.*; import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -22,9 +23,8 @@ public class ServiceRegistry implements Watcher ...@@ -22,9 +23,8 @@ public class ServiceRegistry implements Watcher
private TransientWorker transientWorker = null; private TransientWorker transientWorker = null;
private String currentZnode = null; private String currentZnode = null;
private List<String> allServiceAddresses = null; private static List<String> allServiceAddresses = null;
private int numberOfInstances;
public ServiceRegistry(ZooKeeper zooKeeper) public ServiceRegistry(ZooKeeper zooKeeper)
{ {
this.zooKeeper = zooKeeper; this.zooKeeper = zooKeeper;
...@@ -66,15 +66,20 @@ public class ServiceRegistry implements Watcher ...@@ -66,15 +66,20 @@ public class ServiceRegistry implements Watcher
transientWorker = new TransientWorker(metadata); transientWorker = new TransientWorker(metadata);
transientWorker.start(); transientWorker.start();
} }
public void registerToCoordinator(String metadata) throws InterruptedException, KeeperException, IOException public void registerToCoordinator(String metadata) throws InterruptedException, KeeperException, IOException
{ {
this.currentZnode = zooKeeper.create(COORDINATOR_ZNODE_PATH + "/coordinator_", metadata.getBytes(), this.currentZnode = zooKeeper.create(COORDINATOR_ZNODE_PATH + "/coordinator_", metadata.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
logger.info("Registered to be Coordinator and I am Master !"); logger.info("Registered to be Coordinator and I am Master !");
coordinator = new Coordinator(this.zooKeeper , metadata);
//coordinator.start(); GRPCServiceStart.start();
//GRPCServer server = new GRPCServer(coordinator); /*Thread thread = new Thread(()-> new Runnable() {
@Override
public void run() {
GRPCServiceStart.start();
}
});
thread.start();*/
} }
public void registerForUpdates() { public void registerForUpdates() {
try try
...@@ -110,7 +115,7 @@ public class ServiceRegistry implements Watcher ...@@ -110,7 +115,7 @@ public class ServiceRegistry implements Watcher
private void updateAddresses() throws KeeperException, InterruptedException private void updateAddresses() throws KeeperException, InterruptedException
{ {
List<String> workerZnodes = zooKeeper.getChildren(PHYSICAL_ZNODES_PATH, false); List<String> workerZnodes = zooKeeper.getChildren(PHYSICAL_ZNODES_PATH, this);
List<String> addresses = new ArrayList<>(workerZnodes.size()); List<String> addresses = new ArrayList<>(workerZnodes.size());
...@@ -131,6 +136,11 @@ public class ServiceRegistry implements Watcher ...@@ -131,6 +136,11 @@ public class ServiceRegistry implements Watcher
} }
public static List<String> getAllServiceAddresses()
{
return allServiceAddresses;
}
@Override @Override
public void process(WatchedEvent watchedEvent) public void process(WatchedEvent watchedEvent)
{ {
...@@ -153,91 +163,4 @@ public class ServiceRegistry implements Watcher ...@@ -153,91 +163,4 @@ public class ServiceRegistry implements Watcher
} }
/*private void launchWorkersIfNecessary() throws KeeperException, InterruptedException, IOException
{
List<String> physicalZnodes = zooKeeper.getChildren(PHYSICAL_ZNODES_PATH, this);
List<String> workers = zooKeeper.getChildren(WORKERS_ZNODES_PATH, this);
for (String worker : workers)
{
Stat stat = zooKeeper.exists(WORKERS_ZNODES_PATH + "/" + worker, false);
if (stat == null) {
workers.remove(worker);
continue;
}
String node = new String(zooKeeper.getData(WORKERS_ZNODES_PATH + "/" + worker, false, stat));
if (!physicalZnodes.contains(node))
{
workers.remove(worker);
}
}
List<String> sortedWorkers = NodeSorting.sort(getOriginalNodes(workers), physicalZnodes);
while (workers.size() > numberOfInstances)
{
Stat stat = zooKeeper.exists(WORKERS_ZNODES_PATH + "/" + workers.get(0), false);
if (stat == null) {
workers.remove(0);
continue;
}
zooKeeper.delete(WORKERS_ZNODES_PATH + "/" + workers.get(0) , -1);
}
int neededInstances = numberOfInstances - workers.size();
if (neededInstances <= 0) return;
int index = 0;
int size = sortedWorkers.size();
while (neededInstances>0
&& size>0)
{
Stat stat = zooKeeper.exists(PHYSICAL_ZNODES_PATH + "/" + sortedWorkers.get(index), false);
if (stat == null)
{
sortedWorkers.remove(index);
size--;
continue;
}
startNewWorker(sortedWorkers.get(index));
neededInstances--;
index = (index + 1) % size;
}
}*/
/*private List<byte[]> getOriginalNodes(List<String> workers) throws InterruptedException, KeeperException {
List<byte[]> ans = new ArrayList<>();
for (String worker : workers)
{
Stat stat = zooKeeper.exists(WORKERS_ZNODES_PATH+"/"+worker ,false);
if (stat == null) continue;
ans.add(zooKeeper.getData(WORKERS_ZNODES_PATH+"/"+worker,false,stat ));
}
return ans;
}*/
/*private void startNewWorker(String physicalNode) throws IOException, InterruptedException, KeeperException
{
String remoteUser = new String(zooKeeper.getData(PHYSICAL_ZNODES_PATH+"/"+physicalNode , false, null));
String remoteJarFilePath = "/root/AutoHealer/Worker.jar"; //+ file.getName();
logger.info("Sending To : " + remoteUser);
String sshCommand = "ssh " + remoteUser + " \"java -jar " + remoteJarFilePath + " " +physicalNode+"\"";
System.out.println(sshCommand);
Runtime.getRuntime().exec(sshCommand);
//long pid=Runtime.getRuntime().exec(sshCommand).pid();
//System.out.println(pid);
}*/
} }
grpc.server.port=5678
\ No newline at end of file
grpc.server.port=5678
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment