Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Sign in
Toggle navigation
D
Distributed_Search_System
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
areej.mohammad
Distributed_Search_System
Commits
cd1524e7
Commit
cd1524e7
authored
Jan 22, 2026
by
AreejMh57
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
finaal commit
parent
e84298cb
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
505 additions
and
128 deletions
+505
-128
Coordinator.java
src/main/java/com/distributed/search/Coordinator.java
+88
-61
FrontEndServer.java
src/main/java/com/distributed/search/FrontEndServer.java
+135
-0
LeaderElectionManager.java
...in/java/com/distributed/search/LeaderElectionManager.java
+42
-0
SearchApplication.java
src/main/java/com/distributed/search/SearchApplication.java
+68
-35
SearchEngine.java
src/main/java/com/distributed/search/SearchEngine.java
+33
-6
ServiceRegistry.java
src/main/java/com/distributed/search/ServiceRegistry.java
+41
-20
WorkerServer.java
src/main/java/com/distributed/search/WorkerServer.java
+92
-0
search.proto
src/main/proto/search.proto
+6
-6
No files found.
src/main/java/com/distributed/search/Coordinator.java
View file @
cd1524e7
...
@@ -2,7 +2,8 @@ package com.distributed.search;
...
@@ -2,7 +2,8 @@ package com.distributed.search;
import
com.distributed.search.model.SearchTask
;
import
com.distributed.search.model.SearchTask
;
import
com.distributed.search.model.TaskResult
;
import
com.distributed.search.model.TaskResult
;
import
java.io.IOException
;
import
java.io.*
;
import
java.net.ServerSocket
;
import
java.net.Socket
;
import
java.net.Socket
;
import
java.nio.file.Files
;
import
java.nio.file.Files
;
import
java.nio.file.Path
;
import
java.nio.file.Path
;
...
@@ -14,69 +15,111 @@ import java.util.stream.Stream;
...
@@ -14,69 +15,111 @@ import java.util.stream.Stream;
public
class
Coordinator
{
public
class
Coordinator
{
private
final
ServiceRegistry
registry
;
private
final
ServiceRegistry
registry
;
private
final
String
dataDirectory
;
private
final
String
dataDirectory
;
private
final
int
coordinatorPort
;
// منفذ خاص لاستقبال طلبات الـ Front-End
public
Coordinator
(
ServiceRegistry
registry
,
String
dataDirectory
)
{
public
Coordinator
(
ServiceRegistry
registry
,
String
dataDirectory
,
int
coordinatorPort
)
{
this
.
registry
=
registry
;
this
.
registry
=
registry
;
this
.
dataDirectory
=
dataDirectory
;
this
.
dataDirectory
=
dataDirectory
;
this
.
coordinatorPort
=
coordinatorPort
;
}
}
public
void
start
()
throws
Exception
{
public
void
start
()
throws
Exception
{
Scanner
scanner
=
new
Scanner
(
System
.
in
);
// 1. تسجيل عنوان القائد في ZooKeeper لكي يراه الـ Front-End
while
(
true
)
{
registry
.
registerLeader
(
coordinatorPort
);
System
.
out
.
println
(
"\n[Coordinator] Enter search term (or 'exit'):"
);
if
(!
scanner
.
hasNextLine
())
break
;
String
query
=
scanner
.
nextLine
();
if
(
query
.
equalsIgnoreCase
(
"exit"
))
break
;
List
<
String
>
workers
=
registry
.
getActiveWorkers
();
List
<
String
>
allFiles
=
scanFiles
();
if
(
workers
.
isEmpty
())
{
System
.
out
.
println
(
"[Coordinator] No workers available!"
);
continue
;
}
if
(
allFiles
.
isEmpty
())
{
System
.
out
.
println
(
"[Coordinator] No files found in directory: "
+
dataDirectory
);
continue
;
}
Map
<
String
,
Double
>
globalResults
=
new
HashMap
<>();
int
filesPerWorker
=
(
int
)
Math
.
ceil
((
double
)
allFiles
.
size
()
/
workers
.
size
());
for
(
int
i
=
0
;
i
<
workers
.
size
();
i
++)
{
System
.
out
.
println
(
"[Coordinator] I am the leader. Listening for Front-End on port: "
+
coordinatorPort
);
int
start
=
i
*
filesPerWorker
;
int
end
=
Math
.
min
(
start
+
filesPerWorker
,
allFiles
.
size
());
if
(
start
>=
end
)
break
;
List
<
String
>
workerFiles
=
allFiles
.
subList
(
start
,
end
);
// 2. فتح سيرفر للاستماع لطلبات الـ Front-End
try
(
ServerSocket
serverSocket
=
new
ServerSocket
(
coordinatorPort
))
{
while
(
true
)
{
try
(
Socket
frontEndSocket
=
serverSocket
.
accept
();
BufferedReader
reader
=
new
BufferedReader
(
new
InputStreamReader
(
frontEndSocket
.
getInputStream
()));
PrintWriter
writer
=
new
PrintWriter
(
frontEndSocket
.
getOutputStream
(),
true
))
{
// Safe parsing of worker info from ZooKeeper
// استقبال كلمة البحث من الـ Front-End
String
nodeName
=
workers
.
get
(
i
);
String
query
=
reader
.
readLine
();
String
nodeData
=
nodeName
.
replace
(
"worker_"
,
""
);
if
(
query
!=
null
&&
!
query
.
isEmpty
())
{
System
.
out
.
println
(
"[Coordinator] Received web query: "
+
query
);
if
(!
nodeData
.
contains
(
":"
))
{
System
.
err
.
println
(
"[Coordinator] Skipping invalid worker node: "
+
nodeName
);
continue
;
}
try
{
// تنفيذ البحث الموزع والحصول على النتائج كـ String
String
[]
addrParts
=
nodeData
.
split
(
":"
);
String
results
=
performSearch
(
query
);
String
host
=
addrParts
[
0
];
int
port
=
Integer
.
parseInt
(
addrParts
[
1
]);
globalResults
.
putAll
(
sendTask
(
host
,
port
,
query
,
workerFiles
));
// إرسال النتائج النهائية للـ Front-End ليتم عرضها في المتصفح
writer
.
println
(
results
);
}
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
System
.
err
.
println
(
"[Coordinator]
Failed to parse worker address: "
+
nodeData
);
System
.
err
.
println
(
"[Coordinator]
Error handling Front-End request: "
+
e
.
getMessage
()
);
}
}
}
}
printRankedResults
(
globalResults
);
}
}
}
}
// قمنا بتعديل الدالة لتعيد String يحتوي على النتائج بدلاً من الطباعة فقط
public
String
performSearch
(
String
query
)
throws
Exception
{
List
<
String
>
allWorkers
=
registry
.
getActiveWorkers
();
int
myPort
=
WorkerServer
.
getLocalBoundPort
();
List
<
String
>
externalWorkers
=
allWorkers
.
stream
()
.
filter
(
worker
->
!
worker
.
contains
(
":"
+
myPort
))
.
collect
(
Collectors
.
toList
());
List
<
String
>
allFiles
=
scanFiles
();
if
(
externalWorkers
.
isEmpty
())
return
"Error: No external workers available."
;
if
(
allFiles
.
isEmpty
())
return
"Error: No files in directory."
;
Map
<
String
,
Double
>
globalTfResults
=
new
HashMap
<>();
int
filesPerWorker
=
(
int
)
Math
.
ceil
((
double
)
allFiles
.
size
()
/
externalWorkers
.
size
());
for
(
int
i
=
0
;
i
<
externalWorkers
.
size
();
i
++)
{
int
start
=
i
*
filesPerWorker
;
int
end
=
Math
.
min
(
start
+
filesPerWorker
,
allFiles
.
size
());
if
(
start
>=
end
)
break
;
List
<
String
>
workerFiles
=
allFiles
.
subList
(
start
,
end
);
String
nodeData
=
externalWorkers
.
get
(
i
).
replace
(
"worker_"
,
""
);
String
[]
addr
=
nodeData
.
split
(
":"
);
globalTfResults
.
putAll
(
sendTask
(
addr
[
0
],
Integer
.
parseInt
(
addr
[
1
]),
query
,
workerFiles
));
}
Map
<
String
,
Double
>
finalTfIdfResults
=
calculateFinalScores
(
globalTfResults
,
allFiles
.
size
());
// تحويل النتائج المرتبة إلى نص لإرساله للـ Front-End
return
formatResults
(
finalTfIdfResults
);
}
private
String
formatResults
(
Map
<
String
,
Double
>
results
)
{
StringBuilder
sb
=
new
StringBuilder
();
List
<
Map
.
Entry
<
String
,
Double
>>
sorted
=
results
.
entrySet
().
stream
()
.
filter
(
e
->
e
.
getValue
()
>
0
)
.
sorted
(
Map
.
Entry
.<
String
,
Double
>
comparingByValue
().
reversed
())
.
limit
(
10
)
.
collect
(
Collectors
.
toList
());
if
(
sorted
.
isEmpty
())
return
"No results found for this term."
;
for
(
Map
.
Entry
<
String
,
Double
>
entry
:
sorted
)
{
sb
.
append
(
String
.
format
(
"File: %s | Score: %.6f\n"
,
entry
.
getKey
(),
entry
.
getValue
()));
}
return
sb
.
toString
();
}
private
Map
<
String
,
Double
>
calculateFinalScores
(
Map
<
String
,
Double
>
tfResults
,
int
totalDocsCount
)
{
Map
<
String
,
Double
>
tfIdfResults
=
new
HashMap
<>();
long
docsWithTermCount
=
tfResults
.
values
().
stream
().
filter
(
score
->
score
>
0
).
count
();
if
(
docsWithTermCount
==
0
)
return
tfIdfResults
;
double
idf
=
Math
.
log10
((
double
)
totalDocsCount
/
docsWithTermCount
);
for
(
Map
.
Entry
<
String
,
Double
>
entry
:
tfResults
.
entrySet
())
{
tfIdfResults
.
put
(
entry
.
getKey
(),
entry
.
getValue
()
*
idf
);
}
return
tfIdfResults
;
}
private
List
<
String
>
scanFiles
()
throws
IOException
{
private
List
<
String
>
scanFiles
()
throws
IOException
{
Path
path
=
Paths
.
get
(
dataDirectory
);
Path
path
=
Paths
.
get
(
dataDirectory
);
if
(!
Files
.
exists
(
path
))
return
Collections
.
emptyList
();
if
(!
Files
.
exists
(
path
))
return
Collections
.
emptyList
();
try
(
Stream
<
Path
>
stream
=
Files
.
list
(
path
))
{
try
(
Stream
<
Path
>
stream
=
Files
.
list
(
path
))
{
return
stream
.
filter
(
Files:
:
isRegularFile
)
return
stream
.
filter
(
Files:
:
isRegularFile
)
.
map
(
p
->
p
.
getFileName
().
toString
())
.
map
(
p
->
p
.
getFileName
().
toString
())
...
@@ -87,29 +130,13 @@ public class Coordinator {
...
@@ -87,29 +130,13 @@ public class Coordinator {
private
Map
<
String
,
Double
>
sendTask
(
String
host
,
int
port
,
String
query
,
List
<
String
>
files
)
{
private
Map
<
String
,
Double
>
sendTask
(
String
host
,
int
port
,
String
query
,
List
<
String
>
files
)
{
try
(
Socket
socket
=
new
Socket
(
host
,
port
))
{
try
(
Socket
socket
=
new
Socket
(
host
,
port
))
{
socket
.
setSoTimeout
(
5000
);
// 5 seconds timeout
socket
.
setSoTimeout
(
5000
);
SearchTask
.
newBuilder
().
setQuery
(
query
).
addAllFilePaths
(
files
).
build
()
SearchTask
.
newBuilder
().
setQuery
(
query
).
addAllFilePaths
(
files
).
build
()
.
writeDelimitedTo
(
socket
.
getOutputStream
());
.
writeDelimitedTo
(
socket
.
getOutputStream
());
TaskResult
result
=
TaskResult
.
parseDelimitedFrom
(
socket
.
getInputStream
());
TaskResult
result
=
TaskResult
.
parseDelimitedFrom
(
socket
.
getInputStream
());
return
result
!=
null
?
result
.
getDocTfScoresMap
()
:
Collections
.
emptyMap
();
return
result
!=
null
?
result
.
getDocTfScoresMap
()
:
Collections
.
emptyMap
();
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
System
.
err
.
println
(
"[Coordinator] Error communicating with worker "
+
host
+
":"
+
port
);
return
Collections
.
emptyMap
();
return
Collections
.
emptyMap
();
}
}
}
}
private
void
printRankedResults
(
Map
<
String
,
Double
>
results
)
{
System
.
out
.
println
(
"\n--- Final Ranked Search Results ---"
);
List
<
Map
.
Entry
<
String
,
Double
>>
ranked
=
results
.
entrySet
().
stream
()
.
filter
(
e
->
e
.
getValue
()
>
0
)
.
sorted
(
Map
.
Entry
.<
String
,
Double
>
comparingByValue
().
reversed
())
.
collect
(
Collectors
.
toList
());
if
(
ranked
.
isEmpty
())
{
System
.
out
.
println
(
"No matching results found."
);
}
else
{
ranked
.
forEach
(
e
->
System
.
out
.
printf
(
"File: %-12s | Score: %.4f%n"
,
e
.
getKey
(),
e
.
getValue
()));
}
}
}
}
\ No newline at end of file
src/main/java/com/distributed/search/FrontEndServer.java
0 → 100644
View file @
cd1524e7
package
com
.
distributed
.
search
;
import
com.sun.net.httpserver.HttpServer
;
import
com.sun.net.httpserver.HttpHandler
;
import
com.sun.net.httpserver.HttpExchange
;
import
java.io.*
;
import
java.net.InetSocketAddress
;
import
java.net.Socket
;
import
java.nio.charset.StandardCharsets
;
public
class
FrontEndServer
{
private
final
int
port
;
private
final
ServiceRegistry
registry
;
// نمط CSS مشترك لجعل الواجهة تبدو احترافية
private
static
final
String
CSS
=
"<style>"
+
"body { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; background-color: #f8f9fa; display: flex; flex-direction: column; align-items: center; padding-top: 50px; color: #333; }"
+
".container { background: white; padding: 30px; border-radius: 12px; box-shadow: 0 4px 6px rgba(0,0,0,0.1); width: 80%; max-width: 600px; text-align: center; }"
+
"h1 { color: #4285F4; margin-bottom: 30px; font-size: 2.5em; }"
+
"input[type='text'] { width: 70%; padding: 12px 20px; margin: 8px 0; border: 1px solid #dfe1e5; border-radius: 24px; outline: none; font-size: 16px; transition: 0.3s; }"
+
"input[type='text']:focus { box-shadow: 0 1px 6px rgba(32,33,36,0.28); border-color: rgba(223,225,229,0); }"
+
"button { background-color: #f8f9fa; border: 1px solid #f8f9fa; border-radius: 4px; color: #3c4043; font-size: 14px; margin: 11px 4px; padding: 10px 24px; cursor: pointer; transition: 0.2s; }"
+
"button:hover { border: 1px solid #dadce0; box-shadow: 0 1px 1px rgba(0,0,0,0.1); color: #202124; }"
+
".results-container { text-align: left; width: 80%; max-width: 800px; margin-top: 20px; }"
+
".result-item { background: white; padding: 15px; border-radius: 8px; margin-bottom: 10px; border-left: 5px solid #4285F4; box-shadow: 0 2px 4px rgba(0,0,0,0.05); }"
+
".file-name { font-weight: bold; color: #1a0dab; font-size: 18px; text-decoration: none; }"
+
".score { color: #006621; font-size: 14px; margin-top: 5px; }"
+
".no-results { color: #70757a; font-style: italic; }"
+
"a.back-link { margin-top: 20px; text-decoration: none; color: #4285F4; font-weight: bold; }"
+
"</style>"
;
public
FrontEndServer
(
int
port
,
ServiceRegistry
registry
)
{
this
.
port
=
port
;
this
.
registry
=
registry
;
}
public
void
start
()
throws
IOException
{
HttpServer
server
=
HttpServer
.
create
(
new
InetSocketAddress
(
port
),
0
);
server
.
createContext
(
"/search"
,
new
SearchHandler
());
server
.
createContext
(
"/"
,
new
HomeHandler
());
server
.
setExecutor
(
null
);
System
.
out
.
println
(
"[FrontEnd] Modern Web interface ready at http://localhost:"
+
port
+
"/"
);
server
.
start
();
}
private
class
HomeHandler
implements
HttpHandler
{
@Override
public
void
handle
(
HttpExchange
exchange
)
throws
IOException
{
String
html
=
"<html><head><title>Distributed Search</title>"
+
CSS
+
"</head>"
+
"<body>"
+
"<div class='container'>"
+
"<h1>🔍 Distributed Search</h1>"
+
"<form action='/search' method='get'>"
+
"<input type='text' name='query' placeholder='What are you looking for?' required autofocus>"
+
"<br><button type='submit'>Cluster Search</button>"
+
"</form>"
+
"</div>"
+
"</body></html>"
;
sendResponse
(
exchange
,
html
);
}
}
private
class
SearchHandler
implements
HttpHandler
{
@Override
public
void
handle
(
HttpExchange
exchange
)
throws
IOException
{
String
uri
=
exchange
.
getRequestURI
().
toString
();
String
query
=
""
;
if
(
uri
.
contains
(
"query="
))
{
query
=
uri
.
substring
(
uri
.
indexOf
(
"query="
)
+
6
);
query
=
java
.
net
.
URLDecoder
.
decode
(
query
,
StandardCharsets
.
UTF_8
);
}
String
rawResults
;
try
{
String
leaderAddress
=
registry
.
getLeaderAddress
();
rawResults
=
(
leaderAddress
==
null
)
?
"Error: No Leader found."
:
contactLeader
(
leaderAddress
,
query
);
}
catch
(
Exception
e
)
{
rawResults
=
"System Error: "
+
e
.
getMessage
();
}
// تحويل النتائج الخام إلى بطاقات HTML
StringBuilder
resultsHtml
=
new
StringBuilder
();
if
(
rawResults
.
trim
().
isEmpty
()
||
rawResults
.
contains
(
"No results"
))
{
resultsHtml
.
append
(
"<p class='no-results'>No documents matched your query.</p>"
);
}
else
{
for
(
String
line
:
rawResults
.
split
(
"\n"
))
{
if
(
line
.
contains
(
"|"
))
{
String
[]
parts
=
line
.
split
(
"\\|"
);
resultsHtml
.
append
(
"<div class='result-item'>"
)
.
append
(
"<div class='file-name'>📄 "
).
append
(
parts
[
0
].
replace
(
"File:"
,
""
).
trim
()).
append
(
"</div>"
)
.
append
(
"<div class='score'>"
).
append
(
parts
[
1
].
trim
()).
append
(
"</div>"
)
.
append
(
"</div>"
);
}
}
}
String
finalHtml
=
"<html><head><title>Search Results</title>"
+
CSS
+
"</head>"
+
"<body>"
+
"<div class='results-container'>"
+
"<h2>Results for: <span style='color:#4285F4'>"
+
query
+
"</span></h2>"
+
resultsHtml
.
toString
()
+
"<br><a href='/' class='back-link'>← Back to Search</a>"
+
"</div>"
+
"</body></html>"
;
sendResponse
(
exchange
,
finalHtml
);
}
}
private
String
contactLeader
(
String
leaderAddr
,
String
query
)
{
String
[]
parts
=
leaderAddr
.
split
(
":"
);
try
(
Socket
socket
=
new
Socket
(
parts
[
0
],
Integer
.
parseInt
(
parts
[
1
]));
PrintWriter
writer
=
new
PrintWriter
(
socket
.
getOutputStream
(),
true
);
BufferedReader
reader
=
new
BufferedReader
(
new
InputStreamReader
(
socket
.
getInputStream
())))
{
writer
.
println
(
query
);
StringBuilder
response
=
new
StringBuilder
();
String
line
;
while
((
line
=
reader
.
readLine
())
!=
null
)
response
.
append
(
line
).
append
(
"\n"
);
return
response
.
toString
();
}
catch
(
IOException
e
)
{
return
"Failed to connect to Leader."
;
}
}
private
void
sendResponse
(
HttpExchange
exchange
,
String
response
)
throws
IOException
{
byte
[]
bytes
=
response
.
getBytes
(
StandardCharsets
.
UTF_8
);
exchange
.
getResponseHeaders
().
set
(
"Content-Type"
,
"text/html; charset=UTF-8"
);
exchange
.
sendResponseHeaders
(
200
,
bytes
.
length
);
try
(
OutputStream
os
=
exchange
.
getResponseBody
())
{
os
.
write
(
bytes
);
}
}
}
\ No newline at end of file
src/main/java/com/distributed/search/LeaderElectionManager.java
0 → 100644
View file @
cd1524e7
package
com
.
distributed
.
search
;
import
org.apache.curator.framework.CuratorFramework
;
import
org.apache.curator.framework.recipes.leader.LeaderSelector
;
import
org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter
;
public
class
LeaderElectionManager
{
private
static
final
String
ELECTION_PATH
=
"/election"
;
private
final
LeaderSelector
leaderSelector
;
public
LeaderElectionManager
(
CuratorFramework
client
,
Runnable
onLeadershipGained
)
{
this
.
leaderSelector
=
new
LeaderSelector
(
client
,
ELECTION_PATH
,
new
LeaderSelectorListenerAdapter
()
{
@Override
public
void
takeLeadership
(
CuratorFramework
client
)
throws
Exception
{
System
.
out
.
println
(
"[Election] Leadership gained! Starting Coordinator..."
);
// التعديل الجوهري: تشغيل القائد في خيط جديد
// لكي لا يتوقف خيط Curator عند سطر الـ Scanner في المنسق
Thread
leaderThread
=
new
Thread
(
onLeadershipGained
);
leaderThread
.
setName
(
"Leader-Coordinator-Thread"
);
leaderThread
.
start
();
try
{
// مراقبة خيط القائد. طالما هو يعمل، نحن القادة.
while
(
leaderThread
.
isAlive
()
&&
!
Thread
.
currentThread
().
isInterrupted
())
{
Thread
.
sleep
(
2000
);
// تفقد الحالة كل ثانيتين
}
}
catch
(
InterruptedException
e
)
{
System
.
out
.
println
(
"[Election] Leadership interrupted."
);
}
finally
{
System
.
out
.
println
(
"[Election] Relinquishing leadership."
);
}
}
});
this
.
leaderSelector
.
autoRequeue
();
}
public
void
start
()
{
System
.
out
.
println
(
"[Election] Connecting to remote ZooKeeper and joining election..."
);
leaderSelector
.
start
();
}
}
\ No newline at end of file
src/main/java/com/distributed/search/SearchApplication.java
View file @
cd1524e7
package
com
.
distributed
.
search
;
package
com
.
distributed
.
search
;
import
com.distributed.search.model.SearchTask
;
import
com.distributed.search.model.TaskResult
;
import
org.apache.curator.framework.CuratorFramework
;
import
org.apache.curator.framework.CuratorFramework
;
import
org.apache.curator.framework.CuratorFrameworkFactory
;
import
org.apache.curator.framework.CuratorFrameworkFactory
;
import
org.apache.curator.retry.ExponentialBackoffRetry
;
import
org.apache.curator.retry.ExponentialBackoffRetry
;
import
java.net.ServerSocket
;
import
java.net.Socket
;
import
java.util.Map
;
/**
* النسخة النهائية المدمجة مع واجهة FrontEnd ونظام التنسيق الموزع
*/
public
class
SearchApplication
{
public
class
SearchApplication
{
// عنوان سيرفر ZooKeeper في المعهد
private
static
final
String
ZK_ADDRESS
=
"172.29.3.101:2181"
;
private
static
final
String
ZK_ADDRESS
=
"172.29.3.101:2181"
;
// المجلد المحلي للبيانات
private
static
final
String
DATA_DIR
=
"D:\\search_data"
;
private
static
final
String
DATA_DIR
=
"D:\\search_data"
;
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// المنافذ الثابتة للقائد
CuratorFramework
client
=
CuratorFrameworkFactory
.
newClient
(
ZK_ADDRESS
,
new
ExponentialBackoffRetry
(
1000
,
3
));
private
static
final
int
FRONT_END_PORT
=
8080
;
// للمتصفح
client
.
start
();
private
static
final
int
COORDINATOR_PORT
=
9090
;
// للتواصل بين الفرونت والقائد
ServiceRegistry
registry
=
new
ServiceRegistry
(
client
);
public
static
void
main
(
String
[]
args
)
{
SearchEngine
engine
=
new
SearchEngine
(
DATA_DIR
);
try
{
System
.
out
.
println
(
"[System] Initializing connection to remote ZooKeeper: "
+
ZK_ADDRESS
);
// Leader Election Setup
// 1.create client
registry
.
startLeaderElection
(()
->
{
CuratorFramework
client
=
CuratorFrameworkFactory
.
builder
()
System
.
out
.
println
(
"I am now the Leader (COORDINATOR)"
);
.
connectString
(
ZK_ADDRESS
)
new
Thread
(()
->
{
.
retryPolicy
(
new
ExponentialBackoffRetry
(
1000
,
3
))
.
sessionTimeoutMs
(
15000
)
.
connectionTimeoutMs
(
10000
)
.
build
();
client
.
start
();
client
.
blockUntilConnected
();
System
.
out
.
println
(
"[System] Successfully connected to ZooKeeper."
);
//2.Incial registery
ServiceRegistry
registry
=
new
ServiceRegistry
(
client
);
SearchEngine
engine
=
new
SearchEngine
(
DATA_DIR
);
// 3. (WorkerServer) for each node
WorkerServer
workerServer
=
new
WorkerServer
(
engine
);
int
myPort
=
workerServer
.
getPort
();
// 4.register worker |(active)
registry
.
registerWorker
(
myPort
);
// 5. start leader election
LeaderElectionManager
electionManager
=
new
LeaderElectionManager
(
client
,
()
->
{
try
{
try
{
new
Coordinator
(
registry
,
DATA_DIR
).
start
();
System
.
out
.
println
(
"\n[Leader] *** I am the new Leader of the cluster! ***"
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}).
start
();
// turn coordinator
});
Coordinator
coordinator
=
new
Coordinator
(
registry
,
DATA_DIR
,
COORDINATOR_PORT
);
// Worker Server Setup
//turn frontEndServer on browser
ServerSocket
serverSocket
=
new
ServerSocket
(
0
);
FrontEndServer
frontEnd
=
new
FrontEndServer
(
FRONT_END_PORT
,
registry
);
int
port
=
serverSocket
.
getLocalPort
();
frontEnd
.
start
();
new
Thread
(()
->
{
System
.
out
.
println
(
"[Worker] Listening on port: "
+
port
);
System
.
out
.
println
(
"[Leader] FrontEnd is live at http://localhost:"
+
FRONT_END_PORT
);
//
coordinator
.
start
();
}
catch
(
Exception
e
)
{
System
.
err
.
println
(
"[Critical] Leader initialization failed: "
+
e
.
getMessage
());
e
.
printStackTrace
();
}
});
System
.
out
.
println
(
"[System] Entering Leader Election queue..."
);
electionManager
.
start
();
// 6. إبقاء العقدة تعمل لمراقبة حالة المجموعة
System
.
out
.
println
(
"[System] Node is active and waiting. Monitoring cluster status..."
);
while
(
true
)
{
while
(
true
)
{
try
(
Socket
socket
=
serverSocket
.
accept
())
{
Thread
.
sleep
(
10000
);
SearchTask
task
=
SearchTask
.
parseDelimitedFrom
(
socket
.
getInputStream
());
if
(
task
!=
null
)
{
Map
<
String
,
Double
>
tf
=
engine
.
calculateTFForFiles
(
task
.
getFilePathsList
(),
task
.
getQuery
());
TaskResult
.
newBuilder
().
putAllDocTfScores
(
tf
).
build
().
writeDelimitedTo
(
socket
.
getOutputStream
());
}
}
catch
(
Exception
e
)
{
}
}
}
}).
start
();
// Node Registration
}
catch
(
Exception
e
)
{
registry
.
registerWorker
(
port
);
System
.
err
.
println
(
"[System] Application Error: "
+
e
.
getMessage
());
Thread
.
sleep
(
Long
.
MAX_VALUE
);
e
.
printStackTrace
();
System
.
exit
(
1
);
}
}
}
}
}
\ No newline at end of file
src/main/java/com/distributed/search/SearchEngine.java
View file @
cd1524e7
...
@@ -12,19 +12,46 @@ public class SearchEngine {
...
@@ -12,19 +12,46 @@ public class SearchEngine {
this
.
dataDirectory
=
dataDirectory
;
this
.
dataDirectory
=
dataDirectory
;
}
}
public
Map
<
String
,
Double
>
calculateTFForFiles
(
List
<
String
>
fileNames
,
String
term
)
{
public
Map
<
String
,
Double
>
calculateTFForFiles
(
List
<
String
>
fileNames
,
String
query
)
{
Map
<
String
,
Double
>
results
=
new
HashMap
<>();
Map
<
String
,
Double
>
results
=
new
HashMap
<>();
if
(
query
==
null
||
query
.
isEmpty
())
return
results
;
// 1. تقسيم جملة البحث إلى كلمات (Tokens)
String
[]
queryWords
=
query
.
toLowerCase
().
trim
().
split
(
"\\s+"
);
for
(
String
fileName
:
fileNames
)
{
for
(
String
fileName
:
fileNames
)
{
try
{
try
{
Path
path
=
Paths
.
get
(
dataDirectory
,
fileName
);
Path
path
=
Paths
.
get
(
dataDirectory
,
fileName
);
if
(!
Files
.
exists
(
path
))
continue
;
if
(!
Files
.
exists
(
path
))
continue
;
String
content
=
Files
.
readString
(
path
).
toLowerCase
();
String
content
=
Files
.
readString
(
path
).
toLowerCase
();
String
[]
words
=
content
.
split
(
"\\W+"
);
long
count
=
Arrays
.
stream
(
words
).
filter
(
w
->
w
.
equals
(
term
.
toLowerCase
())).
count
();
// 2. تقسيم محتوى الملف مع دعم الحروف العربية والإنجليزية والرموز
double
tf
=
(
words
.
length
>
0
)
?
(
double
)
count
/
words
.
length
:
0
;
String
[]
allWords
=
content
.
split
(
"[\\s\\p{Punct}]+"
);
results
.
put
(
fileName
,
tf
);
if
(
allWords
.
length
==
0
)
{
results
.
put
(
fileName
,
0.0
);
continue
;
}
// 3. حساب التكرار التراكمي لجميع كلمات الاستعلام
double
cumulativeTf
=
0
;
for
(
String
qWord
:
queryWords
)
{
long
count
=
Arrays
.
stream
(
allWords
)
.
filter
(
word
->
word
.
equals
(
qWord
))
.
count
();
// إضافة TF الكلمة الحالية إلى المجموع
cumulativeTf
+=
(
double
)
count
/
allWords
.
length
;
}
// نضع النتيجة فقط إذا كان هناك تطابق لواحدة على الأقل من الكلمات
if
(
cumulativeTf
>
0
)
{
results
.
put
(
fileName
,
cumulativeTf
);
}
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
System
.
err
.
println
(
"
Error reading file: "
+
fileName
);
System
.
err
.
println
(
"
[SearchEngine] Error processing "
+
fileName
+
": "
+
e
.
getMessage
()
);
}
}
}
}
return
results
;
return
results
;
...
...
src/main/java/com/distributed/search/ServiceRegistry.java
View file @
cd1524e7
package
com
.
distributed
.
search
;
package
com
.
distributed
.
search
;
import
org.apache.curator.framework.CuratorFramework
;
import
org.apache.curator.framework.CuratorFramework
;
import
org.apache.curator.framework.recipes.leader.LeaderSelector
;
import
org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter
;
import
org.apache.zookeeper.CreateMode
;
import
org.apache.zookeeper.CreateMode
;
import
java.net.InetAddress
;
import
java.net.InetAddress
;
import
java.util.List
;
import
java.util.List
;
public
class
ServiceRegistry
{
public
class
ServiceRegistry
{
private
static
final
String
ELECTION_PATH
=
"/election"
;
private
static
final
String
WORKERS_REGISTRY_PATH
=
"/workers"
;
private
static
final
String
WORKERS_REGISTRY_PATH
=
"/workers"
;
private
static
final
String
LEADER_REGISTRY_PATH
=
"/leader"
;
private
final
CuratorFramework
client
;
private
final
CuratorFramework
client
;
public
ServiceRegistry
(
CuratorFramework
client
)
{
public
ServiceRegistry
(
CuratorFramework
client
)
{
this
.
client
=
client
;
this
.
client
=
client
;
}
}
// Leader Election Logic
// register worker active
public
void
startLeaderElection
(
Runnable
onLeadershipGained
)
{
LeaderSelector
leaderSelector
=
new
LeaderSelector
(
client
,
ELECTION_PATH
,
new
LeaderSelectorListenerAdapter
()
{
@Override
public
void
takeLeadership
(
CuratorFramework
client
)
throws
Exception
{
onLeadershipGained
.
run
();
while
(!
Thread
.
currentThread
().
isInterrupted
())
{
Thread
.
sleep
(
Long
.
MAX_VALUE
);
}
}
});
leaderSelector
.
autoRequeue
();
leaderSelector
.
start
();
}
// Service Registration Logic
public
void
registerWorker
(
int
port
)
throws
Exception
{
public
void
registerWorker
(
int
port
)
throws
Exception
{
if
(
client
.
checkExists
().
forPath
(
WORKERS_REGISTRY_PATH
)
==
null
)
{
if
(
client
.
checkExists
().
forPath
(
WORKERS_REGISTRY_PATH
)
==
null
)
{
client
.
create
().
creatingParentsIfNeeded
().
forPath
(
WORKERS_REGISTRY_PATH
);
client
.
create
().
creatingParentsIfNeeded
().
forPath
(
WORKERS_REGISTRY_PATH
);
}
}
String
ipAddress
=
InetAddress
.
getLocalHost
().
getHostAddress
();
String
ipAddress
=
InetAddress
.
getLocalHost
().
getHostAddress
();
String
workerPath
=
WORKERS_REGISTRY_PATH
+
"/worker_"
+
ipAddress
+
":"
+
port
;
String
workerPath
=
WORKERS_REGISTRY_PATH
+
"/worker_"
+
ipAddress
+
":"
+
port
;
//node (EPHEMERAL)
if
(
client
.
checkExists
().
forPath
(
workerPath
)
!=
null
)
{
client
.
delete
().
forPath
(
workerPath
);
}
client
.
create
().
withMode
(
CreateMode
.
EPHEMERAL
).
forPath
(
workerPath
);
client
.
create
().
withMode
(
CreateMode
.
EPHEMERAL
).
forPath
(
workerPath
);
System
.
out
.
println
(
"[Registry] Worker registered at: "
+
ipAddress
+
":"
+
port
);
System
.
out
.
println
(
"[Registry] Worker registered at: "
+
ipAddress
+
":"
+
port
);
}
}
// Service Discovery Logic
// --- الوظائف الجديدة ---
/**
* تسجيل القائد الحالي (يستدعيها الـ Coordinator فقط بعد الفوز بالانتخاب)
*/
public
void
registerLeader
(
int
port
)
throws
Exception
{
String
ipAddress
=
InetAddress
.
getLocalHost
().
getHostAddress
();
String
leaderAddress
=
ipAddress
+
":"
+
port
;
// إذا كان هناك قائد قديم مسجل، نحذفه (لضمان التحديث)
if
(
client
.
checkExists
().
forPath
(
LEADER_REGISTRY_PATH
)
!=
null
)
{
client
.
delete
().
forPath
(
LEADER_REGISTRY_PATH
);
}
// إنشاء عقدة القائد كعقدة مؤقتة (EPHEMERAL)
client
.
create
().
withMode
(
CreateMode
.
EPHEMERAL
).
forPath
(
LEADER_REGISTRY_PATH
,
leaderAddress
.
getBytes
());
System
.
out
.
println
(
"[Registry] Leader registered globally at: "
+
leaderAddress
);
}
/**
* جلب عنوان القائد (يستخدمها الـ Front-End ليعرف لمن يرسل الـ Query)
*/
public
String
getLeaderAddress
()
throws
Exception
{
if
(
client
.
checkExists
().
forPath
(
LEADER_REGISTRY_PATH
)
!=
null
)
{
byte
[]
data
=
client
.
getData
().
forPath
(
LEADER_REGISTRY_PATH
);
return
new
String
(
data
);
}
return
null
;
// لا يوجد قائد حالياً
}
// اكتشاف العمال النشطين (يستخدمه القائد لتوزيع المهام)
public
List
<
String
>
getActiveWorkers
()
throws
Exception
{
public
List
<
String
>
getActiveWorkers
()
throws
Exception
{
if
(
client
.
checkExists
().
forPath
(
WORKERS_REGISTRY_PATH
)
==
null
)
{
return
List
.
of
();
}
return
client
.
getChildren
().
forPath
(
WORKERS_REGISTRY_PATH
);
return
client
.
getChildren
().
forPath
(
WORKERS_REGISTRY_PATH
);
}
}
}
}
\ No newline at end of file
src/main/java/com/distributed/search/WorkerServer.java
0 → 100644
View file @
cd1524e7
package
com
.
distributed
.
search
;
import
com.distributed.search.model.SearchTask
;
import
com.distributed.search.model.TaskResult
;
import
java.io.IOException
;
import
java.net.ServerSocket
;
import
java.net.Socket
;
import
java.util.Map
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
/**
* خادم العامل المعدل: يسمح للقائد بمعرفة المنفذ الخاص به لاستثنائه من العمل.
*/
public
class
WorkerServer
{
private
final
SearchEngine
searchEngine
;
private
final
int
port
;
private
final
ExecutorService
threadPool
;
// (Static) for current port
private
static
int
localBoundPort
;
public
WorkerServer
(
SearchEngine
searchEngine
)
throws
IOException
{
this
.
searchEngine
=
searchEngine
;
// oper random port
ServerSocket
serverSocket
=
new
ServerSocket
(
0
);
this
.
port
=
serverSocket
.
getLocalPort
();
// save port in static variable localboundport
localBoundPort
=
this
.
port
;
this
.
threadPool
=
Executors
.
newCachedThreadPool
();
startListening
(
serverSocket
);
}
/**
* دالة ساكنة يستدعيها القائد (Coordinator) لمعرفة المنفذ الخاص به.
*/
public
static
int
getLocalBoundPort
()
{
return
localBoundPort
;
}
private
void
startListening
(
ServerSocket
serverSocket
)
{
Thread
serverThread
=
new
Thread
(()
->
{
System
.
out
.
println
(
"[WorkerServer] Started and listening on port: "
+
port
);
try
{
while
(!
Thread
.
currentThread
().
isInterrupted
())
{
Socket
clientSocket
=
serverSocket
.
accept
();
// معالجة كل طلب قادم في خيط (Thread) منفصل
threadPool
.
submit
(()
->
handleClientRequest
(
clientSocket
));
}
}
catch
(
IOException
e
)
{
if
(!
serverSocket
.
isClosed
())
{
System
.
err
.
println
(
"[WorkerServer] Server error: "
+
e
.
getMessage
());
}
}
});
serverThread
.
setDaemon
(
true
);
serverThread
.
start
();
}
private
void
handleClientRequest
(
Socket
socket
)
{
try
(
socket
)
{
// استقبال المهمة (الكلمة والملفات) من القائد
SearchTask
task
=
SearchTask
.
parseDelimitedFrom
(
socket
.
getInputStream
());
if
(
task
!=
null
)
{
// ملاحظة: بما أن القائد لن يرسل لنفسه، هذه الرسالة ستظهر فقط عند العمال الآخرين
System
.
out
.
println
(
"[WorkerServer] Node "
+
port
+
" is processing "
+
task
.
getFilePathsCount
()
+
" files..."
);
// تنفيذ حساب التردد (TF)
Map
<
String
,
Double
>
results
=
searchEngine
.
calculateTFForFiles
(
task
.
getFilePathsList
(),
task
.
getQuery
()
);
// إرسال النتائج (TF scores) للقائد
TaskResult
response
=
TaskResult
.
newBuilder
()
.
putAllDocTfScores
(
results
)
.
build
();
response
.
writeDelimitedTo
(
socket
.
getOutputStream
());
}
}
catch
(
IOException
e
)
{
System
.
err
.
println
(
"[WorkerServer] Error handling request: "
+
e
.
getMessage
());
}
}
public
int
getPort
()
{
return
port
;
}
}
\ No newline at end of file
src/main/proto/search.proto
View file @
cd1524e7
syntax
=
"proto3"
;
syntax
=
"proto3"
;
// هذا السطر يحدد الحزمة التي سيتم توليد كود الجافا فيها
option
java_package
=
"com.distributed.search.model"
;
option
java_package
=
"com.distributed.search.model"
;
option
java_multiple_files
=
true
;
option
java_multiple_files
=
true
;
// 1. الرسالة التي يرسلها القائد (Coordinator) للعامل
message
SearchTask
{
message
SearchTask
{
string
query
=
1
;
// جملة البحث
string
query
=
1
;
repeated
string
file_paths
=
2
;
//
قائمة مسارات الملفات المخصصة لهذا العامل
repeated
string
file_paths
=
2
;
//
}
}
// 2. الرسالة التي يعيدها العامل (Worker) للقائد
message
TaskResult
{
message
TaskResult
{
// خريطة تربط اسم الملف بمجموع الـ TF للكلمات الموجودة فيه
map
<
string
,
double
>
doc_tf_scores
=
1
;
map
<
string
,
double
>
doc_tf_scores
=
1
;
}
}
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment