Commit 0a1f7bde authored by amir.yosef's avatar amir.yosef

fixing expiration and command parsing

parent 8abed585
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="JsBuildToolPackageJson" sorting="DEFINITION_ORDER" /> <component name="JsBuildToolPackageJson" sorting="DEFINITION_ORDER" />
<component name="ProjectRootManager" version="2" languageLevel="JDK_21" default="true" project-jdk-name="21 (5)" project-jdk-type="JavaSDK"> <component name="ProjectRootManager" version="2" languageLevel="JDK_22" default="true" project-jdk-name="22" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" /> <output url="file://$PROJECT_DIR$/out" />
</component> </component>
</project> </project>
\ No newline at end of file
...@@ -10,10 +10,10 @@ import java.io.OutputStream; ...@@ -10,10 +10,10 @@ import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.util.List; import java.util.List;
public class PrimaryClient extends Client { public class MasterClient extends Client {
public PrimaryClient(Socket socket) { public MasterClient(Socket socket) {
super(socket); super(socket);
} }
......
...@@ -5,5 +5,5 @@ import java.io.OutputStream; ...@@ -5,5 +5,5 @@ import java.io.OutputStream;
import java.util.List; import java.util.List;
public interface CommandHandler { public interface CommandHandler {
void execute(List<String> command, OutputStream os) throws IOException; void execute(List<String> args, OutputStream os) throws IOException;
} }
...@@ -8,9 +8,9 @@ import java.util.List; ...@@ -8,9 +8,9 @@ import java.util.List;
public class EchoCommand implements CommandHandler { public class EchoCommand implements CommandHandler {
@Override @Override
public void execute(List<String> command, OutputStream os) { public void execute(List<String> args, OutputStream os) {
try { try {
os.write(Response.getResponse(command.get(1))); os.write(Response.getResponse(args.getFirst()));
os.flush(); os.flush();
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
......
...@@ -13,10 +13,10 @@ public class GetCommand implements CommandHandler { ...@@ -13,10 +13,10 @@ public class GetCommand implements CommandHandler {
@Override @Override
public void execute(List<String> commands, OutputStream os) { public void execute(List<String> args, OutputStream os) {
try { try {
String response = storage.get(commands.get(1).toLowerCase()); String response = storage.get(args.getFirst().toLowerCase());
if (response == null || response.isEmpty() || response.isBlank()) { if (response == null || response.isEmpty() || response.isBlank()) {
os.write("$-1\r\n".getBytes()); os.write("$-1\r\n".getBytes());
os.flush(); os.flush();
......
package command; package command;
import model.Command; import model.Command;
import server.Configuration;
import util.Response; import util.Response;
import java.io.IOException; import java.io.IOException;
...@@ -14,9 +13,9 @@ public class InfoCommand implements CommandHandler { ...@@ -14,9 +13,9 @@ public class InfoCommand implements CommandHandler {
private final Configuration configuration = Configuration.getInstance(); private final Configuration configuration = Configuration.getInstance();
@Override @Override
public void execute(List<String> commands, OutputStream os) { public void execute(List<String> args, OutputStream os) {
System.out.println(configuration.getInfo()); System.out.println(configuration.getInfo());
String command = commands.getFirst(); String command = args.getFirst();
if (command.equalsIgnoreCase(Command.REPLICATION.getValue())) { if (command.equalsIgnoreCase(Command.REPLICATION.getValue())) {
Map<String, String> info = configuration.getInfo(); Map<String, String> info = configuration.getInfo();
String response = info.entrySet() String response = info.entrySet()
......
...@@ -10,8 +10,8 @@ import java.util.List; ...@@ -10,8 +10,8 @@ import java.util.List;
public class PingCommand implements CommandHandler { public class PingCommand implements CommandHandler {
@Override @Override
public void execute(List<String> command, OutputStream os) { public void execute(List<String> args, OutputStream os) {
System.out.printf("Processing PING command %s%n", command); System.out.printf("Processing PING args %s%n", args);
try { try {
os.write(("+" + Command.PONG.getValue() + "\r\n").getBytes()); os.write(("+" + Command.PONG.getValue() + "\r\n").getBytes());
os.flush(); os.flush();
......
...@@ -13,26 +13,26 @@ public class SetCommand implements CommandHandler { ...@@ -13,26 +13,26 @@ public class SetCommand implements CommandHandler {
private final Storage redisStorage = Storage.getInstance(); private final Storage redisStorage = Storage.getInstance();
@Override @Override
public void execute(List<String> commands, OutputStream os) { public void execute(List<String> args, OutputStream os) {
Map<String, String> commandsMap = new HashMap<>(); Map<String, String> commandsMap = new HashMap<>();
for (int i = 2; i < commands.size(); i += 2) { for (int i = 2; i < args.size(); i += 2) {
commandsMap.put(commands.get(i).toLowerCase(), commands.get(i + 1)); commandsMap.put(args.get(i).toLowerCase(), args.get(i + 1));
} }
String value = commands.get(2); String value = args.get(1);
String expiration = commandsMap.get(Command.PX.getValue().toLowerCase()); String expiration = commandsMap.get(Command.PX.getValue().toLowerCase());
if (expiration != null) { if (expiration != null) {
try { try {
Long expirationTime = Long.parseLong(expiration); Long expirationTime = Long.parseLong(expiration);
redisStorage.save(commands.getFirst().toLowerCase(), value, expirationTime); redisStorage.save(args.getFirst().toLowerCase(), value, expirationTime);
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
System.out.println("NumberFormatException: " + e.getMessage()); System.out.println("NumberFormatException: " + e.getMessage());
} }
} else { } else {
redisStorage.save(commands.get(1).toLowerCase(), value); redisStorage.save(args.getFirst().toLowerCase(), value);
} }
try { try {
......
...@@ -6,8 +6,7 @@ import java.util.List; ...@@ -6,8 +6,7 @@ import java.util.List;
public class UnknownCommand implements CommandHandler { public class UnknownCommand implements CommandHandler {
@Override @Override
public void execute(List<String> command, OutputStream os) { public void execute(List<String> args, OutputStream os) {
System.out.printf("Received unknown command %s%n", command);
try { try {
os.write(("-" + "Unknown Command" + "\r\n").getBytes()); os.write(("-" + "Unknown Command" + "\r\n").getBytes());
os.flush(); os.flush();
......
...@@ -24,8 +24,7 @@ public class ClientCommandHandler { ...@@ -24,8 +24,7 @@ public class ClientCommandHandler {
public boolean execute() { public boolean execute() {
Command command = CommandUtil.getCommand(commands.getFirst()); Command command = CommandUtil.getCommand(commands.getFirst());
System.out.println(command); commands.removeFirst();
System.out.println("Command " + commands);
CommandHandler commandProcessor = new CommandFactory(command).getInstance(); CommandHandler commandProcessor = new CommandFactory(command).getInstance();
try { try {
CommandInvoker.invoke(commandProcessor, commands, os); CommandInvoker.invoke(commandProcessor, commands, os);
......
package command.replica.handshake; package handshake;
import model.Command; import model.Command;
import parser.CommandParser; import parser.CommandParser;
......
package command.replica.handshake; package handshake;
import model.Command; import model.Command;
...@@ -20,7 +20,7 @@ public class ReplConfCommand extends Sender { ...@@ -20,7 +20,7 @@ public class ReplConfCommand extends Sender {
@Override @Override
public void send(BufferedReader bufferedReader, OutputStream outputStream) throws IOException { public void send(BufferedReader bufferedReader, OutputStream outputStream) throws IOException {
String replconfResp = ""; String replconfResp = "";
replconfResp = getReplconfResp(bufferedReader, outputStream); replconfResp = getReplConfResp(bufferedReader, outputStream);
if (!replconfResp.isEmpty()) { if (!replconfResp.isEmpty()) {
replconfResp = sendReplConfCapa(bufferedReader, outputStream); replconfResp = sendReplConfCapa(bufferedReader, outputStream);
...@@ -41,7 +41,7 @@ public class ReplConfCommand extends Sender { ...@@ -41,7 +41,7 @@ public class ReplConfCommand extends Sender {
return bufferedReader.readLine(); return bufferedReader.readLine();
} }
private String getReplconfResp(BufferedReader bufferedReader, OutputStream outputStream) throws IOException { private String getReplConfResp(BufferedReader bufferedReader, OutputStream outputStream) throws IOException {
outputStream.write(commandParser.getResponseFromCommandArray(List.of( outputStream.write(commandParser.getResponseFromCommandArray(List.of(
Command.REPLCONF.getValue(), Command.REPLCONF.getValue(),
"listening-port", "listening-port",
......
package command.replica.handshake; package handshake;
import parser.CommandParser; import parser.CommandParser;
......
package parser; package parser;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
public class CommandParser { public class CommandParser {
...@@ -19,7 +18,6 @@ public class CommandParser { ...@@ -19,7 +18,6 @@ public class CommandParser {
commands.add(nextCommand); commands.add(nextCommand);
} }
} else commands.add(line); } else commands.add(line);
//System.out.println("Parsing commands: " + commands);
return commands; return commands;
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
......
package server;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class Configuration {
private final Map<String, String> info = new ConcurrentHashMap<>();
private Configuration() {
info.put("role", "master");
}
private static final class ConfigurationHolder {
private static final Configuration configuration = new Configuration();
}
public static Configuration getInstance() {
return ConfigurationHolder.configuration;
}
public String getRole() {
return ConfigurationHolder.configuration.info.get("role");
}
public String[] findRole(Map<String, String> parameters) {
String[] masterPortAndHost = new String[]{};
String role = "master";
if (parameters.containsKey("--replicaof")) {
masterPortAndHost = parameters.get("--replicaof").split(" ");
role = "slave";
} else {
info.put("master_repl_offset", "0");
info.put("master_replid", "8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb");
}
info.put("role", role);
return masterPortAndHost;
}
public Map<String, String> getInfo() {
return this.info;
}
}
package server; package server;
import client.Client; import client.Client;
import client.primary.PrimaryClient; import client.primary.MasterClient;
import java.io.IOException; import java.io.IOException;
import java.net.ServerSocket; import java.net.ServerSocket;
...@@ -44,7 +44,7 @@ public class Server { ...@@ -44,7 +44,7 @@ public class Server {
private void handleClient(Socket clientSocket) { private void handleClient(Socket clientSocket) {
executor.submit(() -> { executor.submit(() -> {
Client task = new PrimaryClient(clientSocket); Client task = new MasterClient(clientSocket);
task.run(); task.run();
}); });
} }
......
package storage;
public record RdbFile(String path, String fileName) {
}
package storage;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Base64;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.lang.System.out;
public class RdbFileInfo {
private static RdbFileInfo instance;
private RdbFile rdbFile;
private RdbFileInfo() {
}
public static synchronized RdbFileInfo getInstance() {
if (instance == null) {
instance = new RdbFileInfo();
}
return instance;
}
public byte[] getContent() {
byte[] decode;
try (Stream<String> stringStream = Files.lines(Path.of(rdbFile.path() + "/" + rdbFile.fileName()))) {
String rdbFile = stringStream.collect(Collectors.joining());
decode = Base64.getDecoder().decode(rdbFile);
} catch (IOException e) {
throw new RuntimeException(e);
}
return decode;
}
public String getFileName() {
return rdbFile.fileName();
}
public String getPath() {
return rdbFile.path();
}
public void setFile(Map<String, String> parameters) {
String path = "C:\\Users\\Amir\\Desktop\\Redis\\tmp";
String fileName = "rdb.rdb";
out.println(parameters);
if (parameters.containsKey("--dir")) {
path = parameters.get("--dir");
}
if (parameters.containsKey("--dbfilename")) {
fileName = parameters.get("--dbfilename");
}
if (path.isEmpty() || fileName.isEmpty()) {
return;
}
this.rdbFile = new RdbFile(path, fileName);
out.println(fileName);
String pathname = rdbFile.path() + "/" + rdbFile.fileName();
File file1 = new File(pathname);
if (!file1.exists()) {
boolean mkdir = file1.getParentFile().mkdirs();
out.println(mkdir);
try (FileWriter writer = new FileWriter(file1)) {
writer.write("UkVESVMwMDEx+glyZWRpcy12ZXIFNy4yLjD6CnJlZGlzLWJpdHPAQPoFY3RpbWXCbQi8ZfoIdXNlZC1tZW3CsMQQAPoIYW9mLWJhc2XAAP/wbjv+wP9aog==");
writer.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
package storage;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import static java.lang.System.out;
public class RdbFileReader {
private final RdbFileInfo rdbFileInfo;
private final Map<String, Long> keysExpiration;
public RdbFileReader() {
rdbFileInfo = RdbFileInfo.getInstance();
keysExpiration = new HashMap<>();
}
private static String getKey(InputStream fis, int b) throws IOException {
String key;
int strLength = lengthEncoding(fis, b);
b = fis.read();
out.println("strLength == " + strLength);
if (strLength == 0) {
strLength = b;
}
byte[] bytes = fis.readNBytes(strLength);
key = new String(bytes);
return key;
}
private static String getValue(InputStream fis) throws IOException {
int strLength;
int b;
b = fis.read();
strLength = lengthEncoding(fis, b);
if (strLength == 0) {
strLength = b;
}
byte[] bytesValue = fis.readNBytes(strLength);
String value = new String(bytesValue);
out.println(value);
return value;
}
private static int lengthEncoding(InputStream is, int b) throws IOException {
int length = 100;
int first2bits = b & 11000000;
out.println("first2bits = " + first2bits);
if (first2bits == 0) {
out.println("00");
length = 0;
} else if (first2bits == 128) {
out.println("01");
length = 2;
} else if (first2bits == 256) {
out.println("10");
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
buffer.put(is.readNBytes(4));
buffer.rewind();
length = 1 + buffer.getInt();
} else if (first2bits == 256 + 128) {
out.println("11");
length = 1;
}
return length;
}
public Map<String, String> readFile() {
String key = "";
Long expiration = null;
Map<String, String> storage = new HashMap<>();
try (
InputStream fis =
new FileInputStream(new File(rdbFileInfo.getPath(), rdbFileInfo.getFileName()))) {
byte[] redis = new byte[5];
byte[] version = new byte[4];
fis.read(redis);
fis.read(version);
out.println("Magic String = " +
new String(redis, StandardCharsets.UTF_8));
out.println("Version = " +
new String(version, StandardCharsets.UTF_8));
int b;
header:
while ((b = fis.read()) != -1) {
switch (b) {
case 0xFF:
out.println("EOF");
break;
case 0xFE:
out.println("SELECTDB");
break;
case 0xFD:
out.println("EXPIRETIME");
break;
case 0xFC:
out.println("EXPIRETIMEMS");
break;
case 0xFB:
out.println("RESIZEDB");
b = fis.read();
fis.readNBytes(lengthEncoding(fis, b));
fis.readNBytes(lengthEncoding(fis, b));
break header;
case 0xFA:
out.println("AUX");
break;
}
}
out.println("header done");
b = fis.read();
while ((b = fis.read()) != -1) {
out.println("value-type = " + b);
out.println("value-type = " + b);
if (b == 0xFC) {
expiration = getExpiration(fis);
out.println("expiration = " + expiration);
b = fis.read();
}
out.println(" b = " + Integer.toBinaryString(b));
if (!Integer.toBinaryString(b).equals("0")) {
break;
}
out.println("reading keys");
key = getKey(fis, b);
out.println(key);
String value = getValue(fis);
storage.put(key, value);
if (expiration != null && expiration != 0) {
keysExpiration.put(key, expiration);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
out.flush();
return storage;
}
private Long getExpiration(InputStream fis) {
try {
byte[] bytes = fis.readNBytes(8);
ByteBuffer wrap = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
return wrap.getLong();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public Map<String, Long> getKeysExpiration() {
return keysExpiration;
}
}
package storage; package storage;
//
//import jdk.incubator.vector.ByteVector;
//import jdk.incubator.vector.VectorMask;
//import jdk.incubator.vector.VectorOperators;
//import jdk.incubator.vector.VectorSpecies;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
...@@ -13,7 +9,6 @@ public class Storage { ...@@ -13,7 +9,6 @@ public class Storage {
private final Map<String, Long> timeToExpiration = new ConcurrentHashMap<>(10000); private final Map<String, Long> timeToExpiration = new ConcurrentHashMap<>(10000);
private final Map<String, Long> currentTimeForKey = new ConcurrentHashMap<>(10000); private final Map<String, Long> currentTimeForKey = new ConcurrentHashMap<>(10000);
//private static final VectorSpecies<Byte> SPECIES = ByteVector.SPECIES_128;
private Storage() { private Storage() {
// RdbFileReader reader = new RdbFileReader(); // RdbFileReader reader = new RdbFileReader();
...@@ -27,6 +22,7 @@ public class Storage { ...@@ -27,6 +22,7 @@ public class Storage {
private static final Storage storage = new Storage(); private static final Storage storage = new Storage();
} }
public static Storage getInstance() { public static Storage getInstance() {
return StorageHolder.storage; return StorageHolder.storage;
} }
...@@ -43,54 +39,27 @@ public class Storage { ...@@ -43,54 +39,27 @@ public class Storage {
} }
public String get(String key) { public String get(String key) {
System.out.println("GET Storage: " + key); if (isExpired(key)) return "";
// if (isExpired(key)) {
// System.out.println("Key expired: " + key);
// return "";
// }
return storage.get(key); return storage.get(key);
} }
//
// private boolean isExpired(String key) {
// long currentTime = System.currentTimeMillis();
//
// if (timeToExpiration.containsKey(key)) {
// long expirationTime = timeToExpiration.get(key);
//
// if (currentTimeForKey.containsKey(key)) {
// long currExpiration = currentTimeForKey.get(key);
// long timeDiff = currentTime - currExpiration;
// return vectorizedComparison(timeDiff, expirationTime);
// } else {
// return vectorizedComparison(currentTime, expirationTime);
// }
// }
// return false;
// }
// private boolean vectorizedComparison(long timeDiff, long expirationTime) { private boolean isExpired(String key) {
// // Convert timeDiff and expirationTime to byte arrays for SIMD processing if (timeToExpiration.containsKey(key) && !currentTimeForKey.containsKey(key)) {
// byte[] timeDiffBytes = longToBytes(timeDiff); long currentTime = System.currentTimeMillis();
// byte[] expirationTimeBytes = longToBytes(expirationTime); long expirationTime = timeToExpiration.get(key);
// if (currentTime > expirationTime) {
// ByteVector timeDiffVector = ByteVector.fromArray(SPECIES, timeDiffBytes, 0); System.out.println("Key expired: " + key);
// ByteVector expirationTimeVector = ByteVector.fromArray(SPECIES, expirationTimeBytes, 0); return true;
// }
// }
// VectorMask<Byte> mask = timeDiffVector.compare(VectorOperators.GT, expirationTimeVector); if (currentTimeForKey.containsKey(key)) {
// long currTime = System.currentTimeMillis();
// return mask.anyTrue(); long expirationTime = timeToExpiration.get(key);
// } long currExpiration = currentTimeForKey.get(key);
return currTime - currExpiration > expirationTime;
private byte[] longToBytes(long value) {
byte[] bytes = new byte[8];
for (int i = 7; i >= 0; i--) {
bytes[i] = (byte) (value & 0xFF);
value >>= 8;
} }
return bytes; return false;
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment