Commit 56a7687b authored by tammam.alsoleman's avatar tammam.alsoleman

add the SearchServiceImpl

parent a8b39061
......@@ -2,6 +2,9 @@
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/proto" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/target/generated-sources/protobuf/grpc-java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/target/generated-sources/protobuf/java" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
......@@ -9,21 +9,30 @@
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.58.0</grpc.version>
<protobuf.version>3.24.0</protobuf.version>
<protobuf.version>3.25.0</protobuf.version>
<grpc.version>1.65.0</grpc.version>
<zookeeper.version>3.9.1</zookeeper.version>
<!-- Hardcoded for your environment as discussed -->
<os.detected.classifier>windows-x86_64</os.detected.classifier>
</properties>
<dependencies>
<!-- ZooKeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
<!-- Protobuf & gRPC -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
......@@ -45,10 +54,11 @@
<version>1.3.2</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
<version>2.0.9</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
......@@ -65,8 +75,8 @@
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<!-- Protobuf Compiler Plugin -->
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
......@@ -86,6 +96,18 @@
</executions>
</plugin>
<!-- Compiler Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
<!-- Assembly Plugin (The Added Part) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
......
package com.distributed.search.grpc;
public class SearchServiceImpl {
}
import com.distributed.search.model.*; // Generated Protobuf classes
import com.distributed.search.logic.*; // Custom logic classes (FileManager, TFIDFCalculator)
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Implementation of the SearchService gRPC service.
* This class handles the actual text processing logic on the Worker nodes.
*/
public class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase {
private final String sharedDirectoryPath;
public SearchServiceImpl(String sharedDirectoryPath) {
this.sharedDirectoryPath = sharedDirectoryPath;
}
/**
* Phase 1: Count document occurrences for each search term.
* This information is sent back to the Coordinator to calculate Global IDF.
*/
@Override
public void getDocumentStats(StatRequest request, StreamObserver<StatResponse> responseObserver) {
List<String> terms = request.getTermsList();
int startIndex = request.getStartIndex();
int count = request.getCount();
// Retrieve sorted file names to ensure consistency across the cluster
List<String> allFiles = FileManager.getSortedDocumentNames(sharedDirectoryPath);
// Determine the sub-list of files assigned to this specific worker
int endIndex = Math.min(startIndex + count, allFiles.size());
List<String> assignedFiles = allFiles.subList(startIndex, endIndex);
Map<String, Integer> termToDocCount = new HashMap<>();
// Process each search term
for (String term : terms) {
int docsWithTerm = 0;
for (String docName : assignedFiles) {
try {
String content = FileManager.readDocumentContent(sharedDirectoryPath, docName);
List<String> words = TFIDFCalculator.getWordsFromDocument(content);
// Check if the document contains the term (case-insensitive)
if (words.contains(term.toLowerCase())) {
docsWithTerm++;
}
} catch (IOException e) {
System.err.println("Error reading file: " + docName + " - " + e.getMessage());
}
}
termToDocCount.put(term, docsWithTerm);
}
// Build the gRPC response
StatResponse response = StatResponse.newBuilder()
.putAllTermToDocumentCount(termToDocCount)
.build();
// Send the response and close the stream
responseObserver.onNext(response);
responseObserver.onCompleted();
}
/**
* Phase 2: Calculate final TF-IDF scores for the assigned documents.
* The scores are calculated using the Global IDF provided by the Coordinator.
*/
@Override
public void getFinalScores(CalculationRequest request, StreamObserver<SearchResponse> responseObserver) {
List<String> terms = request.getTermsList();
Map<String, Double> globalIdfs = request.getGlobalIdfsMap();
int startIndex = request.getStartIndex();
int count = request.getCount();
// Ensure the worker processes the exact same set of files as in Phase 1
List<String> allFiles = FileManager.getSortedDocumentNames(sharedDirectoryPath);
int endIndex = Math.min(startIndex + count, allFiles.size());
List<String> assignedFiles = allFiles.subList(startIndex, endIndex);
SearchResponse.Builder responseBuilder = SearchResponse.newBuilder();
// Calculate score for each assigned document
for (String docName : assignedFiles) {
double docScore = 0.0;
try {
String content = FileManager.readDocumentContent(sharedDirectoryPath, docName);
List<String> words = TFIDFCalculator.getWordsFromDocument(content);
for (String term : terms) {
// Calculate local Term Frequency (TF)
double tf = TFIDFCalculator.calculateTermFrequency(words, term);
// Retrieve the Global IDF sent by the Coordinator
double idf = globalIdfs.getOrDefault(term, 0.0);
// Accumulate the final score for the document
docScore += (tf * idf);
}
// Add document result to the response
responseBuilder.addResults(SearchResponse.DocumentResult.newBuilder()
.setDocumentName(docName)
.setScore(docScore)
.build());
} catch (IOException e) {
System.err.println("Error calculating score for: " + docName + " - " + e.getMessage());
}
}
// Send the final results back to the Coordinator
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
}
}
\ No newline at end of file
syntax = "proto3";
option java_package = "com.distributed.search.model";
option java_multiple_files = true;
package com.distributed.search.model;
option java_outer_classname = "SearchProto";
package distributed_search;
// The search service definition.
service SearchService {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment