Commit 2c53725b authored by drnull03's avatar drnull03

Finished user-notification-service

parent c9c0b0df
#
# MIT License
#
# Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Root logger option
log4j.rootLogger=ERROR, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %-5p %c{1}:%L - %m%n
#
# MIT License
#
# Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Root logger option
log4j.rootLogger=ERROR, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %-5p %c{1}:%L - %m%n
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ MIT License
~
~ Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
~
~ Permission is hereby granted, free of charge, to any person obtaining a copy
~ of this software and associated documentation files (the "Software"), to deal
~ in the Software without restriction, including without limitation the rights
~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
~ copies of the Software, and to permit persons to whom the Software is
~ furnished to do so, subject to the following conditions:
~
~ The above copyright notice and this permission notice shall be included in all
~ copies or substantial portions of the Software.
~
~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
~ SOFTWARE.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>distributed.systems</groupId>
......@@ -34,40 +12,52 @@
<build>
<plugins>
<!-- Java compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<version>3.11.0</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
<!-- Create runnable jar with dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.6.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
<mainClass>Application</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
......@@ -80,6 +70,7 @@
<version>1.7.6</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
......@@ -87,7 +78,6 @@
<scope>runtime</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
......@@ -99,5 +89,7 @@
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* MIT License
*
* Copyright (c) 2019 Michael Pogrebinsky - Distributed Systems & Cloud Computing with Java
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
......@@ -31,6 +16,7 @@ import java.util.Collections;
import java.util.Properties;
public class Application {
private static final String SUSPICIOUS_TRANSACTIONS_TOPIC = "suspicious-transactions";
private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
......@@ -45,30 +31,69 @@ public class Application {
}
public static void consumeMessages(String topic, Consumer<String, Transaction> kafkaConsumer) {
/**
* Fill in the code here to subscribe to the right topic.
* Run in a loop and read incoming transactions
* For each new transaction, send a notification to the user
*/
kafkaConsumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, Transaction> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Transaction> record : records) {
Transaction transaction = record.value();
sendUserNotification(transaction);
}
}
}
public static Consumer<String, Transaction> createKafkaConsumer(String bootstrapServers, String consumerGroup) {
Properties properties = new Properties();
/**
* Fill in the code here to configure the rest of the Kafka client parameters
*/
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Transaction.TransactionDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
return new KafkaConsumer<>(properties);
}
private static void sendUserNotification(Transaction transaction) {
System.out.println(
String.format("Sending user %s notification about a suspicious transaction of $%.2f in their account " +
"originating in %s",
String message = String.format(
"Sending user %s notification about a suspicious transaction of $%.2f in their account originating in %s",
transaction.getUser(),
transaction.getAmount(),
transaction.getTransactionLocation()));
transaction.getTransactionLocation()
);
System.out.println(message);
try {
Path notificationsDir = Paths.get("..", "notifications");
if (!Files.exists(notificationsDir)) {
Files.createDirectories(notificationsDir);
}
String fileName = transaction.getUser() + ".txt";
Path userFile = notificationsDir.resolve(fileName);
try (FileWriter writer = new FileWriter(userFile.toFile(), true)) {
writer.write(message + System.lineSeparator());
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
\ No newline at end of file
#Generated by Maven
#Sat Mar 14 21:11:42 CET 2026
artifactId=user-notification-service
groupId=distributed.systems
version=1.0-SNAPSHOT
Transaction.class
Transaction$TransactionDeserializer.class
Application.class
/home/drnull/UNI/adistr/OurBank/user-notification-service/src/main/java/Transaction.java
/home/drnull/UNI/adistr/OurBank/user-notification-service/src/main/java/Application.java
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment