Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Sign in
Toggle navigation
D
Distributed-Search-Engine
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
tammam.alsoleman
Distributed-Search-Engine
Commits
cab03e2e
Commit
cab03e2e
authored
Jan 21, 2026
by
tammam.alsoleman
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Clean the Code
parent
c170d470
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
35 additions
and
28 deletions
+35
-28
LeaderElection.java
...n/java/com/distributed/search/cluster/LeaderElection.java
+2
-1
OnElectionAction.java
...java/com/distributed/search/cluster/OnElectionAction.java
+0
-1
SearchClient.java
src/main/java/com/distributed/search/grpc/SearchClient.java
+13
-10
SearchServiceImpl.java
...n/java/com/distributed/search/grpc/SearchServiceImpl.java
+10
-4
TFIDFCalculator.java
...in/java/com/distributed/search/logic/TFIDFCalculator.java
+10
-12
No files found.
src/main/java/com/distributed/search/cluster/LeaderElection.java
View file @
cab03e2e
...
@@ -6,7 +6,8 @@ import java.util.Collections;
...
@@ -6,7 +6,8 @@ import java.util.Collections;
import
java.util.List
;
import
java.util.List
;
/**
/**
* Handles the Leader Election logic using Zookeeper's ephemeral sequential nodes.
* Implements the Fault-Tolerant Leader Election using Zookeeper.
* Uses Ephemeral Sequential nodes to avoid 'Herd Effect'.
*/
*/
public
class
LeaderElection
implements
Watcher
{
public
class
LeaderElection
implements
Watcher
{
private
static
final
String
ELECTION_NAMESPACE
=
"/election"
;
private
static
final
String
ELECTION_NAMESPACE
=
"/election"
;
...
...
src/main/java/com/distributed/search/cluster/OnElectionAction.java
View file @
cab03e2e
...
@@ -47,7 +47,6 @@ public class OnElectionAction implements OnElectionCallback {
...
@@ -47,7 +47,6 @@ public class OnElectionAction implements OnElectionCallback {
@Override
@Override
public
void
onWorker
()
{
public
void
onWorker
()
{
try
{
try
{
// نغير الترتيب: نشغل السيرفر أولاً
boolean
started
=
startGrpcServer
();
boolean
started
=
startGrpcServer
();
if
(
started
)
{
if
(
started
)
{
...
...
src/main/java/com/distributed/search/grpc/SearchClient.java
View file @
cab03e2e
package
com
.
distributed
.
search
.
grpc
;
package
com
.
distributed
.
search
.
grpc
;
import
java.net.InetSocketAddress
;
import
java.net.InetSocketAddress
;
import
com.distributed.search.model.*
;
import
com.distributed.search.model.*
;
import
io.grpc.ManagedChannel
;
import
io.grpc.ManagedChannel
;
// انتبه: هذا الـ Import المظلل هو الذي يحل مشكلة الـ NameResolver في الـ Fat JAR
import
io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
;
import
io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
;
import
java.util.*
;
import
java.util.*
;
/**
* Acts as the Coordinator Client that manages connections to Workers
* and aggregates search results.
*/
public
class
SearchClient
{
public
class
SearchClient
{
private
final
Map
<
String
,
SearchServiceGrpc
.
SearchServiceBlockingStub
>
stubs
=
new
HashMap
<>();
private
final
Map
<
String
,
SearchServiceGrpc
.
SearchServiceBlockingStub
>
stubs
=
new
HashMap
<>();
private
final
List
<
ManagedChannel
>
channels
=
new
ArrayList
<>();
private
final
List
<
ManagedChannel
>
channels
=
new
ArrayList
<>();
/**
* Updates the active worker list and establishes gRPC channels.
*/
public
void
updateWorkers
(
List
<
String
>
workerAddresses
)
{
public
void
updateWorkers
(
List
<
String
>
workerAddresses
)
{
// إغلاق القنوات القديمة
for
(
ManagedChannel
channel
:
channels
)
{
for
(
ManagedChannel
channel
:
channels
)
{
channel
.
shutdownNow
();
channel
.
shutdownNow
();
}
}
...
@@ -21,13 +25,10 @@ public class SearchClient {
...
@@ -21,13 +25,10 @@ public class SearchClient {
for
(
String
address
:
workerAddresses
)
{
for
(
String
address
:
workerAddresses
)
{
try
{
try
{
// تقسيم العنوان يدويًا
String
[]
parts
=
address
.
split
(
":"
);
String
[]
parts
=
address
.
split
(
":"
);
String
host
=
parts
[
0
];
String
host
=
parts
[
0
];
int
port
=
Integer
.
parseInt
(
parts
[
1
]);
int
port
=
Integer
.
parseInt
(
parts
[
1
]);
// استخدام NettyChannelBuilder.forAddress حصراً
// هذا يتجاوز نظام الـ NameResolver تماماً ويمنع خطأ الـ 'unix'
ManagedChannel
channel
=
NettyChannelBuilder
.
forAddress
(
new
InetSocketAddress
(
host
,
port
))
ManagedChannel
channel
=
NettyChannelBuilder
.
forAddress
(
new
InetSocketAddress
(
host
,
port
))
.
usePlaintext
()
.
usePlaintext
()
.
build
();
.
build
();
...
@@ -38,18 +39,20 @@ public class SearchClient {
...
@@ -38,18 +39,20 @@ public class SearchClient {
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
System
.
err
.
println
(
"Failed to connect to "
+
address
+
": "
+
e
.
getMessage
());
System
.
err
.
println
(
"Failed to connect to "
+
address
+
": "
+
e
.
getMessage
());
e
.
printStackTrace
();
// مهم: سيظهر مكان رمي الاستثناء بالضبط
e
.
printStackTrace
();
}
}
}
}
}
}
/**
* Orchestrates the 2-Phase Distributed Search.
*/
public
void
performSearch
(
List
<
String
>
terms
,
List
<
String
>
allFiles
)
{
public
void
performSearch
(
List
<
String
>
terms
,
List
<
String
>
allFiles
)
{
if
(
stubs
.
isEmpty
())
{
if
(
stubs
.
isEmpty
())
{
System
.
out
.
println
(
"No workers available."
);
System
.
out
.
println
(
"No workers available."
);
return
;
return
;
}
}
//
--- Phase 1: Global Stats ---
//
Phase 1: Aggregate Global Counts
Map
<
String
,
Integer
>
globalTermCounts
=
new
HashMap
<>();
Map
<
String
,
Integer
>
globalTermCounts
=
new
HashMap
<>();
int
filesPerWorker
=
(
int
)
Math
.
ceil
((
double
)
allFiles
.
size
()
/
stubs
.
size
());
int
filesPerWorker
=
(
int
)
Math
.
ceil
((
double
)
allFiles
.
size
()
/
stubs
.
size
());
int
currentFileIndex
=
0
;
int
currentFileIndex
=
0
;
...
...
src/main/java/com/distributed/search/grpc/SearchServiceImpl.java
View file @
cab03e2e
...
@@ -9,6 +9,10 @@ import java.util.HashMap;
...
@@ -9,6 +9,10 @@ import java.util.HashMap;
import
java.util.List
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map
;
/**
* Implements gRPC Search Service on the Worker nodes.
*/
public
class
SearchServiceImpl
extends
SearchServiceGrpc
.
SearchServiceImplBase
{
public
class
SearchServiceImpl
extends
SearchServiceGrpc
.
SearchServiceImplBase
{
private
final
String
sharedDirectoryPath
;
private
final
String
sharedDirectoryPath
;
...
@@ -16,7 +20,9 @@ public class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase {
...
@@ -16,7 +20,9 @@ public class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase {
public
SearchServiceImpl
(
String
sharedDirectoryPath
)
{
public
SearchServiceImpl
(
String
sharedDirectoryPath
)
{
this
.
sharedDirectoryPath
=
sharedDirectoryPath
;
this
.
sharedDirectoryPath
=
sharedDirectoryPath
;
}
}
/**
* Local Document Frequency Statistics.
*/
@Override
@Override
public
void
getDocumentStats
(
StatRequest
request
,
StreamObserver
<
StatResponse
>
responseObserver
)
{
public
void
getDocumentStats
(
StatRequest
request
,
StreamObserver
<
StatResponse
>
responseObserver
)
{
List
<
String
>
terms
=
request
.
getTermsList
();
List
<
String
>
terms
=
request
.
getTermsList
();
...
@@ -32,7 +38,6 @@ public class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase {
...
@@ -32,7 +38,6 @@ public class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase {
for
(
String
term
:
terms
)
{
for
(
String
term
:
terms
)
{
int
docsWithTerm
=
0
;
int
docsWithTerm
=
0
;
for
(
String
docName
:
assignedFiles
)
{
for
(
String
docName
:
assignedFiles
)
{
// التعديل هنا: نمرر الـ Path مباشرة كما اقترح الحل الجديد
Path
filePath
=
Paths
.
get
(
sharedDirectoryPath
,
docName
);
Path
filePath
=
Paths
.
get
(
sharedDirectoryPath
,
docName
);
List
<
String
>
words
=
TFIDFCalculator
.
getWordsFromDocument
(
filePath
);
List
<
String
>
words
=
TFIDFCalculator
.
getWordsFromDocument
(
filePath
);
...
@@ -50,7 +55,9 @@ public class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase {
...
@@ -50,7 +55,9 @@ public class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase {
responseObserver
.
onNext
(
response
);
responseObserver
.
onNext
(
response
);
responseObserver
.
onCompleted
();
responseObserver
.
onCompleted
();
}
}
/**
* Final Scoring using Global IDF received from Coordinator.
*/
@Override
@Override
public
void
getFinalScores
(
CalculationRequest
request
,
StreamObserver
<
SearchResponse
>
responseObserver
)
{
public
void
getFinalScores
(
CalculationRequest
request
,
StreamObserver
<
SearchResponse
>
responseObserver
)
{
List
<
String
>
terms
=
request
.
getTermsList
();
List
<
String
>
terms
=
request
.
getTermsList
();
...
@@ -67,7 +74,6 @@ public class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase {
...
@@ -67,7 +74,6 @@ public class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase {
for
(
String
docName
:
assignedFiles
)
{
for
(
String
docName
:
assignedFiles
)
{
double
docScore
=
0.0
;
double
docScore
=
0.0
;
// التعديل هنا أيضاً: نمرر الـ Path
Path
filePath
=
Paths
.
get
(
sharedDirectoryPath
,
docName
);
Path
filePath
=
Paths
.
get
(
sharedDirectoryPath
,
docName
);
List
<
String
>
words
=
TFIDFCalculator
.
getWordsFromDocument
(
filePath
);
List
<
String
>
words
=
TFIDFCalculator
.
getWordsFromDocument
(
filePath
);
...
...
src/main/java/com/distributed/search/logic/TFIDFCalculator.java
View file @
cab03e2e
...
@@ -8,28 +8,26 @@ import java.io.IOException;
...
@@ -8,28 +8,26 @@ import java.io.IOException;
import
java.util.*
;
import
java.util.*
;
import
java.util.stream.Collectors
;
import
java.util.stream.Collectors
;
/**
* Handles the mathematical part of the TF-IDF algorithm.
* Optimized for performance and cross-language support.
*/
public
class
TFIDFCalculator
{
public
class
TFIDFCalculator
{
/**
/**
* تقرأ محتوى الملف وتحوّله إلى قائمة كلمات بشكل آمن
* Reads a file and extracts words (tokens).
* - بدون regex خاطئ
* Uses ISO_8859_1 encoding to support various legacy text formats.
* - يدعم Unicode (عربي / إنجليزي)
* - لا يرمي Exceptions تكسر gRPC
*/
*/
public
static
List
<
String
>
getWordsFromDocument
(
Path
filePath
)
{
public
static
List
<
String
>
getWordsFromDocument
(
Path
filePath
)
{
try
{
try
{
// قراءة الملف كنص UTF-8
String
content
=
Files
.
readString
(
filePath
,
StandardCharsets
.
ISO_8859_1
);
String
content
=
Files
.
readString
(
filePath
,
StandardCharsets
.
ISO_8859_1
);
// أحرف صغيرة
content
=
content
.
toLowerCase
(
Locale
.
ROOT
);
content
=
content
.
toLowerCase
(
Locale
.
ROOT
);
// استبدال أي شيء ليس حرفًا أو رقمًا بمسافة
// Regex: keep only letters and numbers (Unicode supported)
// \p{L} = أي حرف (أي لغة)
// \p{N} = أي رقم
String
cleaned
=
content
.
replaceAll
(
"[^\\p{L}\\p{N}]+"
,
" "
);
String
cleaned
=
content
.
replaceAll
(
"[^\\p{L}\\p{N}]+"
,
" "
);
// تقسيم على المسافات
String
[]
tokens
=
cleaned
.
trim
().
split
(
"\\s+"
);
String
[]
tokens
=
cleaned
.
trim
().
split
(
"\\s+"
);
return
Arrays
.
stream
(
tokens
)
return
Arrays
.
stream
(
tokens
)
...
@@ -50,7 +48,7 @@ public class TFIDFCalculator {
...
@@ -50,7 +48,7 @@ public class TFIDFCalculator {
}
}
/**
/**
*
Term Frequency
*
Calculates Term Frequency (TF): (Count of term in doc) / (Total words in doc)
*/
*/
public
static
double
calculateTermFrequency
(
List
<
String
>
words
,
String
term
)
{
public
static
double
calculateTermFrequency
(
List
<
String
>
words
,
String
term
)
{
if
(
words
==
null
||
words
.
isEmpty
())
return
0.0
;
if
(
words
==
null
||
words
.
isEmpty
())
return
0.0
;
...
@@ -63,7 +61,7 @@ public class TFIDFCalculator {
...
@@ -63,7 +61,7 @@ public class TFIDFCalculator {
}
}
/**
/**
*
Inverse Document Frequency
*
Calculates Inverse Document Frequency (IDF): log10(Total Docs / Docs containing term)
*/
*/
public
static
double
calculateIdf
(
int
totalDocuments
,
int
documentsWithTerm
)
{
public
static
double
calculateIdf
(
int
totalDocuments
,
int
documentsWithTerm
)
{
if
(
documentsWithTerm
<=
0
||
totalDocuments
<=
0
)
return
0.0
;
if
(
documentsWithTerm
<=
0
||
totalDocuments
<=
0
)
return
0.0
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment