Commit a3c58c6d authored by tammam.alsoleman's avatar tammam.alsoleman

create EncryptionConsumer class

parent 32c70f87
package consumer;
public class EncryptionConsumer {
}
import model.SequencedLine;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class EncryptionConsumer implements Runnable {
private final int consumerId;
private final BlockingQueue<SequencedLine> inputQueue;
private final BlockingQueue<SequencedLine> outputQueue;
private final AtomicBoolean productionFinished;
private final AtomicInteger activeConsumers;
private final EncryptionProcessor processor;
public EncryptionConsumer(int consumerId,
BlockingQueue<SequencedLine> inputQueue,
BlockingQueue<SequencedLine> outputQueue,
AtomicBoolean productionFinished,
AtomicInteger activeConsumers,
EncryptionProcessor processor) {
this.consumerId = consumerId;
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
this.productionFinished = productionFinished;
this.activeConsumers = activeConsumers;
this.processor = processor;
// Increment active consumers counter
this.activeConsumers.incrementAndGet();
}
@Override
public void run() {
System.out.println("Consumer " + consumerId + " started work");
int processedCount = 0;
try {
// Continue processing until no more work
while (shouldContinueWorking()) {
// Get SequencedLine from input queue
SequencedLine inputLine = inputQueue.poll(1, java.util.concurrent.TimeUnit.SECONDS);
if (inputLine != null) {
// Process the line
String encryptedContent = processor.processLine(inputLine.getLine());
// Create new SequencedLine with SAME sequence number
SequencedLine outputLine = new SequencedLine(
inputLine.getSequenceNumber(),
encryptedContent
);
// Put result in output queue (may wait if queue is full)
outputQueue.put(outputLine);
processedCount++;
// Log progress occasionally
if (processedCount % 25 == 0) {
System.out.println("Consumer " + consumerId + " processed " + processedCount + " lines");
}
}
}
} catch (InterruptedException e) {
System.err.println("Consumer " + consumerId + " was interrupted");
Thread.currentThread().interrupt();
} finally {
// Decrement active consumers counter
int remainingConsumers = activeConsumers.decrementAndGet();
System.out.println("Consumer " + consumerId + " finished - processed " +
processedCount + " lines, " + remainingConsumers + " consumers remaining");
}
}
private boolean shouldContinueWorking() {
// Continue if:
// 1. We haven't been interrupted, AND
// 2. Either: production isn't finished, OR input queue isn't empty
return !Thread.currentThread().isInterrupted() &&
(!productionFinished.get() || !inputQueue.isEmpty());
}
public int getConsumerId() {
return consumerId;
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment