Commit 650addc4 authored by mohammad.salama's avatar mohammad.salama

Serializing Transactions

parent 74ec3501
<component name="ArtifactManager"> <component name="ArtifactManager">
<artifact type="jar" name="Consumer:jar"> <artifact type="jar" name="Consumer-Account-Manager:jar">
<output-path>$PROJECT_DIR$/out/artifacts/Consumer_jar</output-path> <output-path>$PROJECT_DIR$/out/artifacts/Consumer_Account_Manager_jar</output-path>
<root id="archive" name="Consumer.jar"> <root id="archive" name="Consumer.jar">
<element id="module-output" name="Consumer" /> <element id="module-output" name="Consumer" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-databind/2.12.1/jackson-databind-2.12.1.jar" path-in-jar="/" /> <element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-databind/2.12.1/jackson-databind-2.12.1.jar" path-in-jar="/" />
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
<project version="4"> <project version="4">
<component name="ArtifactsWorkspaceSettings"> <component name="ArtifactsWorkspaceSettings">
<artifacts-to-build> <artifacts-to-build>
<artifact name="Consumer:jar" /> <artifact name="Consumer-Account-Manager:jar" />
</artifacts-to-build> </artifacts-to-build>
</component> </component>
<component name="AutoImportSettings"> <component name="AutoImportSettings">
...@@ -10,11 +10,29 @@ ...@@ -10,11 +10,29 @@
</component> </component>
<component name="ChangeListManager"> <component name="ChangeListManager">
<list default="true" id="68795978-b1a2-4702-95fa-e8e7afae617a" name="Changes" comment=""> <list default="true" id="68795978-b1a2-4702-95fa-e8e7afae617a" name="Changes" comment="">
<change afterPath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/UserNotificationKafkaConsumer.java" afterDir="false" /> <change beforePath="$PROJECT_DIR$/../Consumer-Reporting/.idea/artifacts/Consumer_jar.xml" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/out/artifacts/Consumer_jar/Consumer.jar" beforeDir="false" /> <change beforePath="$PROJECT_DIR$/../Consumer-Reporting/.idea/compiler.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/.idea/compiler.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/.idea/vcs.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/.idea/vcs.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/src/main/java/org/example/ReportingKafkaConsumer.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/src/main/java/org/example/ReportingKafkaConsumer.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/src/main/java/org/example/TransactionInfo.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/src/main/java/org/example/TransactionInfo.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/target/classes/org/example/ReportingKafkaConsumer.class" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/target/classes/org/example/ReportingKafkaConsumer.class" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/target/classes/org/example/TransactionInfo.class" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/target/classes/org/example/TransactionInfo.class" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-User-Notification/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-User-Notification/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/DataBaseHandler.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/DataBaseHandler.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/TransactionInfo.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/TransactionInfo.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/UserNotificationKafkaConsumer.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/UserNotificationKafkaConsumer.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/.idea/artifacts/Producer_jar.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/.idea/artifacts/Producer_jar.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/.idea/workspace.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/../Producer/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/out/artifacts/Producer_jar/Producer.jar" beforeDir="false" /> <change beforePath="$PROJECT_DIR$/../Producer/src/main/java/org/example/BankingAPI.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/src/main/java/org/example/BankingAPI.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/pom.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/../Producer/src/main/java/org/example/Customer.java" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/src/main/java/org/example/MyProducer.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/src/main/java/org/example/MyProducer.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/src/main/java/org/example/TransactionInfo.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/src/main/java/org/example/TransactionInfo.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/src/main/resources/META-INF/MANIFEST.MF" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/src/main/resources/META-INF/MANIFEST.MF" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/target/classes/META-INF/MANIFEST.MF" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/target/classes/META-INF/MANIFEST.MF" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/target/classes/org/example/Customer.class" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/target/classes/org/example/MyProducer.class" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/target/classes/org/example/MyProducer.class" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/target/classes/org/example/TransactionInfo.class" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/target/classes/org/example/TransactionInfo.class" afterDir="false" />
</list> </list>
<option name="SHOW_DIALOG" value="false" /> <option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" /> <option name="HIGHLIGHT_CONFLICTS" value="true" />
...@@ -40,17 +58,17 @@ ...@@ -40,17 +58,17 @@
<option name="hideEmptyMiddlePackages" value="true" /> <option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" /> <option name="showLibraryContents" value="true" />
</component> </component>
<component name="PropertiesComponent">{ <component name="PropertiesComponent"><![CDATA[{
&quot;keyToString&quot;: { "keyToString": {
&quot;RunOnceActivity.OpenProjectViewOnStart&quot;: &quot;true&quot;, "RunOnceActivity.OpenProjectViewOnStart": "true",
&quot;RunOnceActivity.ShowReadmeOnStart&quot;: &quot;true&quot;, "RunOnceActivity.ShowReadmeOnStart": "true",
&quot;SHARE_PROJECT_CONFIGURATION_FILES&quot;: &quot;true&quot;, "SHARE_PROJECT_CONFIGURATION_FILES": "true",
&quot;last_opened_file_path&quot;: &quot;D:/HIAST/FIY/SS/Advanced Distributed Systems/Lab/4/java-kafka-consumer/Consumer/src/main/resources&quot;, "last_opened_file_path": "D:/HIAST/FIY/SS/Advanced Distributed Systems/Lab/4/java-kafka-consumer/Consumer/src/main/resources",
&quot;project.structure.last.edited&quot;: &quot;Modules&quot;, "project.structure.last.edited": "Artifacts",
&quot;project.structure.proportion&quot;: &quot;0.0&quot;, "project.structure.proportion": "0.15",
&quot;project.structure.side.proportion&quot;: &quot;0.2&quot; "project.structure.side.proportion": "0.2"
} }
}</component> }]]></component>
<component name="RecentsManager"> <component name="RecentsManager">
<key name="CopyFile.RECENT_KEYS"> <key name="CopyFile.RECENT_KEYS">
<recent name="D:\HIAST\FIY\SS\Advanced Distributed Systems\Lab\4\java-kafka-consumer\Consumer\src\main\resources" /> <recent name="D:\HIAST\FIY\SS\Advanced Distributed Systems\Lab\4\java-kafka-consumer\Consumer\src\main\resources" />
......
...@@ -29,12 +29,12 @@ public class AccountManagerKafkaConsumer ...@@ -29,12 +29,12 @@ public class AccountManagerKafkaConsumer
System.out.println("Consumer is part of consumer group " + consumerGroup); System.out.println("Consumer is part of consumer group " + consumerGroup);
Consumer<Long, String> kafkaConsumer = createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup); Consumer<Long, TransactionInfo> kafkaConsumer = createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup);
consumeMessages(TOPIC, kafkaConsumer); consumeMessages(TOPIC, kafkaConsumer);
} }
public static void consumeMessages(String topic, Consumer<Long, String> kafkaConsumer) public static void consumeMessages(String topic, Consumer<Long, TransactionInfo> kafkaConsumer)
{ {
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
...@@ -42,18 +42,16 @@ public class AccountManagerKafkaConsumer ...@@ -42,18 +42,16 @@ public class AccountManagerKafkaConsumer
while (true) while (true)
{ {
ConsumerRecords<Long, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); ConsumerRecords<Long, TransactionInfo> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
if (consumerRecords.isEmpty()) continue; if (consumerRecords.isEmpty()) continue;
for (ConsumerRecord<Long , String> record : consumerRecords) for (ConsumerRecord<Long , TransactionInfo> record : consumerRecords)
{ {
String stringJson = record.value(); ////String stringJson = record.value();
TransactionInfo transactionInfo = record.value();
System.out.println("key : " + record.key()); System.out.println("key : " + record.key());
System.out.println("Received : " + stringJson); System.out.println("Received : " + transactionInfo);
try
{
TransactionInfo transactionInfo = objectMapper.readValue(stringJson , TransactionInfo.class);
WritingHelper writingHelper = new WritingHelper(transactionInfo); WritingHelper writingHelper = new WritingHelper(transactionInfo);
writingHelper.writeToLog(); writingHelper.writeToLog();
boolean x = DataBaseHandler.editAmount(transactionInfo.getName() , transactionInfo.getAmount()); boolean x = DataBaseHandler.editAmount(transactionInfo.getName() , transactionInfo.getAmount());
...@@ -63,24 +61,18 @@ public class AccountManagerKafkaConsumer ...@@ -63,24 +61,18 @@ public class AccountManagerKafkaConsumer
} }
System.out.println(writingHelper); System.out.println(writingHelper);
} }
catch (JsonProcessingException e)
{
System.out.println("Error Parsing");
System.out.println(e.getMessage());
log.error("Could Not Parse Json in Transaction");
}
}
kafkaConsumer.commitAsync(); kafkaConsumer.commitAsync();
} }
} }
public static Consumer<Long, String> createKafkaConsumer(String bootstrapServers, String consumerGroup) public static Consumer<Long, TransactionInfo> createKafkaConsumer(String bootstrapServers, String consumerGroup)
{ {
Properties properties = new Properties(); Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TransactionInfo.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
......
...@@ -12,12 +12,18 @@ public class DataBaseHandler ...@@ -12,12 +12,18 @@ public class DataBaseHandler
public static boolean editAmount(String name , double amount) public static boolean editAmount(String name , double amount)
{ {
String _balance = jedis.hget(name, BALANCE_FIELD); String jsonData = jedis.get(name);
Double balance = Double.parseDouble(_balance);
if (amount > balance) return false; MyRecord myRecord = new MyRecord();
balance -= amount; boolean result = myRecord.modifyBalance(jsonData , amount);
jedis.hset(name , BALANCE_FIELD , balance.toString());
return true; jsonData = myRecord.toString();
if (jsonData == null) return false;
jedis.set(name , jsonData);
return result;
} }
......
package org.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MyRecord
{
/*SET JohnDoe "{\"id\": 1, \"name\": \"John Doe\", \"email\": \"johndoe@example.com\"}"*/
private String country;
private Double balance;
private Integer warnings;
public MyRecord(){}
public MyRecord(String country , double balance , int warnings)
{
this.balance = balance;
this.country = country;
this.warnings = warnings;
}
public Integer getWarnings() {
return warnings;
}
public void setWarnings(Integer warnings) {
this.warnings = warnings;
}
public Double getBalance() {
return balance;
}
public void setBalance(Double balance) {
this.balance = balance;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
@Override
public String toString()
{
String ans = null;
ObjectMapper objectMapper = new ObjectMapper();
try
{
ans = objectMapper.writeValueAsString(this);
}
catch (JsonProcessingException e)
{
throw new RuntimeException(e);
}
finally {return ans;}
}
public boolean modifyBalance(String jsonData , double amount)
{
try
{
MyRecord myRecord = reverseJson(jsonData);
if (myRecord.balance >= amount)
{
myRecord.balance -= amount;
return true;
}
return false;
}
catch (JsonProcessingException e)
{
throw new RuntimeException(e);
}
}
private MyRecord reverseJson(String json) throws JsonProcessingException
{
ObjectMapper objectMapper = new ObjectMapper();
MyRecord myRecord = objectMapper.readValue(json , MyRecord.class);
return myRecord;
}
}
package org.example; package org.example;
public class TransactionInfo import java.io.Serializable;
public class TransactionInfo implements Serializable
{ {
private String name; private String name;
private String location; private String location;
private double amount; private double amount;
public TransactionInfo() public TransactionInfo()
{} {}
public TransactionInfo(String name , String location , double amount) public TransactionInfo(String name , String location , double amount)
...@@ -40,4 +41,9 @@ public class TransactionInfo ...@@ -40,4 +41,9 @@ public class TransactionInfo
this.amount = amount; this.amount = amount;
} }
@Override
public String toString()
{
return name+" , amount = " + amount;
}
} }
\ No newline at end of file
<component name="ArtifactManager"> <component name="ArtifactManager">
<artifact type="jar" name="Consumer:jar"> <artifact type="jar" name="Consumer-Reporting:jar">
<output-path>$PROJECT_DIR$/out/artifacts/Consumer_jar</output-path> <output-path>$PROJECT_DIR$/out/artifacts/Consumer_Reporting_jar</output-path>
<root id="archive" name="Consumer.jar"> <root id="archive" name="Consumer-Reporting.jar">
<element id="module-output" name="Consumer" /> <element id="module-output" name="Consumer-Reporting" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-databind/2.12.1/jackson-databind-2.12.1.jar" path-in-jar="/" /> <element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.10.5/snappy-java-1.1.10.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/ch/qos/logback/logback-classic/1.2.10/logback-classic-1.2.10.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-annotations/2.12.1/jackson-annotations-2.12.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/github/luben/zstd-jni/1.5.5-6/zstd-jni-1.5.5-6.jar" path-in-jar="/" /> <element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/github/luben/zstd-jni/1.5.5-6/zstd-jni-1.5.5-6.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-core/2.12.1/jackson-core-2.12.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar" path-in-jar="/" /> <element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-annotations/2.12.1/jackson-annotations-2.12.1.jar" path-in-jar="/" /> <element id="extracted-dir" path="$MAVEN_REPOSITORY$/ch/qos/logback/logback-core/1.2.10/logback-core-1.2.10.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-databind/2.12.1/jackson-databind-2.12.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-core/2.12.1/jackson-core-2.12.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar" path-in-jar="/" /> <element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.10.5/snappy-java-1.1.10.5.jar" path-in-jar="/" />
</root> </root>
</artifact> </artifact>
</component> </component>
\ No newline at end of file
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" /> <sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" /> <outputRelativeToContentRoot value="true" />
<module name="Consumer" /> <module name="Consumer" />
<module name="Consumer-Reporting" />
</profile> </profile>
</annotationProcessing> </annotationProcessing>
</component> </component>
......
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="VcsDirectoryMappings"> <component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
<mapping directory="$PROJECT_DIR$" vcs="Git" /> <mapping directory="$PROJECT_DIR$" vcs="Git" />
</component> </component>
</project> </project>
\ No newline at end of file
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
<project version="4"> <project version="4">
<component name="ArtifactsWorkspaceSettings"> <component name="ArtifactsWorkspaceSettings">
<artifacts-to-build> <artifacts-to-build>
<artifact name="Consumer:jar" /> <artifact name="Consumer-Reporting:jar" />
</artifacts-to-build> </artifacts-to-build>
</component> </component>
<component name="AutoImportSettings"> <component name="AutoImportSettings">
...@@ -10,15 +10,25 @@ ...@@ -10,15 +10,25 @@
</component> </component>
<component name="ChangeListManager"> <component name="ChangeListManager">
<list default="true" id="68795978-b1a2-4702-95fa-e8e7afae617a" name="Changes" comment=""> <list default="true" id="68795978-b1a2-4702-95fa-e8e7afae617a" name="Changes" comment="">
<change afterPath="$PROJECT_DIR$/.gitignore" afterDir="false" /> <change beforePath="$PROJECT_DIR$/../Consumer-Account-Manager/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Account-Manager/.idea/workspace.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/artifacts/Consumer_jar.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/AccountManagerKafkaConsumer.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/AccountManagerKafkaConsumer.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/encodings.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/DataBaseHandler.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/DataBaseHandler.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/misc.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/TransactionInfo.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/TransactionInfo.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/uiDesigner.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/../Consumer-User-Notification/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-User-Notification/.idea/workspace.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/vcs.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/DataBaseHandler.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/DataBaseHandler.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/TransactionInfo.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/TransactionInfo.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/pom.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/UserNotificationKafkaConsumer.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/UserNotificationKafkaConsumer.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/main/java/org/example/ReportingKafkaConsumer.java" afterDir="false" /> <change beforePath="$PROJECT_DIR$/../Producer/.idea/artifacts/Producer_jar.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/.idea/artifacts/Producer_jar.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/src/main/java/org/example/BankingAPI.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/src/main/java/org/example/BankingAPI.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/src/main/java/org/example/Customer.java" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/src/main/java/org/example/MyProducer.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/src/main/java/org/example/MyProducer.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/src/main/java/org/example/TransactionInfo.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/src/main/java/org/example/TransactionInfo.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/src/main/resources/META-INF/MANIFEST.MF" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/src/main/resources/META-INF/MANIFEST.MF" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/target/classes/META-INF/MANIFEST.MF" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/target/classes/META-INF/MANIFEST.MF" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/target/classes/org/example/Customer.class" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/target/classes/org/example/MyProducer.class" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/target/classes/org/example/MyProducer.class" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/target/classes/org/example/TransactionInfo.class" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/target/classes/org/example/TransactionInfo.class" afterDir="false" />
</list> </list>
<option name="SHOW_DIALOG" value="false" /> <option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" /> <option name="HIGHLIGHT_CONFLICTS" value="true" />
...@@ -33,7 +43,7 @@ ...@@ -33,7 +43,7 @@
</option> </option>
</component> </component>
<component name="Git.Settings"> <component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" /> <option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$/.." />
</component> </component>
<component name="MarkdownSettingsMigration"> <component name="MarkdownSettingsMigration">
<option name="stateVersion" value="1" /> <option name="stateVersion" value="1" />
...@@ -44,17 +54,17 @@ ...@@ -44,17 +54,17 @@
<option name="hideEmptyMiddlePackages" value="true" /> <option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" /> <option name="showLibraryContents" value="true" />
</component> </component>
<component name="PropertiesComponent"><![CDATA[{ <component name="PropertiesComponent">{
"keyToString": { &quot;keyToString&quot;: {
"RunOnceActivity.OpenProjectViewOnStart": "true", &quot;RunOnceActivity.OpenProjectViewOnStart&quot;: &quot;true&quot;,
"RunOnceActivity.ShowReadmeOnStart": "true", &quot;RunOnceActivity.ShowReadmeOnStart&quot;: &quot;true&quot;,
"SHARE_PROJECT_CONFIGURATION_FILES": "true", &quot;SHARE_PROJECT_CONFIGURATION_FILES&quot;: &quot;true&quot;,
"last_opened_file_path": "D:/HIAST/FIY/SS/Advanced Distributed Systems/Lab/4/java-kafka-consumer/Consumer/src/main/resources", &quot;last_opened_file_path&quot;: &quot;D:/HIAST/FIY/SS/Advanced Distributed Systems/Lab/4/java-kafka-consumer/Consumer/src/main/resources&quot;,
"project.structure.last.edited": "Artifacts", &quot;project.structure.last.edited&quot;: &quot;Artifacts&quot;,
"project.structure.proportion": "0.0", &quot;project.structure.proportion&quot;: &quot;0.15&quot;,
"project.structure.side.proportion": "0.0" &quot;project.structure.side.proportion&quot;: &quot;0.2&quot;
} }
}]]></component> }</component>
<component name="RecentsManager"> <component name="RecentsManager">
<key name="CopyFile.RECENT_KEYS"> <key name="CopyFile.RECENT_KEYS">
<recent name="D:\HIAST\FIY\SS\Advanced Distributed Systems\Lab\4\java-kafka-consumer\Consumer\src\main\resources" /> <recent name="D:\HIAST\FIY\SS\Advanced Distributed Systems\Lab\4\java-kafka-consumer\Consumer\src\main\resources" />
......
package org.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MyRecord
{
private String country;
private Double balance;
public MyRecord(){}
public MyRecord(String country , Double balance )
{
this.balance = balance;
this.country = country;
//this.warnings = warnings;
}
@Override
public String toString()
{
String ans = null;
ObjectMapper objectMapper = new ObjectMapper();
try
{
ans = objectMapper.writeValueAsString(this);
}
catch (JsonProcessingException e)
{
throw new RuntimeException(e);
}
finally {return ans;}
}
public String getCountry ()
{
return this.country;
}
public void setCountry(String country) {
this.country = country;
}
public Double getBalance() {
return balance;
}
public void setBalance(Double balance) {
this.balance = balance;
}
}
package org.example; package org.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -16,7 +13,7 @@ import java.util.Properties; ...@@ -16,7 +13,7 @@ import java.util.Properties;
public class ReportingKafkaConsumer public class ReportingKafkaConsumer
{ {
private static final String TOPIC = "events"; private static final String TOPIC = "events";
private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094"; private static final String BOOTSTRAP_SERVERS = "192.168.184.11:9092";
static final Logger log = LoggerFactory.getLogger(ReportingKafkaConsumer.class); static final Logger log = LoggerFactory.getLogger(ReportingKafkaConsumer.class);
...@@ -30,53 +27,42 @@ public class ReportingKafkaConsumer ...@@ -30,53 +27,42 @@ public class ReportingKafkaConsumer
System.out.println("Consumer is part of consumer group " + consumerGroup); System.out.println("Consumer is part of consumer group " + consumerGroup);
Consumer<Long, String> kafkaConsumer = createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup); Consumer<Long, TransactionInfo> kafkaConsumer = createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup);
consumeMessages(TOPIC, kafkaConsumer); consumeMessages(TOPIC, kafkaConsumer);
} }
public static void consumeMessages(String topic, Consumer<Long, String> kafkaConsumer) public static void consumeMessages(String topic, Consumer<Long, TransactionInfo> kafkaConsumer)
{ {
ObjectMapper objectMapper = new ObjectMapper();
kafkaConsumer.subscribe(Collections.singletonList(topic)); kafkaConsumer.subscribe(Collections.singletonList(topic));
while (true) while (true)
{ {
ConsumerRecords<Long, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); ConsumerRecords<Long, TransactionInfo> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
if (consumerRecords.isEmpty()) continue; if (consumerRecords.isEmpty()) continue;
for (ConsumerRecord<Long , String> record : consumerRecords) for (ConsumerRecord<Long , TransactionInfo> record : consumerRecords)
{ {
String stringJson = record.value(); TransactionInfo transactionInfo = record.value();
System.out.println("key : " + record.key()); System.out.println("key : " + record.key());
System.out.println("Received : " + stringJson); System.out.println("Received : " + transactionInfo);
try
{
TransactionInfo transactionInfo = objectMapper.readValue(stringJson , TransactionInfo.class);
WritingHelper writingHelper = new WritingHelper(transactionInfo); WritingHelper writingHelper = new WritingHelper(transactionInfo);
writingHelper.writeToLog(); writingHelper.writeToLog();
System.out.println(writingHelper); System.out.println(writingHelper);
} }
catch (JsonProcessingException e)
{
System.out.println("Error Parsing");
System.out.println(e.getMessage());
log.error("Could Not Parse Json in Transaction");
}
}
kafkaConsumer.commitAsync(); kafkaConsumer.commitAsync();
} }
} }
public static Consumer<Long, String> createKafkaConsumer(String bootstrapServers, String consumerGroup) public static Consumer<Long, TransactionInfo> createKafkaConsumer(String bootstrapServers, String consumerGroup)
{ {
Properties properties = new Properties(); Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TransactionInfo.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
......
package org.example; package org.example;
public class TransactionInfo import java.io.Serializable;
public class TransactionInfo implements Serializable
{ {
private String name; private String name;
private String location; private String location;
private double amount; private double amount;
public TransactionInfo() public TransactionInfo()
{} {}
public TransactionInfo(String name , String location , double amount) public TransactionInfo(String name , String location , double amount)
...@@ -40,4 +41,9 @@ public class TransactionInfo ...@@ -40,4 +41,9 @@ public class TransactionInfo
this.amount = amount; this.amount = amount;
} }
@Override
public String toString()
{
return name+" , amount = " + amount;
}
} }
\ No newline at end of file
...@@ -10,10 +10,28 @@ ...@@ -10,10 +10,28 @@
</component> </component>
<component name="ChangeListManager"> <component name="ChangeListManager">
<list default="true" id="68795978-b1a2-4702-95fa-e8e7afae617a" name="Changes" comment=""> <list default="true" id="68795978-b1a2-4702-95fa-e8e7afae617a" name="Changes" comment="">
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/out/artifacts/Consumer_jar/Consumer.jar" beforeDir="false" /> <change beforePath="$PROJECT_DIR$/../Consumer-Account-Manager/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Account-Manager/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/AccountManagerKafkaConsumer.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/AccountManagerKafkaConsumer.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/DataBaseHandler.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/DataBaseHandler.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/TransactionInfo.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/TransactionInfo.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/.idea/artifacts/Consumer_jar.xml" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/.idea/compiler.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/.idea/compiler.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/.idea/vcs.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/.idea/vcs.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/src/main/java/org/example/ReportingKafkaConsumer.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/src/main/java/org/example/ReportingKafkaConsumer.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/src/main/java/org/example/TransactionInfo.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/src/main/java/org/example/TransactionInfo.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/target/classes/org/example/ReportingKafkaConsumer.class" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/target/classes/org/example/ReportingKafkaConsumer.class" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/.idea/artifacts/Producer_jar.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/.idea/artifacts/Producer_jar.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/.idea/workspace.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/../Producer/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/out/artifacts/Producer_jar/Producer.jar" beforeDir="false" /> <change beforePath="$PROJECT_DIR$/../Producer/src/main/java/org/example/BankingAPI.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/src/main/java/org/example/BankingAPI.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/pom.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/../Producer/src/main/java/org/example/Customer.java" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/src/main/java/org/example/MyProducer.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/src/main/java/org/example/MyProducer.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/src/main/java/org/example/TransactionInfo.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/src/main/java/org/example/TransactionInfo.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/src/main/resources/META-INF/MANIFEST.MF" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/src/main/resources/META-INF/MANIFEST.MF" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/target/classes/META-INF/MANIFEST.MF" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/target/classes/META-INF/MANIFEST.MF" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/target/classes/org/example/Customer.class" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/target/classes/org/example/MyProducer.class" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/target/classes/org/example/MyProducer.class" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Producer/target/classes/org/example/TransactionInfo.class" beforeDir="false" afterPath="$PROJECT_DIR$/../Producer/target/classes/org/example/TransactionInfo.class" afterDir="false" />
</list> </list>
<option name="SHOW_DIALOG" value="false" /> <option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" /> <option name="HIGHLIGHT_CONFLICTS" value="true" />
...@@ -39,19 +57,20 @@ ...@@ -39,19 +57,20 @@
<option name="hideEmptyMiddlePackages" value="true" /> <option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" /> <option name="showLibraryContents" value="true" />
</component> </component>
<component name="PropertiesComponent"><![CDATA[{ <component name="PropertiesComponent">{
"keyToString": { &quot;keyToString&quot;: {
"RunOnceActivity.OpenProjectViewOnStart": "true", &quot;RunOnceActivity.OpenProjectViewOnStart&quot;: &quot;true&quot;,
"RunOnceActivity.ShowReadmeOnStart": "true", &quot;RunOnceActivity.ShowReadmeOnStart&quot;: &quot;true&quot;,
"SHARE_PROJECT_CONFIGURATION_FILES": "true", &quot;SHARE_PROJECT_CONFIGURATION_FILES&quot;: &quot;true&quot;,
"last_opened_file_path": "D:/HIAST/FIY/SS/Advanced Distributed Systems/Lab/4/java-kafka-consumer/Consumer/src/main/resources", &quot;last_opened_file_path&quot;: &quot;D:/HIAST/FIY/SS/Advanced Distributed Systems/Lab/4/java-kafka-producer/Producer/Consumer-User-Notification/src/main/java/org/example&quot;,
"project.structure.last.edited": "Modules", &quot;project.structure.last.edited&quot;: &quot;Modules&quot;,
"project.structure.proportion": "0.0", &quot;project.structure.proportion&quot;: &quot;0.0&quot;,
"project.structure.side.proportion": "0.2" &quot;project.structure.side.proportion&quot;: &quot;0.2&quot;
} }
}]]></component> }</component>
<component name="RecentsManager"> <component name="RecentsManager">
<key name="CopyFile.RECENT_KEYS"> <key name="CopyFile.RECENT_KEYS">
<recent name="D:\HIAST\FIY\SS\Advanced Distributed Systems\Lab\4\java-kafka-producer\Producer\Consumer-User-Notification\src\main\java\org\example" />
<recent name="D:\HIAST\FIY\SS\Advanced Distributed Systems\Lab\4\java-kafka-consumer\Consumer\src\main\resources" /> <recent name="D:\HIAST\FIY\SS\Advanced Distributed Systems\Lab\4\java-kafka-consumer\Consumer\src\main\resources" />
</key> </key>
<key name="CopyClassDialog.RECENTS_KEY"> <key name="CopyClassDialog.RECENTS_KEY">
......
package org.example; package org.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import redis.clients.jedis.Jedis; import redis.clients.jedis.Jedis;
public class DataBaseHandler public class DataBaseHandler
...@@ -12,11 +13,17 @@ public class DataBaseHandler ...@@ -12,11 +13,17 @@ public class DataBaseHandler
public static void notifyUser(String name) public static void notifyUser(String name)
{ {
String _notify_num = jedis.hget(name, SUSPICIOUS_FIELD);
Integer notify_num = Integer.parseInt(_notify_num);
notify_num++;
jedis.hset(name , SUSPICIOUS_FIELD , notify_num.toString());
}
String jsonData = jedis.get(name);
MyRecord myRecord = new MyRecord();
myRecord.increaseNotification(jsonData);
jsonData = myRecord.toString();
if ((jsonData == null) ) return;
jedis.set(name , jsonData);
}
} }
package org.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.protocol.types.Field;
public class MyRecord
{
/*SET JohnDoe "{\"id\": 1, \"name\": \"John Doe\", \"email\": \"johndoe@example.com\"}"*/
private String country;
private Double balance;
private Integer warnings;
public MyRecord(){}
/*
data base key = our user name
*/
public MyRecord (String country , double balance , int warnings)
{
this.balance = balance;
this.country = country;
this.warnings = warnings;
}
public Integer getWarnings() {
return warnings;
}
public void setWarnings(Integer warnings) {
this.warnings = warnings;
}
public Double getBalance() {
return balance;
}
public void setBalance(Double balance) {
this.balance = balance;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
@Override
public String toString()
{
String ans = null;
ObjectMapper objectMapper = new ObjectMapper();
try
{
ans = objectMapper.writeValueAsString(this);
}
catch (JsonProcessingException e)
{
throw new RuntimeException(e);
}
finally {return ans;}
}
public void increaseNotification(String jsonData)
{
try
{
MyRecord myRecord = reverseJson(jsonData);
myRecord.warnings++;
}
catch (JsonProcessingException e)
{
throw new RuntimeException(e);
}
}
private MyRecord reverseJson(String json) throws JsonProcessingException
{
ObjectMapper objectMapper = new ObjectMapper();
MyRecord myRecord = objectMapper.readValue(json , MyRecord.class);
return myRecord;
}
}
package org.example; package org.example;
public class TransactionInfo import java.io.Serializable;
public class TransactionInfo implements Serializable
{ {
private String name; private String name;
private String location; private String location;
private double amount; private double amount;
public TransactionInfo() public TransactionInfo()
{} {}
public TransactionInfo(String name , String location , double amount) public TransactionInfo(String name , String location , double amount)
...@@ -40,4 +41,9 @@ public class TransactionInfo ...@@ -40,4 +41,9 @@ public class TransactionInfo
this.amount = amount; this.amount = amount;
} }
@Override
public String toString()
{
return name+" , amount = " + amount;
}
} }
\ No newline at end of file
package org.example; package org.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -15,7 +12,7 @@ import java.util.Properties; ...@@ -15,7 +12,7 @@ import java.util.Properties;
public class UserNotificationKafkaConsumer public class UserNotificationKafkaConsumer
{ {
private static final String TOPIC = "SUSPICIOUS_TOPIC"; private static final String TOPIC = "SUSPICIOUS_TOPIC";
private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094"; private static final String BOOTSTRAP_SERVERS = "localhost:9092";
static final Logger log = LoggerFactory.getLogger(UserNotificationKafkaConsumer.class); static final Logger log = LoggerFactory.getLogger(UserNotificationKafkaConsumer.class);
...@@ -29,54 +26,43 @@ public class UserNotificationKafkaConsumer ...@@ -29,54 +26,43 @@ public class UserNotificationKafkaConsumer
System.out.println("Consumer is part of consumer group " + consumerGroup); System.out.println("Consumer is part of consumer group " + consumerGroup);
Consumer<Long, String> kafkaConsumer = createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup); Consumer<Long, TransactionInfo> kafkaConsumer = createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup);
consumeMessages(TOPIC, kafkaConsumer); consumeMessages(TOPIC, kafkaConsumer);
} }
public static void consumeMessages(String topic, Consumer<Long, String> kafkaConsumer) public static void consumeMessages(String topic, Consumer<Long, TransactionInfo> kafkaConsumer)
{ {
ObjectMapper objectMapper = new ObjectMapper();
kafkaConsumer.subscribe(Collections.singletonList(topic)); kafkaConsumer.subscribe(Collections.singletonList(topic));
while (true) while (true)
{ {
ConsumerRecords<Long, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); ConsumerRecords<Long, TransactionInfo> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
if (consumerRecords.isEmpty()) continue; if (consumerRecords.isEmpty()) continue;
for (ConsumerRecord<Long , String> record : consumerRecords) for (ConsumerRecord<Long , TransactionInfo> record : consumerRecords)
{ {
String stringJson = record.value(); TransactionInfo transactionInfo = record.value();
System.out.println("key : " + record.key()); System.out.println("key : " + record.key());
System.out.println("Received : " + stringJson); System.out.println("Received : " + transactionInfo);
try
{
TransactionInfo transactionInfo = objectMapper.readValue(stringJson , TransactionInfo.class);
WritingHelper writingHelper = new WritingHelper(transactionInfo); WritingHelper writingHelper = new WritingHelper(transactionInfo);
writingHelper.writeToLog(); writingHelper.writeToLog();
DataBaseHandler.notifyUser(transactionInfo.getName()); DataBaseHandler.notifyUser(transactionInfo.getName());
System.out.println(writingHelper); System.out.println(writingHelper);
} }
catch (JsonProcessingException e)
{
System.out.println("Error Parsing");
System.out.println(e.getMessage());
log.error("Could Not Parse Json in Transaction");
}
}
kafkaConsumer.commitAsync(); kafkaConsumer.commitAsync();
} }
} }
public static Consumer<Long, String> createKafkaConsumer(String bootstrapServers, String consumerGroup) public static Consumer<Long, TransactionInfo> createKafkaConsumer(String bootstrapServers, String consumerGroup)
{ {
Properties properties = new Properties(); Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TransactionInfo.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
......
File added
...@@ -3,14 +3,18 @@ ...@@ -3,14 +3,18 @@
<output-path>$PROJECT_DIR$/out/artifacts/Producer_jar</output-path> <output-path>$PROJECT_DIR$/out/artifacts/Producer_jar</output-path>
<root id="archive" name="Producer.jar"> <root id="archive" name="Producer.jar">
<element id="module-output" name="Producer" /> <element id="module-output" name="Producer" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/github/luben/zstd-jni/1.5.5-6/zstd-jni-1.5.5-6.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-core/2.15.3/jackson-core-2.15.3.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-annotations/2.15.3/jackson-annotations-2.15.3.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/redis/clients/jedis/3.7.0/jedis-3.7.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/commons/commons-pool2/2.10.0/commons-pool2-2.10.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar" path-in-jar="/" /> <element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.10.5/snappy-java-1.1.10.5.jar" path-in-jar="/" /> <element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.10.5/snappy-java-1.1.10.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-annotations/2.12.1/jackson-annotations-2.12.1.jar" path-in-jar="/" /> <element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.jar" path-in-jar="/" /> <element id="extracted-dir" path="$MAVEN_REPOSITORY$/ch/qos/logback/logback-core/1.2.10/logback-core-1.2.10.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-core/2.12.1/jackson-core-2.12.1.jar" path-in-jar="/" /> <element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-databind/2.15.3/jackson-databind-2.15.3.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.32/slf4j-api-1.7.32.jar" path-in-jar="/" /> <element id="extracted-dir" path="$MAVEN_REPOSITORY$/ch/qos/logback/logback-classic/1.2.10/logback-classic-1.2.10.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-databind/2.12.1/jackson-databind-2.12.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/github/luben/zstd-jni/1.5.5-6/zstd-jni-1.5.5-6.jar" path-in-jar="/" />
</root> </root>
</artifact> </artifact>
</component> </component>
\ No newline at end of file
...@@ -10,7 +10,21 @@ ...@@ -10,7 +10,21 @@
</component> </component>
<component name="ChangeListManager"> <component name="ChangeListManager">
<list default="true" id="9704b422-94d9-4d0a-8879-40f13e57ea0e" name="Changes" comment=""> <list default="true" id="9704b422-94d9-4d0a-8879-40f13e57ea0e" name="Changes" comment="">
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/out/artifacts/Consumer_jar/Consumer.jar" beforeDir="false" /> <change beforePath="$PROJECT_DIR$/../Consumer-Account-Manager/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Account-Manager/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/AccountManagerKafkaConsumer.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/AccountManagerKafkaConsumer.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/DataBaseHandler.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/DataBaseHandler.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/TransactionInfo.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Account-Manager/src/main/java/org/example/TransactionInfo.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/.idea/artifacts/Consumer_jar.xml" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/.idea/compiler.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/.idea/compiler.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/.idea/vcs.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/.idea/vcs.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/src/main/java/org/example/ReportingKafkaConsumer.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/src/main/java/org/example/ReportingKafkaConsumer.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/src/main/java/org/example/TransactionInfo.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/src/main/java/org/example/TransactionInfo.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-Reporting/target/classes/org/example/ReportingKafkaConsumer.class" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-Reporting/target/classes/org/example/ReportingKafkaConsumer.class" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-User-Notification/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-User-Notification/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/DataBaseHandler.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/DataBaseHandler.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/TransactionInfo.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/TransactionInfo.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/UserNotificationKafkaConsumer.java" beforeDir="false" afterPath="$PROJECT_DIR$/../Consumer-User-Notification/src/main/java/org/example/UserNotificationKafkaConsumer.java" afterDir="false" />
</list> </list>
<option name="SHOW_DIALOG" value="false" /> <option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" /> <option name="HIGHLIGHT_CONFLICTS" value="true" />
...@@ -35,7 +49,7 @@ ...@@ -35,7 +49,7 @@
"RunOnceActivity.ShowReadmeOnStart": "true", "RunOnceActivity.ShowReadmeOnStart": "true",
"SHARE_PROJECT_CONFIGURATION_FILES": "true", "SHARE_PROJECT_CONFIGURATION_FILES": "true",
"last_opened_file_path": "D:/HIAST/FIY/SS/Advanced Distributed Systems/Lab/4/java-kafka-producer/Producer/Producer/src/main/java/org/example", "last_opened_file_path": "D:/HIAST/FIY/SS/Advanced Distributed Systems/Lab/4/java-kafka-producer/Producer/Producer/src/main/java/org/example",
"project.structure.last.edited": "Libraries", "project.structure.last.edited": "Project",
"project.structure.proportion": "0.15", "project.structure.proportion": "0.15",
"project.structure.side.proportion": "0.2", "project.structure.side.proportion": "0.2",
"run.code.analysis.last.selected.profile": "pProject Default", "run.code.analysis.last.selected.profile": "pProject Default",
...@@ -46,6 +60,9 @@ ...@@ -46,6 +60,9 @@
<key name="CopyFile.RECENT_KEYS"> <key name="CopyFile.RECENT_KEYS">
<recent name="D:\HIAST\FIY\SS\Advanced Distributed Systems\Lab\4\java-kafka-producer\Producer\Producer\src\main\java\org\example" /> <recent name="D:\HIAST\FIY\SS\Advanced Distributed Systems\Lab\4\java-kafka-producer\Producer\Producer\src\main\java\org\example" />
</key> </key>
<key name="CopyClassDialog.RECENTS_KEY">
<recent name="org.example" />
</key>
</component> </component>
<component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" /> <component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
<component name="TaskManager"> <component name="TaskManager">
......
2024-03-24 00:20:55,909 INFO org.example.MyProducer [main] Registering A Transaction for Mr/Ms : Abd
2024-03-24 00:21:56,393 ERROR org.example.MyProducer [main] Could Not Send Data to Kafka , Or Could Not Get Meta-Data
2024-03-24 00:25:02,236 INFO org.example.MyProducer [main] Registering A Transaction for Mr/Ms : Abd
2024-03-24 00:26:02,655 ERROR org.example.MyProducer [main] Could Not Send Data to Kafka , Or Could Not Get Meta-Data
2024-03-24 00:28:25,348 INFO org.example.MyProducer [main] Registering A Transaction for Mr/Ms : Mohammed
2024-03-24 00:28:56,783 INFO org.example.MyProducer [main] Registering A Transaction for Mr/Ms : Abd
2024-03-24 00:29:57,252 ERROR org.example.MyProducer [main] Could Not Send Data to Kafka , Or Could Not Get Meta-Data
2024-03-24 00:32:30,966 INFO org.example.MyProducer [main] Registering A Transaction for Mr/Ms : Mohammed
2024-03-24 00:32:45,233 INFO org.example.MyProducer [main] Registering A Transaction for Mr/Ms : Abd
2024-03-24 00:33:45,649 ERROR org.example.MyProducer [main] Could Not Send Data to Kafka , Or Could Not Get Meta-Data
2024-03-24 00:35:35,496 INFO org.example.MyProducer [main] Registering A Transaction for Mr/Ms : Abd
2024-03-24 00:36:35,925 ERROR org.example.MyProducer [main] Could Not Send Data to Kafka , Or Could Not Get Meta-Data
2024-03-24 00:37:28,690 INFO org.example.MyProducer [main] Registering A Transaction for Mr/Ms : Abd
2024-03-24 00:38:29,154 ERROR org.example.MyProducer [main] Could Not Send Data to Kafka , Or Could Not Get Meta-Data
2024-03-24 01:06:19,508 INFO org.example.MyProducer [main] Registering A Transaction for Mr/Ms : Abd
2024-03-24 01:07:20,030 ERROR org.example.MyProducer [main] Could Not Send Data to Kafka , Or Could Not Get Meta-Data
2024-03-24 01:08:23,020 INFO org.example.MyProducer [main] Registering A Transaction for Mr/Ms : Abd
2024-03-24 01:09:23,452 ERROR org.example.MyProducer [main] Could Not Send Data to Kafka , Or Could Not Get Meta-Data
2024-03-24 01:12:51,541 INFO org.example.MyProducer [main] Registering A Transaction for Mr/Ms : Abd
2024-03-24 12:22:49,672 INFO org.example.MyProducer [main] Registering A Transaction for Mr/Ms : Ali
2024-03-24 12:23:50,148 ERROR org.example.MyProducer [main] Could Not Send Data to Kafka , Or Could Not Get Meta-Data
2024-03-24 12:28:18,336 INFO org.example.MyProducer [main] Registering A Transaction for Mr/Ms : Mohammed
2024-03-24 12:30:44,947 INFO org.example.MyProducer [main] Registering A Transaction for Mr/Ms : Abd
package org.example; package org.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import redis.clients.jedis.Jedis; import redis.clients.jedis.Jedis;
import java.util.Scanner; import java.util.Scanner;
public class BankingAPI { public class BankingAPI {
private static final String REDIS_HOST = "localhost"; private static final String REDIS_HOST = "192.168.184.1";
private static final int REDIS_PORT = 6379; private static final int REDIS_PORT = 6379;
private static Jedis jedis;
public static void main(String[] args) public static void main(String[] args)
{ {
// Connect to Redis jedis = new Jedis(REDIS_HOST, REDIS_PORT);
Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);
// Read user input
String name = getUserInput("Enter your name: "); String name = getUserInput("Enter your name: ");
String location = getUserInput("Enter your location: "); String location = getUserInput("Enter your location: ");
double amount = Double.parseDouble(getUserInput("Enter the transaction amount: ")); double amount = Double.parseDouble(getUserInput("Enter the transaction amount: "));
// Search for the user in the Redis database
String country = jedis.hget("users", name);
if (country == null) { String country = "";
String jsonData = jedis.get(name);
System.out.println(jsonData);
ObjectMapper objectMapper = new ObjectMapper();
try
{
MyRecord myRecord = objectMapper.readValue(jsonData , MyRecord.class);
System.out.println(myRecord.getCountry());
country = myRecord.getCountry();
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
if (country == null)
{
System.out.println("User not found in the database."); System.out.println("User not found in the database.");
} else { } else
if (country.equals(location)) { {
if (country.equals(location))
{
System.out.println("Transaction is valid."); System.out.println("Transaction is valid.");
// Perform further processing for valid transaction TransactionInfo transactionInfo = new TransactionInfo(name , location , amount);
} else { System.out.println(transactionInfo.getAmount());
boolean x = MyProducer.logTransaction(transactionInfo , "VALID_TOPIC");
if (x)
{
System.out.println("Success !!");
}
else
{
System.out.println("ERROR");
}
}
else
{
TransactionInfo transactionInfo = new TransactionInfo(name , location , amount);
System.out.println(transactionInfo.getAmount());
System.out.println("Transaction is suspicious."); System.out.println("Transaction is suspicious.");
// Perform further processing for suspicious transaction boolean x = MyProducer.logTransaction(transactionInfo , "SUSPICIOUS_TOPIC");
if (x)
{
System.out.println("Success !!");
}
else
{
System.out.println("ERRORR");
}
} }
} }
// Close the Redis connection
jedis.close(); jedis.close();
} }
...@@ -42,4 +79,23 @@ public class BankingAPI { ...@@ -42,4 +79,23 @@ public class BankingAPI {
Scanner scanner = new Scanner(System.in); Scanner scanner = new Scanner(System.in);
return scanner.nextLine(); return scanner.nextLine();
} }
private static String getCountry(String name)
{
String jsonData = jedis.get(name);
System.out.println(jsonData);
ObjectMapper objectMapper = new ObjectMapper();
try
{
System.out.println("Trying");
MyRecord myRecord = objectMapper.readValue(jsonData , MyRecord.class);
System.out.println("returning nourw");
System.out.println(myRecord.getCountry());
return myRecord.getCountry();
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
} }
\ No newline at end of file
package org.example;
import java.nio.DoubleBuffer;
public class Customer
{
private String name;
private String originalCountry;
private double cash;
public Customer (String name , String originalCountry , double cash)
{
this.name = name;
this.originalCountry = originalCountry;
this.cash = cash;
}
public double getCash()
{
return cash;
}
public String getName()
{
return name;
}
public String getOriginalCountry()
{
return originalCountry;
}
public boolean withdrawMoney (double amount)
{
cash -= amount;
if (cash < 0)
{
cash += amount;
return false;
}
return true;
}
public boolean depositMoney (double amount)
{
if (cash + amount < cash) return false;
cash += amount;
return true;
}
}
...@@ -14,39 +14,26 @@ import java.util.concurrent.ExecutionException; ...@@ -14,39 +14,26 @@ import java.util.concurrent.ExecutionException;
public class MyProducer public class MyProducer
{ {
private static final String TOPIC = "events"; //private static final String TOPIC = "events";
private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094"; private static final String BOOTSTRAP_SERVERS = "192.168.184.11:9092";
private static final Logger log = LoggerFactory.getLogger(MyProducer.class); private static final Logger log = LoggerFactory.getLogger(MyProducer.class);
public static void main(String[] args) throws JsonProcessingException
{
TransactionInfo transactionInfo = new TransactionInfo("Mohammed" , "Tartous" , 150.24);
boolean x = logTransaction(transactionInfo , TOPIC);
}
public static boolean logTransaction(TransactionInfo transactionInfo , String topic) public static boolean logTransaction(TransactionInfo transactionInfo , String topic)
{ {
log.info("Registering A Transaction for Mr/Ms : " + transactionInfo.getName()); log.info("Registering A Transaction for Mr/Ms : " + transactionInfo.getName());
ObjectMapper objectMapper = new ObjectMapper();
String value = null; TransactionInfo value = transactionInfo;
try
{
value = objectMapper.writeValueAsString(transactionInfo);
}
catch (JsonProcessingException e)
{
log.error("Transaction for Mr/Ms : " + transactionInfo.getName() + " Could Not Be Processed To Json");
return false;
}
Producer<Long, String> kafkaProducer = createKafkaProducer(BOOTSTRAP_SERVERS); Producer<Long, TransactionInfo> kafkaProducer = createKafkaProducer(BOOTSTRAP_SERVERS);
long timeStamp = System.currentTimeMillis(); long timeStamp = System.currentTimeMillis();
ProducerRecord<Long , String> record = new ProducerRecord<>(topic , timeStamp ,value); System.out.println("topic : " + topic);
System.out.println("value : " + value);
ProducerRecord<Long , TransactionInfo> record = new ProducerRecord<>(topic , timeStamp ,value);
RecordMetadata recordMetadata = null; RecordMetadata recordMetadata = null;
try try
...@@ -56,7 +43,8 @@ public class MyProducer ...@@ -56,7 +43,8 @@ public class MyProducer
catch (InterruptedException | ExecutionException e) catch (InterruptedException | ExecutionException e)
{ {
log.error("Could Not Send Data to Kafka , Or Could Not Get Meta-Data"); log.error("Could Not Send Data to Kafka , Or Could Not Get Meta-Data");
return false; throw new RuntimeException(e);
///return false;
} }
...@@ -79,12 +67,13 @@ public class MyProducer ...@@ -79,12 +67,13 @@ public class MyProducer
} }
public static Producer<Long, String> createKafkaProducer(String bootstrapServers) { public static Producer<Long, TransactionInfo> createKafkaProducer(String bootstrapServers) {
Properties properties = new Properties(); Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "events-producer"); properties.put(ProducerConfig.CLIENT_ID_CONFIG, "events-producer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TransactionInfo.class.getName());
return new KafkaProducer<>(properties); return new KafkaProducer<>(properties);
} }
......
package org.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MyRecord
{
/*SET JohnDoe "{\"id\": 1, \"name\": \"John Doe\", \"email\": \"johndoe@example.com\"}"*/
private String country;
private Double balance;
private Integer warnings;
public MyRecord(){}
/*
data base key = our user name
*/
public MyRecord(String country , double balance , int warnings)
{
this.balance = balance;
this.country = country;
this.warnings = warnings;
}
@Override
public String toString()
{
String ans = null;
ObjectMapper objectMapper = new ObjectMapper();
try
{
ans = objectMapper.writeValueAsString(this);
}
catch (JsonProcessingException e)
{
throw new RuntimeException(e);
}
finally {return ans;}
}
public boolean modifyBalance(String jsonData , double amount)
{
try
{
MyRecord myRecord = reverseJson(jsonData);
if (myRecord.balance >= amount)
{
myRecord.balance -= amount;
return true;
}
return false;
}
catch (JsonProcessingException e)
{
throw new RuntimeException(e);
}
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public Double getBalance() {
return balance;
}
public void setBalance(Double balance) {
this.balance = balance;
}
public Integer getWarnings() {
return warnings;
}
public void setWarnings(Integer warnings) {
this.warnings = warnings;
}
private MyRecord reverseJson(String json) throws JsonProcessingException
{
ObjectMapper objectMapper = new ObjectMapper();
MyRecord myRecord = objectMapper.readValue(json , MyRecord.class);
return myRecord;
}
}
package org.example; package org.example;
import org.apache.kafka.common.protocol.types.Field; import java.io.Serializable;
import java.security.PublicKey; public class TransactionInfo implements Serializable
import java.security.SecureRandom;
public class TransactionInfo
{ {
private String name; private String name;
private String location; private String location;
...@@ -13,7 +10,7 @@ public class TransactionInfo ...@@ -13,7 +10,7 @@ public class TransactionInfo
public TransactionInfo() public TransactionInfo()
{} {}
public TransactionInfo (String name , String location , double amount) public TransactionInfo(String name , String location , double amount)
{ {
this.amount = amount; this.amount = amount;
this.name = name; this.name = name;
...@@ -43,4 +40,10 @@ public class TransactionInfo ...@@ -43,4 +40,10 @@ public class TransactionInfo
{ {
this.amount = amount; this.amount = amount;
} }
@Override
public String toString()
{
return name+" , amount = " + amount;
}
} }
\ No newline at end of file
Manifest-Version: 1.0 Manifest-Version: 1.0
Main-Class: org.example.MyProducer Main-Class: org.example.BankingAPI
Manifest-Version: 1.0 Manifest-Version: 1.0
Main-Class: org.example.MyProducer Main-Class: org.example.BankingAPI
Run Redis First of All on port 6379
the Run Producer and consumers on kafka VM as jars.
no Parameters are Required.
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment