Commit cee9396b authored by diana.alkateeb's avatar diana.alkateeb

Initial commit

parents
### IntelliJ IDEA ###
out/
!**/src/main/**/out/
!**/src/test/**/out/
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
bin/
!**/src/main/**/bin/
!**/src/test/**/bin/
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/ThradsSynchronization.iml" filepath="$PROJECT_DIR$/ThradsSynchronization.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AutoImportSettings">
<option name="autoReloadType" value="SELECTIVE" />
</component>
<component name="ChangeListManager">
<list default="true" id="ed7c1a9b-30ba-4c74-8c06-e51dfcd96833" name="Changes" comment="">
<change afterPath="$PROJECT_DIR$/.gitignore" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/misc.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/modules.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/vcs.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/ThradsSynchronization.iml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/Consumer.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/ConsumerWriter.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/Producer.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/ProducerConsumerTask.java" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="FileTemplateManagerImpl">
<option name="RECENT_TEMPLATES">
<list>
<option value="Class" />
</list>
</option>
</component>
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component>
<component name="MarkdownSettingsMigration">
<option name="stateVersion" value="1" />
</component>
<component name="ProblemsViewState">
<option name="selectedTabId" value="CurrentFile" />
</component>
<component name="ProjectId" id="2YfgFuGoVb6HlJmE5uYmVhMsTLd" />
<component name="ProjectLevelVcsManager" settingsEditedManually="true" />
<component name="ProjectViewState">
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent">{
&quot;keyToString&quot;: {
&quot;RunOnceActivity.OpenProjectViewOnStart&quot;: &quot;true&quot;,
&quot;RunOnceActivity.ShowReadmeOnStart&quot;: &quot;true&quot;
}
}</component>
<component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="ed7c1a9b-30ba-4c74-8c06-e51dfcd96833" name="Changes" comment="" />
<created>1700921952224</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1700921952224</updated>
</task>
<servers />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
import java.util.concurrent.BlockingQueue;
class Consumer implements Runnable {
private BlockingQueue<String> waitingLine;
private BlockingQueue<String> results;
public Consumer(BlockingQueue<String> waitingLine, BlockingQueue<String> results) {
this.waitingLine = waitingLine;
this.results = results;
}
@Override
public void run() {
try {
consumeAndProcess();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
private void consumeAndProcess() throws InterruptedException {
while (true) {
String line = waitingLine.take(); // Blocking operation if the queue is empty
// split line on : that we have added in the producer
String[] parts = line.split(":", 2);
int lineNumber = Integer.parseInt(parts[0]);
String lineContent = parts[1];
// the process is converting the text to upper case
String result = lineNumber + ":" + lineContent.toUpperCase();
//after processing the line it stores it in the results
results.put(result);
System.out.println(Thread.currentThread().getName() + " consumer thread processed the line number: " + lineNumber);
}
}
}
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
class ConsumerWriter implements Runnable {
private BlockingQueue<String> results;
private String outputFilePath;
public ConsumerWriter(BlockingQueue<String> results, String outputFilePath) {
this.results = results;
this.outputFilePath = outputFilePath;
}
@Override
public void run() {
try {
writeResults();
} catch (IOException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
private void writeResults() throws IOException {
int expected=0;
try (BufferedWriter writer = new BufferedWriter(new FileWriter(outputFilePath))) {
while (true) {
String result = results.take();
// split result to line number and line lineContent
String[] parts = result.split(":", 2);
int lineNumber = Integer.parseInt(parts[0]);
String lineContent = parts[1];
//if the line number is as expected it goes to the file
if(lineNumber==expected){
writer.write(lineContent);
writer.newLine();
writer.flush();
System.out.println(Thread.currentThread().getName() + " consumer writer thread wrote the line num " + lineNumber);
expected++;}
//to know that all file was written on the output
else if(lineNumber==-1 && results.isEmpty())
{System.out.println(Thread.currentThread().getName() + " all are written " );
writer.close();
break;}
//if the line number is not as expected it goes back to the results queue
else {
try{results.put(result);
System.out.println(Thread.currentThread().getName() + " consumer writer thread returned the line num: " + lineNumber);
}catch (InterruptedException e){throw new RuntimeException(e);}
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
class Producer implements Runnable {
private BlockingQueue<String> waitingLine;
private String filePath;
public Producer(BlockingQueue<String> waitingLine, String filePath) {
this.waitingLine = waitingLine;
this.filePath = filePath;
}
@Override
public void run() {
try{try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
String line;
int lineNumber = 0;
while ((line = reader.readLine()) != null) {
try {
//here we are adding line number to preserve the order in the out but file because if we have to consumers both
// are processing lines, the second one may finish processing and add it to the results queue before the first thread which leads to loss the text order
// the number is used later to preserve the order
waitingLine.put(lineNumber + ":" + line);
//to know every time this thread is working
System.out.println(Thread.currentThread().getName() + " producer thread produced the line number " + lineNumber);
lineNumber++;
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
try{
//to define the end of file a line is added to the waiting list with number -1 and it will be used later
waitingLine.put("-1:EndOfFile");
System.out.println("reader reached end of file");
}catch (InterruptedException e){
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}catch (IOException e){e.printStackTrace();}
}
}
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerTask {
public static void main(String[] args) {
BlockingQueue<String> waitingLine = new ArrayBlockingQueue<>(5);
BlockingQueue<String> results = new ArrayBlockingQueue<>(100);
String filePath = "C:\\Users\\Diana\\Downloads\\input.txt";
String outputFilePath = "C:\\Users\\Diana\\Downloads\\output.txt";
Thread producerThread = new Thread(new Producer(waitingLine, filePath));
Thread consumerThread1 = new Thread(new Consumer(waitingLine, results));
Thread consumerThread2 = new Thread(new Consumer(waitingLine, results));
Thread fileWriterConsumerThread = new Thread(new ConsumerWriter(results, outputFilePath));
producerThread.start();
consumerThread1.start();
consumerThread2.start();
fileWriterConsumerThread.start();
try {
Thread.sleep(5000); // tp give some time for producers and consumers to work
producerThread.interrupt();
producerThread.join();
consumerThread1.join();
consumerThread2.join();
fileWriterConsumerThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment