Commit e84298cb authored by AreejMh57's avatar AreejMh57

Initial commit

parents
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store
\ No newline at end of file
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Distributed_Search_System</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.0</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer>
<mainClass>com.distributed.search.SearchApplication</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<maven.compiler.target>17</maven.compiler.target>
<maven.compiler.source>17</maven.compiler.source>
<protobuf.version>3.25.1</protobuf.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Distributed_Search_System</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<protobuf.version>3.25.1</protobuf.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.0</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.0</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.distributed.search.SearchApplication</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.distributed.search;
import com.distributed.search.model.SearchTask;
import com.distributed.search.model.TaskResult;
import java.io.IOException;
import java.net.Socket;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class Coordinator {
private final ServiceRegistry registry;
private final String dataDirectory;
public Coordinator(ServiceRegistry registry, String dataDirectory) {
this.registry = registry;
this.dataDirectory = dataDirectory;
}
public void start() throws Exception {
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.println("\n[Coordinator] Enter search term (or 'exit'):");
if (!scanner.hasNextLine()) break;
String query = scanner.nextLine();
if (query.equalsIgnoreCase("exit")) break;
List<String> workers = registry.getActiveWorkers();
List<String> allFiles = scanFiles();
if (workers.isEmpty()) {
System.out.println("[Coordinator] No workers available!");
continue;
}
if (allFiles.isEmpty()) {
System.out.println("[Coordinator] No files found in directory: " + dataDirectory);
continue;
}
Map<String, Double> globalResults = new HashMap<>();
int filesPerWorker = (int) Math.ceil((double) allFiles.size() / workers.size());
for (int i = 0; i < workers.size(); i++) {
int start = i * filesPerWorker;
int end = Math.min(start + filesPerWorker, allFiles.size());
if (start >= end) break;
List<String> workerFiles = allFiles.subList(start, end);
// Safe parsing of worker info from ZooKeeper
String nodeName = workers.get(i);
String nodeData = nodeName.replace("worker_", "");
if (!nodeData.contains(":")) {
System.err.println("[Coordinator] Skipping invalid worker node: " + nodeName);
continue;
}
try {
String[] addrParts = nodeData.split(":");
String host = addrParts[0];
int port = Integer.parseInt(addrParts[1]);
globalResults.putAll(sendTask(host, port, query, workerFiles));
} catch (Exception e) {
System.err.println("[Coordinator] Failed to parse worker address: " + nodeData);
}
}
printRankedResults(globalResults);
}
}
private List<String> scanFiles() throws IOException {
Path path = Paths.get(dataDirectory);
if (!Files.exists(path)) return Collections.emptyList();
try (Stream<Path> stream = Files.list(path)) {
return stream.filter(Files::isRegularFile)
.map(p -> p.getFileName().toString())
.filter(name -> name.endsWith(".txt"))
.collect(Collectors.toList());
}
}
private Map<String, Double> sendTask(String host, int port, String query, List<String> files) {
try (Socket socket = new Socket(host, port)) {
socket.setSoTimeout(5000); // 5 seconds timeout
SearchTask.newBuilder().setQuery(query).addAllFilePaths(files).build()
.writeDelimitedTo(socket.getOutputStream());
TaskResult result = TaskResult.parseDelimitedFrom(socket.getInputStream());
return result != null ? result.getDocTfScoresMap() : Collections.emptyMap();
} catch (Exception e) {
System.err.println("[Coordinator] Error communicating with worker " + host + ":" + port);
return Collections.emptyMap();
}
}
private void printRankedResults(Map<String, Double> results) {
System.out.println("\n--- Final Ranked Search Results ---");
List<Map.Entry<String, Double>> ranked = results.entrySet().stream()
.filter(e -> e.getValue() > 0)
.sorted(Map.Entry.<String, Double>comparingByValue().reversed())
.collect(Collectors.toList());
if (ranked.isEmpty()) {
System.out.println("No matching results found.");
} else {
ranked.forEach(e -> System.out.printf("File: %-12s | Score: %.4f%n", e.getKey(), e.getValue()));
}
}
}
\ No newline at end of file
package com.distributed.search;
import com.distributed.search.model.SearchTask;
import com.distributed.search.model.TaskResult;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
public class SearchApplication {
private static final String ZK_ADDRESS = "172.29.3.101:2181";
private static final String DATA_DIR = "D:\\search_data";
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3));
client.start();
ServiceRegistry registry = new ServiceRegistry(client);
SearchEngine engine = new SearchEngine(DATA_DIR);
// Leader Election Setup
registry.startLeaderElection(() -> {
System.out.println("I am now the Leader (COORDINATOR)");
new Thread(() -> {
try {
new Coordinator(registry, DATA_DIR).start();
} catch (Exception e) { e.printStackTrace(); }
}).start();
});
// Worker Server Setup
ServerSocket serverSocket = new ServerSocket(0);
int port = serverSocket.getLocalPort();
new Thread(() -> {
System.out.println("[Worker] Listening on port: " + port);
while (true) {
try (Socket socket = serverSocket.accept()) {
SearchTask task = SearchTask.parseDelimitedFrom(socket.getInputStream());
if (task != null) {
Map<String, Double> tf = engine.calculateTFForFiles(task.getFilePathsList(), task.getQuery());
TaskResult.newBuilder().putAllDocTfScores(tf).build().writeDelimitedTo(socket.getOutputStream());
}
} catch (Exception e) { }
}
}).start();
// Node Registration
registry.registerWorker(port);
Thread.sleep(Long.MAX_VALUE);
}
}
\ No newline at end of file
package com.distributed.search;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
public class SearchEngine {
private final String dataDirectory;
public SearchEngine(String dataDirectory) {
this.dataDirectory = dataDirectory;
}
public Map<String, Double> calculateTFForFiles(List<String> fileNames, String term) {
Map<String, Double> results = new HashMap<>();
for (String fileName : fileNames) {
try {
Path path = Paths.get(dataDirectory, fileName);
if (!Files.exists(path)) continue;
String content = Files.readString(path).toLowerCase();
String[] words = content.split("\\W+");
long count = Arrays.stream(words).filter(w -> w.equals(term.toLowerCase())).count();
double tf = (words.length > 0) ? (double) count / words.length : 0;
results.put(fileName, tf);
} catch (Exception e) {
System.err.println("Error reading file: " + fileName);
}
}
return results;
}
}
\ No newline at end of file
package com.distributed.search;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.zookeeper.CreateMode;
import java.net.InetAddress;
import java.util.List;
public class ServiceRegistry {
private static final String ELECTION_PATH = "/election";
private static final String WORKERS_REGISTRY_PATH = "/workers";
private final CuratorFramework client;
public ServiceRegistry(CuratorFramework client) {
this.client = client;
}
// Leader Election Logic
public void startLeaderElection(Runnable onLeadershipGained) {
LeaderSelector leaderSelector = new LeaderSelector(client, ELECTION_PATH, new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
onLeadershipGained.run();
while (!Thread.currentThread().isInterrupted()) {
Thread.sleep(Long.MAX_VALUE);
}
}
});
leaderSelector.autoRequeue();
leaderSelector.start();
}
// Service Registration Logic
public void registerWorker(int port) throws Exception {
if (client.checkExists().forPath(WORKERS_REGISTRY_PATH) == null) {
client.create().creatingParentsIfNeeded().forPath(WORKERS_REGISTRY_PATH);
}
String ipAddress = InetAddress.getLocalHost().getHostAddress();
String workerPath = WORKERS_REGISTRY_PATH + "/worker_" + ipAddress + ":" + port;
client.create().withMode(CreateMode.EPHEMERAL).forPath(workerPath);
System.out.println("[Registry] Worker registered at: " + ipAddress + ":" + port);
}
// Service Discovery Logic
public List<String> getActiveWorkers() throws Exception {
return client.getChildren().forPath(WORKERS_REGISTRY_PATH);
}
}
\ No newline at end of file
package org.example;
public class Main {
public static void main(String[] args) {
System.out.println("Hello world!");
}
}
\ No newline at end of file
syntax = "proto3";
// هذا السطر يحدد الحزمة التي سيتم توليد كود الجافا فيها
option java_package = "com.distributed.search.model";
option java_multiple_files = true;
// 1. الرسالة التي يرسلها القائد (Coordinator) للعامل
message SearchTask {
string query = 1; // جملة البحث
repeated string file_paths = 2; // قائمة مسارات الملفات المخصصة لهذا العامل
}
// 2. الرسالة التي يعيدها العامل (Worker) للقائد
message TaskResult {
// خريطة تربط اسم الملف بمجموع الـ TF للكلمات الموجودة فيه
map<string, double> doc_tf_scores = 1;
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment