You need to sign in or sign up before continuing.
Commit 48e127c2 authored by mohammad.salama's avatar mohammad.salama

Working as Strings, no json, problem in coordinator address

parent 4a919357
This diff is collapsed.
...@@ -646,3 +646,47 @@ java.lang.NullPointerException: Cannot invoke "java.util.List.isEmpty()" because ...@@ -646,3 +646,47 @@ java.lang.NullPointerException: Cannot invoke "java.util.List.isEmpty()" because
2024-01-31 15:23:59,172 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.ServiceRegistry [main] Registered to service registry 2024-01-31 15:23:59,172 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.ServiceRegistry [main] Registered to service registry
2024-01-31 15:23:59,177 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.TransientWorker [main] Server started on port 98 2024-01-31 15:23:59,177 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.TransientWorker [main] Server started on port 98
2024-01-31 15:23:59,181 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.ServiceRegistry [main-EventThread] The cluster addresses are: [127.0.0.1:65, 127.0.0.1:98] 2024-01-31 15:23:59,181 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.ServiceRegistry [main-EventThread] The cluster addresses are: [127.0.0.1:65, 127.0.0.1:98]
2024-01-31 22:04:09,441 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application [main-EventThread] Successfully connected to Zookeeper
2024-01-31 22:04:09,505 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.LeaderElection [main] My Node under Election is/election/c_0000000204
2024-01-31 22:04:09,512 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.LeaderElection [main] I am LEADER
2024-01-31 22:04:09,515 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.ServiceRegistry [main] The cluster addresses are: []
2024-01-31 22:04:09,518 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.ServiceRegistry [main] Registered to be Coordinator and I am Master !
2024-01-31 22:04:10,249 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application [main] Starting Application using Java 17.0.5 on M-Salameh with PID 8960 (D:\HIAST\FIY\FS\Distributed Systems\Lab\8\Distributed-Search\target\classes started by M_Salameh in D:\HIAST\FIY\FS\Distributed Systems\Lab\8\Distributed-Search)
2024-01-31 22:04:10,250 DEBUG org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application [main] Running with Spring Boot v2.6.8, Spring v5.3.20
2024-01-31 22:04:10,250 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application [main] No active profile set, falling back to 1 default profile: "default"
2024-01-31 22:04:12,149 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application [main] Started Application in 2.437 seconds (JVM running for 7.171)
2024-01-31 22:04:16,741 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application [main-EventThread] Successfully connected to Zookeeper
2024-01-31 22:04:16,763 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.LeaderElection [main] My Node under Election is/election/c_0000000205
2024-01-31 22:04:16,768 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.LeaderElection [main] I am NOT LEADER
2024-01-31 22:04:16,775 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.ServiceRegistry [main] Registered to service registry
2024-01-31 22:04:16,782 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.TransientWorker [main] Server started on port 35
2024-01-31 22:04:16,790 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.ServiceRegistry [main-EventThread] The cluster addresses are: [127.0.0.1:35]
2024-01-31 22:04:23,546 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application [main-EventThread] Successfully connected to Zookeeper
2024-01-31 22:04:23,566 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.LeaderElection [main] My Node under Election is/election/c_0000000206
2024-01-31 22:04:23,569 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.LeaderElection [main] I am NOT LEADER
2024-01-31 22:04:23,574 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.ServiceRegistry [main] Registered to service registry
2024-01-31 22:04:23,580 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.TransientWorker [main] Server started on port 68
2024-01-31 22:04:23,585 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.ServiceRegistry [main-EventThread] The cluster addresses are: [127.0.0.1:35, 127.0.0.1:68]
2024-01-31 22:17:37,794 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application [main-EventThread] Successfully connected to Zookeeper
2024-01-31 22:17:37,812 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.LeaderElection [main] My Node under Election is/election/c_0000000207
2024-01-31 22:17:37,815 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.LeaderElection [main] I am LEADER
2024-01-31 22:17:37,818 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.ServiceRegistry [main] The cluster addresses are: []
2024-01-31 22:17:37,820 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.ServiceRegistry [main] Registered to be Coordinator and I am Master !
2024-01-31 22:17:38,321 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application [main] Starting Application using Java 17.0.5 on M-Salameh with PID 10968 (D:\HIAST\FIY\FS\Distributed Systems\Lab\8\Distributed-Search\target\classes started by M_Salameh in D:\HIAST\FIY\FS\Distributed Systems\Lab\8\Distributed-Search)
2024-01-31 22:17:38,322 DEBUG org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application [main] Running with Spring Boot v2.6.8, Spring v5.3.20
2024-01-31 22:17:38,322 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application [main] No active profile set, falling back to 1 default profile: "default"
2024-01-31 22:17:39,698 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application [main] Started Application in 1.742 seconds (JVM running for 7.043)
2024-01-31 22:17:46,954 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application [main-EventThread] Successfully connected to Zookeeper
2024-01-31 22:17:46,972 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.LeaderElection [main] My Node under Election is/election/c_0000000208
2024-01-31 22:17:46,975 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.LeaderElection [main] I am NOT LEADER
2024-01-31 22:17:46,980 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.ServiceRegistry [main] Registered to service registry
2024-01-31 22:17:46,986 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.TransientWorker [main] Server started on port 36
2024-01-31 22:17:46,991 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.ServiceRegistry [main-EventThread] The cluster addresses are: [127.0.0.1:36]
2024-01-31 22:17:57,000 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Application [main-EventThread] Successfully connected to Zookeeper
2024-01-31 22:17:57,015 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.LeaderElection [main] My Node under Election is/election/c_0000000209
2024-01-31 22:17:57,019 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.LeaderElection [main] I am NOT LEADER
2024-01-31 22:17:57,022 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.ServiceRegistry [main] Registered to service registry
2024-01-31 22:17:57,029 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.TransientWorker [main] Server started on port 68
2024-01-31 22:17:57,032 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.ServiceRegistry [main-EventThread] The cluster addresses are: [127.0.0.1:36, 127.0.0.1:68]
2024-01-31 22:18:21,823 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
2024-01-31 22:18:21,823 INFO org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.TransientWorker [main] Coordinator connected: /127.0.0.1
...@@ -4,6 +4,7 @@ package GRPCConnection.GRPCClient; ...@@ -4,6 +4,7 @@ package GRPCConnection.GRPCClient;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import org.AutoHealerAndClusterSearch.AutoHealerAndClusterSearch.Coordinator;
import org.AutoHealerAndClusterSearch.SearchReply; import org.AutoHealerAndClusterSearch.SearchReply;
import org.AutoHealerAndClusterSearch.SearchRequest; import org.AutoHealerAndClusterSearch.SearchRequest;
import org.AutoHealerAndClusterSearch.SearchServiceGrpc; import org.AutoHealerAndClusterSearch.SearchServiceGrpc;
...@@ -20,29 +21,16 @@ import java.util.List; ...@@ -20,29 +21,16 @@ import java.util.List;
* the client here will be the web server * the client here will be the web server
* configure it later * configure it later
* */ * */
public class GRPCClient implements Watcher public class GRPCClient
{ {
private ZooKeeper zooKeeper;
private final String address = "192.168.184.10:2181";
private final int SESSION_TIMEOUT = 3000; //dead client
private final String COORDINATOR_ZNODE_PATH = "/coordinator_node";
private String COORDINATOR_ADDRESS; private String COORDINATOR_ADDRESS;
private String COORDINATOR_ZNODE;
public GRPCClient () throws IOException, InterruptedException
{
connectToZookeeper();
run();
close();
}
public List<String> search(String query) throws InterruptedException, KeeperException public List<String> search(String query) throws InterruptedException, KeeperException
{ {
getAddress(); COORDINATOR_ADDRESS = Coordinator.getIP();
ManagedChannel channel = ManagedChannelBuilder. ManagedChannel channel = ManagedChannelBuilder.
forAddress(COORDINATOR_ADDRESS , 6565). forAddress("localhost" , 6565).
usePlaintext().build(); usePlaintext().build();
SearchServiceGrpc.SearchServiceBlockingStub stub = SearchServiceGrpc.newBlockingStub(channel); SearchServiceGrpc.SearchServiceBlockingStub stub = SearchServiceGrpc.newBlockingStub(channel);
...@@ -54,50 +42,5 @@ public class GRPCClient implements Watcher ...@@ -54,50 +42,5 @@ public class GRPCClient implements Watcher
return stringList; return stringList;
} }
private synchronized void getAddress() throws InterruptedException, KeeperException
{
List<String> temp = zooKeeper.getChildren(COORDINATOR_ZNODE_PATH , this);
if (temp.isEmpty())
{
return;
}
COORDINATOR_ZNODE = temp.get(0);
String pth = COORDINATOR_ZNODE_PATH + "/"+ COORDINATOR_ZNODE;
Stat stat = zooKeeper.exists(pth , false);
if (stat == null) return;
COORDINATOR_ADDRESS = new String(zooKeeper.getData( pth, false , stat));
COORDINATOR_ADDRESS = COORDINATOR_ADDRESS.split(":")[0];
}
private void run() throws InterruptedException {
synchronized (zooKeeper) {
zooKeeper.wait();
}
}
private void close() throws InterruptedException {
this.zooKeeper.close();
}
private void connectToZookeeper() throws IOException
{
this.zooKeeper = new ZooKeeper(address, SESSION_TIMEOUT, this);
}
@Override
public void process(WatchedEvent watchedEvent)
{
switch (watchedEvent.getType())
{
case NodeChildrenChanged :
{
try
{
getAddress();
}
catch (InterruptedException | KeeperException e)
{
throw new RuntimeException(e);
}
}
}
}
} }
...@@ -9,7 +9,6 @@ import java.net.http.HttpClient; ...@@ -9,7 +9,6 @@ import java.net.http.HttpClient;
import java.net.http.HttpRequest; import java.net.http.HttpRequest;
import java.net.http.HttpResponse; import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
......
...@@ -13,6 +13,7 @@ import org.slf4j.LoggerFactory; ...@@ -13,6 +13,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
...@@ -28,7 +29,7 @@ public class WebServer ...@@ -28,7 +29,7 @@ public class WebServer
public WebServer(int port) throws IOException, InterruptedException public WebServer(int port) throws IOException, InterruptedException
{ {
this.port = port; this.port = port;
grpcClient = new GRPCClient();
} }
public static void main(String[] args) throws IOException, InterruptedException { public static void main(String[] args) throws IOException, InterruptedException {
int serverPort = 5566; int serverPort = 5566;
...@@ -74,7 +75,7 @@ public class WebServer ...@@ -74,7 +75,7 @@ public class WebServer
byte[] requestBytes = exchange.getRequestBody().readAllBytes(); byte[] requestBytes = exchange.getRequestBody().readAllBytes();
String query = new String(requestBytes); String query = new String(requestBytes);
List<String> files = null; List<String> files = new ArrayList<>();
try try
{ {
files = grpcClient.search(query); files = grpcClient.search(query);
......
...@@ -19,7 +19,7 @@ import java.util.stream.Stream; ...@@ -19,7 +19,7 @@ import java.util.stream.Stream;
public class Coordinator public class Coordinator
{ {
private static final String PHYSICAL_ZNODES_PATH = "/physical_nodes"; private static String ipAdress;
private static String FILES_DIRECTORY = System.getProperty("user.dir") + "/SearchFiles/";; private static String FILES_DIRECTORY = System.getProperty("user.dir") + "/SearchFiles/";;
private static final Logger logger = LoggerFactory.getLogger(Coordinator.class); private static final Logger logger = LoggerFactory.getLogger(Coordinator.class);
...@@ -42,7 +42,12 @@ public class Coordinator ...@@ -42,7 +42,12 @@ public class Coordinator
return answer; return answer;
} }
public static String getIP()
{
ipAdress = ServiceRegistry.getCoordinatorAddress();
return ipAdress;
}
private static SearchQueryResponse sendRequestToNode(SearchQueryRequest searchQueryRequest, String ipAddress) private static SearchQueryResponse sendRequestToNode(SearchQueryRequest searchQueryRequest, String ipAddress)
{ {
......
...@@ -24,6 +24,7 @@ public class ServiceRegistry implements Watcher ...@@ -24,6 +24,7 @@ public class ServiceRegistry implements Watcher
private TransientWorker transientWorker = null; private TransientWorker transientWorker = null;
private String currentZnode = null; private String currentZnode = null;
private static List<String> allServiceAddresses = null; private static List<String> allServiceAddresses = null;
private static String Coordinator_Address;
public ServiceRegistry(ZooKeeper zooKeeper) public ServiceRegistry(ZooKeeper zooKeeper)
{ {
...@@ -71,7 +72,7 @@ public class ServiceRegistry implements Watcher ...@@ -71,7 +72,7 @@ public class ServiceRegistry implements Watcher
this.currentZnode = zooKeeper.create(COORDINATOR_ZNODE_PATH + "/coordinator_", metadata.getBytes(), this.currentZnode = zooKeeper.create(COORDINATOR_ZNODE_PATH + "/coordinator_", metadata.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
logger.info("Registered to be Coordinator and I am Master !"); logger.info("Registered to be Coordinator and I am Master !");
Coordinator_Address = metadata.split(":")[0];
GRPCServiceStart.start(); GRPCServiceStart.start();
/*Thread thread = new Thread(()-> new Runnable() { /*Thread thread = new Thread(()-> new Runnable() {
@Override @Override
...@@ -110,7 +111,6 @@ public class ServiceRegistry implements Watcher ...@@ -110,7 +111,6 @@ public class ServiceRegistry implements Watcher
private synchronized void masterJob() throws InterruptedException, KeeperException, IOException { private synchronized void masterJob() throws InterruptedException, KeeperException, IOException {
updateAddresses(); updateAddresses();
//launchWorkersIfNecessary();
} }
private void updateAddresses() throws KeeperException, InterruptedException private void updateAddresses() throws KeeperException, InterruptedException
...@@ -133,6 +133,7 @@ public class ServiceRegistry implements Watcher ...@@ -133,6 +133,7 @@ public class ServiceRegistry implements Watcher
this.allServiceAddresses = Collections.unmodifiableList(addresses); this.allServiceAddresses = Collections.unmodifiableList(addresses);
logger.info("The cluster addresses are: " + this.allServiceAddresses); logger.info("The cluster addresses are: " + this.allServiceAddresses);
} }
...@@ -140,6 +141,10 @@ public class ServiceRegistry implements Watcher ...@@ -140,6 +141,10 @@ public class ServiceRegistry implements Watcher
{ {
return allServiceAddresses; return allServiceAddresses;
} }
public static String getCoordinatorAddress()
{
return Coordinator_Address;
}
@Override @Override
public void process(WatchedEvent watchedEvent) public void process(WatchedEvent watchedEvent)
......
package org.AutoHealerAndClusterSearch.SearchWebApp.WebClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class Aggregator {
private WebClient webClient;
static final Logger logger = LoggerFactory.getLogger(Aggregator.class);
public Aggregator() {
this.webClient = new WebClient();
}
/*send task to list of workers*/
public List<String> sendTasksToWorkers(List<String> workersAddresses, List<String> tasks) throws ExecutionException, InterruptedException {
List<String> responses = new ArrayList<>();
List<CompletableFuture<String>> temp = new ArrayList<>();
for (String workerAddr : workersAddresses)
{
for (String task : tasks)
{
try
{
temp.add(webClient.sendTask(workerAddr, task.getBytes()));
}
catch (Exception e)
{
logger.error("Cannot Connect To Server " + workerAddr);
break;
}
}
}
responses = joinFutures(temp);
return responses;
}
/*send task to list of workers*/
public List<String> sendTasksToWorkers(List<String> workersAddresses, List<String> tasks, String headers) throws ExecutionException, InterruptedException {
List<String> responses = new ArrayList<>();
List<CompletableFuture<String>> temp = new ArrayList<>();
for (String workerAddr : workersAddresses)
{
for (String task : tasks)
{
try
{
temp.add(webClient.sendTask(workerAddr , task.getBytes() , headers));
}
catch (Exception e)
{
logger.error("Cannot Connect To Server " + workerAddr);
break;
}
}
}
responses = joinFutures(temp);
return responses;
}
private List<String> joinFutures(List<CompletableFuture<String>> futures)
{
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
try
{
allFutures.get();
}
catch (InterruptedException | ExecutionException e)
{
logger.error("Responses could not be got");
e.printStackTrace();
}
List<String> results = new ArrayList<>();
for (CompletableFuture<String> future : futures)
{
try
{
String result = future.get();
results.add(result);
}
catch (InterruptedException | ExecutionException e)
{
logger.error("Responses could not be got");
e.printStackTrace();
}
}
return results;
}
}
package org.AutoHealerAndClusterSearch.SearchWebApp.WebClient;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
public class Application
{
private static final String WORKER_ADDRESS_1 = "http://localhost:8080/task";
private static final String WORKER_ADDRESS_2 = "http://localhost:8081/task";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Aggregator aggregator = new Aggregator();
String task1 = "10,200";
String task2 = "123456789,100000000000000,700000002342343";
String headers = "X-Debug: true";
List<String> results = aggregator.sendTasksToWorkers(Arrays.asList(WORKER_ADDRESS_1, WORKER_ADDRESS_2),
Arrays.asList(task1, task2));
for (String result : results) {
System.out.println(result);
}
List<String> resultsWithHeaders = aggregator.sendTasksToWorkers(Arrays.asList(WORKER_ADDRESS_1/*, WORKER_ADDRESS_2*/),
Arrays.asList(task1, task2), headers);
for (String result : resultsWithHeaders) {
System.out.println(result);
}
}
}
package org.AutoHealerAndClusterSearch.SearchWebApp.WebClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
public class WebClient {
private HttpClient client;
private String DEBUG_RESPONSE_KEY = "X-Debug-Info";
static final Logger logger = LoggerFactory.getLogger(WebClient.class);
/* instantiate web client */
/* Read more about Builder pattern https://en.wikipedia.org/wiki/Builder_pattern*/
public WebClient()
{
this.client = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2).build();
}
/* send task (post http request) asynchronously */
public CompletableFuture<String> sendTask(String url, byte[] requestPayload)
{
CompletableFuture<String> response = new CompletableFuture<>();
HttpRequest httpRequest = HttpRequest.newBuilder()
.uri(URI.create(url))
.POST(HttpRequest.BodyPublishers.ofByteArray(requestPayload))
.build();
response = client.sendAsync
(httpRequest ,
HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8)).
thenApply(HttpResponse::body);
return response;
}
/* send task (post http request) asynchronously with custom headers*/
public CompletableFuture<String> sendTask(String url, byte[] requestPayload, String headers)
{
CompletableFuture<String> reply = new CompletableFuture<>();
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.uri(URI.create(url))
.POST(HttpRequest.BodyPublishers.ofByteArray(requestPayload));
if (headers != null && !headers.isEmpty())
{
String[] headerLines = headers.split("\n");
for (String headerLine : headerLines)
{
String[] headerParts = headerLine.split(":");
String headerName = headerParts[0].trim();
String headerValue = headerParts[1].trim();
requestBuilder.header(headerName, headerValue);
}
}
HttpRequest request = requestBuilder.build();
reply = client.sendAsync
(request,
HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8))
.thenApply(response ->
response.headers().map().get(DEBUG_RESPONSE_KEY).get(0) + response.body());
return reply;
}
}
package org.AutoHealerAndClusterSearch.SearchWebApp.WebServer;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpContext;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.concurrent.Executors;
public class WebServer implements Watcher
{
static final Logger logger = LoggerFactory.getLogger(WebServer.class);
private static final String TASK_ENDPOINT = "/search-query";
private static final String COORDINATOR_ZNODE_PATH = "/coordinator_node";
private final String zooKeeperServerAddress = "192.168.184.10";
private static final int SESSION_TIMEOUT = 3000;
private final int port;
private HttpServer server;
private ZooKeeper zooKeeper;
private String CoordinatorAddress;
private String Coordinator_Full_Name;
public WebServer(int port) throws IOException, InterruptedException, KeeperException
{
this.port = port;
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
int serverPort = 8080;
if (args.length == 1)
{
serverPort = Integer.parseInt(args[0]);
}
WebServer webServer = new WebServer(serverPort);
webServer.connectToZookeeper();
webServer.startServer();
System.out.println("Server is listening on port " + serverPort);
logger.info("Server is listening on port " + serverPort);
webServer.run();
webServer.close();
}
public void connectToZookeeper() throws IOException, InterruptedException, KeeperException
{
this.zooKeeper = new ZooKeeper(zooKeeperServerAddress, SESSION_TIMEOUT, this);
setCoordinatorAddress();
}
public void run() throws InterruptedException {
synchronized (zooKeeper) {
zooKeeper.wait();
}
}
private void close() throws InterruptedException {
this.zooKeeper.close();
}
private synchronized void setCoordinatorAddress() throws InterruptedException, KeeperException {
String cord = zooKeeper.getChildren(COORDINATOR_ZNODE_PATH , this).get(0);
Coordinator_Full_Name = COORDINATOR_ZNODE_PATH + "/" + cord;
Stat stat = zooKeeper.exists(Coordinator_Full_Name , false);
if (stat == null)
{
logger.error("Coordinator Not Found");
throw new RuntimeException();
}
this.CoordinatorAddress = new String(zooKeeper.getData(Coordinator_Full_Name , false , stat)).split("@")[1];
}
public void startServer() {
try
{
this.server = HttpServer.create(new InetSocketAddress(port), 0);
logger.info("Server Started at : " + System.nanoTime());
}
catch (IOException e) {
logger.error("Server failed to Start at : " + System.nanoTime());
throw new RuntimeException(e);
}
HttpContext taskContext = server.createContext(TASK_ENDPOINT);
taskContext.setHandler(this::handleTaskRequest);
server.setExecutor(Executors.newFixedThreadPool(8));
server.start();
}
private void handleTaskRequest(HttpExchange exchange) throws IOException
{
logger.info("Query Request at " + System.nanoTime());
if (!exchange.getRequestMethod().equalsIgnoreCase("post"))
{
exchange.close();
return;
}
Headers headers = exchange.getRequestHeaders();
long startTime = System.nanoTime();
byte[] requestBytes = exchange.getRequestBody().readAllBytes();
byte[] responseBytes = prepareRsponse(requestBytes);
long finishTime = System.nanoTime();
String debugMessage = String.format("Operation took %d ns", finishTime - startTime);
exchange.getResponseHeaders().put("X-Debug-Info", Arrays.asList(debugMessage));
sendResponse(responseBytes, exchange);
}
private byte[] prepareRsponse(byte[] requestBytes)
{
String bodyString = new String(requestBytes);
String[] stringTerms = bodyString.split(" ");
/**
*
* must add code to send the stringTerms to the coordinator
* using grpc and wait for response !!
* */
return String.format("Result of the multiplication is \n").getBytes();
}
private void sendResponse(byte[] responseBytes, HttpExchange exchange) throws IOException {
exchange.sendResponseHeaders(200, responseBytes.length);
OutputStream outputStream = exchange.getResponseBody();
outputStream.write(responseBytes);
outputStream.flush();
outputStream.close();
exchange.close();
}
@Override
public void process(WatchedEvent watchedEvent)
{
switch (watchedEvent.getType())
{
case NodeChildrenChanged:
{
try
{
logger.warn("Coordinator Changed !!");
setCoordinatorAddress();
}
catch (InterruptedException | KeeperException e)
{
logger.error("Could NOT Handle Node Children Changed Event!");
throw new RuntimeException(e);
}
}
}
}
}
...@@ -151,7 +151,7 @@ public class test ...@@ -151,7 +151,7 @@ public class test
System.out.println(entry.getKey() + " : " + entry.getValue()); System.out.println(entry.getKey() + " : " + entry.getValue());
} }
} }
public static void main(String[] args) public static void doo()
{ {
String path = System.getProperty("user.dir") + "/SearchFiles/"; String path = System.getProperty("user.dir") + "/SearchFiles/";
///listFilesForFolder(System.getProperty("user.dir") + "/SearchFiles"); ///listFilesForFolder(System.getProperty("user.dir") + "/SearchFiles");
......
package org.AutoHealerAndClusterSearch.testTransmitting;
import java.io.Serializable;
public class MyClass implements Serializable
{
public String sth;
}
package org.AutoHealerAndClusterSearch.testTransmitting;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.net.ServerSocket;
import java.net.Socket;
public class Receiver {
public static void main(String[] args) {
int serverPort = 12345;
try {
// Create a ServerSocket and bind it to the specified port
ServerSocket serverSocket = new ServerSocket(serverPort);
System.out.println("Waiting for incoming connections...");
// Accept a client connection
Socket socket = serverSocket.accept();
System.out.println("Connection established with a client.");
// Get the InputStream from the socket
InputStream inputStream = socket.getInputStream();
// Create an ObjectInputStream to deserialize the object
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
// Read the serialized object and cast it back to the original object type
MyClass receivedObj = (MyClass) objectInputStream.readObject();
// Close the input stream, object input stream, and socket
objectInputStream.close();
inputStream.close();
socket.close();
System.out.println("Object received successfully : " + receivedObj.sth);
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
}
}
\ No newline at end of file
package org.AutoHealerAndClusterSearch.testTransmitting;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.Socket;
public class Sender {
public static void main(String[] args) {
String serverHost = "localhost";
int serverPort = 12345;
try {
// Establish a TCP connection to the server
Socket socket = new Socket(serverHost, serverPort);
// Get the OutputStream from the socket
OutputStream outputStream = socket.getOutputStream();
// Create an ObjectOutputStream to serialize the object
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
// Create and serialize the object
MyClass obj = new MyClass(); // The object you want to serialize
obj.sth = "hello Mr.Salameh";
objectOutputStream.writeObject(obj);
// Flush and close the output stream and socket
objectOutputStream.flush();
objectOutputStream.close();
socket.close();
System.out.println("Object sent successfully.");
} catch (IOException e) {
e.printStackTrace();
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment