Commit 313eb69d authored by drnull03's avatar drnull03

Finished reporting-service

parent 2c53725b
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<!-- <project xmlns="http://maven.apache.org/POM/4.0.0"
~ MIT License xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
~ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
~ Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java http://maven.apache.org/xsd/maven-4.0.0.xsd">
~
~ Permission is hereby granted, free of charge, to any person obtaining a copy
~ of this software and associated documentation files (the "Software"), to deal
~ in the Software without restriction, including without limitation the rights
~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
~ copies of the Software, and to permit persons to whom the Software is
~ furnished to do so, subject to the following conditions:
~
~ The above copyright notice and this permission notice shall be included in all
~ copies or substantial portions of the Software.
~
~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
~ SOFTWARE.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>distributed.systems</groupId> <groupId>distributed.systems</groupId>
...@@ -34,40 +12,52 @@ ...@@ -34,40 +12,52 @@
<build> <build>
<plugins> <plugins>
<!-- Java compiler -->
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version> <version>3.11.0</version>
<configuration> <configuration>
<release>11</release> <release>11</release>
</configuration> </configuration>
</plugin> </plugin>
<!-- Create runnable jar with dependencies -->
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId> <artifactId>maven-assembly-plugin</artifactId>
<version>3.6.0</version>
<executions> <executions>
<execution> <execution>
<id>make-assembly</id>
<phase>package</phase> <phase>package</phase>
<goals> <goals>
<goal>single</goal> <goal>single</goal>
</goals> </goals>
</execution> </execution>
</executions> </executions>
<configuration> <configuration>
<archive> <archive>
<manifest> <manifest>
<mainClass>Application</mainClass> <mainClass>Application</mainClass>
</manifest> </manifest>
</archive> </archive>
<descriptorRefs> <descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef> <descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs> </descriptorRefs>
</configuration> </configuration>
</plugin> </plugin>
</plugins> </plugins>
</build> </build>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId> <artifactId>kafka-clients</artifactId>
...@@ -80,6 +70,7 @@ ...@@ -80,6 +70,7 @@
<version>1.7.6</version> <version>1.7.6</version>
<scope>runtime</scope> <scope>runtime</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>log4j</groupId> <groupId>log4j</groupId>
<artifactId>log4j</artifactId> <artifactId>log4j</artifactId>
...@@ -87,7 +78,6 @@ ...@@ -87,7 +78,6 @@
<scope>runtime</scope> <scope>runtime</scope>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId> <artifactId>jackson-core</artifactId>
...@@ -99,5 +89,7 @@ ...@@ -99,5 +89,7 @@
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>
<version>2.10.1</version> <version>2.10.1</version>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
/*
* MIT License
*
* Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration; import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
...@@ -32,46 +13,111 @@ import java.util.List; ...@@ -32,46 +13,111 @@ import java.util.List;
import java.util.Properties; import java.util.Properties;
public class Application { public class Application {
private static final String VALID_TRANSACTIONS_TOPIC = "valid-transactions"; private static final String VALID_TRANSACTIONS_TOPIC = "valid-transactions";
private static final String SUSPICIOUS_TRANSACTIONS_TOPIC = "suspicious-transactions"; private static final String SUSPICIOUS_TRANSACTIONS_TOPIC = "suspicious-transactions";
private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094"; private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
public static void main(String[] args) { public static void main(String[] args) {
String consumerGroup = /** Decide on the name for the consumer group.**/
String consumerGroup = "transaction-reporting-service";
System.out.println("Consumer is part of consumer group " + consumerGroup); System.out.println("Consumer is part of consumer group " + consumerGroup);
Consumer<String, Transaction> kafkaConsumer = createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup); Consumer<String, Transaction> kafkaConsumer =
createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup);
consumeMessages(Collections.unmodifiableList(Arrays.asList(SUSPICIOUS_TRANSACTIONS_TOPIC, VALID_TRANSACTIONS_TOPIC)), kafkaConsumer); consumeMessages(
Collections.unmodifiableList(
Arrays.asList(SUSPICIOUS_TRANSACTIONS_TOPIC, VALID_TRANSACTIONS_TOPIC)
),
kafkaConsumer
);
} }
public static void consumeMessages(List<String> topics, Consumer<String, Transaction> kafkaConsumer) { public static void consumeMessages(List<String> topics, Consumer<String, Transaction> kafkaConsumer) {
/**
* Fill in the code here to subscribe to the provided topics // Subscribe to both topics
* Run in a loop and consume all the transactions kafkaConsumer.subscribe(topics);
* Record the transactions for reporting based on the topic
*/ while (true) {
ConsumerRecords<String, Transaction> records =
kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Transaction> record : records) {
String topic = record.topic();
Transaction transaction = record.value();
recordTransactionForReporting(topic, transaction);
}
}
} }
public static Consumer<String, Transaction> createKafkaConsumer(String bootstrapServers, String consumerGroup) { public static Consumer<String, Transaction> createKafkaConsumer(String bootstrapServers, String consumerGroup) {
/**
* Configure all the Kafka client parameters here Properties properties = new Properties();
* Create and return new Kafka consumer
*/ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Transaction.TransactionDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
return new KafkaConsumer<>(properties);
} }
private static void recordTransactionForReporting(String topic, Transaction transaction) { private static void recordTransactionForReporting(String topic, Transaction transaction) {
String message;
if (topic.equals(SUSPICIOUS_TRANSACTIONS_TOPIC)) { if (topic.equals(SUSPICIOUS_TRANSACTIONS_TOPIC)) {
System.out.println(String.format("Recording suspicious transaction for user %s, amount of " +
"$%.2f originating in %s for further investigation", message = String.format(
transaction.getUser(), transaction.getAmount(), transaction.getTransactionLocation())); "Suspicious transaction for user %s, amount $%.2f originating in %s",
transaction.getUser(),
transaction.getAmount(),
transaction.getTransactionLocation());
System.out.println(message);
saveToFile("../reports/sus", transaction.getUser(), message);
} else if (topic.equals(VALID_TRANSACTIONS_TOPIC)) { } else if (topic.equals(VALID_TRANSACTIONS_TOPIC)) {
System.out.println(String.format("Recording transaction for user %s, amount $%.2f to show it on user's " +
"monthly statement", message = String.format(
transaction.getUser(), transaction.getAmount())); "Valid transaction for user %s, amount $%.2f",
transaction.getUser(),
transaction.getAmount());
System.out.println(message);
saveToFile("../reports/normal", transaction.getUser(), message);
} }
} }
} private static void saveToFile(String folderPath, String user, String message) {
try {
Path dir = Paths.get(folderPath);
// create folder if it doesn't exist
if (!Files.exists(dir)) {
Files.createDirectories(dir);
}
Path file = dir.resolve(user + ".txt");
try (FileWriter writer = new FileWriter(file.toFile(), true)) {
writer.write(message + System.lineSeparator());
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
\ No newline at end of file
#Generated by Maven
#Sat Mar 14 21:36:50 CET 2026
artifactId=reporting-service
groupId=distributed.systems
version=1.0-SNAPSHOT
Transaction.class
Transaction$TransactionDeserializer.class
Application.class
/home/drnull/UNI/adistr/OurBank/reporting-service/src/main/java/Transaction.java
/home/drnull/UNI/adistr/OurBank/reporting-service/src/main/java/Application.java
#Generated by Maven #Generated by Maven
#Sat Mar 14 21:11:42 CET 2026 #Sat Mar 14 21:26:11 CET 2026
artifactId=user-notification-service artifactId=user-notification-service
groupId=distributed.systems groupId=distributed.systems
version=1.0-SNAPSHOT version=1.0-SNAPSHOT
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment