Commit d5f76f36 authored by tammam.alsoleman's avatar tammam.alsoleman

final Edit

parent e55a9da1
...@@ -20,7 +20,7 @@ import java.util.Scanner; ...@@ -20,7 +20,7 @@ import java.util.Scanner;
* Main Entry point for the Distributed Search Engine. * Main Entry point for the Distributed Search Engine.
*/ */
public class Application implements Watcher { public class Application implements Watcher {
private static final String ZOOKEEPER_ADDRESS = "192.168.39.250:2181"; // Change if ZK is on another machine private static final String ZOOKEEPER_ADDRESS = "192.168.233.250:2181"; // Change if ZK is on another machine
private static final int SESSION_TIMEOUT = 3000; private static final int SESSION_TIMEOUT = 3000;
private static final int DEFAULT_PORT = 8080; private static final int DEFAULT_PORT = 8080;
private static final String STORAGE_DIR = "storage"; // Folder containing .txt files private static final String STORAGE_DIR = "storage"; // Folder containing .txt files
......
...@@ -21,7 +21,7 @@ import java.nio.file.Paths; ...@@ -21,7 +21,7 @@ import java.nio.file.Paths;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
public class FrontendApplication implements Watcher { public class FrontendApplication implements Watcher {
private static final String ZOOKEEPER_ADDRESS = "192.168.39.250:2181"; // Use your ZK IP private static final String ZOOKEEPER_ADDRESS = "192.168.233.250:2181"; // Use your ZK IP
private static final int PORT = 8080; // Main Web Port private static final int PORT = 8080; // Main Web Port
private final LeaderHttpRegistry leaderHttpRegistry; private final LeaderHttpRegistry leaderHttpRegistry;
private final HttpClient httpClient; private final HttpClient httpClient;
...@@ -112,7 +112,7 @@ public class FrontendApplication implements Watcher { ...@@ -112,7 +112,7 @@ public class FrontendApplication implements Watcher {
// Send headers first (200 OK, 0 means chunked/unknown length) // Send headers first (200 OK, 0 means chunked/unknown length)
exchange.sendResponseHeaders(200, 0); exchange.sendResponseHeaders(200, 0);
is.transferTo(os); // Copy data directly is.transferTo(os); // send data directly
os.flush(); os.flush();
} }
......
...@@ -50,7 +50,9 @@ public class SearchClient { ...@@ -50,7 +50,9 @@ public class SearchClient {
if (stubs.isEmpty()) { if (stubs.isEmpty()) {
return Collections.emptyList(); return Collections.emptyList();
} }
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
// Phase 1: Aggregate Global Counts // Phase 1: Aggregate Global Counts
Map<String, Integer> globalTermCounts = new HashMap<>(); Map<String, Integer> globalTermCounts = new HashMap<>();
int filesPerWorker = (int) Math.ceil((double) allFiles.size() / stubs.size()); int filesPerWorker = (int) Math.ceil((double) allFiles.size() / stubs.size());
...@@ -62,10 +64,7 @@ public class SearchClient { ...@@ -62,10 +64,7 @@ public class SearchClient {
if (count <= 0) break; if (count <= 0) break;
StatRequest request = StatRequest.newBuilder() StatRequest request = StatRequest.newBuilder()
.addAllTerms(terms) .addAllTerms(terms).setStartIndex(currentFileIndex).setCount(count).build();
.setStartIndex(currentFileIndex)
.setCount(count)
.build();
try { try {
StatResponse response = stubs.get(address).getDocumentStats(request); StatResponse response = stubs.get(address).getDocumentStats(request);
response.getTermToDocumentCountMap().forEach((term, docCount) -> response.getTermToDocumentCountMap().forEach((term, docCount) ->
...@@ -85,26 +84,40 @@ public class SearchClient { ...@@ -85,26 +84,40 @@ public class SearchClient {
// --- Phase 2: Final Scoring --- // --- Phase 2: Final Scoring ---
List<SearchResponse.DocumentResult> finalResults = new ArrayList<>(); List<SearchResponse.DocumentResult> finalResults = new ArrayList<>();
currentFileIndex = 0; currentFileIndex = 0;
// Helper variables to track the winning node (for the single log line)
String bestWorkerNode = "None";
double highestScoreFound = -1.0;
for (String address : workerList) { for (String address : workerList) {
int count = Math.min(filesPerWorker, allFiles.size() - currentFileIndex); int count = Math.min(filesPerWorker, allFiles.size() - currentFileIndex);
if (count <= 0) break; if (count <= 0) break;
CalculationRequest request = CalculationRequest.newBuilder() CalculationRequest request = CalculationRequest.newBuilder()
.addAllTerms(terms) .addAllTerms(terms).putAllGlobalIdfs(globalIdfs)
.putAllGlobalIdfs(globalIdfs) .setStartIndex(currentFileIndex).setCount(count).build();
.setStartIndex(currentFileIndex)
.setCount(count)
.build();
try { try {
SearchResponse response = stubs.get(address).getFinalScores(request); SearchResponse response = stubs.get(address).getFinalScores(request);
finalResults.addAll(response.getResultsList()); List<SearchResponse.DocumentResult> resultsFromWorker = response.getResultsList();
// Logic to identify the node with the top result
for (SearchResponse.DocumentResult res : resultsFromWorker) {
if (res.getScore() > highestScoreFound) {
highestScoreFound = res.getScore();
bestWorkerNode = address;
}
}
finalResults.addAll(resultsFromWorker);
} catch (Exception e) { System.err.println("Worker " + address + " Phase 2 error"); } } catch (Exception e) { System.err.println("Worker " + address + " Phase 2 error"); }
currentFileIndex += count; currentFileIndex += count;
} }
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
System.out.println(">>> Distributed Search took: " + (endTime - startTime) + " ms");
// --- return the sorted list ---
finalResults.sort((a, b) -> Double.compare(b.getScore(), a.getScore())); finalResults.sort((a, b) -> Double.compare(b.getScore(), a.getScore()));
System.out.println(String.format(">>> SEARCH LOG: Best result from [%s] | Score: [%.6f] | Cluster Latency: [%d ms]",
bestWorkerNode, highestScoreFound, (endTime - startTime)));
return finalResults; return finalResults;
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment