Commit c71a1e58 authored by mohammad.salama's avatar mohammad.salama

Coordinator and Slaves are Working Fine - no jar run main

parent 8a53ba69
......@@ -11,12 +11,14 @@
<component name="ChangeListManager">
<list default="true" id="b8bcd35c-99f3-44c8-a866-59f81c3c8bd0" name="Changes" comment="">
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/Distributed-Search.jar" beforeDir="false" afterPath="$PROJECT_DIR$/Distributed-Search.jar" afterDir="false" />
<change beforePath="$PROJECT_DIR$/logs/app.log" beforeDir="false" afterPath="$PROJECT_DIR$/logs/app.log" afterDir="false" />
<change beforePath="$PROJECT_DIR$/out/artifacts/Distributed_Search_jar/Distributed-Search.jar" beforeDir="false" afterPath="$PROJECT_DIR$/out/artifacts/Distributed_Search_jar/Distributed-Search.jar" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/Application.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/Application.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/Coordinator.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/Coordinator.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/target/classes/AutoHealerAndClusterSearch/Coordinator.class" beforeDir="false" afterPath="$PROJECT_DIR$/target/classes/AutoHealerAndClusterSearch/Coordinator.class" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/TransientWorker.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/AutoHealerAndClusterSearch/TransientWorker.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/ObjectExchangeInCluster/FileWordPair.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/ObjectExchangeInCluster/FileWordPair.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/ObjectExchangeInCluster/SearchQueryResponse.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/ObjectExchangeInCluster/SearchQueryResponse.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/WordsCountingInFiles/WordsCountingInFiles.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/WordsCountingInFiles/WordsCountingInFiles.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/generalTesting/test.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/generalTesting/test.java" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
......@@ -75,7 +77,7 @@
<recent name="" />
</key>
</component>
<component name="RunManager" selected="Application.n1">
<component name="RunManager" selected="Application.n4">
<configuration name="n1" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="AutoHealerAndClusterSearch.Application" />
<module name="Distributed-Search" />
......
......@@ -50,3 +50,48 @@
2024-01-25 23:38:42,604 WARN AutoHealerAndClusterSearch.Coordinator [Thread-7] No Nodes Are Working , Search Cannot Be Done
2024-01-25 23:38:53,707 WARN AutoHealerAndClusterSearch.Coordinator [Thread-8] No Nodes Are Working , Search Cannot Be Done
2024-01-25 23:38:59,031 WARN AutoHealerAndClusterSearch.Coordinator [Thread-9] No Nodes Are Working , Search Cannot Be Done
2024-01-26 21:23:07,344 INFO AutoHealerAndClusterSearch.Application [main-EventThread] Successfully connected to Zookeeper
2024-01-26 21:23:07,377 INFO AutoHealerAndClusterSearch.LeaderElection [main] My Node under Election is/election/c_0000000135
2024-01-26 21:23:07,385 INFO AutoHealerAndClusterSearch.LeaderElection [main] I am LEADER
2024-01-26 21:23:07,389 INFO AutoHealerAndClusterSearch.ServiceRegistry [main] The cluster addresses are: []
2024-01-26 21:23:07,394 INFO AutoHealerAndClusterSearch.ServiceRegistry [main] Registered to be Coordinator and I am Master !
2024-01-26 21:23:07,396 INFO AutoHealerAndClusterSearch.Coordinator [main] Server started on port 123
2024-01-26 21:23:10,526 INFO AutoHealerAndClusterSearch.Application [main-EventThread] Successfully connected to Zookeeper
2024-01-26 21:23:10,545 INFO AutoHealerAndClusterSearch.LeaderElection [main] My Node under Election is/election/c_0000000136
2024-01-26 21:23:10,549 INFO AutoHealerAndClusterSearch.LeaderElection [main] I am NOT LEADER
2024-01-26 21:23:10,557 INFO AutoHealerAndClusterSearch.ServiceRegistry [main] Registered to service registry
2024-01-26 21:23:10,562 INFO AutoHealerAndClusterSearch.TransientWorker [main] Server started on port 456
2024-01-26 21:23:14,031 INFO AutoHealerAndClusterSearch.Application [main-EventThread] Successfully connected to Zookeeper
2024-01-26 21:23:14,058 INFO AutoHealerAndClusterSearch.LeaderElection [main] My Node under Election is/election/c_0000000137
2024-01-26 21:23:14,067 INFO AutoHealerAndClusterSearch.LeaderElection [main] I am NOT LEADER
2024-01-26 21:23:14,075 INFO AutoHealerAndClusterSearch.ServiceRegistry [main] Registered to service registry
2024-01-26 21:23:14,082 INFO AutoHealerAndClusterSearch.TransientWorker [main] Server started on port 787
2024-01-26 21:23:17,243 INFO AutoHealerAndClusterSearch.Application [main-EventThread] Successfully connected to Zookeeper
2024-01-26 21:23:17,265 INFO AutoHealerAndClusterSearch.LeaderElection [main] My Node under Election is/election/c_0000000138
2024-01-26 21:23:17,273 INFO AutoHealerAndClusterSearch.LeaderElection [main] I am NOT LEADER
2024-01-26 21:23:17,281 INFO AutoHealerAndClusterSearch.ServiceRegistry [main] Registered to service registry
2024-01-26 21:23:17,289 INFO AutoHealerAndClusterSearch.TransientWorker [main] Server started on port 5487
2024-01-26 21:23:44,400 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:23:44,400 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:23:44,400 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:23:59,798 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:23:59,798 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:23:59,798 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:24:02,608 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:24:02,608 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:24:02,608 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:24:06,861 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:24:06,862 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:24:06,864 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:25:41,977 INFO AutoHealerAndClusterSearch.LeaderElection [main-EventThread] I am LEADER
2024-01-26 21:25:42,020 INFO AutoHealerAndClusterSearch.ServiceRegistry [main-EventThread] The cluster addresses are: [127.0.0.1:5487, 127.0.0.1:787]
2024-01-26 21:25:42,029 INFO AutoHealerAndClusterSearch.ServiceRegistry [main-EventThread] Registered to be Coordinator and I am Master !
2024-01-26 21:25:42,031 INFO AutoHealerAndClusterSearch.Coordinator [main-EventThread] Server started on port 456
2024-01-26 21:25:51,383 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:25:51,385 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:27:00,107 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:27:00,107 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:27:23,961 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:27:23,961 INFO AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-26 21:27:36,645 WARN AutoHealerAndClusterSearch.Coordinator [Thread-7] No Nodes Are Working , Search Cannot Be Done
2024-01-26 21:27:40,981 WARN AutoHealerAndClusterSearch.Coordinator [Thread-8] No Nodes Are Working , Search Cannot Be Done
......@@ -22,16 +22,16 @@ public class Application implements Watcher
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
String IP = args.length == 1 ? args[0] : "127.0.0.1:55451";
int port = Integer.parseInt(IP.split(":")[1]);
/*String IP = args.length == 1 ? args[0] : "127.0.0.1:55451";
int port = Integer.parseInt(IP.split(":")[1]);*/
/* Scanner scanner = new Scanner(System.in);
Scanner scanner = new Scanner(System.in);
System.out.println("Enter Port !!");
String IP = "127.0.0.1";
int port = scanner.nextInt();
System.out.println("IP is : " + IP );
System.out.println("Port is : " + port);*/
System.out.println("Port is : " + port);
Application application = new Application();
ZooKeeper zooKeeper = application.connectToZookeeper();
......
......@@ -3,6 +3,7 @@ package AutoHealerAndClusterSearch;
import ObjectExchangeInCluster.FileWordPair;
import ObjectExchangeInCluster.SearchQueryRequest;
import ObjectExchangeInCluster.SearchQueryResponse;
import io.grpc.netty.shaded.io.netty.handler.codec.socks.SocksRequestType;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
......@@ -22,15 +23,13 @@ import java.util.stream.Stream;
public class Coordinator
{
private ZooKeeper zooKeeper;
///private static final String WORKERS_ZNODES_PATH = "/workers";
private static final String PHYSICAL_ZNODES_PATH = "/physical_nodes";
///private static final String COORDINATOR_ZNODE_PATH = "/coordinator_node";
private int COORDINATOR_PORT;
private int CLUSTER_PORT = 54321;
private String FILES_DIRECTORY = "";
private final Logger logger = LoggerFactory.getLogger(Coordinator.class);
public Coordinator(ZooKeeper zooKeeper , String SOCKET)
{
this.zooKeeper = zooKeeper;
......@@ -67,11 +66,7 @@ public class Coordinator
System.out.println("Query = " + query);
Thread clientThread = new Thread(() ->
{
SearchQueryResponse searchQueryResponseMap = handleClient(query);
printResponsr(searchQueryResponseMap);
//sendResponsesToClient(clientSocket, responseMap);
handleClient(query);
});
clientThread.start();
......@@ -79,10 +74,7 @@ public class Coordinator
}
private void printResponsr(SearchQueryResponse searchQueryResponseMap)
{
if (searchQueryResponseMap == null)return;
}
private SearchQueryResponse sendRequestToNode(SearchQueryRequest searchQueryRequest, String ipAddress)
{
......@@ -103,27 +95,22 @@ public class Coordinator
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
System.out.println("Printing Search Query Response");
System.out.println(searchQueryResponse);
return searchQueryResponse;
}
private void sendResponsesToClient(Socket clientSocket, SearchQueryResponse searchQueryResponse)
private void sendResponsesToClient(List<String> filesAnswer)
{
try {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(clientSocket.getOutputStream());
/*for (Map.Entry<Serializable, Serializable> entry : responseMap.entrySet()) {
objectOutputStream.writeObject(entry.getValue());
}*/
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
///GRPC Connection
for (String file : filesAnswer)
{
System.out.println(file);
}
}
private SearchQueryResponse handleClient(String query)
private void handleClient(String query)
{
try
{
......@@ -133,7 +120,12 @@ public class Coordinator
System.out.println("No Nodes Are Working , Search Cannot Be Done");
logger.warn("No Nodes Are Working , Search Cannot Be Done");
}
processResultsAndPrepareAnswer(respons);
Map<String,Double> filesScore = getFilesScore(respons , query);
List<String> answer = getFilesInOrder(filesScore);
sendResponsesToClient(answer);
}
catch (IOException | InterruptedException | KeeperException e)
{
......@@ -147,7 +139,6 @@ public class Coordinator
throw new RuntimeException(e);
}
return null;
}
......@@ -155,26 +146,79 @@ public class Coordinator
* response from cluster node is map where the keys are words
* and values are list of pair<fileName,Number Of Appearance of the word in it>
* we now must calculate the importance of each file for every word we got
* Each slave will Send the set of words with their frequency percentage in each file
* and EACH SLAVE HAS A UNIQUE SET OF FILES !!
* */
private void processResultsAndPrepareAnswer(List<SearchQueryResponse> respons)
private Map<String , Double> getFilesScore(List<SearchQueryResponse> respons , String query)
{
if (respons == null) return;
for (SearchQueryResponse searchQueryResponse : respons)
if (respons == null) return null;
if (query.isEmpty()) return null;
Map<String , Double> wordsIDF = calculateIDF(respons , query);
Map<String , Double> fileScore = new HashMap<>();
String[] words= query.split(" ");
for (String word : words)
{
Map<String, List<FileWordPair>> treeMap = searchQueryResponse.getWordFrequencies();
for(Map.Entry<String, List<FileWordPair>> mapEntry : treeMap.entrySet())
for (SearchQueryResponse response : respons)
{
System.out.println("The word " + mapEntry.getKey() + " is :");
for(FileWordPair fileWordPair : mapEntry.getValue())
Map<String , Double> temp = response.calcScoreForFilesContainingWord(word , wordsIDF.get(word));
for(Map.Entry<String , Double> x : temp.entrySet())
{
System.out.println(fileWordPair);
fileScore.put(x.getKey() , fileScore.getOrDefault(x.getKey() , 0.0) + x.getValue());
}
}
}
///send via GRPC
return;
return fileScore;
}
private List<String> getFilesInOrder(Map<String , Double> filesScore)
{
List<String> files = new ArrayList<>();
if(filesScore == null)
{
files.add("No Answer !");
return files;
}
List<Map.Entry<String, Double>> entryList = new ArrayList<>(filesScore.entrySet());
Collections.sort(entryList, new Comparator<Map.Entry<String, Double>>() {
@Override
public int compare(Map.Entry<String, Double> entry1, Map.Entry<String, Double> entry2) {
// Sort in descending order by comparing the values
return entry2.getValue().compareTo(entry1.getValue());
}
});
for (Map.Entry<String, Double> entry : entryList)
{
files.add(entry.getKey());
}
return files;
}
/**
* Each slave will Send the set of words with their frequency percentage in each file
* and EACH SLAVE HAS A UNIQUE SET OF FILES !!
*/
private Map<String , Double> calculateIDF(List<SearchQueryResponse> respons , String query)
{
Map<String , Double> wordsIDF = new HashMap<>();
int totalNumberOfFiles = countFilesInDirectory();
String[] words = query.split(" ");
for (String word : words)
{
int wordFreq = 0;
for (SearchQueryResponse slaveAns : respons)
{
Map<String , List<FileWordPair>> temp = slaveAns.getWordFrequencies();
wordFreq += temp.getOrDefault(word , new ArrayList<>()).size();
}
Double d = wordFreq > 0 ? Math.log(1.0*totalNumberOfFiles/wordFreq) : 0;
wordsIDF.put(word , d);
}
return wordsIDF;
}
private int countFilesInDirectory()
{
try (Stream<Path> files = Files.list(Paths.get(FILES_DIRECTORY)))
......@@ -204,7 +248,7 @@ public class Coordinator
}
int totalFilesNumber = countFilesInDirectory();
System.out.println("Files Number = " + totalFilesNumber);
//System.out.println("Files Number = " + totalFilesNumber);
int filesNumberforNode = (totalFilesNumber + physicalZnodes.size()-1)/physicalZnodes.size();
int remaining = totalFilesNumber;
int index = 0;
......@@ -250,25 +294,4 @@ public class Coordinator
return respons;
}
/*private List<String> getIPAdressesInCluster() throws InterruptedException, KeeperException
{
List<String> nodes = zooKeeper.getChildren(PHYSICAL_ZNODES_PATH , false);
List<String> ipList = new ArrayList<>();
for (String node : nodes)
{
String fullName = PHYSICAL_ZNODES_PATH + "/" + node;
Stat stat = zooKeeper.exists(fullName , false);
if (stat == null)
{
nodes.remove(node);
continue;
}
String data = new String(zooKeeper.getData(fullName , false , stat));
data = data.split("@")[1];
ipList.add(data);
}
return ipList;
}*/
}
......@@ -139,10 +139,6 @@ public class TransientWorker
searchQueryResponse.setWordFrequencies
(WordsCountingInFiles.countWordsInFiles(words , myFiles));
if(searchQueryResponse.getWordFrequencies().size() == 0)
{
System.out.println("Problem in Counting Words in Files");
}
return searchQueryResponse;
}
}
......
......@@ -5,9 +5,9 @@ import java.io.Serializable;
public class FileWordPair implements Serializable
{
public String fileName;
public long freq;
public double freq;
public FileWordPair (String fileName , long freq)
public FileWordPair (String fileName , double freq)
{
this.fileName = fileName;
this.freq = freq;
......
......@@ -21,6 +21,19 @@ public class SearchQueryResponse implements Serializable {
{
return wordFrequencies;
}
public Map<String , Double> calcScoreForFilesContainingWord(String word , Double idf)
{
List<FileWordPair> temp = wordFrequencies.getOrDefault(word , new ArrayList<>());
Map<String , Double> ans = new HashMap<>();
if (temp.size() == 0) return ans;
for (FileWordPair fileWordPair : temp)
{
ans.put(fileWordPair.fileName , fileWordPair.freq*idf);
}
return ans;
}
}
......@@ -26,12 +26,18 @@ public class WordsCountingInFiles
}
String text = textBuilder.toString();
int size = text.length();
size = Math.max(size,1);
// Count occurrences of words in the text
for (String word : words)
{
int count = countWordOccurrencesInText(word, text);
if (count == 0) continue;
double freq = 1.0*(count)/size;
List<FileWordPair> FileWordPair = wordOccurrences.getOrDefault(word, new ArrayList<>());
FileWordPair.add(new FileWordPair(filePath, count));
FileWordPair.add(new FileWordPair(filePath, freq));
wordOccurrences.put(word, FileWordPair);
}
} catch (IOException e) {
......@@ -41,6 +47,17 @@ public class WordsCountingInFiles
return wordOccurrences;
}
public static Map<String, List<FileWordPair>> countWordsInFiles(String query, List<String> filePaths)
{
String[] temp = query.split(" ");
List<String> words = new ArrayList<>();
for (String s : temp)
{
words.add(s);
}
return countWordsInFiles(words , filePaths);
}
public static int countWordOccurrencesInText(String word, String text)
{
int count = 0;
......
package generalTesting;
import ObjectExchangeInCluster.FileWordPair;
import WordsCountingInFiles.WordsCountingInFiles;
import java.io.*;
import java.net.Socket;
......@@ -114,12 +115,35 @@ public class test
return count;
}
public static void rubbish(String path)
{
String query = "The File and Kindness are superstitious for arranging Pcs";
List<String> f1 = new ArrayList<>();
List<String> f2 = new ArrayList<>();
f1.add(path+"f (1).txt");
f1.add(path+"f (3).txt");
f1.add(path+"f (5).txt");
f1.add(path+"f (6).txt");
f2.add(path+"f (2).txt");
f2.add(path+"f (4).txt");
f2.add(path+"f (7).txt");
f2.add(path+"f (8).txt");
Map<String,List<FileWordPair>> m1 = WordsCountingInFiles.countWordsInFiles(query , f1);
Map<String,List<FileWordPair>> m2 = WordsCountingInFiles.countWordsInFiles(query , f2);
}
public static void main(String[] args)
{
String path = System.getProperty("user.dir") + "/SearchFiles/";
///listFilesForFolder(System.getProperty("user.dir") + "/SearchFiles");
///countWordsInFiles();
System.out.println(countFilesInDirectory());
//System.out.println(countFilesInDirectory());
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment