Commit a86e4d16 authored by abdullh.alsoleman's avatar abdullh.alsoleman

Second

parent 0b9c3a4b
......@@ -60,17 +60,17 @@
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent"><![CDATA[{
"keyToString": {
"RunOnceActivity.OpenProjectViewOnStart": "true",
"RunOnceActivity.ShowReadmeOnStart": "true",
"SHARE_PROJECT_CONFIGURATION_FILES": "true",
"last_opened_file_path": "C:/Users/Abdullah/Downloads/network-communication-master/Cordinator_Worker/src/main/resources",
"project.structure.last.edited": "Artifacts",
"project.structure.proportion": "0.0",
"project.structure.side.proportion": "0.0"
<component name="PropertiesComponent">{
&quot;keyToString&quot;: {
&quot;RunOnceActivity.OpenProjectViewOnStart&quot;: &quot;true&quot;,
&quot;RunOnceActivity.ShowReadmeOnStart&quot;: &quot;true&quot;,
&quot;SHARE_PROJECT_CONFIGURATION_FILES&quot;: &quot;true&quot;,
&quot;last_opened_file_path&quot;: &quot;C:/Users/Abdullah/Downloads/network-communication-master/Cordinator_Worker/src/main/resources&quot;,
&quot;project.structure.last.edited&quot;: &quot;Artifacts&quot;,
&quot;project.structure.proportion&quot;: &quot;0.0&quot;,
&quot;project.structure.side.proportion&quot;: &quot;0.0&quot;
}
}]]></component>
}</component>
<component name="RecentsManager">
<key name="CopyFile.RECENT_KEYS">
<recent name="C:\Users\Abdullah\Downloads\network-communication-master\Cordinator_Worker\src\main\resources" />
......@@ -84,6 +84,28 @@
<recent name="" />
</key>
</component>
<component name="RunManager" selected="Application.Application">
<configuration name="Application" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
<option name="MAIN_CLASS_NAME" value="Application" />
<module name="Cordinator_Worker" />
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
<configuration name="Main" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
<option name="MAIN_CLASS_NAME" value="Main" />
<module name="Cordinator_Worker" />
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
<recent_temporary>
<list>
<item itemvalue="Application.Application" />
<item itemvalue="Application.Main" />
</list>
</recent_temporary>
</component>
<component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
......@@ -106,20 +128,4 @@
</map>
</option>
</component>
<component name="XDebuggerManager">
<breakpoint-manager>
<breakpoints>
<line-breakpoint enabled="true" type="java-line">
<url>file://$PROJECT_DIR$/src/main/java/Coordinator_Worker/Worker.java</url>
<line>40</line>
<option name="timeStamp" value="9" />
</line-breakpoint>
<line-breakpoint enabled="true" type="java-line">
<url>file://$PROJECT_DIR$/src/main/java/Coordinator_Worker/Coordinator.java</url>
<line>85</line>
<option name="timeStamp" value="12" />
</line-breakpoint>
</breakpoints>
</breakpoint-manager>
</component>
</project>
\ No newline at end of file
This diff is collapsed.
......@@ -55,7 +55,7 @@ public class Application implements Watcher {
}
}
private void close() throws InterruptedException {
void close() throws InterruptedException {
this.zooKeeper.close();
}
......
......@@ -4,22 +4,23 @@ import java.io.*;
import java.net.Socket;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Communication {
public static void sendData(Socket socket, String dataString, List<String> fileNames) throws IOException {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(new DataObject(dataString, fileNames));
}
public static List<Document> receiveResult(Socket socket) throws IOException, ClassNotFoundException {
public static ConcurrentLinkedQueue<Document> receiveResult(Socket socket) throws IOException, ClassNotFoundException {
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
return (List<Document>) objectInputStream.readObject();
return (ConcurrentLinkedQueue <Document>) objectInputStream.readObject();
}
public static DataObject receiveData(Socket socket) throws IOException, ClassNotFoundException {
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
return (DataObject) objectInputStream.readObject();
}
public static void sendResult(Socket socket, List<Document> response) throws IOException {
public static void sendResult(Socket socket, ConcurrentLinkedQueue<Document> response) throws IOException {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(response);
}
......
......@@ -4,6 +4,7 @@ import Zookeeper.ServiceRegistry;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
......@@ -12,21 +13,20 @@ import java.util.*;
import java.util.concurrent.*;
public class Coordinator {
private static final int PORT = 4444;
private static final int PORT = 12363;
public static void startCoordinator() {
ConcurrentLinkedQueue<Document> workersResult = new ConcurrentLinkedQueue<>();
try {
List<Future<List<Document>>> futures = new ArrayList<>();
List<Document> resultFromAllWorkers=new ArrayList<>();
String dataString = "the car";
String [] parts = dataString.split("\\s");
String[] parts = dataString.split("\\s");
List<Term> terms = new ArrayList<>();
for (String part:parts) {
for (String part : parts) {
terms.add(new Term(part));
}
TreeMap<String,Double> response= new TreeMap<>();
TreeMap<String, Double> response = new TreeMap<>();
List<String> fileNames = getFileNamesInDirectory("src/main/resources");
......@@ -36,17 +36,7 @@ public class Coordinator {
int index = 0;
List<CoordinatorHandler> handlerList = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
if(ServiceRegistry.getWorkers().size()==1){
// Connect to worker
Socket workerSocket = new Socket("127.0.0.1", PORT);
// Create a CoordinatorHandler instance for each worker
CoordinatorHandler handler = new CoordinatorHandler(dataString,workerSocket, fileNames);
handlerList.add(handler);
// Start the CoordinatorHandler thread
futures.add(executorService.submit(handler));
}else {
for (Worker worker : ServiceRegistry.getWorkers()) {
List<String> workerFilesList = new ArrayList<>();
for (int i = 0; i < eachWorkerFiles; i++) {
......@@ -59,48 +49,41 @@ public class Coordinator {
index++;
}
}
}
// Connect to worker
Socket workerSocket = new Socket("127.0.0.1", PORT);
// Create a CoordinatorHandler instance for each worker
CoordinatorHandler handler = new CoordinatorHandler(dataString, workerSocket, workerFilesList);
handlerList.add(handler);
Runnable task = () -> processRequest(workerSocket, workerFilesList, dataString, workersResult);
// Start the CoordinatorHandler thread
futures.add(executorService.submit(handler));
}
}
executorService.submit(task);
}
for (Future<List<Document>> future : futures) {
try {
List<Document> documents = future.get();
resultFromAllWorkers.addAll(documents);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executorService.awaitTermination(5,TimeUnit.MICROSECONDS);
// Shut down the executor after all tasks are submitted
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
for (int i=0 ; i<terms.size();i++) {
for (Document document:resultFromAllWorkers) {
Map<String,Double> tf = document.getTf();
if(tf.get(terms.get(i))>0);
for (int i = 0; i < terms.size(); i++) {
for (Document document : workersResult) {
Map<String, Double> tf = document.getTf();
if (tf.get(terms.get(i).getTermName()) > 0) ;
terms.get(i).increase();
}
}
TreeMap<String,Double> IDF = new TreeMap<>();
for (Document document: resultFromAllWorkers) {
TreeMap<String, Double> IDF = new TreeMap<>();
for (Document document : workersResult) {
document.setTerms(terms);
document.CalculateIDF(fileNames.size());
IDF.put(document.getName(),document.getIDF());
IDF.put(document.getName(), document.getIDF());
}
System.out.println("IDF Files: "+ IDF);
System.out.println("IDF Files: " + IDF);
executorService.shutdown();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
}
}
......@@ -130,46 +113,23 @@ public class Coordinator {
}
}
static class CoordinatorHandler implements Callable<List<Document>> {
private Socket clientSocket;
private List<String> fileNames = new ArrayList<>();
// private List<Document> result = new ArrayList<>();
private static void processRequest(Socket clientSocket, List<String> fileNames,
String dataString, ConcurrentLinkedQueue<Document> documents) {
String dataString= "";
public CoordinatorHandler(String dataString,Socket socket, List<String> fileNames) {
this.clientSocket = socket;
this.fileNames.addAll(fileNames);
this.dataString=dataString;
}
@Override
public List<Document> call() {
List<Document> result=new ArrayList<>();
try {
// Assume the coordinator sends a string and a list of file names
// Send data to worker
Communication.sendData(clientSocket, dataString, fileNames);
// Receive results from worker
result = Communication.receiveResult(clientSocket);
System.out.println("Received result from worker: " + result);
ConcurrentLinkedQueue<Document> d = Communication.receiveResult(clientSocket);
documents.addAll(d);
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
} finally {
return result;
}
}
// public List<Document> getResult() {
// return result;
// }
}
}
......@@ -8,12 +8,13 @@ import java.util.Map;
public class Document implements Serializable {
private String name;
private Map<String,Double> tf;
private Map<String,Double> tf=new HashMap<>();
private List<Term> terms;
private Double IDF=0.0;
public Document(String name)
{
this.name = name;
this.terms = new ArrayList<>(); // Initialize the terms list
}
public String getName() {
return name;
......
......@@ -5,31 +5,32 @@ import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
public class Worker {
private static final int PORT = 4444;
private static final int PORT = 12363;
private String nodeName;
private String ipAddress;
private String userName;
private static List<Document> documents = new ArrayList<>();
public Worker(String fullPathNode) {
this.nodeName = fullPathNode;
}
public static void startWorker() {
ServerSocket serverSocket = null;
try {
// Step 1: Set up server socket to listen for connections
serverSocket = new ServerSocket(PORT);
} catch (IOException e) {
throw new RuntimeException(e);
}
while (true) {
try {
ServerSocket serverSocket = new ServerSocket(PORT);
System.out.println("Waiting Request");
// Step 2: Accept connection from coordinator
Socket coordinatorSocket = serverSocket.accept();
......@@ -38,14 +39,14 @@ public class Worker {
System.out.println("Received data from coordinator: " + data.getDataString() + ", " + data.getFileNames());
// Step 4: Process the data using IDF algorithm
List<Document> response = IDF(data);
ConcurrentLinkedQueue<Document> response = new ConcurrentLinkedQueue<>();
response.addAll(IDF(data));
// Step 5: Send the result back to the coordinator
Communication.sendResult(coordinatorSocket, response);
// Step 6: Close sockets
coordinatorSocket.close();
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
......@@ -56,48 +57,36 @@ public class Worker {
// Function to calculate IDF for each term in the search query across files
private static List<Document> IDF(DataObject data) {
private static ConcurrentLinkedQueue<Document> IDF(DataObject data) {
ConcurrentLinkedQueue<Document> documents=new ConcurrentLinkedQueue<>();
String searchQuery = data.getDataString();
List<Document> r=new ArrayList<>();
String[] terms = searchQuery.split("\\s+");
// Step 2: Use ExecutorService for parallel processing
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
try {
List<Callable<Document>> tasks=new ArrayList<>();
List<Future<Document>> futures = new ArrayList<>(); // Step 3: Submit a task for each file to the ExecutorService
for (String fileName : data.getFileNames()) {
File file = new File("C:\\Users\\Abdullah\\Downloads\\network-communication-master\\Cordinator_Worker\\src\\main\\resources\\" + fileName);
Callable<Document> task = () -> processFile(file, terms);
tasks.add(task);
futures.add(executorService.submit(task));
}
executorService.awaitTermination(2, TimeUnit.MILLISECONDS);
// Step 4: Retrieve results from Future and merge into the 'r' list
for (Future<Document> future : futures) {
try {
Document document = future.get();
r.add(document);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
Runnable task = () -> processFile(file, terms,documents);
executorService.submit(task);
}
executorService.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
// Step 5: Shutdown the ExecutorService
//Step 5: Shutdown the ExecutorService
executorService.shutdown();
}
// Step 6: Return the final IDF map
return r;
return documents;
}
// Function to process each file and calculate term frequencies
private static Document processFile(File file, String[] terms) {
private static void processFile(File file, String[] terms,ConcurrentLinkedQueue<Document>documents) {
Document document = new Document(file.getName());
int totalWords = 0;
double totalWords = 0;
try {
// Read content from the file
......@@ -106,27 +95,22 @@ public class Worker {
// Calculate term frequencies in the file
for (String term : terms) {
int frequency = 0;
double frequency = 0;
for (String word : words) {
if (word.equals(term)) {
frequency++;
}
totalWords++;
}
document.setTf(term, (double) (frequency / totalWords));
Double d= frequency/totalWords;
document.setTf(term, d);
documents.offer(document);
}
} catch (IOException e) {
e.printStackTrace();
// Handle file reading exception
}finally {
return document;
}
}
private static synchronized void addToDocuments(Document document) {
documents.add(document);
}
public void setIpAddress(String ipAddress) {
this.ipAddress = ipAddress;
}
......
......@@ -30,7 +30,9 @@ public class OnElectionAction implements OnElectionCallback {
String currentServerAddress =
"192.168.56.116"+","+username+"," +port;
serviceRegistry.registerToCluster(currentServerAddress);
Worker.startWorker();
Runnable task = ()->Worker.startWorker();
Thread thread =new Thread(task);
thread.start();
} catch (InterruptedException e) {
e.printStackTrace();
}catch (KeeperException e) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment