Commit c6c0e76b authored by amir.yosef's avatar amir.yosef

Optimizing LRU policy

parent 35baeb5b
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="JsBuildToolPackageJson" sorting="DEFINITION_ORDER" /> <component name="JsBuildToolPackageJson" sorting="DEFINITION_ORDER" />
<component name="ProjectRootManager" version="2" languageLevel="JDK_22" default="true" project-jdk-name="ibm-22" project-jdk-type="JavaSDK"> <component name="ProjectRootManager" version="2" languageLevel="JDK_22" default="true" project-jdk-name="graalvm-22" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" /> <output url="file://$PROJECT_DIR$/out" />
</component> </component>
</project> </project>
\ No newline at end of file
...@@ -4,35 +4,65 @@ import server.Server; ...@@ -4,35 +4,65 @@ import server.Server;
import server.ServerBuilder; import server.ServerBuilder;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main { public class Main {
public static ExecutorService executor; public static ExecutorService executor;
public static void main(String[] args) { public static void main(String[] args) {
executor = Executors.newFixedThreadPool(2); int availableProcessors = Runtime.getRuntime().availableProcessors();
executor = Executors.newFixedThreadPool(availableProcessors);
Director director = new Director(); Director director = new Director();
ServerBuilder builder = new ServerBuilder(args); ServerBuilder builder = new ServerBuilder(args);
// try (Server server = builder.build()) {
// server.start(); List<Callable<Void>> tasks = Arrays.asList(
// } () -> {
// builder.setPort(16379); try {
// executor.submit(() -> { ReplicaConnectionService service = null;
// ReplicaConnectionService service = null; director.buildReplica(builder, service);
// try { } catch (IOException e) {
// director.buildReplica(builder, service); throw new RuntimeException(e);
// } catch (IOException e) { }
// throw new RuntimeException(e); return null;
// } },
// }); () -> {
executor.submit(() -> {
try (Server server = builder.build()) { try (Server server = builder.build()) {
server.start(); server.start();
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return null;
} }
); );
try {
executor.invokeAll(tasks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Task execution was interrupted", e);
} finally {
shutdownExecutor(executor);
}
}
private static void shutdownExecutor(ExecutorService executor) {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Executor did not terminate");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
} }
} }
package storage; package storage;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.StampedLock;
public class LRUCachePolicy<K, V> implements CachePolicy<K, V> { public class LRUCachePolicy<K, V> implements CachePolicy<K, V> {
private final int maxCapacity;
private final int threshold; private final int threshold;
private final Map<K, V> cacheMap; private final Map<K, Node<K, V>> cacheMap;
private final Queue<K> accessOrder; private final StampedLock lock;
private final AtomicInteger size;
private final AtomicReference<Node<K, V>> head;
private final AtomicReference<Node<K, V>> tail;
public LRUCachePolicy(int maxCapacity, int threshold) { public LRUCachePolicy(int maxCapacity, int threshold) {
this.maxCapacity = maxCapacity;
this.threshold = threshold;
this.cacheMap = new ConcurrentHashMap<>(maxCapacity); this.cacheMap = new ConcurrentHashMap<>(maxCapacity);
this.lock = new StampedLock();
this.size = new AtomicInteger(0);
this.head = new AtomicReference<>(null);
this.tail = new AtomicReference<>(null);
RdbFileReader<String, String> reader = new RdbFileReader<>(); RdbFileReader<String, String> reader = new RdbFileReader<>();
cacheMap.putAll((Map<? extends K, ? extends V>) reader.readFile()); Map<? extends K, ? extends V> initialData = (Map<? extends K, ? extends V>) reader.readFile();
this.threshold = threshold; for (Map.Entry<? extends K, ? extends V> entry : initialData.entrySet()) {
this.accessOrder = new ConcurrentLinkedQueue<>(); add(entry.getKey(), entry.getValue());
}
} }
@Override @Override
public void add(K key, V value) { public void add(K key, V value) {
cacheMap.put(key, value); Node<K, V> newNode = new Node<>(key, value);
updateAccessOrder(key); Node<K, V> oldNode = cacheMap.put(key, newNode);
if (oldNode == null) {
if (size.incrementAndGet() > maxCapacity) {
evictLeastRecentlyUsed();
}
addToFrontOptimistic(newNode);
} else {
oldNode.setValue(value);
moveToFrontOptimistic(oldNode);
}
} }
@Override @Override
public V retrieve(K key) { public V retrieve(K key) {
V value = cacheMap.get(key); Node<K, V> node = cacheMap.get(key);
if (value != null) { if (node != null) {
updateAccessOrder(key); moveToFrontOptimistic(node);
return node.value;
} }
return value; return null;
} }
@Override @Override
public void delete(K key) { public void delete(K key) {
cacheMap.remove(key); Node<K, V> node = cacheMap.remove(key);
accessOrder.remove(key); if (node != null) {
removeNodeOptimistic(node);
size.decrementAndGet();
}
} }
@Override @Override
public void runMaintenance() { public void runMaintenance() {
while (cacheMap.entrySet().size() > threshold) { while (size.get() > threshold) {
evictLeastRecentlyUsed(); evictLeastRecentlyUsed();
} }
} }
private void updateAccessOrder(K key) { private void addToFrontOptimistic(Node<K, V> node) {
accessOrder.remove(key); long stamp = lock.tryOptimisticRead();
accessOrder.offer(key); Node<K, V> oldHead = head.get();
node.next = oldHead;
if (!lock.validate(stamp)) {
stamp = lock.writeLock();
try {
oldHead = head.get();
node.next = oldHead;
if (oldHead != null) {
oldHead.prev = node;
} else {
tail.set(node);
}
head.set(node);
} finally {
lock.unlockWrite(stamp);
}
} else {
if (oldHead != null) {
oldHead.prev = node;
} else {
tail.set(node);
}
head.set(node);
}
}
private void moveToFrontOptimistic(Node<K, V> node) {
long stamp = lock.tryOptimisticRead();
if (node != head.get()) {
Node<K, V> oldHead = head.get();
Node<K, V> oldTail = tail.get();
if (!lock.validate(stamp)) {
stamp = lock.writeLock();
try {
if (node != head.get()) {
if (node.prev != null) {
node.prev.next = node.next;
}
if (node.next != null) {
node.next.prev = node.prev;
} else {
tail.set(node.prev);
}
oldHead = head.get();
node.next = oldHead;
node.prev = null;
if (oldHead != null) {
oldHead.prev = node;
}
head.set(node);
if (tail.get() == null) {
tail.set(node);
}
}
} finally {
lock.unlockWrite(stamp);
}
} else {
if (node.prev != null) {
node.prev.next = node.next;
}
if (node.next != null) {
node.next.prev = node.prev;
} else if (node == oldTail) {
tail.compareAndSet(node, node.prev);
}
node.next = oldHead;
node.prev = null;
if (oldHead != null) {
oldHead.prev = node;
}
head.compareAndSet(oldHead, node);
}
}
}
private void removeNodeOptimistic(Node<K, V> node) {
long stamp = lock.tryOptimisticRead();
Node<K, V> prevNode = node.prev;
Node<K, V> nextNode = node.next;
if (!lock.validate(stamp)) {
stamp = lock.writeLock();
try {
if (node.prev != null) {
node.prev.next = node.next;
} else {
head.set(node.next);
}
if (node.next != null) {
node.next.prev = node.prev;
} else {
tail.set(node.prev);
}
} finally {
lock.unlockWrite(stamp);
}
} else {
if (prevNode != null) {
prevNode.next = nextNode;
} else {
head.compareAndSet(node, nextNode);
}
if (nextNode != null) {
nextNode.prev = prevNode;
} else {
tail.compareAndSet(node, prevNode);
}
}
} }
private void evictLeastRecentlyUsed() { private void evictLeastRecentlyUsed() {
while (!accessOrder.isEmpty()) { Node<K, V> nodeToEvict;
K leastRecentKey = accessOrder.poll(); long stamp = lock.writeLock();
if (cacheMap.containsKey(leastRecentKey)) { try {
delete(leastRecentKey); nodeToEvict = tail.get();
break; if (nodeToEvict != null) {
Node<K, V> newTail = nodeToEvict.prev;
tail.set(newTail);
if (newTail != null) {
newTail.next = null;
} else {
head.set(null);
}
}
} finally {
lock.unlockWrite(stamp);
}
if (nodeToEvict != null) {
cacheMap.remove(nodeToEvict.key);
size.decrementAndGet();
} }
} }
private static class Node<K, V> {
final K key;
volatile V value;
volatile Node<K, V> prev;
volatile Node<K, V> next;
Node(K key, V value) {
this.key = key;
this.value = value;
}
void setValue(V value) {
this.value = value;
}
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment