Commit 574de136 authored by rawan's avatar rawan

Final Project: Kafka Banking System with Performance Tests

parents
File added
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="user-notification-service" />
<module name="reporting-service" />
<module name="account-manager" />
<module name="banking-api-service" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<component name="libraryTable">
<library name="Maven: com.fasterxml.jackson.core:jackson-core:2.10.1">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-core/2.10.1/jackson-core-2.10.1.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-core/2.10.1/jackson-core-2.10.1-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-core/2.10.1/jackson-core-2.10.1-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
<component name="libraryTable">
<library name="Maven: com.fasterxml.jackson.core:jackson-databind:2.10.1">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-databind/2.10.1/jackson-databind-2.10.1.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-databind/2.10.1/jackson-databind-2.10.1-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-databind/2.10.1/jackson-databind-2.10.1-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
<component name="libraryTable">
<library name="Maven: com.github.luben:zstd-jni:1.5.5-6">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/com/github/luben/zstd-jni/1.5.5-6/zstd-jni-1.5.5-6.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/com/github/luben/zstd-jni/1.5.5-6/zstd-jni-1.5.5-6-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/com/github/luben/zstd-jni/1.5.5-6/zstd-jni-1.5.5-6-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
<component name="libraryTable">
<library name="Maven: log4j:log4j:1.2.17">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/log4j/log4j/1.2.17/log4j-1.2.17.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/log4j/log4j/1.2.17/log4j-1.2.17-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/log4j/log4j/1.2.17/log4j-1.2.17-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
<component name="libraryTable">
<library name="Maven: org.apache.kafka:kafka-clients:3.7.0">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
<component name="libraryTable">
<library name="Maven: org.lz4:lz4-java:1.8.0">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
<component name="libraryTable">
<library name="Maven: org.slf4j:slf4j-api:1.7.36">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
<component name="libraryTable">
<library name="Maven: org.slf4j:slf4j-log4j12:1.7.6">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
<component name="libraryTable">
<library name="Maven: org.xerial.snappy:snappy-java:1.1.10.5">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.10.5/snappy-java-1.1.10.5.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.10.5/snappy-java-1.1.10.5-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.10.5/snappy-java-1.1.10.5-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/account-manager/pom.xml" />
<option value="$PROJECT_DIR$/bank-api-service/pom.xml" />
<option value="$PROJECT_DIR$/reporting-service/pom.xml" />
<option value="$PROJECT_DIR$/user-notification-service/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/banking-system-exercise.iml" filepath="$PROJECT_DIR$/.idea/banking-system-exercise.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="kafka.consumer" />
<module name="account-manager" />
<module name="kafka-consumer" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<component name="CopyrightManager">
<copyright>
<option name="notice" value=" MIT License&#10;&#10; Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems &amp;amp; Cloud Computing with Java&#10;&#10; Permission is hereby granted, free of charge, to any person obtaining a copy&#10; of this software and associated documentation files (the &quot;Software&quot;), to deal&#10; in the Software without restriction, including without limitation the rights&#10; to use, copy, modify, merge, publish, distribute, sublicense, and/or sell&#10; copies of the Software, and to permit persons to whom the Software is&#10; furnished to do so, subject to the following conditions:&#10;&#10; The above copyright notice and this permission notice shall be included in all&#10; copies or substantial portions of the Software.&#10;&#10; THE SOFTWARE IS PROVIDED &quot;AS IS&quot;, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR&#10; IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,&#10; FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE&#10; AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER&#10; LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,&#10; OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE&#10; SOFTWARE." />
<option name="myName" value="MIT" />
</copyright>
</component>
\ No newline at end of file
<component name="CopyrightManager">
<settings default="MIT">
<module2copyright>
<element module="All" copyright="MIT" />
</module2copyright>
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_11" project-jdk-name="11" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
This diff is collapsed.
MIT License
Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ MIT License
~
~ Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
~
~ Permission is hereby granted, free of charge, to any person obtaining a copy
~ of this software and associated documentation files (the "Software"), to deal
~ in the Software without restriction, including without limitation the rights
~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
~ copies of the Software, and to permit persons to whom the Software is
~ furnished to do so, subject to the following conditions:
~
~ The above copyright notice and this permission notice shall be included in all
~ copies or substantial portions of the Software.
~
~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
~ SOFTWARE.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>distributed.systems</groupId>
<artifactId>account-manager</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
<mainClass>Application</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class Application {
private static final String VALID_TRANSACTIONS_TOPIC = "valid-transactions";
// تأكدي من مطابقة الـ IP والـ Port لجهازك (الـ Ubuntu)
private static final String BOOTSTRAP_SERVERS = "192.168.109.130:9092";
public static void main(String[] args) {
String consumerGroup = "account-manager";
System.out.println("Consumer is part of consumer group " + consumerGroup);
Consumer<String, Transaction> kafkaConsumer = createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup);
consumeMessages(VALID_TRANSACTIONS_TOPIC, kafkaConsumer);
}
public static void consumeMessages(String topic, Consumer<String, Transaction> kafkaConsumer) {
kafkaConsumer.subscribe(Collections.singletonList(topic));
while (true) {
// قراءة الرسائل من كافكا (ننتظر لمدة ثانية واحدة كحد أقصى)
ConsumerRecords<String, Transaction> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, Transaction> record : records) {
// معالجة كل معاملة (الموافقة عليها)
approveTransaction(record.value());
}
// تثبيت الـ Offsets يدوياً بعد المعالجة لضمان الدقة
kafkaConsumer.commitAsync();
}
}
public static Consumer<String, Transaction> createKafkaConsumer(String bootstrapServers, String consumerGroup) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
// استخدام الـ Deserializers المناسبة (String للمفتاح والـ Custom Transaction للقيمة)
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Transaction.TransactionDeserializer.class.getName());
// --- ميزة الـ Exactly-Once للـ Consumer ---
// لضمان قراءة المعاملات التي تم عمل Commit لها فقط وتجاهل الـ Aborted
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// البدء بالقراءة من أقدم رسالة متوفرة
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(properties);
}
private static void approveTransaction(Transaction transaction) {
System.out.println(String.format("Authorizing transaction for user %s, in the amount of $%.2f",
transaction.getUser(), transaction.getAmount()));
}
}
/*
* MIT License
*
* Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
public class Transaction {
private String user;
private double amount;
private String transactionLocation;
public String getUser() {
return user;
}
public double getAmount() {
return amount;
}
public String getTransactionLocation() {
return transactionLocation;
}
public void setUser(String user) {
this.user = user;
}
public void setAmount(double amount) {
this.amount = amount;
}
public void setTransactionLocation(String transactionLocation) {
this.transactionLocation = transactionLocation;
}
@Override
public String toString() {
return "Transaction{" +
"user='" + user + '\'' +
", amount=" + amount +
", transactionLocation='" + transactionLocation + '\'' +
'}';
}
/**
* Kafka Deserializer implementation.
* Deserializes a Transaction from JSON to a {@link Transaction} object
*/
public static class TransactionDeserializer implements Deserializer<Transaction> {
@Override
public Transaction deserialize(String topic, byte[] data) {
ObjectMapper mapper = new ObjectMapper();
Transaction transaction = null;
try {
transaction = mapper.readValue(data, Transaction.class);
} catch (Exception e) {
e.printStackTrace();
}
return transaction;
}
}
}
#
# MIT License
#
# Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Root logger option
log4j.rootLogger=ERROR, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %-5p %c{1}:%L - %m%n
#
# MIT License
#
# Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Root logger option
log4j.rootLogger=ERROR, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %-5p %c{1}:%L - %m%n
D:\fifth year\DS\banking-system-exercise\account-manager\src\main\java\Transaction.java
D:\fifth year\DS\banking-system-exercise\account-manager\src\main\java\Application.java
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="kafka-producer" />
<module name="banking-api-service" />
<module name="kafka.producer" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<component name="CopyrightManager">
<copyright>
<option name="notice" value=" MIT License&#10;&#10; Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems &amp;amp; Cloud Computing with Java&#10;&#10; Permission is hereby granted, free of charge, to any person obtaining a copy&#10; of this software and associated documentation files (the &quot;Software&quot;), to deal&#10; in the Software without restriction, including without limitation the rights&#10; to use, copy, modify, merge, publish, distribute, sublicense, and/or sell&#10; copies of the Software, and to permit persons to whom the Software is&#10; furnished to do so, subject to the following conditions:&#10;&#10; The above copyright notice and this permission notice shall be included in all&#10; copies or substantial portions of the Software.&#10;&#10; THE SOFTWARE IS PROVIDED &quot;AS IS&quot;, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR&#10; IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,&#10; FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE&#10; AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER&#10; LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,&#10; OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE&#10; SOFTWARE." />
<option name="myName" value="MIT" />
</copyright>
</component>
\ No newline at end of file
<component name="CopyrightManager">
<settings default="MIT">
<module2copyright>
<element module="All" copyright="MIT" />
</module2copyright>
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_11" project-jdk-name="11" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Palette2">
<group name="Swing">
<item class="com.intellij.uiDesigner.HSpacer" tooltip-text="Horizontal Spacer" icon="/com/intellij/uiDesigner/icons/hspacer.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="1" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="com.intellij.uiDesigner.VSpacer" tooltip-text="Vertical Spacer" icon="/com/intellij/uiDesigner/icons/vspacer.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="1" anchor="0" fill="2" />
</item>
<item class="javax.swing.JPanel" icon="/com/intellij/uiDesigner/icons/panel.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3" />
</item>
<item class="javax.swing.JScrollPane" icon="/com/intellij/uiDesigner/icons/scrollPane.png" removable="false" auto-create-binding="false" can-attach-label="true">
<default-constraints vsize-policy="7" hsize-policy="7" anchor="0" fill="3" />
</item>
<item class="javax.swing.JButton" icon="/com/intellij/uiDesigner/icons/button.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="0" fill="1" />
<initial-values>
<property name="text" value="Button" />
</initial-values>
</item>
<item class="javax.swing.JRadioButton" icon="/com/intellij/uiDesigner/icons/radioButton.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="RadioButton" />
</initial-values>
</item>
<item class="javax.swing.JCheckBox" icon="/com/intellij/uiDesigner/icons/checkBox.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="CheckBox" />
</initial-values>
</item>
<item class="javax.swing.JLabel" icon="/com/intellij/uiDesigner/icons/label.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="8" fill="0" />
<initial-values>
<property name="text" value="Label" />
</initial-values>
</item>
<item class="javax.swing.JTextField" icon="/com/intellij/uiDesigner/icons/textField.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JPasswordField" icon="/com/intellij/uiDesigner/icons/passwordField.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JFormattedTextField" icon="/com/intellij/uiDesigner/icons/formattedTextField.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JTextArea" icon="/com/intellij/uiDesigner/icons/textArea.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTextPane" icon="/com/intellij/uiDesigner/icons/textPane.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JEditorPane" icon="/com/intellij/uiDesigner/icons/editorPane.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JComboBox" icon="/com/intellij/uiDesigner/icons/comboBox.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="2" anchor="8" fill="1" />
</item>
<item class="javax.swing.JTable" icon="/com/intellij/uiDesigner/icons/table.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JList" icon="/com/intellij/uiDesigner/icons/list.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="2" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTree" icon="/com/intellij/uiDesigner/icons/tree.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTabbedPane" icon="/com/intellij/uiDesigner/icons/tabbedPane.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSplitPane" icon="/com/intellij/uiDesigner/icons/splitPane.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSpinner" icon="/com/intellij/uiDesigner/icons/spinner.png" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSlider" icon="/com/intellij/uiDesigner/icons/slider.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSeparator" icon="/com/intellij/uiDesigner/icons/separator.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3" />
</item>
<item class="javax.swing.JProgressBar" icon="/com/intellij/uiDesigner/icons/progressbar.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="javax.swing.JToolBar" icon="/com/intellij/uiDesigner/icons/toolbar.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1">
<preferred-size width="-1" height="20" />
</default-constraints>
</item>
<item class="javax.swing.JToolBar$Separator" icon="/com/intellij/uiDesigner/icons/toolbarSeparator.png" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="0" fill="1" />
</item>
<item class="javax.swing.JScrollBar" icon="/com/intellij/uiDesigner/icons/scrollbar.png" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="0" anchor="0" fill="2" />
</item>
</group>
</component>
</project>
\ No newline at end of file
This diff is collapsed.
MIT License
Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ MIT License
~
~ Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
~
~ Permission is hereby granted, free of charge, to any person obtaining a copy
~ of this software and associated documentation files (the "Software"), to deal
~ in the Software without restriction, including without limitation the rights
~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
~ copies of the Software, and to permit persons to whom the Software is
~ furnished to do so, subject to the following conditions:
~
~ The above copyright notice and this permission notice shall be included in all
~ copies or substantial portions of the Software.
~
~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
~ SOFTWARE.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>distributed.systems</groupId>
<artifactId>banking-api-service</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
<mainClass>Application</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* Banking API Service
*/
public class Application {
private static final String SUSPICIOUS_TRANSACTIONS_TOPIC = "suspicious-transactions";
private static final String VALID_TRANSACTIONS_TOPIC = "valid-transactions";
private static final String BOOTSTRAP_SERVERS = "192.168.109.130:9092";
public static void main(String[] args) {
Producer<String, Transaction> kafkaProducer = createKafkaProducer(BOOTSTRAP_SERVERS);
// البدء بالعملية (Mandatory for Exactly-Once)
kafkaProducer.initTransactions();
try {
processTransactions(new IncomingTransactionsReader(), new UserResidenceDatabase(), kafkaProducer);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
} finally {
kafkaProducer.flush();
kafkaProducer.close();
}
}
public static void processTransactions(IncomingTransactionsReader incomingTransactionsReader,
UserResidenceDatabase userResidenceDatabase,
Producer<String, Transaction> kafkaProducer) throws ExecutionException, InterruptedException {
try {
// 1. بدء معاملة Kafka (Transaction) - هي بداية ضمان الـ Exactly-Once
kafkaProducer.beginTransaction();
int count = 0;
System.out.println(">>> Starting a new Transactional Batch...");
while (incomingTransactionsReader.hasNext()) {
Transaction transaction = incomingTransactionsReader.next();
// استخراج بلد إقامة المستخدم لمقارنته بمكان العملية
String userHomeCountry = userResidenceDatabase.getUserResidence(transaction.getUser());
// تحديد الـ Topic المناسب (Valid أو Suspicious)
String topic = transaction.getTransactionLocation().equals(userHomeCountry)
? VALID_TRANSACTIONS_TOPIC : SUSPICIOUS_TRANSACTIONS_TOPIC;
// إرسال العملية مع تحديد الـ Key (userId) لضمان الـ Key-Based Routing
ProducerRecord<String, Transaction> record =
new ProducerRecord<>(topic, transaction.getUser(), transaction);
kafkaProducer.send(record);
// --- أسطر إضافية مخصصة للاختبار العملي (EOS Testing) ---
count++;
System.out.println("Processing transaction #" + count + " for user: " + transaction.getUser());
// تأخير 500ms (نصف ثانية) لكل عملية عشان تلحقي تضغطي زر الإيقاف الأحمر (Stop)
Thread.sleep(500);
// -------------------------------------------------------
}
// 2. إنهاء المعاملة بنجاح وتثبيت كل الرسائل بـ Kafka
kafkaProducer.commitTransaction();
System.out.println(">>> SUCCESS: All " + count + " transactions committed successfully!");
} catch (Exception e) {
// 3. في حال حدوث خطأ أو "إيقاف يدوي"، يتم إلغاء كل ما أُرسل ولم يُثبت بعد
System.err.println("!!! CRITICAL ERROR or MANUAL STOP: Aborting transaction...");
kafkaProducer.abortTransaction();
e.printStackTrace();
}
}
public static Producer<String, Transaction> createKafkaProducer(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "banking-api-service");
// استخدام StringSerializer للمفتاح (userId) والـ TransactionSerializer للقيمة (Object)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Transaction.TransactionSerializer.class.getName());
// --- إعدادات الـ Exactly-Once و الـ Idempotence ---
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // منع التكرار في حال إعادة الإرسال
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "banking-system-tx-id"); // معرف المعاملة الفريد
// زيادة الأمان عند الإرسال
properties.put(ProducerConfig.ACKS_CONFIG, "all");
return new KafkaProducer<>(properties);
}
}
\ No newline at end of file
/*
* MIT License
*
* Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
import java.io.InputStream;
import java.util.*;
/**
* Mocks an HTTP server that receives purchase transactions in real time
*/
public class IncomingTransactionsReader implements Iterator<Transaction> {
private static final String INPUT_TRANSACTIONS_FILE = "user-transactions.txt";
private final List<Transaction> transactions;
private final Iterator<Transaction> transactionIterator;
public IncomingTransactionsReader(){
this.transactions = loadTransactions();
this.transactionIterator = transactions.iterator();
}
private List<Transaction> loadTransactions() {
InputStream inputStream = getClass().getClassLoader().getResourceAsStream(INPUT_TRANSACTIONS_FILE);
Scanner scanner = new Scanner(inputStream);
List<Transaction> transactions = new ArrayList<>();
while (scanner.hasNextLine()) {
String[] transaction = scanner.nextLine().split(" ");
String user = transaction[0];
String transactionLocation = transaction[1];
double amount = Double.valueOf(transaction[2]);
transactions.add(new Transaction(user, amount, transactionLocation));
}
return Collections.unmodifiableList(transactions);
}
@Override
public boolean hasNext() {
return transactionIterator.hasNext();
}
@Override
public Transaction next() {
return transactionIterator.next();
}
}
/*
* MIT License
*
* Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;
public class Transaction {
private String user;
private double amount;
private String transactionLocation;
public Transaction(String user, double amount, String transactionLocation) {
this.user = user;
this.amount = amount;
this.transactionLocation = transactionLocation;
}
public String getUser() {
return user;
}
public double getAmount() {
return amount;
}
public String getTransactionLocation() {
return transactionLocation;
}
@Override
public String toString() {
return "Transaction{" +
"user='" + user + '\'' +
", amount=" + amount +
", transactionLocation='" + transactionLocation + '\'' +
'}';
}
/**
* Kafka Serializer implementation.
* Serializes a Transaction to JSON so it can be sent to a Kafka Topic
*/
public static class TransactionSerializer implements Serializer<Transaction> {
@Override
public byte[] serialize(String topic, Transaction data) {
byte[] serializedData = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
serializedData = objectMapper.writeValueAsString(data).getBytes();
} catch (Exception e) {
e.printStackTrace();
}
return serializedData;
}
}
}
/*
* MIT License
*
* Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
/**
* Mock database that contains a map from a user to its country of residence
*/
public class UserResidenceDatabase {
private static final String USER_RESIDENCE_FILE = "user-residence.txt";
private final Map<String, String> userToResidenceMap;
public UserResidenceDatabase(){
this.userToResidenceMap = loadUsersResidenceFromFile();
}
/**
* Returns the user's country of residence
*/
public String getUserResidence(String user) {
if (!userToResidenceMap.containsKey(user)) {
throw new RuntimeException("user " + user + " doesn't exist");
}
return userToResidenceMap.get(user);
}
private Map<String, String> loadUsersResidenceFromFile() {
Map<String, String> userToResidence = new HashMap<>();
InputStream inputStream = getClass().getClassLoader().getResourceAsStream(USER_RESIDENCE_FILE);
Scanner scanner = new Scanner(inputStream);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
String []userResidencePair = line.split(" ");
userToResidence.put(userResidencePair[0], userResidencePair[1]);
}
return Collections.unmodifiableMap(userToResidence);
}
}
#
# MIT License
#
# Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Root logger option
log4j.rootLogger=ERROR, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %-5p %c{1}:%L - %m%n
john1967 Germany
msmith2015 Oregon
tom_stevens12 England
jimliang1988 Ireland
arthur_wilson India
mary_johson98 Germany
smith_teresa Spain
andrewb1999 Turkey
cameron-dion Texas
arjun4444 Canada
rehansh_jk2001 Netherlands
mia_fisher Italy
annabecker_1997 Sweden
\ No newline at end of file
john1967 Germany 110
msmith2015 California 50
tom_stevens12 England 1000
jimliang1988 Ireland 70
arthur_wilson India 5.99
mary_johson98 Japan 44.50
smith_teresa Spain 800
andrewb1999 Turkey 745.70
cameron-dion Florida 100
arjun4444 Canada 99.95
rehansh_jk2001 Netherlands 7.50
mia_fisher Italy 150
annabecker_1997 Brazil 90
\ No newline at end of file
#
# MIT License
#
# Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Root logger option
log4j.rootLogger=ERROR, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %-5p %c{1}:%L - %m%n
john1967 Germany
msmith2015 Oregon
tom_stevens12 England
jimliang1988 Ireland
arthur_wilson India
mary_johson98 Germany
smith_teresa Spain
andrewb1999 Turkey
cameron-dion Texas
arjun4444 Canada
rehansh_jk2001 Netherlands
mia_fisher Italy
annabecker_1997 Sweden
\ No newline at end of file
john1967 Germany 110
msmith2015 California 50
tom_stevens12 England 1000
jimliang1988 Ireland 70
arthur_wilson India 5.99
mary_johson98 Japan 44.50
smith_teresa Spain 800
andrewb1999 Turkey 745.70
cameron-dion Florida 100
arjun4444 Canada 99.95
rehansh_jk2001 Netherlands 7.50
mia_fisher Italy 150
annabecker_1997 Brazil 90
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="kafka.consumer" />
<module name="reporting-service" />
<module name="kafka-consumer" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<component name="CopyrightManager">
<copyright>
<option name="notice" value=" MIT License&#10;&#10; Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems &amp;amp; Cloud Computing with Java&#10;&#10; Permission is hereby granted, free of charge, to any person obtaining a copy&#10; of this software and associated documentation files (the &quot;Software&quot;), to deal&#10; in the Software without restriction, including without limitation the rights&#10; to use, copy, modify, merge, publish, distribute, sublicense, and/or sell&#10; copies of the Software, and to permit persons to whom the Software is&#10; furnished to do so, subject to the following conditions:&#10;&#10; The above copyright notice and this permission notice shall be included in all&#10; copies or substantial portions of the Software.&#10;&#10; THE SOFTWARE IS PROVIDED &quot;AS IS&quot;, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR&#10; IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,&#10; FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE&#10; AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER&#10; LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,&#10; OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE&#10; SOFTWARE." />
<option name="myName" value="MIT" />
</copyright>
</component>
\ No newline at end of file
<component name="CopyrightManager">
<settings default="MIT">
<module2copyright>
<element module="All" copyright="MIT" />
</module2copyright>
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_11" project-jdk-name="11" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
This diff is collapsed.
MIT License
Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ MIT License
~
~ Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
~
~ Permission is hereby granted, free of charge, to any person obtaining a copy
~ of this software and associated documentation files (the "Software"), to deal
~ in the Software without restriction, including without limitation the rights
~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
~ copies of the Software, and to permit persons to whom the Software is
~ furnished to do so, subject to the following conditions:
~
~ The above copyright notice and this permission notice shall be included in all
~ copies or substantial portions of the Software.
~
~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
~ SOFTWARE.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>distributed.systems</groupId>
<artifactId>reporting-service</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
<mainClass>Application</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
public class Application {
private static final String VALID_TRANSACTIONS_TOPIC = "valid-transactions";
private static final String SUSPICIOUS_TRANSACTIONS_TOPIC = "suspicious-transactions";
// تأكدي من استخدام الـ IP الخاص بك
private static final String BOOTSTRAP_SERVERS = "192.168.109.130:9092";
public static void main(String[] args) {
// قررنا اسم المجموعة ليكون معبراً عن الخدمة
String consumerGroup = "reporting-service";
System.out.println("Consumer is part of consumer group " + consumerGroup);
Consumer<String, Transaction> kafkaConsumer = createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup);
// الاشتراك في القائمة غير القابلة للتعديل من المواضيع (Valid و Suspicious)
consumeMessages(Collections.unmodifiableList(Arrays.asList(SUSPICIOUS_TRANSACTIONS_TOPIC, VALID_TRANSACTIONS_TOPIC)), kafkaConsumer);
}
public static void consumeMessages(List<String> topics, Consumer<String, Transaction> kafkaConsumer) {
// 1. الاشتراك في المواضيع المطلوبة
kafkaConsumer.subscribe(topics);
// 2. حلقة استهلاك الرسائل
while (true) {
ConsumerRecords<String, Transaction> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Transaction> record : records) {
// 3. تمرير اسم التوبيك مع الرسالة للتمييز بين نوع التقارير
recordTransactionForReporting(record.topic(), record.value());
}
}
}
public static Consumer<String, Transaction> createKafkaConsumer(String bootstrapServers, String consumerGroup) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
// ضبط الـ Deserializers
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Transaction.TransactionDeserializer.class.getName());
// ضمان قراءة البيانات المؤكدة فقط (Exactly-Once)
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// البدء من أقدم رسالة لضمان عدم ضياع أي بيانات تاريخية في التقارير
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// تفعيل الـ Auto Commit للتبسيط في خدمة التقارير
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new KafkaConsumer<>(properties);
}
private static void recordTransactionForReporting(String topic, Transaction transaction) {
if (topic.equals(SUSPICIOUS_TRANSACTIONS_TOPIC)) {
System.out.println(String.format("Recording suspicious transaction for user %s, amount of " +
"$%.2f originating in %s for further investigation",
transaction.getUser(), transaction.getAmount(), transaction.getTransactionLocation()));
} else if (topic.equals(VALID_TRANSACTIONS_TOPIC)) {
System.out.println(String.format("Recording transaction for user %s, amount $%.2f to show it on user's " +
"monthly statement",
transaction.getUser(), transaction.getAmount()));
}
}
}
\ No newline at end of file
/*
* MIT License
*
* Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
public class Transaction {
private String user;
private double amount;
private String transactionLocation;
public String getUser() {
return user;
}
public double getAmount() {
return amount;
}
public String getTransactionLocation() {
return transactionLocation;
}
public void setUser(String user) {
this.user = user;
}
public void setAmount(double amount) {
this.amount = amount;
}
public void setTransactionLocation(String transactionLocation) {
this.transactionLocation = transactionLocation;
}
@Override
public String toString() {
return "Transaction{" +
"user='" + user + '\'' +
", amount=" + amount +
", transactionLocation='" + transactionLocation + '\'' +
'}';
}
/**
* Kafka Deserializer implementation.
* Deserializes a Transaction from JSON to a {@link Transaction} object
*/
public static class TransactionDeserializer implements Deserializer<Transaction> {
@Override
public Transaction deserialize(String topic, byte[] data) {
ObjectMapper mapper = new ObjectMapper();
Transaction transaction = null;
try {
transaction = mapper.readValue(data, Transaction.class);
} catch (Exception e) {
e.printStackTrace();
}
return transaction;
}
}
}
#
# MIT License
#
# Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Root logger option
log4j.rootLogger=ERROR, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %-5p %c{1}:%L - %m%n
#
# MIT License
#
# Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Root logger option
log4j.rootLogger=ERROR, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %-5p %c{1}:%L - %m%n
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="kafka.consumer" />
<module name="user-notification-service" />
<module name="notifications-service" />
<module name="kafka-consumer" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<component name="CopyrightManager">
<copyright>
<option name="notice" value=" MIT License&#10;&#10; Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems &amp;amp; Cloud Computing with Java&#10;&#10; Permission is hereby granted, free of charge, to any person obtaining a copy&#10; of this software and associated documentation files (the &quot;Software&quot;), to deal&#10; in the Software without restriction, including without limitation the rights&#10; to use, copy, modify, merge, publish, distribute, sublicense, and/or sell&#10; copies of the Software, and to permit persons to whom the Software is&#10; furnished to do so, subject to the following conditions:&#10;&#10; The above copyright notice and this permission notice shall be included in all&#10; copies or substantial portions of the Software.&#10;&#10; THE SOFTWARE IS PROVIDED &quot;AS IS&quot;, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR&#10; IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,&#10; FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE&#10; AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER&#10; LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,&#10; OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE&#10; SOFTWARE." />
<option name="myName" value="MIT" />
</copyright>
</component>
\ No newline at end of file
<component name="CopyrightManager">
<settings default="MIT">
<module2copyright>
<element module="All" copyright="MIT" />
</module2copyright>
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_11" default="false" project-jdk-name="11" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
This diff is collapsed.
MIT License
Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ MIT License
~
~ Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
~
~ Permission is hereby granted, free of charge, to any person obtaining a copy
~ of this software and associated documentation files (the "Software"), to deal
~ in the Software without restriction, including without limitation the rights
~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
~ copies of the Software, and to permit persons to whom the Software is
~ furnished to do so, subject to the following conditions:
~
~ The above copyright notice and this permission notice shall be included in all
~ copies or substantial portions of the Software.
~
~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
~ SOFTWARE.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>distributed.systems</groupId>
<artifactId>user-notification-service</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
<mainClass>Application</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class Application {
private static final String SUSPICIOUS_TRANSACTIONS_TOPIC = "suspicious-transactions";
// تأكدي من وضع الـ IP الصحيح لـ Ubuntu الخاص بك
private static final String BOOTSTRAP_SERVERS = "192.168.109.130:9092";
public static void main(String[] args) {
String consumerGroup = "user-notification-service";
System.out.println("Consumer is part of consumer group " + consumerGroup);
Consumer<String, Transaction> kafkaConsumer = createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup);
consumeMessages(SUSPICIOUS_TRANSACTIONS_TOPIC, kafkaConsumer);
}
public static void consumeMessages(String topic, Consumer<String, Transaction> kafkaConsumer) {
// 1. الاشتراك في التوبيك الخاص بالمعاملات المشبوهة
kafkaConsumer.subscribe(Collections.singletonList(topic));
// 2. حلقة القراءة المستمرة
while (true) {
ConsumerRecords<String, Transaction> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Transaction> record : records) {
// 3. إرسال إشعار للمستخدم عن كل معاملة مشبوهة
sendUserNotification(record.value());
}
// الـ Commit هون تلقائي لأن ENABLE_AUTO_COMMIT_CONFIG مضبوطة على true
}
}
public static Consumer<String, Transaction> createKafkaConsumer(String bootstrapServers, String consumerGroup) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "notification-group-final-test");
// ضبط الـ Deserializers (String للمفتاح والـ Custom Transaction للقيمة)
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Transaction.TransactionDeserializer.class.getName());
// لضمان قراءة البيانات المؤكدة فقط (EOS)
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// البدء من أقدم رسالة في حال كان المستهلك جديداً
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new KafkaConsumer<>(properties);
}
private static void sendUserNotification(Transaction transaction) {
System.out.println(
String.format("Sending user %s notification about a suspicious transaction of $%.2f in their account " +
"originating in %s",
transaction.getUser(),
transaction.getAmount(),
transaction.getTransactionLocation()));
}
}
\ No newline at end of file
/*
* MIT License
*
* Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
public class Transaction {
private String user;
private double amount;
private String transactionLocation;
public String getUser() {
return user;
}
public double getAmount() {
return amount;
}
public String getTransactionLocation() {
return transactionLocation;
}
public void setUser(String user) {
this.user = user;
}
public void setAmount(double amount) {
this.amount = amount;
}
public void setTransactionLocation(String transactionLocation) {
this.transactionLocation = transactionLocation;
}
@Override
public String toString() {
return "Transaction{" +
"user='" + user + '\'' +
", amount=" + amount +
", transactionLocation='" + transactionLocation + '\'' +
'}';
}
/**
* Kafka Deserializer implementation.
* Deserializes a Transaction from JSON to a {@link Transaction} object
*/
public static class TransactionDeserializer implements Deserializer<Transaction> {
@Override
public Transaction deserialize(String topic, byte[] data) {
ObjectMapper mapper = new ObjectMapper();
Transaction transaction = null;
try {
transaction = mapper.readValue(data, Transaction.class);
} catch (Exception e) {
e.printStackTrace();
}
return transaction;
}
}
}
#
# MIT License
#
# Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Root logger option
log4j.rootLogger=ERROR, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %-5p %c{1}:%L - %m%n
#
# MIT License
#
# Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Root logger option
log4j.rootLogger=ERROR, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %-5p %c{1}:%L - %m%n
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment