Commit d212e5e4 authored by amir.yosef's avatar amir.yosef

final ( before cleaning)

parent 03aa0c1a
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="ConstantValue" enabled="true" level="WARNING" enabled_by_default="true">
<option name="REPORT_CONSTANT_REFERENCE_VALUES" value="false" />
</inspection_tool>
</profile>
</component>
\ No newline at end of file
Manifest-Version: 1.0
Main-Class: Main
import director.Director;
import server.ReplicaConnectionService;
import server.Server;
import server.ServerBuilder;
import java.util.Arrays;
import java.io.IOException;
public class Main {
public static void main(String[] args) {
public static void main(String[] args) throws IOException {
Director director = new Director();
// System.out.println(Arrays.toString(args));
ServerBuilder builder = new ServerBuilder(args);
director.buildMaster(builder);
try (Server server = builder.build()) {
server.start();
}
// try (Server server = builder.build()) {
// server.start();
// }
// builder.setPort(16379);
ReplicaConnectionService service = null;
director.buildReplica(builder, service);
}
}
\ No newline at end of file
......@@ -7,11 +7,14 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.util.logging.Level;
import java.util.logging.Logger;
public abstract class Client {
public abstract class Client extends Thread {
protected final CommandParser commandParser;
protected final Socket socket;
protected BufferedReader reader;
private static final Logger logger = Logger.getLogger(Client.class.getName());
protected Client(Socket socket) {
this.socket = socket;
......@@ -24,6 +27,7 @@ public abstract class Client {
this.socket = socket;
}
@Override
public void run() {
try (OutputStream outputStream = socket.getOutputStream()) {
if (reader == null) {
......@@ -31,7 +35,7 @@ public abstract class Client {
}
handleClient(reader, outputStream);
} catch (IOException e) {
System.out.println("IOException while handling client: " + e.getMessage());
logger.log(Level.SEVERE, e.getMessage());
}
}
......
package client.replica;
import client.Client;
import command.CommandProcessable;
import handlers.replica.ReplicaCommandHandler;
import util.CommandByteCounter;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ReplicaClient extends Client {
private final CommandByteCounter commandByteCounter;
private final ExecutorService executorService;
public ReplicaClient(BufferedReader reader, Socket socket) {
super(reader, socket);
System.out.println("Client");
this.commandByteCounter = CommandByteCounter.getInstance();
this.executorService = Executors.newSingleThreadExecutor();
}
@Override
protected void handleClient(BufferedReader bufferedReader, OutputStream outputStream) throws IOException {
String line;
System.out.println("Handling");
while ((line = bufferedReader.readLine()) != null) {
if (line.isEmpty()) continue;
List<String> parsedCommands = commandParser.parseCommand(bufferedReader, line);
System.out.println("ReplicaClient parse: " + parsedCommands);
addBytes(parsedCommands);
CommandProcessable commandHandler = new ReplicaCommandHandler(parsedCommands);
Thread thread = new Thread(commandHandler);
thread.start();
}
}
private void addBytes(List<String> parsedCommands) {
if (commandByteCounter.isFirst()) {
commandByteCounter.setIsFirst(false);
} else {
String response = commandParser.getResponseFromCommandArray(parsedCommands);
commandByteCounter.addBytes(response.getBytes().length);
}
}
}
package command;
import java.io.IOException;
public interface CommandExecutable<T> {
T execute() throws IOException;
}
package command;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class CommandInvoker {
public static byte[] invoke(CommandHandler command) throws IOException {
return command.execute();
private static final Logger logger = Logger.getLogger(CommandInvoker.class.getName());
public static byte[] invoke(CommandExecutable<byte[]> command) throws IOException {
String commandClassName = command.getClass().getName();
String threadName = Thread.currentThread().getName();
logger.log(Level.INFO, "Executing command: " + commandClassName + " on thread: " + threadName);
byte[] result = command.execute();
logger.log(Level.INFO, "Command executed successfully: " + commandClassName);
return result;
}
}
package command;
public interface CommandProcessable extends Runnable {
void process();
}
......@@ -4,7 +4,7 @@ import util.Response;
import java.util.List;
public class EchoCommand implements CommandHandler {
public class EchoCommand implements CommandExecutable<byte[]> {
private final List<String> args;
public EchoCommand(List<String> args) {
......
package command;
import model.Command;
import server.SendToReplica;
import server.ServerInfo;
import util.RdbFileInfo;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
public class FullRsyncCommand implements CommandExecutable<byte[]> {
private final SendToReplica replicaSender;
private final RdbFileInfo rdbFileInfo;
private final OutputStream outputStream;
private final ServerInfo serverInfo;
public FullRsyncCommand(SendToReplica replicaSender, OutputStream outputStream) {
this.outputStream = outputStream;
this.replicaSender = replicaSender;
rdbFileInfo = RdbFileInfo.getInstance();
this.serverInfo = ServerInfo.getInstance();
}
@Override
public byte[] execute() {
synchronized (this) {
replicaSender.addConnection(outputStream);
System.out.println("hi");
byte[] decode = rdbFileInfo.getContent();
try {
return createCommandBytes(Command.FULLRESYNC, decode, serverInfo);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public byte[] createCommandBytes(Command command, byte[] decode, ServerInfo serverInfo) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
byteArrayOutputStream.write(("+" + command.getValue() + " " + serverInfo.getInfo().get("master_replid") + " 0\r\n").getBytes());
byteArrayOutputStream.write(("$" + decode.length + "\r\n").getBytes());
byteArrayOutputStream.write(decode);
return byteArrayOutputStream.toByteArray();
}
}
......@@ -6,7 +6,7 @@ import util.Response;
import java.util.List;
public class GetCommand implements CommandHandler {
public class GetCommand implements CommandExecutable<byte[]> {
private final Storage storage = Storage.getInstance();
private final List<String> args;
......
......@@ -8,7 +8,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class InfoCommand implements CommandHandler {
public class InfoCommand implements CommandExecutable<byte[]> {
private final ServerInfo configuration = ServerInfo.getInstance();
List<String> args;
......
......@@ -3,7 +3,7 @@ package command;
import model.Command;
public class PingCommand implements CommandHandler {
public class PingCommand implements CommandExecutable<byte[]> {
@Override
public byte[] execute() {
......
package command;
import model.Command;
import server.ReceivedFromReplica;
import java.util.List;
public class ReplConfCommand implements CommandExecutable<byte[]> {
private final List<String> args;
public ReplConfCommand(List<String> args) {
this.args = args;
}
@Override
public byte[] execute() {
if (args.getFirst().equalsIgnoreCase(Command.ACK.getValue())) {
return null;
}
return ("+OK\r\n".getBytes());
}
}
......@@ -3,11 +3,12 @@ package command;
import model.Command;
import storage.Storage;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class SetCommand implements CommandHandler {
public class SetCommand implements CommandExecutable<byte[]> {
private final Storage storage = Storage.getInstance();
private final List<String> args;
......@@ -17,16 +18,9 @@ public class SetCommand implements CommandHandler {
@Override
public byte[] execute() {
return getBytes(args, storage);
}
private byte[] getBytes(List<String> args, Storage storage) {
Map<String, String> commandsMap = new HashMap<>();
for (int i = 2; i < args.size(); i += 2) {
commandsMap.put(args.get(i).toLowerCase(), args.get(i + 1));
}
Map<String, String> commandsMap = IntStream.iterate(2, i -> i < args.size(), i -> i + 2)
.boxed()
.collect(Collectors.toMap(i -> args.get(i).toLowerCase(), i -> args.get(i + 1), (a, b) -> b));
String value = args.get(1);
String expiration = commandsMap.get(Command.PX.getValue().toLowerCase());
......
package command;
public class UnknownCommand implements CommandHandler {
public class UnknownCommand implements CommandExecutable<byte[]> {
@Override
public byte[] execute() {
return (("-" + "Unknown Command" + "\r\n").getBytes());
......
package command.handshake;
import parser.CommandParser;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.OutputStream;
public abstract class CommandSender {
protected final CommandParser commandParser;
protected CommandSender(CommandParser commandParser) {
this.commandParser = commandParser;
}
public abstract void sendCommand(BufferedReader bufferedReader, OutputStream outputStream) throws IOException;
}
package command.handshake;
import model.Command;
import parser.CommandParser;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
public class PingCommandSender extends CommandSender {
public PingCommandSender(CommandParser commandParser) {
super(commandParser);
}
@Override
public void sendCommand(BufferedReader bufferedReader, OutputStream outputStream) throws IOException {
byte[] bytes = commandParser.getResponseFromCommandArray(List.of(Command.PING.getValue().toLowerCase())).getBytes();
outputStream.write(bytes);
outputStream.flush();
}
}
package command.handshake;
import model.Command;
import parser.CommandParser;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
public class ReplConfCommandSender extends CommandSender {
private final String port;
public ReplConfCommandSender(String port, CommandParser commandParser) {
super(commandParser);
this.port = port;
}
@Override
public void sendCommand(BufferedReader bufferedReader, OutputStream outputStream) throws IOException {
String replconfResp = "";
replconfResp = getReplConfResp(bufferedReader, outputStream);
if (!replconfResp.isEmpty()) {
replconfResp = sendReplConfCapa(bufferedReader, outputStream);
}
if (!replconfResp.isEmpty()) {
outputStream.write(commandParser.getResponseFromCommandArray(List.of(Command.PSYNC.getValue(), "?", "-1")).getBytes());
}
}
private String sendReplConfCapa(BufferedReader bufferedReader, OutputStream outputStream) throws IOException {
outputStream.write(commandParser.getResponseFromCommandArray(List.of(
Command.REPLCONF.getValue(),
"capa",
"npsync2")
).getBytes()
);
outputStream.flush();
return bufferedReader.readLine();
}
private String getReplConfResp(BufferedReader bufferedReader, OutputStream outputStream) throws IOException {
outputStream.write(commandParser.getResponseFromCommandArray(List.of(
Command.REPLCONF.getValue(),
"listening-port",
String.valueOf(port))
).getBytes()
);
outputStream.flush();
return bufferedReader.readLine();
}
}
package command.replica;
import command.CommandExecutable;
import model.Command;
import parser.CommandParser;
import java.io.IOException;
import java.util.List;
public class ReplicaFullRsyncCommand implements CommandExecutable<byte[]> {
private final CommandParser commandParser;
public ReplicaFullRsyncCommand() {
commandParser = new CommandParser();
}
@Override
public byte[] execute() throws IOException {
return commandParser.getResponseFromCommandArray(List.of(Command.REPLCONF.getValue(), "ACK", "0")).getBytes();
}
}
package command.replica;
import command.CommandExecutable;
import model.Command;
import parser.CommandParser;
import util.CommandByteCounter;
import java.util.List;
public class ReplicaReplConfCommand implements CommandExecutable<byte[]> {
private final CommandParser commandParser;
private final CommandByteCounter commandByteCounter;
public ReplicaReplConfCommand() {
commandParser = new CommandParser();
this.commandByteCounter = CommandByteCounter.getInstance();
}
@Override
public byte[] execute() {
System.out.println("ReplicaReplConfCommand processed command: ");
return commandParser.getResponseFromCommandArray(List.of(Command.REPLCONF.getValue(), "ACK", commandByteCounter.getBytes().toString())).getBytes();
}
}
package command.replica;
import command.CommandExecutable;
import model.Command;
import storage.Storage;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ReplicaSetCommand implements CommandExecutable<Void> {
private final Storage storage = Storage.getInstance();
private final List<String> args;
public ReplicaSetCommand(List<String> args) {
this.args = args;
}
@Override
public Void execute() {
Map<String, String> commandsMap = IntStream.iterate(2, i -> i < args.size(), i -> i + 2).boxed()
.collect(Collectors.toMap(i -> args.get(i).toLowerCase(), i -> args.get(i + 1), (a, b) -> b));
String value = args.get(1);
String expiration = commandsMap.get(Command.PX.getValue().toLowerCase());
if (expiration != null) {
try {
Long expirationTime = Long.parseLong(expiration);
storage.save(args.getFirst().toLowerCase(), value, expirationTime);
} catch (NumberFormatException e) {
System.out.println("NumberFormatException: " + e.getMessage());
}
} else {
storage.save(args.getFirst().toLowerCase(), value);
}
return null;
}
}
package director;
import server.ReplicaConnectionService;
import server.ServerBuilder;
import java.io.IOException;
public class Director {
public void buildMaster(ServerBuilder builder) {
builder.setRole("Master");
}
public void buildReplica(ServerBuilder builder) {
builder.setRole("Replica");
public void buildReplica(ServerBuilder builder, ReplicaConnectionService replicaConnectionService) throws IOException {
replicaConnectionService = new ReplicaConnectionService(builder.getMasterPortAndHost(), 16378);
replicaConnectionService.checkConnection();
}
}
......@@ -2,7 +2,9 @@ package factory;
import command.*;
import model.Command;
import server.SendToReplica;
import java.io.OutputStream;
import java.util.List;
public class CommandFactory implements Factory {
......@@ -19,28 +21,29 @@ public class CommandFactory implements Factory {
}
@Override
public CommandHandler getCommand(Command command, List<String> args) {
public CommandExecutable<byte[]> getCommand(Command command, List<String> args) {
return switch (command) {
case PING -> new PingCommand();
case ECHO -> new EchoCommand(args);
case SET -> new SetCommand(args);
case GET -> new GetCommand(args);
case INFO -> new InfoCommand(args);
// case REPLCONF -> new ReplConfCommand(replicaReceiver);
// case PSYNC -> new FullResyncCommandProcessor(replicaSender);
// case WAIT -> new WaitCommandProcessor(replicaSender, replicaReceiver);
// case CONFIG -> new ConfigCommandProcessor();
// case KEYS -> new KeysCommandProcessor();
// case TYPE -> new TypeCommandProcessor();
// case XADD -> new XaddCommandProcessor();
// case XRANGE -> new XrangeCommandProcessor();
// case XREAD -> new XreadCommandProcessor();
// case INCR -> new IncrCommandProcessor();
// case MULTI -> new MultiCommandProcessor(transactionMultiCommandService);
// case EXEC -> new ExecCommandProcessor(transactionMultiCommandService);
// case DISCARD -> new DiscardCommandProcessor(transactionMultiCommandService);
case REPLCONF -> new ReplConfCommand(args);
default -> new UnknownCommand();
};
}
// Overloaded method for additional dependencies
public CommandExecutable<byte[]> getCommand(Command command, List<String> args, SendToReplica sender, OutputStream os) {
return switch (command) {
case PING -> new PingCommand();
case ECHO -> new EchoCommand(args);
case SET -> new SetCommand(args);
case GET -> new GetCommand(args);
case INFO -> new InfoCommand(args);
case REPLCONF -> new ReplConfCommand(args);
case PSYNC -> new FullRsyncCommand(sender, os);
default -> new UnknownCommand();
};
}
}
package factory;
import command.CommandHandler;
import command.CommandExecutable;
import model.Command;
import java.util.List;
public interface Factory {
CommandHandler getCommand(Command command, List<String> args);
CommandExecutable<?> getCommand(Command command, List<String> args);
}
package factory.replica;
import command.CommandExecutable;
import command.replica.ReplicaFullRsyncCommand;
import command.replica.ReplicaReplConfCommand;
import command.replica.ReplicaSetCommand;
import factory.Factory;
import model.Command;
import java.util.List;
public class ReplicaCommandFactory implements Factory {
private static final class ReplicaFactoryHolder {
private static final ReplicaCommandFactory factory = new ReplicaCommandFactory();
}
public ReplicaCommandFactory() {
}
public static ReplicaCommandFactory getInstance() {
return ReplicaFactoryHolder.factory;
}
@Override
public CommandExecutable<?> getCommand(Command command, List<String> args) {
System.out.println("Factory get instance from " + command.getValue());
return switch (command) {
case SET -> new ReplicaSetCommand(args);
case REPLCONF -> new ReplicaReplConfCommand();
case PSYNC, FULLRESYNC -> new ReplicaFullRsyncCommand();
default -> null;
};
}
}
package handlers;
import command.CommandHandler;
import command.CommandExecutable;
import command.CommandInvoker;
import factory.CommandFactory;
import model.Command;
import parser.CommandParser;
import server.SendToReplica;
import util.CommandUtil;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ClientCommandHandler {
private static final Logger logger = Logger.getLogger(ClientCommandHandler.class.getName());
private final List<String> commands;
private final OutputStream os;
private final CommandParser commandParser;
private final CommandFactory factory = CommandFactory.getInstance();
private final SendToReplica sendToReplica;
public ClientCommandHandler(List<String> commands, OutputStream outputStream) {
this.commands = commands;
this.commands = Collections.unmodifiableList(commands);
this.os = outputStream;
this.commandParser = new CommandParser();
this.sendToReplica = SendToReplica.getInstance();
}
public void execute() {
Command command = CommandUtil.getCommand(commands.getFirst());
commands.removeFirst();
CommandHandler commandProcessor = factory.getCommand(command, commands);
if (commands.isEmpty()) {
logger.warning("No commands to execute.");
return;
}
try {
byte[] result = CommandInvoker.invoke(commandProcessor);
os.write(result);
os.flush();
Command command = CommandUtil.getCommand(commands.getFirst());
List<String> replicaCommand = createReplicaCommand(commands);
byte[] result = executeCommand(command);
sendReplicaCommand(replicaCommand);
writeResponse(result);
} catch (IOException e) {
logger.log(Level.SEVERE, "Command execution failed", e);
throw new RuntimeException(e);
}
}
private byte[] executeCommand(Command command) throws IOException {
CommandExecutable<byte[]> commandProcessor = factory.getCommand(command, commands.subList(1, commands.size()), sendToReplica, os);
return CommandInvoker.invoke(commandProcessor);
}
private void sendReplicaCommand(List<String> replicaCommand) {
sendToReplica.addCommand(commandParser.getResponseFromCommandArray(replicaCommand));
}
private void writeResponse(byte[] result) throws IOException {
os.write(result);
os.flush();
}
private List<String> createReplicaCommand(List<String> commands) {
return List.copyOf(commands);
}
}
package handlers.replica;
import command.handshake.CommandSender;
import command.handshake.PingCommandSender;
import command.handshake.ReplConfCommandSender;
import model.Command;
import parser.CommandParser;
import util.RdbBytesReader;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
public class ConnectionHandler {
private final Socket socket;
private final RdbBytesReader reader;
private final int port;
private final CommandParser commandParser;
private CommandSender commandSender;
public ConnectionHandler(Socket socket, CommandParser commandParser, int port) {
this.socket = socket;
commandSender = new PingCommandSender(commandParser);
this.port = port;
this.reader = new RdbBytesReader();
this.commandParser = commandParser;
System.out.println("Connection handler");
}
public BufferedReader handleConnection() {
try {
OutputStream outputStream = socket.getOutputStream();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
commandSender.sendCommand(bufferedReader, outputStream);
String line;
while ((line = bufferedReader.readLine()) != null) {
if (line.isEmpty()) continue;
System.out.println("GET CONNECTION : process line: " + line);
if (line.equalsIgnoreCase("+" + Command.PONG.getValue())) {
this.commandSender = new ReplConfCommandSender(String.valueOf(port), commandParser);
commandSender.sendCommand(bufferedReader, outputStream);
} else if (line.startsWith("+FULLRESYNC")) {
return reader.read(bufferedReader);
}
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
return null;
}
}
package handlers.replica;
import command.CommandExecutable;
import command.CommandProcessable;
import factory.replica.ReplicaCommandFactory;
import model.Command;
import util.CommandUtil;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ReplicaCommandHandler implements CommandProcessable {
private static final Logger logger = Logger.getLogger(ReplicaCommandHandler.class.getName());
private final List<String> commands;
private final ReplicaCommandFactory factory = ReplicaCommandFactory.getInstance();
public ReplicaCommandHandler(List<String> commands) {
logger.info("replica command handler");
this.commands = Collections.unmodifiableList(commands);
}
@Override
public void process() {
if (commands.isEmpty()) {
logger.warning("No commands to process.");
return;
}
try {
Command command = extractCommand();
if (command == null) {
logger.warning("Invalid command detected.");
return;
}
logger.info("Executing");
CommandExecutable<?> commandProcessor = factory.getCommand(command, commands.subList(1, commands.size()));
executeCommand(commandProcessor);
} catch (Exception e) {
logger.log(Level.SEVERE, "Failed to process command.", e);
throw new RuntimeException(e);
}
}
private Command extractCommand() {
String firstCommand = commands.getFirst();
return CommandUtil.getCommand(firstCommand);
}
private void executeCommand(CommandExecutable<?> commandProcessor) throws IOException {
if (commandProcessor != null) {
commandProcessor.execute();
} else {
logger.warning("Command processor is null.");
}
}
@Override
public void run() {
process();
}
}
......@@ -7,9 +7,7 @@ public enum Command {
ECHO("ECHO"),
FULLRESYNC("FULLRESYNC"),
GET("GET"),
INCR("INCR"),
INFO("INFO"),
KEYS("KEYS"),
PING("PING"),
PONG("PONG"),
PSYNC("PSYNC"),
......@@ -17,11 +15,7 @@ public enum Command {
REPLCONF("REPLCONF"),
REPLICATION("REPLICATION"),
SET("SET"),
TYPE("TYPE"),
WAIT("WAIT"),
XADD("XADD"),
XRANGE("XRANGE"),
XREAD("XREAD"), MULTI("MULTI"), EXEC("EXEC"), DISCARD("DISCARD"),
UNKNOWN("UNKNOWN");
private final String value;
......
package model;
import java.io.OutputStream;
public record ConnectedReplica(OutputStream os) {
}
package model;
public record RdbFile(String path, String fileName) {
}
package server;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class ReceivedFromReplica {
private final AtomicInteger atomicInteger;
private final AtomicBoolean atomicBoolean;
public ReceivedFromReplica() {
synchronized (this) {
atomicBoolean = new AtomicBoolean(false);
atomicInteger = new AtomicInteger(0);
}
}
public void receive() {
atomicInteger.getAndIncrement();
}
public int getReceivedCount() {
return atomicInteger.get();
}
public void reset() {
System.out.println("reset values");
synchronized (this) {
atomicBoolean.set(false);
atomicInteger.set(0);
}
}
}
package server;
import client.Client;
import client.replica.ReplicaClient;
import handlers.replica.ConnectionHandler;
import parser.CommandParser;
import java.io.BufferedReader;
import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ReplicaConnectionService {
private static final Logger logger = Logger.getLogger(ReplicaConnectionService.class.getName());
private final CommandParser commandParser;
private final int port;
private Socket socket;
private final ExecutorService executorService;
public ReplicaConnectionService(String[] masterPortAndHost, int port) throws IOException {
this.port = port;
this.commandParser = new CommandParser();
this.executorService = Executors.newCachedThreadPool();
if (masterPortAndHost.length > 0) {
this.socket = new Socket(masterPortAndHost[0], Integer.parseInt(masterPortAndHost[1]));
}
}
public void getConnection() {
if (socket == null) {
logger.warning("Socket is not initialized.");
return;
}
ConnectionHandler connectionHandler = new ConnectionHandler(socket, commandParser, port);
BufferedReader bufferedReader = connectionHandler.handleConnection();
try {
logger.info("Ending handshake");
Client client = new ReplicaClient(bufferedReader, socket);
executorService.submit(client);
logger.info("ReplicaConnectionService: CONNECTION SUCCESS");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void checkConnection() {
if (socket != null && !socket.isClosed()) {
getConnection();
} else {
logger.warning("Socket is either null or closed.");
}
}
public void close() {
executorService.shutdown();
if (socket != null && !socket.isClosed()) {
try {
socket.close();
} catch (IOException e) {
logger.log(Level.SEVERE, "Failed to close socket", e);
}
}
}
}
package server;
import model.Command;
import model.ConnectedReplica;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
public class SendToReplica implements Closeable {
private static final Logger logger = Logger.getLogger(SendToReplica.class.getName());
private final Queue<ConnectedReplica> connectedReplicas;
private final Queue<String> commands;
private final ExecutorService executorService;
private final AtomicInteger countCommands;
private SendToReplica() {
this.connectedReplicas = new ConcurrentLinkedQueue<>();
this.commands = new ConcurrentLinkedQueue<>();
this.executorService = Executors.newSingleThreadExecutor();
this.countCommands = new AtomicInteger(0);
}
public static SendToReplica getInstance() {
return SendToReplicaHolder.INSTANCE;
}
public String getCountConnectedReplicas() {
return String.valueOf(connectedReplicas.size());
}
public void start() {
executorService.submit(this::processCommands);
}
private void processCommands() {
while (true) {
String command = commands.poll();
if (command == null) {
continue;
}
byte[] commandBytes = command.getBytes();
connectedReplicas.forEach(replica -> {
try {
OutputStream outputStream = replica.os();
outputStream.write(commandBytes);
outputStream.flush();
} catch (IOException e) {
logger.severe("Failed to send command to replica ");
}
});
}
}
public void addCommand(String command) {
String lowerCaseCommand = command.toLowerCase();
if (lowerCaseCommand.contains(Command.SET.getValue().toLowerCase()) || lowerCaseCommand.contains("getack")) {
countCommands.incrementAndGet();
commands.add(command);
}
}
public int getCountCommands() {
return countCommands.getAndSet(0);
}
public void addConnection(OutputStream outputStream) {
connectedReplicas.add(new ConnectedReplica(outputStream));
logger.info("SIZE CONNECTED REPLICAS = " + connectedReplicas.size());
}
@Override
public void close() {
executorService.shutdown();
logger.info("Closing replicas");
}
private static final class SendToReplicaHolder {
private static final SendToReplica INSTANCE = new SendToReplica();
}
}
......@@ -9,43 +9,65 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Server implements AutoCloseable {
private static final Logger logger = Logger.getLogger(Server.class.getName());
private final int PORT;
private final ExecutorService executor;
StorageManager manager = new StorageManager();
private final StorageManager manager = new StorageManager();
Server(int port, String role) {
PORT = port;
public Server(int port) {
this.PORT = port;
this.executor = Executors.newVirtualThreadPerTaskExecutor();
}
public void start() {
try (ServerSocket serverSocket = new ServerSocket(PORT)) {
System.out.println("Server started on Port " + PORT);
serverSocket.setReuseAddress(true);
logger.info("Server started on Port " + PORT);
while (true) {
serverSocket.setReuseAddress(true);
Socket clientSocket = serverSocket.accept();
System.out.println("New client connected: " + clientSocket.getRemoteSocketAddress());
handleClient(clientSocket);
while (!executor.isShutdown()) {
try {
Socket clientSocket = serverSocket.accept();
logger.info("New client connected: " + clientSocket.getRemoteSocketAddress());
handleClient(clientSocket);
} catch (IOException e) {
logger.log(Level.SEVERE, "Failed to accept client connection", e);
}
}
} catch (IOException e) {
e.getStackTrace();
System.out.println("Server failed to handle: " + e.getMessage());
logger.log(Level.SEVERE, "Server failed to start", e);
}
}
private void handleClient(Socket clientSocket) {
executor.submit(() -> {
Client task = new MasterClient(clientSocket);
task.run();
try {
Client task = new MasterClient(clientSocket);
task.run();
} catch (Exception e) {
logger.log(Level.SEVERE, "Error handling client", e);
} finally {
try {
clientSocket.close();
} catch (IOException e) {
logger.log(Level.WARNING, "Failed to close client socket", e);
}
}
});
}
@Override
public void close() {
try {
if (!executor.isShutdown()) {
executor.shutdown();
}
} catch (Exception e) {
logger.log(Level.SEVERE, "Failed to shutdown executor", e);
}
manager.shutdown();
}
}
package server;
import util.RdbFileInfo;
import util.Settings;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
public class ServerBuilder {
private int port = 6379;
private int port = 16380;
private String role;
String[] masterPortAndHost;
private final String[] masterPortAndHost;
private final static ServerInfo info = ServerInfo.getInstance();
Map<String, String> parameters;
public void setPort(int port) {
this.port = Settings.extractPort(parameters, port);
}
private final Map<String, String> parameters;
private static final RdbFileInfo rdbfileInfo = RdbFileInfo.getInstance();
public ServerBuilder(String[] args) {
this.parameters = Settings.extractArgs(args);
this.port = Settings.extractPort(parameters, port);
this.masterPortAndHost = info.findRole(parameters);
this.masterPortAndHost = initializeMasterPortAndHost();
}
public ServerBuilder setPort(int port) {
this.port = port;
return this;
}
public void setRole(String role) {
this.role = role;
}
public Server build() {
return new Server(port, role);
public String[] getMasterPortAndHost() {
return Optional.ofNullable(masterPortAndHost).orElseGet(this::initializeMasterPortAndHost);
}
public Server build() throws IOException {
if (role == null) {
role = "master";
}
return new Server(port);
}
private String[] initializeMasterPortAndHost() {
rdbfileInfo.setFile(parameters);
return info.findRole(parameters);
}
}
......@@ -12,6 +12,8 @@ public class LRUCachePolicy<K, V> implements CachePolicy<K, V> {
public LRUCachePolicy(int maxCapacity, int threshold) {
this.cacheMap = new ConcurrentHashMap<>(maxCapacity);
RdbFileReader<String,String> reader = new RdbFileReader<>();
cacheMap.putAll((Map<? extends K, ? extends V>) reader.readFile());
this.threshold = threshold;
this.accessOrder = new ConcurrentLinkedQueue<>();
}
......
package storage;
import util.RdbFileInfo;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import static java.lang.System.out;
public class RdbFileReader<K, V> {
private final RdbFileInfo rdbFileInfo;
private final Map<String, Long> keysExpiration;
public RdbFileReader() {
rdbFileInfo = RdbFileInfo.getInstance();
keysExpiration = new HashMap<>();
}
private static String getKey(InputStream fis, int b) throws IOException {
String key;
int strLength = lengthEncoding(fis, b);
b = fis.read();
out.println("strLength == " + strLength);
if (strLength == 0) {
strLength = b;
}
byte[] bytes = fis.readNBytes(strLength);
key = new String(bytes);
return key;
}
private static String getValue(InputStream fis) throws IOException {
int strLength;
int b;
b = fis.read();
strLength = lengthEncoding(fis, b);
if (strLength == 0) {
strLength = b;
}
byte[] bytesValue = fis.readNBytes(strLength);
String value = new String(bytesValue);
out.println(value);
return value;
}
private static int lengthEncoding(InputStream is, int b) throws IOException {
int length = 100;
int first2bits = b & 11000000;
out.println("first2bits = " + first2bits);
if (first2bits == 0) {
out.println("00");
length = 0;
} else if (first2bits == 128) {
out.println("01");
length = 2;
} else if (first2bits == 256) {
out.println("10");
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
buffer.put(is.readNBytes(4));
buffer.rewind();
length = 1 + buffer.getInt();
} else if (first2bits == 256 + 128) {
out.println("11");
length = 1;
}
return length;
}
public Map<K, V> readFile() {
String key = "";
Long expiration = null;
Map<String, String> storage = new HashMap<>();
try (
InputStream fis =
new FileInputStream(new File(rdbFileInfo.getPath(), rdbFileInfo.getFileName()))) {
byte[] redis = new byte[5];
byte[] version = new byte[4];
fis.read(redis);
fis.read(version);
out.println("Magic String = " +
new String(redis, StandardCharsets.UTF_8));
out.println("Version = " +
new String(version, StandardCharsets.UTF_8));
int b;
header:
while ((b = fis.read()) != -1) {
switch (b) {
case 0xFF:
out.println("EOF");
break;
case 0xFE:
out.println("SELECTDB");
break;
case 0xFD:
out.println("EXPIRETIME");
break;
case 0xFC:
out.println("EXPIRETIMEMS");
break;
case 0xFB:
out.println("RESIZEDB");
b = fis.read();
fis.readNBytes(lengthEncoding(fis, b));
fis.readNBytes(lengthEncoding(fis, b));
break header;
case 0xFA:
out.println("AUX");
break;
}
}
out.println("header done");
b = fis.read();
while ((b = fis.read()) != -1) {
out.println("value-type = " + b);
out.println("value-type = " + b);
if (b == 0xFC) {
expiration = getExpiration(fis);
out.println("expiration = " + expiration);
b = fis.read();
}
out.println(" b = " + Integer.toBinaryString(b));
if (!Integer.toBinaryString(b).equals("0")) {
break;
}
out.println("reading keys");
key = getKey(fis, b);
out.println(key);
String value = getValue(fis);
storage.put(key, value);
if (expiration != null && expiration != 0) {
keysExpiration.put(key, expiration);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
out.flush();
return (Map<K, V>) storage;
}
private Long getExpiration(InputStream fis) {
try {
byte[] bytes = fis.readNBytes(8);
ByteBuffer wrap = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
return wrap.getLong();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public Map<String, Long> getKeysExpiration() {
return keysExpiration;
}
}
......@@ -11,6 +11,9 @@ public class Storage {
private Storage(int threshold) {
this.storage = new LRUCachePolicy<>(capacity, threshold);
RdbFileReader<String, String> reader = new RdbFileReader<>();
Map<String, Long> stringStringMap = reader.getKeysExpiration();
this.timeToExpiration.putAll(stringStringMap);
}
private static final class StorageHolder {
......
package util;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class CommandByteCounter {
private static volatile CommandByteCounter instance;
private final AtomicInteger bytes;
private final AtomicBoolean isFirst;
private CommandByteCounter() {
bytes = new AtomicInteger(0);
isFirst = new AtomicBoolean(true);
}
public static CommandByteCounter getInstance() {
if (instance == null) {
synchronized (CommandByteCounter.class) {
if (instance == null) {
instance = new CommandByteCounter();
}
}
}
return instance;
}
public void setIsFirst(boolean b) {
isFirst.set(b);
}
public boolean isFirst() {
return isFirst.get();
}
public void addBytes(int bytesLength) {
bytes.addAndGet(bytesLength);
}
public void renewBytes(int bytes) {
this.bytes.set(this.bytes.get() + bytes);
}
public Integer getBytes() {
return bytes.get();
}
}
\ No newline at end of file
......@@ -10,12 +10,16 @@ public class CommandUtil {
"ping", Command.PING,
"set", Command.SET,
"get", Command.GET,
"info", Command.INFO
"info", Command.INFO,
"fullrsync", Command.FULLRESYNC,
"psync", Command.PSYNC,
"wait", Command.WAIT,
"replconf", Command.REPLCONF
);
public static Command getCommand(String command) {
return COMMAND_MAP.entrySet().stream()
.filter(entry -> command.toLowerCase().contains(entry.getKey().toLowerCase()))
.filter(entry -> command.toLowerCase().contains(entry.getKey()))
.map(Map.Entry::getValue)
.findFirst()
.orElse(Command.UNKNOWN);
......
package util;
import java.io.BufferedReader;
import java.io.IOException;
public class RdbBytesReader {
public BufferedReader read(BufferedReader bf) {
try {
String s = bf.readLine();
String substring = s.substring(1);
int charsToSkip = Integer.parseInt(substring) - 1;
long skip = bf.skip(charsToSkip);
System.out.println("skipped bytes : " + skip);
return bf;
} catch (IOException e) {
System.out.println(e.getMessage());
}
return null;
}
}
package util;
import model.RdbFile;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Base64;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.lang.System.out;
public class RdbFileInfo {
private static RdbFileInfo instance;
private RdbFile rdbFile;
private RdbFileInfo() {
}
public static synchronized RdbFileInfo getInstance() {
if (instance == null) {
instance = new RdbFileInfo();
}
return instance;
}
public byte[] getContent() {
byte[] decode;
try (Stream<String> stringStream = Files.lines(Path.of(rdbFile.path() + "/" + rdbFile.fileName()))) {
String rdbFile = stringStream.collect(Collectors.joining());
decode = Base64.getDecoder().decode(rdbFile);
} catch (IOException e) {
throw new RuntimeException(e);
}
return decode;
}
public String getFileName() {
return rdbFile.fileName();
}
public String getPath() {
return rdbFile.path();
}
public void setFile(Map<String, String> parameters) {
String path = "C:\\Users\\Amir\\Desktop\\Projects\\Redis\\tmp";
String fileName = "rdb.rdb";
out.println(parameters);
if (parameters.containsKey("--dir")) {
path = parameters.get("--dir");
}
if (parameters.containsKey("--dbfilename")) {
fileName = parameters.get("--dbfilename");
}
if (path.isEmpty() || fileName.isEmpty()) {
return;
}
this.rdbFile = new RdbFile(path, fileName);
out.println(fileName);
String pathname = rdbFile.path() + "/" + rdbFile.fileName();
File file1 = new File(pathname);
if (!file1.exists()) {
boolean mkdir = file1.getParentFile().mkdirs();
out.println(mkdir);
try (FileWriter writer = new FileWriter(file1)) {
writer.write("UkVESVMwMDEx+glyZWRpcy12ZXIFNy4yLjD6CnJlZGlzLWJpdHPAQPoFY3RpbWXCbQi8ZfoIdXNlZC1tZW3CsMQQAPoIYW9mLWJhc2XAAP/wbjv+wP9aog==");
writer.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment