Commit dd02f772 authored by tammam.alsoleman's avatar tammam.alsoleman

the first version

parent 56a7687b
...@@ -20,14 +20,25 @@ ...@@ -20,14 +20,25 @@
</properties> </properties>
<dependencies> <dependencies>
<!-- ZooKeeper --> <!-- ZooKeeper Client -->
<dependency> <dependency>
<groupId>org.apache.zookeeper</groupId> <groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId> <artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version> <version>${zookeeper.version}</version>
<!-- استبعاد مكتبات التسجيل التلقائية لـ Zookeeper لمنع التعارض -->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<!-- Protobuf & gRPC --> <!-- gRPC & Protobuf -->
<dependency> <dependency>
<groupId>com.google.protobuf</groupId> <groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>
...@@ -54,17 +65,28 @@ ...@@ -54,17 +65,28 @@
<version>1.3.2</version> <version>1.3.2</version>
</dependency> </dependency>
<!-- Logging --> <!-- Logging - Force specific compatible versions -->
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
<version>2.0.9</version> <version>2.0.7</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>ch.qos.logback</groupId> <groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId> <artifactId>logback-classic</artifactId>
<version>1.4.11</version> <version>1.4.7</version>
</dependency> </dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.4.7</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
<version>1.65.0</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
...@@ -76,15 +98,20 @@ ...@@ -76,15 +98,20 @@
</extension> </extension>
</extensions> </extensions>
<plugins> <plugins>
<!-- Protobuf Compiler Plugin --> <!-- Protobuf Compiler Plugin -->
<plugin> <plugin>
<groupId>org.xolstice.maven.plugins</groupId> <groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId> <artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version> <version>0.6.1</version>
<configuration> <configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact> <protocArtifact>
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId> <pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> <pluginArtifact>
io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
</configuration> </configuration>
<executions> <executions>
<execution> <execution>
...@@ -96,7 +123,7 @@ ...@@ -96,7 +123,7 @@
</executions> </executions>
</plugin> </plugin>
<!-- Compiler Plugin --> <!-- Java Compiler -->
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
...@@ -107,33 +134,36 @@ ...@@ -107,33 +134,36 @@
</configuration> </configuration>
</plugin> </plugin>
<!-- Assembly Plugin (The Added Part) --> <!-- ✅ SHADE PLUGIN (بديل assembly) -->
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId> <artifactId>maven-shade-plugin</artifactId>
<version>3.7.1</version> <version>3.4.1</version>
<configuration>
<archive>
<manifest>
<mainClass>com.distributed.search.Application</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<finalName>search-engine-1.0-jar-with-dependencies</finalName>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions> <executions>
<execution> <execution>
<id>make-assembly</id>
<phase>package</phase> <phase>package</phase>
<goals> <goals>
<goal>single</goal> <goal>shade</goal>
</goals> </goals>
<configuration>
<!-- مهم جدًا لـ gRPC -->
<transformers>
<!-- يدمج META-INF/services -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<!-- Main-Class -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.distributed.search.Application</mainClass>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
<finalName>search-engine-1.0-jar-with-dependencies</finalName>
</configuration>
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
</plugins> </plugins>
</build> </build>
</project> </project>
\ No newline at end of file
package com.distributed.search; package com.distributed.search;
public class Application { import com.distributed.search.cluster.LeaderElection;
} import com.distributed.search.cluster.OnElectionAction;
import com.distributed.search.cluster.ServiceRegistry;
import com.distributed.search.grpc.SearchClient;
import com.distributed.search.logic.FileManager;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;
/**
* Main Entry point for the Distributed Search Engine.
*/
public class Application implements Watcher {
private static final String ZOOKEEPER_ADDRESS = "192.168.96.198:2181"; // Change if ZK is on another machine
private static final int SESSION_TIMEOUT = 3000;
private static final int DEFAULT_PORT = 8080;
private static final String STORAGE_DIR = "storage"; // Folder containing .txt files
private ZooKeeper zooKeeper;
private static boolean isLeader = false;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
// 1. Force the system to use IPv4 and disable native transports that cause 'unix' errors
System.setProperty("io.grpc.netty.shaded.io.netty.transport.noNative", "true");
System.setProperty("java.net.preferIPv4Stack", "true");
// Use port from arguments to allow multiple instances on the same machine
int currentServerPort = args.length == 1 ? Integer.parseInt(args[0]) : DEFAULT_PORT;
Application application = new Application();
ZooKeeper zooKeeper = application.connectToZookeeper();
ServiceRegistry serviceRegistry = new ServiceRegistry(zooKeeper);
SearchClient searchClient = new SearchClient();
// Action to take when election completes
OnElectionAction onElectionAction = new OnElectionAction(serviceRegistry, currentServerPort) {
@Override
public void onElectedToBeLeader() {
super.onElectedToBeLeader();
isLeader = true; // Mark this node as the leader
}
};
LeaderElection leaderElection = new LeaderElection(zooKeeper, onElectionAction);
leaderElection.volunteerForLeadership();
leaderElection.reelectLeader();
// If this node is the Leader, enter the Search Loop
application.enterSearchLoop(serviceRegistry, searchClient);
application.close();
}
/**
* The Main UI Loop: Only active for the Leader/Coordinator node.
*/
private void enterSearchLoop(ServiceRegistry serviceRegistry, SearchClient searchClient) throws InterruptedException {
Scanner scanner = new Scanner(System.in);
while (true) {
if (isLeader) {
System.out.println("\n[Coordinator] Enter search query (or 'exit' to quit):");
String input = scanner.nextLine();
if (input.equalsIgnoreCase("exit")) break;
if (input.trim().isEmpty()) continue;
// 1. Get current active workers from Zookeeper
List<String> workers = serviceRegistry.getAllServiceAddresses();
if (workers.isEmpty()) {
System.out.println("No workers registered yet. Please wait...");
continue;
}
// 2. Prepare search data
List<String> terms = Arrays.asList(input.toLowerCase().split("\\s+"));
List<String> allFiles = FileManager.getSortedDocumentNames(STORAGE_DIR);
if (allFiles.isEmpty()) {
System.out.println("No documents found in 'storage' directory.");
continue;
}
// 3. Update gRPC channels and perform distributed search
System.out.println("Searching in " + allFiles.size() + " files across " + workers.size() + " workers...");
searchClient.updateWorkers(workers);
searchClient.performSearch(terms, allFiles);
} else {
// If Worker: Just wait and keep the connection alive
synchronized (zooKeeper) {
zooKeeper.wait(5000);
}
}
}
}
public ZooKeeper connectToZookeeper() throws IOException {
this.zooKeeper = new ZooKeeper(ZOOKEEPER_ADDRESS, SESSION_TIMEOUT, this);
return zooKeeper;
}
private void close() throws InterruptedException {
this.zooKeeper.close();
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.None && event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("Successfully connected to Zookeeper");
}
}
}
\ No newline at end of file
package com.distributed.search.cluster; package com.distributed.search.cluster;
public class LeaderElection { import org.apache.zookeeper.*;
} import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
/**
* Handles the Leader Election logic using Zookeeper's ephemeral sequential nodes.
*/
public class LeaderElection implements Watcher {
private static final String ELECTION_NAMESPACE = "/election";
private final ZooKeeper zooKeeper;
private final OnElectionCallback onElectionCallback;
private String currentZnodeName;
public LeaderElection(ZooKeeper zooKeeper, OnElectionCallback onElectionCallback) {
this.zooKeeper = zooKeeper;
this.onElectionCallback = onElectionCallback;
ensureElectionNamespaceExists();
}
private void ensureElectionNamespaceExists() {
try {
if (zooKeeper.exists(ELECTION_NAMESPACE, false) == null) {
zooKeeper.create(ELECTION_NAMESPACE, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* Volunteers for leadership by creating an ephemeral sequential znode.
*/
public void volunteerForLeadership() throws KeeperException, InterruptedException {
String znodePrefix = ELECTION_NAMESPACE + "/c_";
String znodeFullPath = zooKeeper.create(znodePrefix, new byte[]{},
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Znode created: " + znodeFullPath);
this.currentZnodeName = znodeFullPath.replace(ELECTION_NAMESPACE + "/", "");
}
/**
* Determines the leader based on the smallest sequence number.
* If not the leader, watches the predecessor node for failures.
*/
public void reelectLeader() throws KeeperException, InterruptedException {
Stat predecessorStat = null;
String predecessorName = "";
// Loop to ensure we get a valid predecessor to watch
while (predecessorStat == null) {
List<String> children = zooKeeper.getChildren(ELECTION_NAMESPACE, false);
Collections.sort(children);
String smallestChild = children.get(0);
if (smallestChild.equals(currentZnodeName)) {
System.out.println("I am the Leader (Coordinator)");
onElectionCallback.onElectedToBeLeader();
return;
} else {
System.out.println("I am a Worker");
int predecessorIndex = children.indexOf(currentZnodeName) - 1;
predecessorName = children.get(predecessorIndex);
// Watch the node that is exactly before us in the sequence
predecessorStat = zooKeeper.exists(ELECTION_NAMESPACE + "/" + predecessorName, this);
}
}
onElectionCallback.onWorker();
System.out.println("Watching predecessor znode: " + predecessorName);
System.out.println();
}
/**
* Zookeeper Watcher event handler.
* Re-triggers election if the watched predecessor node is deleted.
*/
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
try {
reelectLeader();
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}
}
\ No newline at end of file
package com.distributed.search.cluster; package com.distributed.search.cluster;
public class OnElectionAction { import com.distributed.search.grpc.SearchServiceImpl;
} import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
/**
* Defines the actions taken when a node's role changes in the cluster.
*/
public class OnElectionAction implements OnElectionCallback {
private final ServiceRegistry serviceRegistry;
private final int port;
private Server grpcServer; // The gRPC server instance for Workers
private final String sharedDirectoryPath = "storage"; // The central storage folder
public OnElectionAction(ServiceRegistry serviceRegistry, int port) {
this.serviceRegistry = serviceRegistry;
this.port = port;
}
/**
* Called when this node is elected as the Coordinator (Leader).
*/
@Override
public void onElectedToBeLeader() {
// If it was a worker, unregister itself from the worker list
serviceRegistry.unregisterFromCluster();
// Start watching for other active workers in the cluster
serviceRegistry.registerForUpdates();
System.out.println("Node elected as Leader. Monitoring the cluster...");
// Note: The Coordinator doesn't need to run a gRPC server for searching.
// It acts as a client that sends requests to workers.
if (grpcServer != null) {
grpcServer.shutdown();
}
}
/**
* Called when this node is a Worker.
*/
@Override
public void onWorker() {
try {
// نغير الترتيب: نشغل السيرفر أولاً
boolean started = startGrpcServer();
if (started) {
String currentServerAddress = "localhost:" + port;
serviceRegistry.registerToCluster(currentServerAddress);
System.out.println("Worker registered successfully on: " + currentServerAddress);
} else {
System.err.println("Worker failed to start gRPC server. Registration aborted.");
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Helper method to start the gRPC server that runs SearchServiceImpl logic.
*/
private boolean startGrpcServer() {
try {
grpcServer = ServerBuilder.forPort(port)
.addService(new SearchServiceImpl(sharedDirectoryPath))
.build();
grpcServer.start();
return true;
} catch (IOException e) {
System.err.println("gRPC Server Error: " + e.getMessage());
return false;
}
}
}
\ No newline at end of file
package com.distributed.search.cluster;
/**
* Interface used to define actions to be triggered after the leader election process.
*/
public interface OnElectionCallback {
// Triggered when this node becomes the Leader (Coordinator)
void onElectedToBeLeader();
// Triggered when this node is a Worker
void onWorker();
}
\ No newline at end of file
package com.distributed.search.cluster; package com.distributed.search.cluster;
public class ServiceRegistry { import org.apache.zookeeper.*;
} import org.apache.zookeeper.data.Stat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class ServiceRegistry implements Watcher {
private static final String REGISTRY_ZNODE = "/service_registry";
private final ZooKeeper zooKeeper;
private String currentZnode = null;
private List<String> allServiceAddresses = null;
public ServiceRegistry(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
createServiceRegistryZnode();
}
private void createServiceRegistryZnode() {
try {
if (zooKeeper.exists(REGISTRY_ZNODE, false) == null) {
zooKeeper.create(REGISTRY_ZNODE, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException | InterruptedException e) { e.printStackTrace(); }
}
public void registerToCluster(String metadata) throws KeeperException, InterruptedException {
this.currentZnode = zooKeeper.create(REGISTRY_ZNODE + "/n_", metadata.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
public void registerForUpdates() {
try { updateAddresses(); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); }
}
public void unregisterFromCluster() {
try {
if (currentZnode != null && zooKeeper.exists(currentZnode, false) != null) {
zooKeeper.delete(currentZnode, -1);
}
} catch (KeeperException | InterruptedException e) { e.printStackTrace(); }
}
public synchronized List<String> getAllServiceAddresses() {
if (allServiceAddresses == null) {
try { updateAddresses(); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); }
}
return allServiceAddresses;
}
private synchronized void updateAddresses() throws KeeperException, InterruptedException {
List<String> workerZnodes = zooKeeper.getChildren(REGISTRY_ZNODE, this);
List<String> addresses = new ArrayList<>(workerZnodes.size());
for (String workerZnode : workerZnodes) {
String workerFullPath = REGISTRY_ZNODE + "/" + workerZnode;
Stat stat = zooKeeper.exists(workerFullPath, false);
if (stat == null) continue;
byte[] addressBytes = zooKeeper.getData(workerFullPath, false, stat);
addresses.add(new String(addressBytes));
}
this.allServiceAddresses = Collections.unmodifiableList(addresses);
System.out.println("Current active workers: " + this.allServiceAddresses);
}
@Override
public void process(WatchedEvent event) {
try { updateAddresses(); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); }
}
}
\ No newline at end of file
package com.distributed.search.grpc; package com.distributed.search.grpc;
import java.net.InetSocketAddress;
import com.distributed.search.model.*;
import io.grpc.ManagedChannel;
// انتبه: هذا الـ Import المظلل هو الذي يحل مشكلة الـ NameResolver في الـ Fat JAR
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import java.util.*;
public class SearchClient { public class SearchClient {
} private final Map<String, SearchServiceGrpc.SearchServiceBlockingStub> stubs = new HashMap<>();
private final List<ManagedChannel> channels = new ArrayList<>();
public void updateWorkers(List<String> workerAddresses) {
// إغلاق القنوات القديمة
for (ManagedChannel channel : channels) {
channel.shutdownNow();
}
channels.clear();
stubs.clear();
for (String address : workerAddresses) {
try {
// تقسيم العنوان يدويًا
String[] parts = address.split(":");
String host = parts[0];
int port = Integer.parseInt(parts[1]);
// استخدام NettyChannelBuilder.forAddress حصراً
// هذا يتجاوز نظام الـ NameResolver تماماً ويمنع خطأ الـ 'unix'
ManagedChannel channel = NettyChannelBuilder.forAddress(new InetSocketAddress(host, port))
.usePlaintext()
.build();
channels.add(channel);
stubs.put(address, SearchServiceGrpc.newBlockingStub(channel));
System.out.println("Successfully connected to Worker: " + host + ":" + port);
} catch (Exception e) {
System.err.println("Failed to connect to " + address + ": " + e.getMessage());
e.printStackTrace(); // مهم: سيظهر مكان رمي الاستثناء بالضبط
}
}
}
public void performSearch(List<String> terms, List<String> allFiles) {
if (stubs.isEmpty()) {
System.out.println("No workers available.");
return;
}
// --- Phase 1: Global Stats ---
Map<String, Integer> globalTermCounts = new HashMap<>();
int filesPerWorker = (int) Math.ceil((double) allFiles.size() / stubs.size());
int currentFileIndex = 0;
List<String> workerList = new ArrayList<>(stubs.keySet());
for (String address : workerList) {
int count = Math.min(filesPerWorker, allFiles.size() - currentFileIndex);
if (count <= 0) break;
StatRequest request = StatRequest.newBuilder()
.addAllTerms(terms)
.setStartIndex(currentFileIndex)
.setCount(count)
.build();
try {
StatResponse response = stubs.get(address).getDocumentStats(request);
response.getTermToDocumentCountMap().forEach((term, docCount) ->
globalTermCounts.merge(term, docCount, Integer::sum));
} catch (Exception e) { System.err.println("Worker " + address + " Phase 1 error"); }
currentFileIndex += count;
}
// --- Calculate Global IDF ---
Map<String, Double> globalIdfs = new HashMap<>();
for (String term : terms) {
int docsWithTerm = globalTermCounts.getOrDefault(term, 0);
double idf = Math.log10((double) allFiles.size() / Math.max(1, docsWithTerm));
globalIdfs.put(term, idf);
}
// --- Phase 2: Final Scoring ---
List<SearchResponse.DocumentResult> finalResults = new ArrayList<>();
currentFileIndex = 0;
for (String address : workerList) {
int count = Math.min(filesPerWorker, allFiles.size() - currentFileIndex);
if (count <= 0) break;
CalculationRequest request = CalculationRequest.newBuilder()
.addAllTerms(terms)
.putAllGlobalIdfs(globalIdfs)
.setStartIndex(currentFileIndex)
.setCount(count)
.build();
try {
SearchResponse response = stubs.get(address).getFinalScores(request);
finalResults.addAll(response.getResultsList());
} catch (Exception e) { System.err.println("Worker " + address + " Phase 2 error"); }
currentFileIndex += count;
}
// --- Sort and Show Top 10 ---
finalResults.sort((a, b) -> Double.compare(b.getScore(), a.getScore()));
System.out.println("\n--- Search Results ---");
for (int i = 0; i < Math.min(10, finalResults.size()); i++) {
System.out.println((i + 1) + ". " + finalResults.get(i).getDocumentName() + " (Score: " + finalResults.get(i).getScore() + ")");
}
}
}
\ No newline at end of file
package com.distributed.search.grpc; package com.distributed.search.grpc;
import com.distributed.search.model.*; // Generated Protobuf classes import com.distributed.search.model.*;
import com.distributed.search.logic.*; // Custom logic classes (FileManager, TFIDFCalculator) import com.distributed.search.logic.*;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import java.io.IOException; import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
/**
* Implementation of the SearchService gRPC service.
* This class handles the actual text processing logic on the Worker nodes.
*/
public class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase { public class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase {
private final String sharedDirectoryPath; private final String sharedDirectoryPath;
...@@ -20,58 +17,40 @@ public class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase { ...@@ -20,58 +17,40 @@ public class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase {
this.sharedDirectoryPath = sharedDirectoryPath; this.sharedDirectoryPath = sharedDirectoryPath;
} }
/**
* Phase 1: Count document occurrences for each search term.
* This information is sent back to the Coordinator to calculate Global IDF.
*/
@Override @Override
public void getDocumentStats(StatRequest request, StreamObserver<StatResponse> responseObserver) { public void getDocumentStats(StatRequest request, StreamObserver<StatResponse> responseObserver) {
List<String> terms = request.getTermsList(); List<String> terms = request.getTermsList();
int startIndex = request.getStartIndex(); int startIndex = request.getStartIndex();
int count = request.getCount(); int count = request.getCount();
// Retrieve sorted file names to ensure consistency across the cluster
List<String> allFiles = FileManager.getSortedDocumentNames(sharedDirectoryPath); List<String> allFiles = FileManager.getSortedDocumentNames(sharedDirectoryPath);
// Determine the sub-list of files assigned to this specific worker
int endIndex = Math.min(startIndex + count, allFiles.size()); int endIndex = Math.min(startIndex + count, allFiles.size());
List<String> assignedFiles = allFiles.subList(startIndex, endIndex); List<String> assignedFiles = allFiles.subList(startIndex, endIndex);
Map<String, Integer> termToDocCount = new HashMap<>(); Map<String, Integer> termToDocCount = new HashMap<>();
// Process each search term
for (String term : terms) { for (String term : terms) {
int docsWithTerm = 0; int docsWithTerm = 0;
for (String docName : assignedFiles) { for (String docName : assignedFiles) {
try { // التعديل هنا: نمرر الـ Path مباشرة كما اقترح الحل الجديد
String content = FileManager.readDocumentContent(sharedDirectoryPath, docName); Path filePath = Paths.get(sharedDirectoryPath, docName);
List<String> words = TFIDFCalculator.getWordsFromDocument(content); List<String> words = TFIDFCalculator.getWordsFromDocument(filePath);
// Check if the document contains the term (case-insensitive) if (words.contains(term.toLowerCase())) {
if (words.contains(term.toLowerCase())) { docsWithTerm++;
docsWithTerm++;
}
} catch (IOException e) {
System.err.println("Error reading file: " + docName + " - " + e.getMessage());
} }
} }
termToDocCount.put(term, docsWithTerm); termToDocCount.put(term, docsWithTerm);
} }
// Build the gRPC response
StatResponse response = StatResponse.newBuilder() StatResponse response = StatResponse.newBuilder()
.putAllTermToDocumentCount(termToDocCount) .putAllTermToDocumentCount(termToDocCount)
.build(); .build();
// Send the response and close the stream
responseObserver.onNext(response); responseObserver.onNext(response);
responseObserver.onCompleted(); responseObserver.onCompleted();
} }
/**
* Phase 2: Calculate final TF-IDF scores for the assigned documents.
* The scores are calculated using the Global IDF provided by the Coordinator.
*/
@Override @Override
public void getFinalScores(CalculationRequest request, StreamObserver<SearchResponse> responseObserver) { public void getFinalScores(CalculationRequest request, StreamObserver<SearchResponse> responseObserver) {
List<String> terms = request.getTermsList(); List<String> terms = request.getTermsList();
...@@ -79,43 +58,33 @@ public class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase { ...@@ -79,43 +58,33 @@ public class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase {
int startIndex = request.getStartIndex(); int startIndex = request.getStartIndex();
int count = request.getCount(); int count = request.getCount();
// Ensure the worker processes the exact same set of files as in Phase 1
List<String> allFiles = FileManager.getSortedDocumentNames(sharedDirectoryPath); List<String> allFiles = FileManager.getSortedDocumentNames(sharedDirectoryPath);
int endIndex = Math.min(startIndex + count, allFiles.size()); int endIndex = Math.min(startIndex + count, allFiles.size());
List<String> assignedFiles = allFiles.subList(startIndex, endIndex); List<String> assignedFiles = allFiles.subList(startIndex, endIndex);
SearchResponse.Builder responseBuilder = SearchResponse.newBuilder(); SearchResponse.Builder responseBuilder = SearchResponse.newBuilder();
// Calculate score for each assigned document
for (String docName : assignedFiles) { for (String docName : assignedFiles) {
double docScore = 0.0; double docScore = 0.0;
try {
String content = FileManager.readDocumentContent(sharedDirectoryPath, docName);
List<String> words = TFIDFCalculator.getWordsFromDocument(content);
// التعديل هنا أيضاً: نمرر الـ Path
Path filePath = Paths.get(sharedDirectoryPath, docName);
List<String> words = TFIDFCalculator.getWordsFromDocument(filePath);
if (!words.isEmpty()) {
for (String term : terms) { for (String term : terms) {
// Calculate local Term Frequency (TF)
double tf = TFIDFCalculator.calculateTermFrequency(words, term); double tf = TFIDFCalculator.calculateTermFrequency(words, term);
// Retrieve the Global IDF sent by the Coordinator
double idf = globalIdfs.getOrDefault(term, 0.0); double idf = globalIdfs.getOrDefault(term, 0.0);
// Accumulate the final score for the document
docScore += (tf * idf); docScore += (tf * idf);
} }
// Add document result to the response
responseBuilder.addResults(SearchResponse.DocumentResult.newBuilder()
.setDocumentName(docName)
.setScore(docScore)
.build());
} catch (IOException e) {
System.err.println("Error calculating score for: " + docName + " - " + e.getMessage());
} }
responseBuilder.addResults(SearchResponse.DocumentResult.newBuilder()
.setDocumentName(docName)
.setScore(docScore)
.build());
} }
// Send the final results back to the Coordinator
responseObserver.onNext(responseBuilder.build()); responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted(); responseObserver.onCompleted();
} }
......
...@@ -12,7 +12,7 @@ public class FileManager { ...@@ -12,7 +12,7 @@ public class FileManager {
public static List<String> getSortedDocumentNames(String directoryPath) { public static List<String> getSortedDocumentNames(String directoryPath) {
File directory = new File(directoryPath); File directory = new File(directoryPath);
File[] files = directory.listFiles((dir, name) -> name.endsWith(".txt")); File[] files = directory.listFiles(File::isFile);
if (files == null) return List.of(); if (files == null) return List.of();
......
package com.distributed.search.logic; package com.distributed.search.logic;
import java.util.Arrays; import java.nio.charset.CharacterCodingException;
import java.util.List; import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
public class TFIDFCalculator { public class TFIDFCalculator {
public static List<String> getWordsFromDocument(String content) { /**
return Arrays.asList(content.toLowerCase().split("(\\.)|(\\s)|(\\k)|(\\,)|(\\?)")); * تقرأ محتوى الملف وتحوّله إلى قائمة كلمات بشكل آمن
* - بدون regex خاطئ
* - يدعم Unicode (عربي / إنجليزي)
* - لا يرمي Exceptions تكسر gRPC
*/
public static List<String> getWordsFromDocument(Path filePath) {
try {
// قراءة الملف كنص UTF-8
String content = Files.readString(filePath, StandardCharsets.UTF_8);
// أحرف صغيرة
content = content.toLowerCase(Locale.ROOT);
// استبدال أي شيء ليس حرفًا أو رقمًا بمسافة
// \p{L} = أي حرف (أي لغة)
// \p{N} = أي رقم
String cleaned = content.replaceAll("[^\\p{L}\\p{N}]+", " ");
// تقسيم على المسافات
String[] tokens = cleaned.trim().split("\\s+");
return Arrays.stream(tokens)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
} catch (CharacterCodingException e) {
System.err.println("Character encoding error reading file: " + filePath);
return Collections.emptyList();
} catch (IOException e) {
System.err.println("Error reading file: " + filePath + " - " + e.getMessage());
return Collections.emptyList();
} catch (Exception e) {
System.err.println("Unexpected error processing file: " + filePath);
e.printStackTrace();
return Collections.emptyList();
}
} }
/**
* Term Frequency
*/
public static double calculateTermFrequency(List<String> words, String term) { public static double calculateTermFrequency(List<String> words, String term) {
if (words == null || words.isEmpty()) return 0.0;
long count = words.stream() long count = words.stream()
.filter(word -> word.equalsIgnoreCase(term)) .filter(word -> word.equalsIgnoreCase(term))
.count(); .count();
return (double) count / words.size(); return (double) count / words.size();
} }
/**
* Inverse Document Frequency
*/
public static double calculateIdf(int totalDocuments, int documentsWithTerm) { public static double calculateIdf(int totalDocuments, int documentsWithTerm) {
if (documentsWithTerm == 0) return 0; if (documentsWithTerm <= 0 || totalDocuments <= 0) return 0.0;
return Math.log10((double) totalDocuments / documentsWithTerm); return Math.log10((double) totalDocuments / documentsWithTerm);
} }
} }
\ No newline at end of file
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- Silence Netty and gRPC debug logs -->
<logger name="io.grpc" level="INFO"/>
<logger name="io.netty" level="INFO"/>
<logger name="org.apache.zookeeper" level="INFO"/>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment