Commit 9614576c authored by yazan.halloul's avatar yazan.halloul

build main pipeline classes (Producer, multi Consumer for text processing, Consumer)

parent 4f54e8fa
package main_pipeline;
import benefit_classes.LineSequence;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
private final BlockingQueue<LineSequence> outputQueue;
private final String outputFilePath;
public Consumer(BlockingQueue<LineSequence> outputQueue, String outputFilePath) {
this.outputQueue = outputQueue;
this.outputFilePath = outputFilePath;
}
@Override
public void run() {
int sequence = 1;
try (BufferedWriter writer = new BufferedWriter(new FileWriter(outputFilePath))) {
while (true) {
LineSequence result = outputQueue.peek();
if (result != null && result.getSequence() == sequence) {
result = outputQueue.take(); // Retrieve processed line from the output queue
sequence += 1;
if (result.getLine().equals("EOF")) {
break;
} else {
writer.write(result.getLine()); // Write the result to the output file
writer.newLine();
writer.flush();
}
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
package main_pipeline;
import benefit_classes.LineSequence;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private final BlockingQueue<LineSequence> inputQueue;
private final String inputFilePath;
public Producer(BlockingQueue<LineSequence> inputQueue, String inputFilePath) {
this.inputQueue = inputQueue;
this.inputFilePath = inputFilePath;
}
@Override
public void run() {
int sequence = 1;
// Read lines from file and add to queue
try (BufferedReader reader = new BufferedReader(new FileReader(inputFilePath))) {
String line;
while ((line = reader.readLine()) != null) {
LineSequence lineSequence = new LineSequence(line, sequence);
inputQueue.put(lineSequence); // Put line into the input queue
sequence += 1;
}
inputQueue.put(new LineSequence("EOF", sequence));
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
package main_pipeline;
import benefit_classes.DESAlgorithm;
import benefit_classes.LineSequence;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import java.io.UnsupportedEncodingException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.BlockingQueue;
public class TextProcessorConsumer implements Runnable {
private final BlockingQueue<LineSequence> inputQueue;
private final BlockingQueue<LineSequence> outputQueue;
private static boolean fileEnded = false;
private final boolean typeOfProcess;
public TextProcessorConsumer(BlockingQueue<LineSequence> inputQueue, BlockingQueue<LineSequence> outputQueue, boolean typeOfProcess) {
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
this.typeOfProcess = typeOfProcess;
}
@Override
public void run() {
try {
DESAlgorithm des = new DESAlgorithm("fzI0SlHWAfc=");
while (!fileEnded) {
LineSequence lineSequence = inputQueue.take(); // Retrieve line from the input queue
if (lineSequence.getLine().equals("EOF")) {
fileEnded = true;
outputQueue.put(lineSequence);
break;
} else {
System.out.println(Thread.currentThread().getName() + " " + lineSequence.getLine() + " " + lineSequence.getSequence());
// Process the line
if (typeOfProcess)
lineSequence.setLine(des.encryption(lineSequence.getLine()));
else
lineSequence.setLine(des.decryption(lineSequence.getLine()));
outputQueue.put(lineSequence); // Put the processed line into the output queue
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (NoSuchPaddingException | UnsupportedEncodingException | IllegalBlockSizeException |
NoSuchAlgorithmException | BadPaddingException | InvalidKeyException e) {
throw new RuntimeException(e);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment