Commit ee2905c2 authored by mohammad.salama's avatar mohammad.salama

Fifth Commit - Sending and Receiving between Coordinator and Slaves

parent 49ec3d3f
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
<project version="4"> <project version="4">
<component name="ArtifactsWorkspaceSettings"> <component name="ArtifactsWorkspaceSettings">
<artifacts-to-build> <artifacts-to-build>
<artifact name="AutoHealer:jar" /> <artifact name="Distributed-Search:jar" />
</artifacts-to-build> </artifacts-to-build>
</component> </component>
<component name="AutoImportSettings"> <component name="AutoImportSettings">
...@@ -10,17 +10,15 @@ ...@@ -10,17 +10,15 @@
</component> </component>
<component name="ChangeListManager"> <component name="ChangeListManager">
<list default="true" id="b8bcd35c-99f3-44c8-a866-59f81c3c8bd0" name="Changes" comment=""> <list default="true" id="b8bcd35c-99f3-44c8-a866-59f81c3c8bd0" name="Changes" comment="">
<change afterPath="$PROJECT_DIR$/src/main/java/WordsCountingInFiles/WordsCountingInFiles.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/artifacts/AutoHealer_jar.xml" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/artifacts/TransientWorker_jar.xml" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/logs/app.log" beforeDir="false" afterPath="$PROJECT_DIR$/logs/app.log" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/Application.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/Application.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/Coordinator.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/Coordinator.java" afterDir="false" /> <change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/Coordinator.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/Coordinator.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/OnElectionAction.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/OnElectionAction.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/ServiceRegistry.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/ServiceRegistry.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/TransientWorker.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/TransientWorker.java" afterDir="false" /> <change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/TransientWorker.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/TransientWorker.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/TransientWorkerApplication.java" beforeDir="false" /> <change beforePath="$PROJECT_DIR$/src/main/java/ObjectExchangeInCluster/FileWordPair.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/ObjectExchangeInCluster/FileWordPair.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/ObjectExchangeInCluster/SearchQueryRequest.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/ObjectExchangeInCluster/SearchQueryRequest.java" afterDir="false" /> <change beforePath="$PROJECT_DIR$/src/main/java/generalTesting/test.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/generalTesting/test.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/ObjectExchangeInCluster/SearchQueryResponse.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/ObjectExchangeInCluster/SearchQueryResponse.java" afterDir="false" /> <change beforePath="$PROJECT_DIR$/target/classes/AutoHealerAndClusterSearch/Coordinator.class" beforeDir="false" afterPath="$PROJECT_DIR$/target/classes/AutoHealerAndClusterSearch/Coordinator.class" afterDir="false" />
<change beforePath="$PROJECT_DIR$/target/classes/AutoHealerAndClusterSearch/TransientWorker.class" beforeDir="false" afterPath="$PROJECT_DIR$/target/classes/AutoHealerAndClusterSearch/TransientWorker.class" afterDir="false" />
</list> </list>
<option name="SHOW_DIALOG" value="false" /> <option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" /> <option name="HIGHLIGHT_CONFLICTS" value="true" />
...@@ -79,7 +77,35 @@ ...@@ -79,7 +77,35 @@
<recent name="" /> <recent name="" />
</key> </key>
</component> </component>
<component name="RunManager"> <component name="RunManager" selected="Application.n4">
<configuration name="n1" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="AutoHealerAndClusterSearch.Application" />
<module name="Distributed-Search" />
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
<configuration name="n2" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="AutoHealerAndClusterSearch.Application" />
<module name="Distributed-Search" />
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
<configuration name="n3" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="AutoHealerAndClusterSearch.Application" />
<module name="Distributed-Search" />
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
<configuration name="n4" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="AutoHealerAndClusterSearch.Application" />
<module name="Distributed-Search" />
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
<configuration name="test" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true"> <configuration name="test" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
<option name="MAIN_CLASS_NAME" value="generalTesting.test" /> <option name="MAIN_CLASS_NAME" value="generalTesting.test" />
<module name="Distributed-Search" /> <module name="Distributed-Search" />
...@@ -93,6 +119,13 @@ ...@@ -93,6 +119,13 @@
<option name="Make" enabled="true" /> <option name="Make" enabled="true" />
</method> </method>
</configuration> </configuration>
<list>
<item itemvalue="Application.n1" />
<item itemvalue="Application.n2" />
<item itemvalue="Application.n3" />
<item itemvalue="Application.n4" />
<item itemvalue="Application.test" />
</list>
<recent_temporary> <recent_temporary>
<list> <list>
<item itemvalue="Application.test" /> <item itemvalue="Application.test" />
......
This diff is collapsed.
...@@ -8,6 +8,7 @@ import org.slf4j.Logger; ...@@ -8,6 +8,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Scanner;
public class Application implements Watcher public class Application implements Watcher
{ {
...@@ -21,12 +22,16 @@ public class Application implements Watcher ...@@ -21,12 +22,16 @@ public class Application implements Watcher
public static void main(String[] args) throws IOException, InterruptedException, KeeperException { public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
/*int currentServerPort = args.length == 3 ? Integer.parseInt(args[0]) : DEFAULT_PORT; /*String IP = args.length == 1 ? args[0] : "127.0.0.1:55451";
numberOfInstances = args.length == 3 ? Integer.parseInt(args[1]) : 4; int port = Integer.parseInt(IP.split(":")[1]);*/
String IP = args.length == 3 ? args[2] : "M_Salameh@127.0.0.1";*/
String IP = args.length == 1 ? args[0] : "127.0.0.1:55451";
int port = Integer.parseInt(IP.split(":")[1]);
Scanner scanner = new Scanner(System.in);
System.out.println("Enter Port !!");
String IP = "127.0.0.1";
int port = scanner.nextInt();
System.out.println("IP is : " + IP );
System.out.println("Port is : " + port);
Application application = new Application(); Application application = new Application();
ZooKeeper zooKeeper = application.connectToZookeeper(); ZooKeeper zooKeeper = application.connectToZookeeper();
...@@ -35,7 +40,7 @@ public class Application implements Watcher ...@@ -35,7 +40,7 @@ public class Application implements Watcher
OnElectionAction onElectionAction = new OnElectionAction(serviceRegistry, port); OnElectionAction onElectionAction = new OnElectionAction(serviceRegistry, port);
LeaderElection leaderElection = new LeaderElection(zooKeeper, onElectionAction); LeaderElection leaderElection = new LeaderElection(zooKeeper, onElectionAction);
leaderElection.volunteerForLeadership(IP); leaderElection.volunteerForLeadership(IP+":"+port);
leaderElection.reelectLeader(); leaderElection.reelectLeader();
application.run(); application.run();
......
...@@ -12,8 +12,12 @@ import org.slf4j.LoggerFactory; ...@@ -12,8 +12,12 @@ import org.slf4j.LoggerFactory;
import java.io.*; import java.io.*;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.stream.Stream;
public class Coordinator public class Coordinator
{ {
...@@ -32,7 +36,7 @@ public class Coordinator ...@@ -32,7 +36,7 @@ public class Coordinator
this.zooKeeper = zooKeeper; this.zooKeeper = zooKeeper;
COORDINATOR_PORT = Integer.parseInt(SOCKET.split(":")[1]); COORDINATOR_PORT = Integer.parseInt(SOCKET.split(":")[1]);
FILES_DIRECTORY = System.getProperty(("user.dir") + "/SearchFiles"); FILES_DIRECTORY = System.getProperty("user.dir") + "/SearchFiles/";
} }
...@@ -60,6 +64,7 @@ public class Coordinator ...@@ -60,6 +64,7 @@ public class Coordinator
String query; String query;
System.out.println("Enter Query To Search : "); System.out.println("Enter Query To Search : ");
query = scanner.nextLine(); query = scanner.nextLine();
System.out.println("Query = " + query);
Thread clientThread = new Thread(() -> Thread clientThread = new Thread(() ->
{ {
SearchQueryResponse searchQueryResponseMap = handleClient(query); SearchQueryResponse searchQueryResponseMap = handleClient(query);
...@@ -97,7 +102,8 @@ public class Coordinator ...@@ -97,7 +102,8 @@ public class Coordinator
} catch (IOException | ClassNotFoundException e) { } catch (IOException | ClassNotFoundException e) {
e.printStackTrace(); e.printStackTrace();
} }
System.out.println("Printing Search Query Response");
System.out.println(searchQueryResponse);
return searchQueryResponse; return searchQueryResponse;
} }
...@@ -164,19 +170,12 @@ public class Coordinator ...@@ -164,19 +170,12 @@ public class Coordinator
private int countFilesInDirectory() private int countFilesInDirectory()
{ {
File directory = new File(FILES_DIRECTORY); try (Stream<Path> files = Files.list(Paths.get(FILES_DIRECTORY)))
File[] files = directory.listFiles(); {
return (int) files.count();
int fileCount = 0; } catch (IOException e) {
if (files != null) { throw new RuntimeException(e);
for (File file : files) {
if (file.isFile()) {
fileCount++;
}
}
} }
return fileCount;
} }
...@@ -194,6 +193,7 @@ public class Coordinator ...@@ -194,6 +193,7 @@ public class Coordinator
int totalFilesNumber = countFilesInDirectory(); int totalFilesNumber = countFilesInDirectory();
System.out.println("Files Number = " + totalFilesNumber);
int filesNumberforNode = (totalFilesNumber + physicalZnodes.size()-1)/physicalZnodes.size(); int filesNumberforNode = (totalFilesNumber + physicalZnodes.size()-1)/physicalZnodes.size();
int remaining = totalFilesNumber; int remaining = totalFilesNumber;
int index = 0; int index = 0;
......
...@@ -41,7 +41,8 @@ public class TransientWorker ...@@ -41,7 +41,8 @@ public class TransientWorker
public TransientWorker (String SOCKET) throws IOException { public TransientWorker (String SOCKET) throws IOException {
CLUSTER_PORT =Integer.parseInt(SOCKET.split(":")[1]); CLUSTER_PORT =Integer.parseInt(SOCKET.split(":")[1]);
filesLocation = System.getProperty("user.dir") + "/SearchFiles"; filesLocation = System.getProperty("user.dir") + "/SearchFiles/";
System.out.println("FL = " + filesLocation);
connectToZookeeper(); connectToZookeeper();
} }
public void connectToZookeeper() throws IOException { public void connectToZookeeper() throws IOException {
...@@ -117,7 +118,7 @@ public class TransientWorker ...@@ -117,7 +118,7 @@ public class TransientWorker
Files.newDirectoryStream(Paths.get(filesLocation))) { Files.newDirectoryStream(Paths.get(filesLocation))) {
for (Path file : stream) { for (Path file : stream) {
if (Files.isRegularFile(file)) { if (Files.isRegularFile(file)) {
fileNames.add(file.getFileName().toString()); fileNames.add(filesLocation+file.getFileName().toString());
} }
} }
} catch (IOException e) } catch (IOException e)
...@@ -138,6 +139,10 @@ public class TransientWorker ...@@ -138,6 +139,10 @@ public class TransientWorker
searchQueryResponse.setWordFrequencies searchQueryResponse.setWordFrequencies
(WordsCountingInFiles.countWordsInFiles(words , myFiles)); (WordsCountingInFiles.countWordsInFiles(words , myFiles));
if(searchQueryResponse.getWordFrequencies().size() == 0)
{
System.out.println("Problem in Counting Words in Files");
}
return searchQueryResponse; return searchQueryResponse;
} }
} }
......
package ObjectExchangeInCluster; package ObjectExchangeInCluster;
public class FileWordPair import java.io.Serializable;
public class FileWordPair implements Serializable
{ {
public String fileName; public String fileName;
public long freq; public long freq;
......
...@@ -9,6 +9,7 @@ import java.nio.file.Files; ...@@ -9,6 +9,7 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.*; import java.util.*;
import java.util.stream.Stream;
public class test public class test
{ {
...@@ -62,6 +63,15 @@ public class test ...@@ -62,6 +63,15 @@ public class test
} }
} }
private static int countFilesInDirectory()
{
try (Stream<Path> files = Files.list(Paths.get(System.getProperty("user.dir") + "/SearchFiles/")))
{
return (int) files.count();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static Map<String, List<FileWordPair>> countWordOccurrences(List<String> words, List<String> filePaths) public static Map<String, List<FileWordPair>> countWordOccurrences(List<String> words, List<String> filePaths)
{ {
Map<String, List<FileWordPair>> wordOccurrences = new HashMap<>(); Map<String, List<FileWordPair>> wordOccurrences = new HashMap<>();
...@@ -109,6 +119,7 @@ public class test ...@@ -109,6 +119,7 @@ public class test
public static void main(String[] args) public static void main(String[] args)
{ {
///listFilesForFolder(System.getProperty("user.dir") + "/SearchFiles"); ///listFilesForFolder(System.getProperty("user.dir") + "/SearchFiles");
countWordsInFiles(); ///countWordsInFiles();
System.out.println(countFilesInDirectory());
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment