Commit 49e510a1 authored by amir.yosef's avatar amir.yosef

Decoupling classes

parent 1d9fb337
package command;
import command.validation.Validatable;
import java.io.IOException;
import java.util.List;
public interface CommandExecutable<T> {
T execute() throws IOException;
class GetCommandValidator implements Validatable {
private static final GetCommandValidator INSTANCE = new GetCommandValidator();
private GetCommandValidator() {
}
public static GetCommandValidator getInstance() {
return INSTANCE;
}
@Override
public boolean isValid(List<String> args) {
return args != null && args.size() == 1;
}
}
}
package command;
import command.validation.EchoCommandValidator;
import command.validation.Validatable;
import util.Response;
import java.util.List;
public final class EchoCommand implements CommandExecutable<byte[]> {
private final List<String> args;
private final CommandValidator validator = CommandValidator.getInstance();
private final Validatable validator = EchoCommandValidator.getInstance();
public EchoCommand(List<String> args) {
this.args = args;
......@@ -14,7 +16,7 @@ public final class EchoCommand implements CommandExecutable<byte[]> {
@Override
public byte[] execute() {
if (validator.validateEchoCommand(args)) {
if (validator.isValid(args)) {
return (Response.getResponse(args.getFirst()));
} else {
return "".getBytes();
......
......@@ -3,7 +3,7 @@ package command;
import model.Command;
import server.SendToReplica;
import server.ServerInfo;
import server.ServiceInfo;
import util.RdbFileInfo;
import java.io.ByteArrayOutputStream;
......@@ -14,13 +14,13 @@ public final class FullRsyncCommand implements CommandExecutable<byte[]> {
private final SendToReplica replicaSender;
private final RdbFileInfo rdbFileInfo;
private final OutputStream outputStream;
private final ServerInfo serverInfo;
private final ServiceInfo serviceInfo;
public FullRsyncCommand(SendToReplica replicaSender, OutputStream outputStream) {
this.outputStream = outputStream;
this.replicaSender = replicaSender;
rdbFileInfo = RdbFileInfo.getInstance();
this.serverInfo = ServerInfo.getInstance();
this.serviceInfo = ServiceInfo.getInstance();
}
@Override
......@@ -29,7 +29,7 @@ public final class FullRsyncCommand implements CommandExecutable<byte[]> {
replicaSender.addConnection(outputStream);
byte[] decode = rdbFileInfo.getContent();
try {
return createCommandBytes(Command.FULLRESYNC, decode, serverInfo);
return createCommandBytes(Command.FULLRESYNC, decode, serviceInfo);
} catch (IOException e) {
throw new RuntimeException(e);
}
......@@ -37,9 +37,9 @@ public final class FullRsyncCommand implements CommandExecutable<byte[]> {
}
public byte[] createCommandBytes(Command command, byte[] decode, ServerInfo serverInfo) throws IOException {
public byte[] createCommandBytes(Command command, byte[] decode, ServiceInfo serviceInfo) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
byteArrayOutputStream.write(("+" + command.getValue() + " " + serverInfo.getInfo().get("master_replid") + " 0\r\n").getBytes());
byteArrayOutputStream.write(("+" + command.getValue() + " " + serviceInfo.getInfo().get("master_replid") + " 0\r\n").getBytes());
byteArrayOutputStream.write(("$" + decode.length + "\r\n").getBytes());
byteArrayOutputStream.write(decode);
return byteArrayOutputStream.toByteArray();
......
package command;
import command.validation.Validatable;
import storage.Cacheable;
import storage.Storage;
import util.Response;
......@@ -7,9 +9,10 @@ import java.util.List;
public final class GetCommand implements CommandExecutable<byte[]> {
private final Storage storage = Storage.getInstance();
private final Cacheable<String, String> storage = Storage.getInstance();
private static final byte[] ERROR_RESPONSE = Response.getResponse("wrong args");
private final List<String> args;
private final CommandValidator validator = CommandValidator.getInstance();
private final Validatable validator = GetCommandValidator.getInstance();
public GetCommand(List<String> args) {
this.args = args;
......@@ -17,8 +20,8 @@ public final class GetCommand implements CommandExecutable<byte[]> {
@Override
public byte[] execute() {
if (!validator.validateGetCommand(args)) {
return Response.getResponse("wrong args");
if (!validator.isValid(args)) {
return ERROR_RESPONSE;
}
String response = storage.get(args.getFirst().toLowerCase());
if (response == null || response.isEmpty() || response.isBlank()) {
......
package command;
import server.ServerInfo;
import command.validation.InfoCommandValidator;
import command.validation.Validatable;
import server.ServiceInfo;
import util.Response;
import java.util.List;
......@@ -8,8 +10,8 @@ import java.util.Map;
import java.util.stream.Collectors;
public final class InfoCommand implements CommandExecutable<byte[]> {
private final ServerInfo configuration = ServerInfo.getInstance();
private final CommandValidator validator = CommandValidator.getInstance();
private final ServiceInfo configuration = ServiceInfo.getInstance();
private final Validatable validator = InfoCommandValidator.getInstance();
List<String> args;
public InfoCommand(List<String> args) {
......@@ -18,7 +20,7 @@ public final class InfoCommand implements CommandExecutable<byte[]> {
@Override
public byte[] execute() {
if (!validator.validateInfoCommand(args)) {
if (!validator.isValid(args)) {
return Response.getResponse("unsupported args");
}
String command = args.getFirst();
......
package command;
import command.validation.SetCommandValidator;
import command.validation.Validatable;
import storage.Cacheable;
import storage.Storage;
import util.Response;
......@@ -8,10 +11,9 @@ import java.util.List;
public final class SetCommand implements CommandExecutable<byte[]> {
private static final byte[] OK_RESPONSE = "+OK\r\n".getBytes();
private static final byte[] ERROR_RESPONSE = Response.getResponse("wrong args");
private final Storage storage = Storage.getInstance();
private final Cacheable<String, String> storage = Storage.getInstance();
private final List<String> args;
private final CommandValidator validator = CommandValidator.getInstance();
private final Validatable validator = SetCommandValidator.getInstance();
public SetCommand(List<String> args) {
this.args = args;
......@@ -19,7 +21,7 @@ public final class SetCommand implements CommandExecutable<byte[]> {
@Override
public byte[] execute() {
if (!validator.validateSetCommand(args)) {
if (!validator.isValid(args)) {
return ERROR_RESPONSE;
}
String key = args.get(0).toLowerCase();
......
package command.replica;
import command.CommandExecutable;
import storage.Cacheable;
import storage.Storage;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
public final class ReplicaSetCommand implements CommandExecutable<Void> {
private final Storage storage = Storage.getInstance();
private final Cacheable<String, String> storage = Storage.getInstance();
private final List<String> args;
private static final Logger logger = Logger.getLogger(ReplicaSetCommand.class.getName());
public ReplicaSetCommand(List<String> args) {
this.args = args;
......@@ -21,20 +19,19 @@ public final class ReplicaSetCommand implements CommandExecutable<Void> {
String key = args.get(0).toLowerCase();
String value = args.get(1);
String expiration = null;
for (int i = 2; i < args.size() - 1; i += 2) {
if ("px".equalsIgnoreCase(args.get(i))) {
expiration = args.get(i + 1);
break;
}
}
if (expiration != null) {
try {
long expirationTime = Long.parseLong(expiration);
int size = args.size();
if (size > 2) {
String lastArg = args.get(size - 1);
if (size % 2 == 0) {
if ("px".equalsIgnoreCase(args.get(size - 2))) {
long expirationTime = Long.parseLong(lastArg);
storage.save(key, value, expirationTime);
} else {
storage.save(key, value);
}
} else {
long expirationTime = Long.parseLong(lastArg);
storage.save(key, value, expirationTime);
} catch (NumberFormatException e) {
logger.log(Level.SEVERE, "NumberFormatException", e);
}
} else {
storage.save(key, value);
......
package command.validation;
import java.util.List;
public final class EchoCommandValidator implements Validatable {
private static final EchoCommandValidator INSTANCE = new EchoCommandValidator();
private EchoCommandValidator() {
}
public static EchoCommandValidator getInstance() {
return INSTANCE;
}
@Override
public boolean isValid(List<String> args) {
return !args.isEmpty() && args.getFirst() != null;
}
}
package command.validation;
import java.util.List;
public final class InfoCommandValidator implements Validatable {
private static final InfoCommandValidator INSTANCE = new InfoCommandValidator();
private InfoCommandValidator() {
}
public static InfoCommandValidator getInstance() {
return INSTANCE;
}
@Override
public boolean isValid(List<String> args) {
return args != null && args.size() == 1 &&
"replication".equalsIgnoreCase(args.getFirst());
}
}
package command;
package command.validation;
import java.util.List;
public final class CommandValidator {
public final class SetCommandValidator implements Validatable {
private static final CommandValidator INSTANCE = new CommandValidator();
private static final SetCommandValidator INSTANCE = new SetCommandValidator();
private CommandValidator() {
private SetCommandValidator() {
}
public static CommandValidator getInstance() {
public static SetCommandValidator getInstance() {
return INSTANCE;
}
public boolean validateSetCommand(List<String> args) {
@Override
public boolean isValid(List<String> args) {
if (args == null || args.size() < 2) {
return false;
}
......@@ -63,16 +64,4 @@ public final class CommandValidator {
return true;
}
public boolean validateGetCommand(List<String> args) {
return args != null && args.size() == 1;
}
public boolean validateInfoCommand(List<String> args) {
return args != null && args.size() == 1 &&
"replication".equalsIgnoreCase(args.getFirst());
}
public boolean validateEchoCommand(List<String> args) {
return !args.isEmpty() && args.getFirst() != null;
}
}
\ No newline at end of file
package command.validation;
import java.util.List;
public interface Validatable {
boolean isValid(List<String> args);
}
......@@ -7,7 +7,7 @@ import server.SendToReplica;
import java.io.OutputStream;
import java.util.List;
public final class CommandFactory implements Factory {
public final class CommandFactory implements ExecutablesFactory {
private static final class FactoryHolder {
private static final CommandFactory factory = new CommandFactory();
......
......@@ -5,6 +5,6 @@ import model.Command;
import java.util.List;
public interface Factory {
public interface ExecutablesFactory {
CommandExecutable<?> getCommand(Command command, List<String> args);
}
......@@ -4,12 +4,12 @@ import command.CommandExecutable;
import command.replica.ReplicaFullRsyncCommand;
import command.replica.ReplicaReplConfCommand;
import command.replica.ReplicaSetCommand;
import factory.Factory;
import factory.ExecutablesFactory;
import model.Command;
import java.util.List;
public final class ReplicaCommandFactory implements Factory {
public final class ReplicaCommandFactory implements ExecutablesFactory {
private static final class ReplicaFactoryHolder {
private static final ReplicaCommandFactory factory = new ReplicaCommandFactory();
}
......
......@@ -3,6 +3,7 @@ package handlers;
import command.CommandExecutable;
import command.CommandInvoker;
import command.UnknownCommand;
import command.validation.Validatable;
import factory.CommandFactory;
import model.Command;
import parser.CommandParser;
......@@ -46,6 +47,7 @@ public final class ClientCommandHandler {
byte[] response = result.execute();
writeResponse(response);
}
List<String> replicaCommand = createReplicaCommand(commands);
byte[] result = executeCommand(command);
sendReplicaCommand(replicaCommand);
......@@ -57,8 +59,8 @@ public final class ClientCommandHandler {
}
private byte[] executeCommand(Command command) throws IOException {
CommandExecutable<byte[]> commandProcessor = factory.getCommand(command, commands.subList(1, commands.size()), sendToReplica, os);
return CommandInvoker.invoke(commandProcessor);
CommandExecutable<byte[]> executable = factory.getCommand(command, commands.subList(1, commands.size()), sendToReplica, os);
return CommandInvoker.invoke(executable);
}
private void sendReplicaCommand(List<String> replicaCommand) {
......
package model;
public record RdbFile(String path, String fileName) {
}
......@@ -37,8 +37,7 @@ public final class ReplicaConnectionService implements AutoCloseable {
}
ConnectionHandler connectionHandler = new ConnectionHandler(socket, commandParser, port);
BufferedReader bufferedReader = connectionHandler.handleConnection();
try {
try (BufferedReader bufferedReader = connectionHandler.handleConnection()) {
logger.info("Ending handshake");
Client client = new ReplicaClient(bufferedReader, socket);
executorService.submit(client::run);
......
......@@ -39,22 +39,23 @@ public final class SendToReplica implements Closeable {
private void processCommands() {
while (true) {
String command = commands.poll();
if (command == null) {
continue;
}
byte[] commandBytes = command.getBytes();
connectedReplicas.forEach(replica -> {
try {
OutputStream outputStream = replica.os();
outputStream.write(commandBytes);
outputStream.flush();
} catch (IOException e) {
logger.log(Level.SEVERE, "Failed to send command to replica ", e);
connectedReplicas.remove(replica);
String command = commands.poll();
if (command == null) {
continue;
}
});
byte[] commandBytes = command.getBytes();
connectedReplicas.forEach(replica -> {
try {
OutputStream outputStream = replica.os();
outputStream.write(commandBytes);
outputStream.flush();
} catch (IOException e) {
logger.log(Level.SEVERE, "Failed to send command to replica ", e);
connectedReplicas.remove(replica);
}
});
}
}
......
......@@ -12,7 +12,7 @@ public final class ServerBuilder {
private int port = 6379;
private String role = "master";
private String[] masterPortAndHost;
private final ServerInfo serverInfo;
private final ServiceInfo serviceInfo;
private final RdbFileInfo rdbFileInfo;
private final Map<String, String> parameters;
private ReplicaConnectionService replicaConnectionService;
......@@ -21,7 +21,7 @@ public final class ServerBuilder {
logger.info("Initializing ServerBuilder");
this.parameters = Settings.extractArgs(args);
this.port = Settings.extractPort(parameters, port);
this.serverInfo = ServerInfo.getInstance();
this.serviceInfo = ServiceInfo.getInstance();
this.rdbFileInfo = RdbFileInfo.getInstance();
initializeFromParameters();
}
......@@ -53,8 +53,8 @@ public final class ServerBuilder {
private void initializeFromParameters() {
logger.info("Initializing from parameters");
rdbFileInfo.setFile(parameters);
this.masterPortAndHost = serverInfo.findRole(parameters);
this.role = serverInfo.getRole();
this.masterPortAndHost = serviceInfo.findRole(parameters);
this.role = serviceInfo.getRole();
logger.info("Initialized with role: " + role);
}
}
......@@ -4,23 +4,24 @@ package server;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public final class ServerInfo {
private static ServerInfo serverInfo;
public final class ServiceInfo {
private static ServiceInfo serviceInfo;
private final Map<String, String> info = new ConcurrentHashMap<>();
private ServerInfo() {
private ServiceInfo() {
info.put("role", "master");
info.put("cachePolicyName", "caffeine");
}
public static ServerInfo getInstance() {
if (serverInfo == null) {
serverInfo = new ServerInfo();
public static ServiceInfo getInstance() {
if (serviceInfo == null) {
serviceInfo = new ServiceInfo();
}
return serverInfo;
return serviceInfo;
}
public String getRole() {
return serverInfo.info.get("role");
return serviceInfo.info.get("role");
}
public String[] findRole(Map<String, String> parameters) {
......
......@@ -5,4 +5,5 @@ public interface Cacheable<K, V> {
void save(K key, V value, Long expirationTime);
V get(K key);
void delete(K key);
void runCachePolicy();
}
package storage;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
......@@ -16,7 +15,7 @@ public class CaffeineCachePolicy<K, V> implements CachePolicy<K, V> {
public CaffeineCachePolicy(int maxCapacity) {
this.cache = Caffeine.newBuilder()
.maximumSize(maxCapacity)
.expireAfterAccess(60, TimeUnit.MINUTES)
.expireAfterAccess(60, TimeUnit.SECONDS)
.evictionListener((K key, V value, RemovalCause cause) ->
logger.info("Evicted key: " + key + ", Value: " + value + ", Reason: " + cause))
.build();
......
......@@ -8,7 +8,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
......@@ -85,10 +84,6 @@ public class RdbFileReader<K, V> {
byte[] version = new byte[4];
fis.read(redis);
fis.read(version);
out.println("Magic String = " +
new String(redis, StandardCharsets.UTF_8));
out.println("Version = " +
new String(version, StandardCharsets.UTF_8));
int b;
header:
while ((b = fis.read()) != -1) {
......@@ -116,8 +111,6 @@ public class RdbFileReader<K, V> {
break;
}
}
out.println("header done");
b = fis.read();
while ((b = fis.read()) != -1) {
out.println("value-type = " + b);
......@@ -132,7 +125,6 @@ public class RdbFileReader<K, V> {
if (!Integer.toBinaryString(b).equals("0")) {
break;
}
out.println("reading keys");
key = getKey(fis, b);
out.println(key);
String value = getValue(fis);
......
......@@ -3,14 +3,14 @@ package storage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class Storage {
public class Storage implements Cacheable<String, String> {
private final int capacity = 1000000;
private final CachePolicy<String, String> storage;
private final Map<String, Long> timeToExpiration = new ConcurrentHashMap<>(capacity);
private final Map<String, Long> currentTimeForKey = new ConcurrentHashMap<>(capacity);
private Storage() {
this.storage = new CaffeineCachePolicy<>(capacity);
this.storage =CachePolicyFactory.getPolicy(capacity);
RdbFileReader<String, String> reader = new RdbFileReader<>();
Map<String, Long> stringStringMap = reader.getKeysExpiration();
this.timeToExpiration.putAll(stringStringMap);
......@@ -20,23 +20,26 @@ public class Storage {
private static final Storage instance = new Storage();
}
public static Storage getInstance() {
public static Cacheable<String,String> getInstance() {
return StorageHolder.instance;
}
@Override
public void save(String key, String value) {
storage.add(key, value);
}
@Override
public void save(String key, String value, Long expirationTime) {
currentTimeForKey.put(key, System.currentTimeMillis());
timeToExpiration.put(key, expirationTime);
save(key, value);
}
@Override
public String get(String key) {
if (isExpired(key)) {
remove(key);
delete(key);
return "";
}
return storage.retrieve(key);
......@@ -52,13 +55,14 @@ public class Storage {
return false;
}
private void remove(String key) {
@Override
public void delete(String key) {
storage.delete(key);
timeToExpiration.remove(key);
currentTimeForKey.remove(key);
}
void runCachePolicy() {
@Override
public void runCachePolicy() {
storage.runMaintenance();
}
}
\ No newline at end of file
......@@ -7,7 +7,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
public class StorageManager {
private final Storage storage;
private final Cacheable<String,String> storage;
private final ScheduledExecutorService scheduler;
private static final Logger logger = Logger.getLogger(StorageManager.class.getName());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment