Commit b7615176 authored by amir.yosef's avatar amir.yosef

final

parent d212e5e4
......@@ -4,18 +4,35 @@ import server.Server;
import server.ServerBuilder;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static ExecutorService executor;
public static void main(String[] args) throws IOException {
public static void main(String[] args) {
executor = Executors.newFixedThreadPool(2);
Director director = new Director();
ServerBuilder builder = new ServerBuilder(args);
// try (Server server = builder.build()) {
// server.start();
// }
// builder.setPort(16379);
ReplicaConnectionService service = null;
director.buildReplica(builder, service);
executor.submit(() -> {
ReplicaConnectionService service = null;
try {
director.buildReplica(builder, service);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
executor.submit(() -> {
try (Server server = builder.build()) {
server.start();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
);
}
}
\ No newline at end of file
......@@ -35,7 +35,7 @@ public abstract class Client extends Thread {
}
handleClient(reader, outputStream);
} catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage());
logger.log(Level.SEVERE, e.getMessage(), e);
}
}
......
......@@ -9,8 +9,11 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
public class MasterClient extends Client {
private static final Logger logger = Logger.getLogger(MasterClient.class.getName());
public MasterClient(Socket socket) {
super(socket);
......@@ -18,13 +21,17 @@ public class MasterClient extends Client {
}
@Override
protected void handleClient(BufferedReader bufferedReader, OutputStream outputStream) throws IOException {
protected void handleClient(BufferedReader bufferedReader, OutputStream outputStream) {
String line;
while ((line = bufferedReader.readLine()) != null) {
if (line.isEmpty()) continue;
List<String> parsedCommands = commandParser.parseCommand(bufferedReader, line);
ClientCommandHandler commandHandler = new ClientCommandHandler(parsedCommands, outputStream);
commandHandler.execute();
try {
while ((line = bufferedReader.readLine()) != null) {
if (line.isEmpty()) continue;
List<String> parsedCommands = commandParser.parseCommand(bufferedReader, line);
ClientCommandHandler commandHandler = new ClientCommandHandler(parsedCommands, outputStream);
commandHandler.execute();
}
} catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage(),e);
}
}
}
......@@ -12,30 +12,33 @@ import java.net.Socket;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ReplicaClient extends Client {
private final CommandByteCounter commandByteCounter;
private final ExecutorService executorService;
private static final Logger logger = Logger.getLogger(ReplicaClient.class.getName());
public ReplicaClient(BufferedReader reader, Socket socket) {
super(reader, socket);
System.out.println("Client");
this.commandByteCounter = CommandByteCounter.getInstance();
this.executorService = Executors.newSingleThreadExecutor();
}
@Override
protected void handleClient(BufferedReader bufferedReader, OutputStream outputStream) throws IOException {
protected void handleClient(BufferedReader bufferedReader, OutputStream outputStream) {
String line;
System.out.println("Handling");
while ((line = bufferedReader.readLine()) != null) {
if (line.isEmpty()) continue;
List<String> parsedCommands = commandParser.parseCommand(bufferedReader, line);
System.out.println("ReplicaClient parse: " + parsedCommands);
addBytes(parsedCommands);
CommandProcessable commandHandler = new ReplicaCommandHandler(parsedCommands);
Thread thread = new Thread(commandHandler);
thread.start();
try {
while ((line = bufferedReader.readLine()) != null) {
if (line.isEmpty()) continue;
List<String> parsedCommands = commandParser.parseCommand(bufferedReader, line);
addBytes(parsedCommands);
CommandProcessable command = new ReplicaCommandHandler(parsedCommands);
executorService.submit(command);
}
} catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage(),e);
}
}
......
package command;
import java.io.IOException;
public interface CommandHandler {
byte[] execute() throws IOException;
}
......@@ -9,8 +9,7 @@ public class CommandInvoker {
public static byte[] invoke(CommandExecutable<byte[]> command) throws IOException {
String commandClassName = command.getClass().getName();
String threadName = Thread.currentThread().getName();
logger.log(Level.INFO, "Executing command: " + commandClassName + " on thread: " + threadName);
logger.log(Level.INFO, "Executing command: " + commandClassName);
byte[] result = command.execute();
logger.log(Level.INFO, "Command executed successfully: " + commandClassName);
......
......@@ -13,6 +13,10 @@ public class EchoCommand implements CommandExecutable<byte[]> {
@Override
public byte[] execute() {
return (Response.getResponse(args.getFirst()));
if (!args.isEmpty()) {
return (Response.getResponse(args.getFirst()));
} else {
return "".getBytes();
}
}
}
package command;
import model.Command;
import server.ReceivedFromReplica;
import java.util.List;
......
......@@ -18,22 +18,26 @@ public class SetCommand implements CommandExecutable<byte[]> {
@Override
public byte[] execute() {
Map<String, String> commandsMap = IntStream.iterate(2, i -> i < args.size(), i -> i + 2)
.boxed()
.collect(Collectors.toMap(i -> args.get(i).toLowerCase(), i -> args.get(i + 1), (a, b) -> b));
if (!args.isEmpty()) {
Map<String, String> commandsMap = IntStream.iterate(2, i -> i < args.size(), i -> i + 2)
.boxed()
.collect(Collectors.toMap(i -> args.get(i).toLowerCase(), i -> args.get(i + 1), (a, b) -> b));
String value = args.get(1);
String expiration = commandsMap.get(Command.PX.getValue().toLowerCase());
if (expiration != null) {
try {
Long expirationTime = Long.parseLong(expiration);
storage.save(args.getFirst().toLowerCase(), value, expirationTime);
} catch (NumberFormatException e) {
System.out.println("NumberFormatException: " + e.getMessage());
String value = args.get(1);
String expiration = commandsMap.get(Command.PX.getValue().toLowerCase());
if (expiration != null) {
try {
Long expirationTime = Long.parseLong(expiration);
storage.save(args.getFirst().toLowerCase(), value, expirationTime);
} catch (NumberFormatException e) {
System.out.println("NumberFormatException: " + e.getMessage());
}
} else {
storage.save(args.getFirst().toLowerCase(), value);
}
return ("+OK\r\n".getBytes());
} else {
storage.save(args.getFirst().toLowerCase(), value);
return "wrong args".getBytes();
}
return ("+OK\r\n".getBytes());
}
}
package command;
import java.util.Objects;
public class UnknownCommand implements CommandExecutable<byte[]> {
private static String message;
public UnknownCommand(String message) {
UnknownCommand.message = message;
}
public UnknownCommand() {
}
@Override
public byte[] execute() {
return (("-" + "Unknown Command" + "\r\n").getBytes());
return ((Objects.requireNonNullElse(message, "-" + "Unknown Command" + "\r\n")).getBytes());
}
}
package command.handshake;
import model.Command;
import parser.CommandParser;
......@@ -8,16 +7,30 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
public class PingCommandSender extends CommandSender {
private static final Logger logger = Logger.getLogger(PingCommandSender.class.getName());
public PingCommandSender(CommandParser commandParser) {
super(commandParser);
}
@Override
public void sendCommand(BufferedReader bufferedReader, OutputStream outputStream) throws IOException {
byte[] bytes = commandParser.getResponseFromCommandArray(List.of(Command.PING.getValue().toLowerCase())).getBytes();
outputStream.write(bytes);
outputStream.flush();
try {
logger.info("Sending PING command.");
byte[] bytes = commandParser.getResponseFromCommandArray(
List.of(Command.PING.getValue().toLowerCase())).getBytes();
outputStream.write(bytes);
outputStream.flush();
logger.info("PING command sent successfully.");
} catch (IOException e) {
logger.log(Level.SEVERE, "Failed to send PING command: " + e.getMessage(), e);
throw new IOException("Failed to send PING command: " + e.getMessage(), e);
}
}
}
package command.handshake;
import model.Command;
import parser.CommandParser;
......@@ -8,8 +7,11 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ReplConfCommandSender extends CommandSender {
private static final Logger logger = Logger.getLogger(ReplConfCommandSender.class.getName());
private final String port;
public ReplConfCommandSender(String port, CommandParser commandParser) {
......@@ -19,36 +21,84 @@ public class ReplConfCommandSender extends CommandSender {
@Override
public void sendCommand(BufferedReader bufferedReader, OutputStream outputStream) throws IOException {
String replconfResp = "";
replconfResp = getReplConfResp(bufferedReader, outputStream);
try {
logger.info("Starting REPLCONF command sequence.");
if (sendReplConfResp(bufferedReader, outputStream)) {
logger.info("REPLCONF listening-port sent successfully.");
if (sendReplConfCapa(bufferedReader, outputStream)) {
logger.info("REPLCONF capa sent successfully.");
if (!replconfResp.isEmpty()) {
replconfResp = sendReplConfCapa(bufferedReader, outputStream);
sendPsyncCommand(outputStream);
logger.info("PSYNC command sent successfully.");
}
}
} catch (IOException e) {
logger.log(Level.SEVERE, "Failed to send REPLCONF command sequence: " + e.getMessage(), e);
throw new IOException("Failed to send REPLCONF command: " + e.getMessage(), e);
}
if (!replconfResp.isEmpty()) {
outputStream.write(commandParser.getResponseFromCommandArray(List.of(Command.PSYNC.getValue(), "?", "-1")).getBytes());
}
private boolean sendReplConfCapa(BufferedReader bufferedReader, OutputStream outputStream) throws IOException {
try {
outputStream.write(commandParser.getResponseFromCommandArray(List.of(
Command.REPLCONF.getValue(),
"capa",
"npsync2")
).getBytes()
);
outputStream.flush();
String response = bufferedReader.readLine();
if (response == null) {
throw new IOException("No response received after sending REPLCONF capa.");
}
logger.info("Received response after REPLCONF capa: " + response);
return !response.isEmpty();
} catch (IOException e) {
logger.log(Level.WARNING, "Error during sending REPLCONF capa: " + e.getMessage(), e);
throw e;
}
}
private String sendReplConfCapa(BufferedReader bufferedReader, OutputStream outputStream) throws IOException {
outputStream.write(commandParser.getResponseFromCommandArray(List.of(
Command.REPLCONF.getValue(),
"capa",
"npsync2")
).getBytes()
);
outputStream.flush();
return bufferedReader.readLine();
private boolean sendReplConfResp(BufferedReader bufferedReader, OutputStream outputStream) throws IOException {
try {
outputStream.write(commandParser.getResponseFromCommandArray(List.of(
Command.REPLCONF.getValue(),
"listening-port",
String.valueOf(port))
).getBytes()
);
outputStream.flush();
String response = bufferedReader.readLine();
if (response == null) {
throw new IOException("No response received after sending REPLCONF listening-port.");
}
logger.info("Received response after REPLCONF listening-port: " + response);
return !response.isEmpty();
} catch (IOException e) {
logger.log(Level.WARNING, "Error during sending REPLCONF listening-port: " + e.getMessage(), e);
throw e;
}
}
private String getReplConfResp(BufferedReader bufferedReader, OutputStream outputStream) throws IOException {
outputStream.write(commandParser.getResponseFromCommandArray(List.of(
Command.REPLCONF.getValue(),
"listening-port",
String.valueOf(port))
).getBytes()
);
outputStream.flush();
return bufferedReader.readLine();
private void sendPsyncCommand(OutputStream outputStream) throws IOException {
try {
outputStream.write(commandParser.getResponseFromCommandArray(List.of(
Command.PSYNC.getValue(),
"?",
"-1"))
.getBytes()
);
outputStream.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "Error during sending PSYNC command: " + e.getMessage(), e);
throw e;
}
}
}
......@@ -6,12 +6,15 @@ import storage.Storage;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ReplicaSetCommand implements CommandExecutable<Void> {
private final Storage storage = Storage.getInstance();
private final List<String> args;
private static final Logger logger = Logger.getLogger(ReplicaSetCommand.class.getName());
public ReplicaSetCommand(List<String> args) {
this.args = args;
......@@ -30,7 +33,7 @@ public class ReplicaSetCommand implements CommandExecutable<Void> {
Long expirationTime = Long.parseLong(expiration);
storage.save(args.getFirst().toLowerCase(), value, expirationTime);
} catch (NumberFormatException e) {
System.out.println("NumberFormatException: " + e.getMessage());
logger.log(Level.SEVERE, "NumberFormatException", e);
}
} else {
storage.save(args.getFirst().toLowerCase(), value);
......
......@@ -33,7 +33,6 @@ public class CommandFactory implements Factory {
};
}
// Overloaded method for additional dependencies
public CommandExecutable<byte[]> getCommand(Command command, List<String> args, SendToReplica sender, OutputStream os) {
return switch (command) {
case PING -> new PingCommand();
......
......@@ -23,7 +23,6 @@ public class ReplicaCommandFactory implements Factory {
@Override
public CommandExecutable<?> getCommand(Command command, List<String> args) {
System.out.println("Factory get instance from " + command.getValue());
return switch (command) {
case SET -> new ReplicaSetCommand(args);
case REPLCONF -> new ReplicaReplConfCommand();
......
......@@ -2,6 +2,7 @@ package handlers;
import command.CommandExecutable;
import command.CommandInvoker;
import command.UnknownCommand;
import factory.CommandFactory;
import model.Command;
import parser.CommandParser;
......@@ -28,6 +29,7 @@ public class ClientCommandHandler {
this.os = outputStream;
this.commandParser = new CommandParser();
this.sendToReplica = SendToReplica.getInstance();
sendToReplica.start();
}
public void execute() {
......@@ -38,6 +40,12 @@ public class ClientCommandHandler {
try {
Command command = CommandUtil.getCommand(commands.getFirst());
if (command == null) {
logger.log(Level.SEVERE, "The client send UnRecognized command.");
CommandExecutable<byte[]> result = new UnknownCommand("UnRecognized command");
byte[] response = result.execute();
writeResponse(response);
}
List<String> replicaCommand = createReplicaCommand(commands);
byte[] result = executeCommand(command);
sendReplicaCommand(replicaCommand);
......
......@@ -8,11 +8,16 @@ import parser.CommandParser;
import util.RdbBytesReader;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ConnectionHandler {
private static final Logger logger = Logger.getLogger(ConnectionHandler.class.getName());
private final Socket socket;
private final RdbBytesReader reader;
private final int port;
......@@ -21,22 +26,25 @@ public class ConnectionHandler {
public ConnectionHandler(Socket socket, CommandParser commandParser, int port) {
this.socket = socket;
commandSender = new PingCommandSender(commandParser);
this.commandSender = new PingCommandSender(commandParser);
this.port = port;
this.reader = new RdbBytesReader();
this.commandParser = commandParser;
System.out.println("Connection handler");
}
public BufferedReader handleConnection() {
try {
OutputStream outputStream = socket.getOutputStream();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
try (OutputStream outputStream = socket.getOutputStream();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
commandSender.sendCommand(bufferedReader, outputStream);
String line;
while ((line = bufferedReader.readLine()) != null) {
if (line.isEmpty()) continue;
System.out.println("GET CONNECTION : process line: " + line);
if (line.isEmpty()) {
continue;
}
logger.info("Processing line: " + line);
if (line.equalsIgnoreCase("+" + Command.PONG.getValue())) {
this.commandSender = new ReplConfCommandSender(String.valueOf(port), commandParser);
commandSender.sendCommand(bufferedReader, outputStream);
......@@ -44,10 +52,11 @@ public class ConnectionHandler {
return reader.read(bufferedReader);
}
}
} catch (IOException e) {
logger.log(Level.SEVERE, "I/O error during connection handling: " + e.getMessage(), e);
} catch (Exception e) {
System.out.println(e.getMessage());
logger.log(Level.SEVERE, "Unexpected error during connection handling: " + e.getMessage(), e);
}
return null;
}
}
......@@ -18,7 +18,6 @@ public class ReplicaCommandHandler implements CommandProcessable {
private final ReplicaCommandFactory factory = ReplicaCommandFactory.getInstance();
public ReplicaCommandHandler(List<String> commands) {
logger.info("replica command handler");
this.commands = Collections.unmodifiableList(commands);
}
......@@ -35,7 +34,6 @@ public class ReplicaCommandHandler implements CommandProcessable {
logger.warning("Invalid command detected.");
return;
}
logger.info("Executing");
CommandExecutable<?> commandProcessor = factory.getCommand(command, commands.subList(1, commands.size()));
executeCommand(commandProcessor);
} catch (Exception e) {
......
package server;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class ReceivedFromReplica {
private final AtomicInteger atomicInteger;
private final AtomicBoolean atomicBoolean;
public ReceivedFromReplica() {
synchronized (this) {
atomicBoolean = new AtomicBoolean(false);
atomicInteger = new AtomicInteger(0);
}
}
public void receive() {
atomicInteger.getAndIncrement();
}
public int getReceivedCount() {
return atomicInteger.get();
}
public void reset() {
System.out.println("reset values");
synchronized (this) {
atomicBoolean.set(false);
atomicInteger.set(0);
}
}
}
......@@ -11,6 +11,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
public class SendToReplica implements Closeable {
......@@ -31,9 +32,6 @@ public class SendToReplica implements Closeable {
return SendToReplicaHolder.INSTANCE;
}
public String getCountConnectedReplicas() {
return String.valueOf(connectedReplicas.size());
}
public void start() {
executorService.submit(this::processCommands);
......@@ -49,11 +47,12 @@ public class SendToReplica implements Closeable {
byte[] commandBytes = command.getBytes();
connectedReplicas.forEach(replica -> {
try {
OutputStream outputStream = replica.os();
outputStream.write(commandBytes);
outputStream.flush();
try (OutputStream outputStream = replica.os()) {
outputStream.write(commandBytes);
outputStream.flush();
}
} catch (IOException e) {
logger.severe("Failed to send command to replica ");
logger.log(Level.SEVERE, "Failed to send command to replica ", e);
}
});
}
......@@ -67,9 +66,7 @@ public class SendToReplica implements Closeable {
}
}
public int getCountCommands() {
return countCommands.getAndSet(0);
}
public void addConnection(OutputStream outputStream) {
connectedReplicas.add(new ConnectedReplica(outputStream));
......
......@@ -12,7 +12,7 @@ public class LRUCachePolicy<K, V> implements CachePolicy<K, V> {
public LRUCachePolicy(int maxCapacity, int threshold) {
this.cacheMap = new ConcurrentHashMap<>(maxCapacity);
RdbFileReader<String,String> reader = new RdbFileReader<>();
RdbFileReader<String, String> reader = new RdbFileReader<>();
cacheMap.putAll((Map<? extends K, ? extends V>) reader.readFile());
this.threshold = threshold;
this.accessOrder = new ConcurrentLinkedQueue<>();
......
package storage;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
public class StorageManager {
private final Storage storage;
private final ScheduledExecutorService scheduler;
private static final Logger logger = Logger.getLogger(StorageManager.class.getName());
public StorageManager() {
this.storage = Storage.getInstance();
......@@ -17,7 +19,7 @@ public class StorageManager {
private void performMaintenance() {
storage.runCachePolicy();
System.out.println("Maintenance performed at: " + Date.from(java.time.Clock.systemUTC().instant()));
logger.log(Level.INFO, "Maintenance Started");
}
public void shutdown() {
......
......@@ -5,7 +5,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class CommandByteCounter {
private static volatile CommandByteCounter instance;
private final AtomicInteger bytes;
private final AtomicBoolean isFirst;
......@@ -14,15 +13,12 @@ public class CommandByteCounter {
isFirst = new AtomicBoolean(true);
}
private static final class InstanceHolder {
private static final CommandByteCounter instance = new CommandByteCounter();
}
public static CommandByteCounter getInstance() {
if (instance == null) {
synchronized (CommandByteCounter.class) {
if (instance == null) {
instance = new CommandByteCounter();
}
}
}
return instance;
return InstanceHolder.instance;
}
public void setIsFirst(boolean b) {
......@@ -38,9 +34,6 @@ public class CommandByteCounter {
bytes.addAndGet(bytesLength);
}
public void renewBytes(int bytes) {
this.bytes.set(this.bytes.get() + bytes);
}
public Integer getBytes() {
return bytes.get();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment