Commit bc422824 authored by rawan's avatar rawan

Initial commit of the Producer-Consumer Pipeline System

parents
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store
\ No newline at end of file
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
The quick brown foxes are jumping over the lazy dogs
Running in the beautiful gardens and playing with balls
Programming computers and learning new technologies
Artificial intelligence and machine learning algorithms
Natural language processing is an amazing field of study
Students are studying hard for their final examinations
The workers are working on important projects and developments
Beautiful flowers are growing in the wonderful garden
Scientists are researching new discoveries and innovations
Engineers are building amazing structures and constructions
the quick brown foxe are jump over the lazy dog
runn in the beautiful garden and play with ball
programm computer and learn new technologie
artificial intelligence and machine learn algorithm
natural language process i an amaz field of study
the worker are work on important project and development
beautiful flower are grow in the wonderful garden
student are study hard for their final examination
scientist are research new discoverie and innovation
engineer are build amaz structure and construction
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>TextProcessingSystem</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.opennlp</groupId>
<artifactId>opennlp-tools</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.example;
import java.io.*;
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class Main {
// Variables shared by all threads
private static volatile boolean producerFinished = false;
private static final AtomicInteger activeWorkers = new AtomicInteger(0);
public static void main(String[] args) {
System.out.println("Starting word processing system...");
createSampleInputFile();
// Set up queues - using LinkedBlockingQueue as in the appendix
BlockingQueue<String> rawLinesQueue = new LinkedBlockingQueue<>(100);
BlockingQueue<String> processedLinesQueue = new LinkedBlockingQueue<>(100);
ExecutorService executor = Executors.newFixedThreadPool(6);
long startTime = System.currentTimeMillis();
executor.submit(new Producer(rawLinesQueue, "input.txt"));
for (int i = 1; i <= 3; i++) {
executor.submit(new WorkerConsumer(rawLinesQueue, processedLinesQueue, i));
}
executor.submit(new WriterConsumer(processedLinesQueue, "output.txt"));
executor.shutdown();
try {
if (executor.awaitTermination(2, TimeUnit.MINUTES)) {
long endTime = System.currentTimeMillis();
System.out.println("Processing completed successfully " + (endTime - startTime) + " ms");
} else {
System.err.println("️ Time runs out before all tasks are completed");
executor.shutdownNow();
}
} catch (InterruptedException e) {
System.err.println("The wait is interrupted");
executor.shutdownNow();
Thread.currentThread().interrupt();
}
displayResults();
}
public static boolean isProducerFinished() {
return producerFinished;
}
public static void setProducerFinished(boolean finished) {
producerFinished = finished;
}
public static AtomicInteger getActiveWorkers() {
return activeWorkers;
}
private static void createSampleInputFile() {
try (BufferedWriter writer = Files.newBufferedWriter(Paths.get("input.txt"))) {
String[] sampleLines = {
"The quick brown foxes are jumping over the lazy dogs",
"Running in the beautiful gardens and playing with balls",
"Programming computers and learning new technologies",
"Artificial intelligence and machine learning algorithms",
"Natural language processing is an amazing field of study",
"Students are studying hard for their final examinations",
"The workers are working on important projects and developments",
"Beautiful flowers are growing in the wonderful garden",
"Scientists are researching new discoveries and innovations",
"Engineers are building amazing structures and constructions"
};
for (String line : sampleLines) {
writer.write(line);
writer.newLine();
}
System.out.println("The input.txt file was created successfully");
} catch (IOException e) {
System.err.println("Error creating file: " + e.getMessage());
}
}
private static void displayResults() {
try {
System.out.println("\n" + "=".repeat(50));
System.out.println("final Resault");
System.out.println("=".repeat(50));
System.out.println("\n Original file (input.txt):");
System.out.println("-".repeat(30));
Files.lines(Paths.get("input.txt")).forEach(System.out::println);
System.out.println("\n Processed file (output.txt):");
System.out.println("-".repeat(30));
Files.lines(Paths.get("output.txt")).forEach(System.out::println);
} catch (IOException e) {
System.err.println("Error displaying results: " + e.getMessage());
}
}
}
\ No newline at end of file
package org.example;
import java.io.*;
import java.nio.file.*;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private final BlockingQueue<String> queue;
private final String inputFile;
public Producer(BlockingQueue<String> queue, String inputFile) {
this.queue = queue;
this.inputFile = inputFile;
}
@Override
public void run() {
System.out.println(" Product: Started reading the file " + inputFile);
int lineCount = 0;
try (BufferedReader reader = Files.newBufferedReader(Paths.get(inputFile))) {
String line;
while ((line = reader.readLine()) != null) {
if (!line.trim().isEmpty()) {
queue.put(line);
lineCount++;
System.out.println("Producer: added the line" + lineCount + " - " +
shortenText(line, 40));
Thread.sleep(50);
}
}
} catch (IOException e) {
System.err.println("Product error while reading: " + e.getMessage());
} catch (InterruptedException e) {
System.err.println(" Product: interrupted");
Thread.currentThread().interrupt();
} finally {
Main.setProducerFinished(true);
System.out.println("Product: finished reading" + lineCount + " line");
}
}
private String shortenText(String text, int maxLength) {
if (text.length() <= maxLength) {
return text;
}
return text.substring(0, maxLength) + "...";
}
}
\ No newline at end of file
package org.example;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class WorkerConsumer implements Runnable {
private final BlockingQueue<String> inputQueue;
private final BlockingQueue<String> outputQueue;
private final int workerId;
private final AtomicInteger processedCount = new AtomicInteger(0);
public WorkerConsumer(BlockingQueue<String> inputQueue,
BlockingQueue<String> outputQueue,
int workerId) {
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
this.workerId = workerId;
}
@Override
public void run() {
Main.getActiveWorkers().incrementAndGet();
System.out.println(" the worker " + workerId + ": started");
try {
while (true) {
String line = inputQueue.poll(1, TimeUnit.SECONDS);
if (line != null) {
processLine(line);
} else if (Main.isProducerFinished() && inputQueue.isEmpty()) {
break;
}
}
} catch (InterruptedException e) {
System.err.println("️ the worker " + workerId + ": has interrupted");
Thread.currentThread().interrupt();
} finally {
Main.getActiveWorkers().decrementAndGet();
System.out.println(" the worker " + workerId + ": finished after the processing " +
processedCount.get() + "line");
}
}
private void processLine(String line) throws InterruptedException {
System.out.println(" the worker " + workerId + " is processing: " + shortenText(line, 30));
Thread.sleep(100 + (int)(Math.random() * 100));
String processedLine = applyStemming(line);
outputQueue.put(processedLine);
int count = processedCount.incrementAndGet();
System.out.println("the worker " + workerId + ": finished the line " + count + " - " +
shortenText(processedLine, 30));
}
private String applyStemming(String line) {
String[] words = line.split("\\s+");
StringBuilder result = new StringBuilder();
for (String word : words) {
if (!word.isEmpty()) {
String stemmedWord = stemWord(word.toLowerCase());
result.append(stemmedWord).append(" ");
}
}
return result.toString().trim();
}
private String stemWord(String word) {
if (word.endsWith("ing")) {
return word.substring(0, word.length() - 3);
} else if (word.endsWith("ed")) {
return word.substring(0, word.length() - 2);
} else if (word.endsWith("s")) {
return word.substring(0, word.length() - 1);
} else if (word.endsWith("ly")) {
return word.substring(0, word.length() - 2);
}
return word;
}
private String shortenText(String text, int maxLength) {
if (text.length() <= maxLength) {
return text;
}
return text.substring(0, maxLength) + "...";
}
}
\ No newline at end of file
package org.example;
import java.io.*;
import java.nio.file.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class WriterConsumer implements Runnable {
private final BlockingQueue<String> queue;
private final String outputFile;
private int writtenCount = 0;
public WriterConsumer(BlockingQueue<String> queue, String outputFile) {
this.queue = queue;
this.outputFile = outputFile;
}
@Override
public void run() {
System.out.println("Writer: He started writing in " + outputFile);
try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(outputFile))) {
while (true) {
String processedLine = queue.poll(1, TimeUnit.SECONDS);
if (processedLine != null) {
writer.write(processedLine);
writer.newLine();
writer.flush();
writtenCount++;
System.out.println(" Writer: He wrote the line " + writtenCount + " - " +
shortenText(processedLine, 30));
Thread.sleep(50);
} else if (Main.isProducerFinished() &&
Main.getActiveWorkers().get() == 0 &&
queue.isEmpty()) {
break;
}
}
} catch (IOException e) {
System.err.println("Writer error: " + e.getMessage());
} catch (InterruptedException e) {
System.err.println("Writer: He was interrupted");
Thread.currentThread().interrupt();
}
System.out.println("Writer: He finished writing " + writtenCount + " line in " + outputFile);
}
private String shortenText(String text, int maxLength) {
if (text.length() <= maxLength) {
return text;
}
return text.substring(0, maxLength) + "...";
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment