Commit 1fb33502 authored by mohammad.salama's avatar mohammad.salama

initial

parents
# Default ignored files
/shelf/
/workspace.xml
<component name="ArtifactManager">
<artifact type="jar" name="BankingAPI:jar">
<output-path>$PROJECT_DIR$/out/artifacts/BankingAPI_jar</output-path>
<root id="archive" name="BankingAPI.jar">
<element id="module-output" name="BankingAPI" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/springframework/spring-jcl/6.0.13/spring-jcl-6.0.13.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/datatype/jackson-datatype-jdk8/2.15.3/jackson-datatype-jdk8-2.15.3.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/logging/log4j/log4j-api/2.20.0/log4j-api-2.20.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-annotations/2.15.3/jackson-annotations-2.15.3.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/module/jackson-module-parameter-names/2.15.3/jackson-module-parameter-names-2.15.3.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/yaml/snakeyaml/1.33/snakeyaml-1.33.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/redis/clients/jedis/3.7.0/jedis-3.7.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.10.5/snappy-java-1.1.10.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-core/2.15.3/jackson-core-2.15.3.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/springframework/boot/spring-boot-autoconfigure/3.1.5/spring-boot-autoconfigure-3.1.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/tomcat/embed/tomcat-embed-core/10.1.15/tomcat-embed-core-10.1.15.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/springframework/spring-core/6.0.13/spring-core-6.0.13.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/micrometer/micrometer-observation/1.11.5/micrometer-observation-1.11.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/ch/qos/logback/logback-classic/1.4.11/logback-classic-1.4.11.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/springframework/boot/spring-boot-starter-logging/3.1.5/spring-boot-starter-logging-3.1.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/springframework/boot/spring-boot-starter/3.1.5/spring-boot-starter-3.1.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/jakarta/annotation/jakarta.annotation-api/2.1.1/jakarta.annotation-api-2.1.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/github/luben/zstd-jni/1.5.5-6/zstd-jni-1.5.5-6.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/2.0.9/slf4j-api-2.0.9.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/springframework/spring-beans/6.0.13/spring-beans-6.0.13.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/springframework/spring-expression/6.0.13/spring-expression-6.0.13.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/springframework/boot/spring-boot-starter-json/3.1.5/spring-boot-starter-json-3.1.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/springframework/boot/spring-boot-starter-web/3.1.5/spring-boot-starter-web-3.1.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/springframework/spring-web/6.0.13/spring-web-6.0.13.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/logging/log4j/log4j-to-slf4j/2.20.0/log4j-to-slf4j-2.20.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/tomcat/embed/tomcat-embed-websocket/10.1.15/tomcat-embed-websocket-10.1.15.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/springframework/spring-context/6.0.13/spring-context-6.0.13.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/jul-to-slf4j/2.0.9/jul-to-slf4j-2.0.9.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/springframework/boot/spring-boot-devtools/3.1.5/spring-boot-devtools-3.1.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/springframework/boot/spring-boot/3.1.5/spring-boot-3.1.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/ch/qos/logback/logback-core/1.4.11/logback-core-1.4.11.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/springframework/spring-aop/6.0.13/spring-aop-6.0.13.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-databind/2.15.3/jackson-databind-2.15.3.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/micrometer/micrometer-commons/1.11.5/micrometer-commons-1.11.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/tomcat/embed/tomcat-embed-el/10.1.15/tomcat-embed-el-10.1.15.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/springframework/boot/spring-boot-starter-tomcat/3.1.5/spring-boot-starter-tomcat-3.1.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/datatype/jackson-datatype-jsr310/2.15.3/jackson-datatype-jsr310-2.15.3.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/springframework/spring-webmvc/6.0.13/spring-webmvc-6.0.13.jar" path-in-jar="/" />
</root>
</artifact>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="BankingAPI" />
</profile>
</annotationProcessing>
</component>
<component name="JavacSettings">
<option name="ADDITIONAL_OPTIONS_OVERRIDE">
<module name="BankingAPI" options="-parameters" />
</option>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17 (2)" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
2024-04-15 14:20:19.419 [restartedMain] INFO com.example.Application - Starting Application using Java 17 with PID 1348 (D:\Second Semester\Advanced DS\Practical\New folder\BankingAPI\target\classes started by dell in D:\Second Semester\Advanced DS\Practical\New folder\BankingAPI)
2024-04-15 14:20:19.424 [restartedMain] DEBUG com.example.Application - Running with Spring Boot v3.1.5, Spring v6.0.13
2024-04-15 14:20:19.426 [restartedMain] INFO com.example.Application - No active profile set, falling back to 1 default profile: "default"
2024-04-15 14:20:24.879 [restartedMain] INFO com.example.Application - Started Application in 6.272 seconds (process running for 7.98)
2024-04-15 14:20:28.271 [http-nio-8080-exec-1] INFO com.example.MyProducer - Registering a transaction for Mr/Ms: Ali
2024-04-15 14:20:29.491 [http-nio-8080-exec-1] INFO com.example.MyProducer - Record for Mr/Ms: Ali - with (key: 1713180028757, value: TransactionInfo{name='Ali', location='jableh', amount=1001.0}), was sent to (partition: 0, offset: 8)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
<relativePath/>
</parent>
<groupId>org.example</groupId>
<artifactId>BankingAPI</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.15.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.3</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
\ No newline at end of file
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import redis.clients.jedis.Jedis;
@SpringBootApplication
public class Application
{
static String IP = "localhost";
public static void main(String[] args)
{
if (args.length > 0)IP = args[0];
SpringApplication.run(Application.class, args);
}
@Bean
public Jedis jedis()
{
return new Jedis(IP, 6379);
}
}
\ No newline at end of file
package com.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import redis.clients.jedis.Jedis;
@RestController
public class BankingAPI {
private static final String SUSPICIOUS_TOPIC = "SUSPICIOUS_TOPIC";
private static final String VALID_TOPIC = "VALID_TOPIC";
@Autowired
private Jedis jedis;
@PostMapping("/check-transaction")
public String checkTransaction(@RequestBody TransactionInfo transactionInfo) {
String name = transactionInfo.getName();
String location = transactionInfo.getLocation();
double amount = transactionInfo.getAmount();
String jsonObject = jedis.get(name);
ObjectMapper objectMapper = new ObjectMapper();
System.out.println(jsonObject);
String country = "";
try {
MyRecord jsonInfo = objectMapper.readValue(jsonObject, MyRecord.class);
country = jsonInfo.getCountry();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
System.out.println("country: "+country);
System.out.println("location: "+location);
if (country == null) {
return "User not found in the database.";
} else {
if (country.equals(location)) {
MyProducer.logTransaction(transactionInfo,VALID_TOPIC);
return "Transaction is valid.";
} else {
MyProducer.logTransaction(transactionInfo,SUSPICIOUS_TOPIC);
return "Transaction is suspicious.";
}
}
}
}
\ No newline at end of file
package com.example;
//
//import com.fasterxml.jackson.core.JsonProcessingException;
//import com.fasterxml.jackson.databind.ObjectMapper;
//import org.apache.kafka.clients.producer.*;
//import org.apache.kafka.common.serialization.LongSerializer;
//import org.apache.kafka.common.serialization.StringSerializer;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//
//import java.util.Properties;
//import java.util.concurrent.ExecutionException;
//
//public class MyProducer
//{
// //private static final String TOPIC = "";
// private static final String BOOTSTRAP_SERVERS = "192.168.181.136:9092";
// private static final Logger log = LoggerFactory.getLogger(MyProducer.class);
// public static boolean logTransaction(TransactionInfo transactionInfo , String topic)
// {
//
// log.info("Registering A Transaction for Mr/Ms : " + transactionInfo.getName());
// ObjectMapper objectMapper = new ObjectMapper();
//
// String value = null;
// try
// {
// value = objectMapper.writeValueAsString(transactionInfo);
// }
// catch (JsonProcessingException e)
// {
// log.error("Transaction for Mr/Ms : " + transactionInfo.getName() + " Could Not Be Processed To Json");
// return false;
// }
//
// Producer<Long, String> kafkaProducer = createKafkaProducer(BOOTSTRAP_SERVERS);
//
// long timeStamp = System.currentTimeMillis();
//
// ProducerRecord<Long , String> record = new ProducerRecord<>(topic , timeStamp ,value);
//
// RecordMetadata recordMetadata = null;
// try
// {
// recordMetadata = kafkaProducer.send(record).get();
// }
// catch (InterruptedException | ExecutionException e)
// {
// log.error("Could Not Send Data to Kafka , Or Could Not Get Meta-Data");
// return false;
// }
//
//
// log.info((String.format("Record for Mr/Ms : %s - with (key: %s, value: %s), " +
// "was sent to (partition: %d, offset: %d)",
// transactionInfo.getName(),record.key(),
// record.value(), recordMetadata.partition(),
// recordMetadata.offset())));
//
// System.out.println((String.format("Record for Mr/Ms : %s - with (key: %s, value: %s), " +
// "was sent to (partition: %d, offset: %d)",
// transactionInfo.getName(),record.key(),
// record.value(), recordMetadata.partition(),
// recordMetadata.offset())));
// kafkaProducer.flush();
// kafkaProducer.close();
// return true;
// }
//
// public static Producer<Long, String> createKafkaProducer(String bootstrapServers) {
// Properties properties = new Properties();
// properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// properties.put(ProducerConfig.CLIENT_ID_CONFIG, "events-producer");
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
// properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//
// return new KafkaProducer<>(properties);
// }
//
//}
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MyProducer {
private static final String BOOTSTRAP_SERVERS = "192.168.181.136:9092";
private static final Logger log = LoggerFactory.getLogger(MyProducer.class);
public static final ObjectMapper objectMapper = new ObjectMapper();
public static boolean logTransaction(TransactionInfo transactionInfo, String topic) {
log.info("Registering a transaction for Mr/Ms: " + transactionInfo.getName());
Serializer<TransactionInfo> valueSerializer = new TransactionInfoSerializer();
byte[] valueBytes;
try {
valueBytes = valueSerializer.serialize(topic, transactionInfo);
} catch (SerializationException e) {
log.error("Transaction for Mr/Ms: " + transactionInfo.getName() + " could not be serialized");
return false;
}
Producer<Long, TransactionInfo> kafkaProducer = createKafkaProducer(BOOTSTRAP_SERVERS);
long timeStamp = System.currentTimeMillis();
ProducerRecord<Long, TransactionInfo> record = new ProducerRecord<>(topic, timeStamp, transactionInfo);
RecordMetadata recordMetadata;
try {
recordMetadata = kafkaProducer.send(record).get();
} catch (InterruptedException | ExecutionException e) {
log.error("Could not send data to Kafka or could not get metadata");
return false;
}
log.info(String.format("Record for Mr/Ms: %s - with (key: %s, value: %s), " +
"was sent to (partition: %d, offset: %d)",
transactionInfo.getName(), record.key(),
record.value().toString(), recordMetadata.partition(),
recordMetadata.offset()));
System.out.println(String.format("Record for Mr/Ms: %s - with (key: %s, value: %s), " +
"was sent to (partition: %d, offset: %d)",
transactionInfo.getName(), record.key(),
record.value().toString(), recordMetadata.partition(),
recordMetadata.offset()));
kafkaProducer.flush();
kafkaProducer.close();
return true;
}
public static Producer<Long, TransactionInfo> createKafkaProducer(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "events-producer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TransactionInfoSerializer.class.getName());
return new KafkaProducer<>(properties);
}
}
package com.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MyRecord
{
/*SET JohnDoe "{\"id\": 1, \"name\": \"John Doe\", \"email\": \"johndoe@example.com\"}"*/
private String country;
private Double balance;
private Integer warnings;
public MyRecord(){}
/*
data base key = our user name
*/
public MyRecord(String country , double balance , int warnings)
{
this.balance = balance;
this.country = country;
this.warnings = warnings;
}
@Override
public String toString()
{
String ans = null;
ObjectMapper objectMapper = new ObjectMapper();
try
{
ans = objectMapper.writeValueAsString(this);
}
catch (JsonProcessingException e)
{
throw new RuntimeException(e);
}
finally {return ans;}
}
public boolean modifyBalance(String jsonData , double amount)
{
try
{
MyRecord myRecord = reverseJson(jsonData);
if (myRecord.balance >= amount)
{
myRecord.balance -= amount;
return true;
}
return false;
}
catch (JsonProcessingException e)
{
throw new RuntimeException(e);
}
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public Double getBalance() {
return balance;
}
public void setBalance(Double balance) {
this.balance = balance;
}
public Integer getWarnings() {
return warnings;
}
public void setWarnings(Integer warnings) {
this.warnings = warnings;
}
private MyRecord reverseJson(String json) throws JsonProcessingException
{
ObjectMapper objectMapper = new ObjectMapper();
MyRecord myRecord = objectMapper.readValue(json , MyRecord.class);
return myRecord;
}
}
package com.example;
import java.io.Serializable;
public class TransactionInfo implements Serializable {
private String name;
private String location;
private double amount;
public TransactionInfo()
{}
public TransactionInfo(String name, String location, double amount) {
this.name = name;
this.location = location;
this.amount = amount;
}
public TransactionInfo(String location, double amount) {
this.location = location;
this.amount = amount;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount) {
this.amount = amount;
}
@Override
public String toString() {
return "TransactionInfo{" +
"name='" + name + '\'' +
", location='" + location + '\'' +
", amount=" + amount +
'}';
}
}
\ No newline at end of file
package com.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.charset.StandardCharsets;
public class TransactionInfoSerializer implements Serializer<TransactionInfo> {
@Override
public byte[] serialize(String topic, TransactionInfo data) {
try {
return MyProducer.objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException("Error serializing TransactionInfo", e);
}
}
}
Manifest-Version: 1.0
Main-Class: com.example.Application
spring.redis.host=localhost
spring.redis.port=6379
spring.kafka.bootstrap-servers=localhost:9092
<configuration>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>logs/application.log</file>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="com.example" level="DEBUG" additivity="false">
<appender-ref ref="FILE"/>
</logger>
<root level="error">
<appender-ref ref="FILE" />
</root>
</configuration>
\ No newline at end of file
Manifest-Version: 1.0
Main-Class: com.example.Application
spring.redis.host=localhost
spring.redis.port=6379
spring.kafka.bootstrap-servers=localhost:9092
<configuration>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>logs/application.log</file>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="com.example" level="DEBUG" additivity="false">
<appender-ref ref="FILE"/>
</logger>
<root level="error">
<appender-ref ref="FILE" />
</root>
</configuration>
\ No newline at end of file
<component name="ArtifactManager">
<artifact type="jar" name="Consumer-Account-Manager:jar">
<output-path>$PROJECT_DIR$/out/artifacts/Consumer_Account_Manager_jar</output-path>
<root id="archive" name="Consumer-Account-Manager.jar">
<element id="module-output" name="Consumer-Account-Manager" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.10.5/snappy-java-1.1.10.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/ch/qos/logback/logback-core/1.2.10/logback-core-1.2.10.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/github/luben/zstd-jni/1.5.5-6/zstd-jni-1.5.5-6.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/redis/clients/jedis/3.7.0/jedis-3.7.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/ch/qos/logback/logback-classic/1.2.10/logback-classic-1.2.10.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-annotations/2.12.1/jackson-annotations-2.12.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-databind/2.12.1/jackson-databind-2.12.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-core/2.12.1/jackson-core-2.12.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/commons/commons-pool2/2.10.0/commons-pool2-2.10.0.jar" path-in-jar="/" />
</root>
</artifact>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="Consumer" />
<module name="Consumer-Account-Manager" />
<module name="Consumer-ِAccount-Manager" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17 (2)" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Palette2">
<group name="Swing">
<item class="com.intellij.uiDesigner.HSpacer" tooltip-text="Horizontal Spacer" icon="/com/intellij/uiDesigner/icons/hspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="1" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="com.intellij.uiDesigner.VSpacer" tooltip-text="Vertical Spacer" icon="/com/intellij/uiDesigner/icons/vspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="1" anchor="0" fill="2" />
</item>
<item class="javax.swing.JPanel" icon="/com/intellij/uiDesigner/icons/panel.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3" />
</item>
<item class="javax.swing.JScrollPane" icon="/com/intellij/uiDesigner/icons/scrollPane.svg" removable="false" auto-create-binding="false" can-attach-label="true">
<default-constraints vsize-policy="7" hsize-policy="7" anchor="0" fill="3" />
</item>
<item class="javax.swing.JButton" icon="/com/intellij/uiDesigner/icons/button.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="0" fill="1" />
<initial-values>
<property name="text" value="Button" />
</initial-values>
</item>
<item class="javax.swing.JRadioButton" icon="/com/intellij/uiDesigner/icons/radioButton.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="RadioButton" />
</initial-values>
</item>
<item class="javax.swing.JCheckBox" icon="/com/intellij/uiDesigner/icons/checkBox.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="CheckBox" />
</initial-values>
</item>
<item class="javax.swing.JLabel" icon="/com/intellij/uiDesigner/icons/label.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="8" fill="0" />
<initial-values>
<property name="text" value="Label" />
</initial-values>
</item>
<item class="javax.swing.JTextField" icon="/com/intellij/uiDesigner/icons/textField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JPasswordField" icon="/com/intellij/uiDesigner/icons/passwordField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JFormattedTextField" icon="/com/intellij/uiDesigner/icons/formattedTextField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JTextArea" icon="/com/intellij/uiDesigner/icons/textArea.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTextPane" icon="/com/intellij/uiDesigner/icons/textPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JEditorPane" icon="/com/intellij/uiDesigner/icons/editorPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JComboBox" icon="/com/intellij/uiDesigner/icons/comboBox.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="2" anchor="8" fill="1" />
</item>
<item class="javax.swing.JTable" icon="/com/intellij/uiDesigner/icons/table.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JList" icon="/com/intellij/uiDesigner/icons/list.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="2" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTree" icon="/com/intellij/uiDesigner/icons/tree.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTabbedPane" icon="/com/intellij/uiDesigner/icons/tabbedPane.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSplitPane" icon="/com/intellij/uiDesigner/icons/splitPane.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSpinner" icon="/com/intellij/uiDesigner/icons/spinner.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSlider" icon="/com/intellij/uiDesigner/icons/slider.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSeparator" icon="/com/intellij/uiDesigner/icons/separator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3" />
</item>
<item class="javax.swing.JProgressBar" icon="/com/intellij/uiDesigner/icons/progressbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="javax.swing.JToolBar" icon="/com/intellij/uiDesigner/icons/toolbar.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1">
<preferred-size width="-1" height="20" />
</default-constraints>
</item>
<item class="javax.swing.JToolBar$Separator" icon="/com/intellij/uiDesigner/icons/toolbarSeparator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="0" fill="1" />
</item>
<item class="javax.swing.JScrollBar" icon="/com/intellij/uiDesigner/icons/scrollbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="0" anchor="0" fill="2" />
</item>
</group>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ArtifactsWorkspaceSettings">
<artifacts-to-build>
<artifact name="Consumer-Account-Manager:jar" />
</artifacts-to-build>
</component>
<component name="AutoImportSettings">
<option name="autoReloadType" value="SELECTIVE" />
</component>
<component name="ChangeListManager">
<list default="true" id="68795978-b1a2-4702-95fa-e8e7afae617a" name="Changes" comment="" />
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="FileTemplateManagerImpl">
<option name="RECENT_TEMPLATES">
<list>
<option value="Class" />
</list>
</option>
</component>
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$/.." />
</component>
<component name="MarkdownSettingsMigration">
<option name="stateVersion" value="1" />
</component>
<component name="ProjectId" id="2e2xOWSyAwlrJgJsmMdML8arM85" />
<component name="ProjectLevelVcsManager" settingsEditedManually="true" />
<component name="ProjectViewState">
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent"><![CDATA[{
"keyToString": {
"RunOnceActivity.OpenProjectViewOnStart": "true",
"RunOnceActivity.ShowReadmeOnStart": "true",
"SHARE_PROJECT_CONFIGURATION_FILES": "true",
"last_opened_file_path": "D:/Second Semester/Advanced DS/Practical/New folder/Consumer-User-Notification",
"project.structure.last.edited": "Artifacts",
"project.structure.proportion": "0.0",
"project.structure.side.proportion": "0.2",
"settings.editor.selected.configurable": "project.propVCSSupport.DirectoryMappings"
}
}]]></component>
<component name="RecentsManager">
<key name="CopyFile.RECENT_KEYS">
<recent name="D:\HIAST\FIY\SS\Advanced Distributed Systems\Lab\4\java-kafka-consumer\Consumer\src\main\resources" />
</key>
<key name="CopyClassDialog.RECENTS_KEY">
<recent name="org.example" />
</key>
</component>
<component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="68795978-b1a2-4702-95fa-e8e7afae617a" name="Changes" comment="" />
<created>1711116444789</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1711116444789</updated>
</task>
<servers />
</component>
<component name="Vcs.Log.Tabs.Properties">
<option name="TAB_STATES">
<map>
<entry key="MAIN">
<value>
<State />
</value>
</entry>
</map>
</option>
</component>
</project>
\ No newline at end of file
2024-03-24 13:03:47,981 INFO org.example.AccountManagerKafkaConsumer [main] Name : Abd --- Location of Transaction : hama --- amount of transaction : 2542.2
2024-03-24 13:05:54,114 INFO org.example.AccountManagerKafkaConsumer [main] Name : Abd --- Location of Transaction : hama --- amount of transaction : 2542.2
2024-03-24 13:08:37,822 INFO org.example.AccountManagerKafkaConsumer [main] Name : Ali --- Location of Transaction : jableh --- amount of transaction : 2542.2
2024-04-15 13:37:07,663 INFO org.example.AccountManagerKafkaConsumer [main] Name : Ali --- Location of Transaction : jableh --- amount of transaction : 1001.0
2024-04-15 13:37:09,803 ERROR org.example.AccountManagerKafkaConsumer [main] Failed to process transaction: org.example.TransactionInfo@15b986cd
redis.clients.jedis.exceptions.JedisConnectionException: Failed to create socket.
at redis.clients.jedis.DefaultJedisSocketFactory.createSocket(DefaultJedisSocketFactory.java:110)
at redis.clients.jedis.Connection.connect(Connection.java:226)
at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:140)
at redis.clients.jedis.Connection.sendCommand(Connection.java:163)
at redis.clients.jedis.BinaryClient.get(BinaryClient.java:207)
at redis.clients.jedis.Client.get(Client.java:95)
at redis.clients.jedis.Jedis.get(Jedis.java:258)
at org.example.DataBaseHandler.editAmount(DataBaseHandler.java:15)
at org.example.AccountManagerKafkaConsumer.consumeMessages(AccountManagerKafkaConsumer.java:54)
at org.example.AccountManagerKafkaConsumer.main(AccountManagerKafkaConsumer.java:31)
Caused by: java.net.SocketTimeoutException: Connect timed out
at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:546)
at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:597)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:327)
at java.base/java.net.Socket.connect(Socket.java:633)
at redis.clients.jedis.DefaultJedisSocketFactory.createSocket(DefaultJedisSocketFactory.java:80)
... 9 common frames omitted
2024-04-15 13:38:20,594 INFO org.example.AccountManagerKafkaConsumer [main] Name : Ali --- Location of Transaction : jableh --- amount of transaction : 1001.0
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.10</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.10</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
\ No newline at end of file
package org.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class AccountManagerKafkaConsumer {
private static final String TOPIC = "VALID_TOPIC";
private static final String BOOTSTRAP_SERVERS = "192.168.181.136:9092,localhost:9093,localhost:9094";
public static final Logger log = LoggerFactory.getLogger(AccountManagerKafkaConsumer.class);
public static void main(String[] args) {
String consumerGroup = "defaultConsumerGroup";//"ValidTransactionsGroup";
if (args.length == 1) {
consumerGroup = args[0];
}
System.out.println("Consumer is part of consumer group " + consumerGroup);
Consumer<Long, TransactionInfo> kafkaConsumer = createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup);
consumeMessages(TOPIC, kafkaConsumer);
}
public static void consumeMessages(String topic, Consumer<Long, TransactionInfo> kafkaConsumer) {
ObjectMapper objectMapper = new ObjectMapper();
kafkaConsumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<Long, TransactionInfo> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
if (consumerRecords.isEmpty()) {
continue;
}
for (ConsumerRecord<Long, TransactionInfo> record : consumerRecords) {
TransactionInfo transactionInfo = record.value();
System.out.println("Key: " + record.key());
System.out.println("Received: " + transactionInfo.toString());
try {
WritingHelper writingHelper = new WritingHelper(transactionInfo);
writingHelper.writeToLog();
boolean success = DataBaseHandler.editAmount(transactionInfo.getName(), transactionInfo.getAmount());
if (!success) {
log.warn(transactionInfo.getName() + ": Insufficient balance to withdraw amount");
}
System.out.println(writingHelper);
} catch (Exception e) {
System.out.println("Error processing transaction");
System.out.println(e.getMessage());
log.error("Failed to process transaction: " + transactionInfo.toString(), e);
}
}
kafkaConsumer.commitAsync();
}
}
public static Consumer<Long, TransactionInfo> createKafkaConsumer(String bootstrapServers, String consumerGroup) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TransactionInfoDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new KafkaConsumer<>(properties);
}
}
\ No newline at end of file
package org.example;
import redis.clients.jedis.Jedis;
public class DataBaseHandler
{
private static final String REDIS_HOST = "localhost";
private static final int REDIS_PORT = 6379;
private static final String BALANCE_FIELD = "balance";
private static Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);
public static boolean editAmount(String name , double amount)
{
String jsonData = jedis.get(name);
MyRecord myRecord = new MyRecord();
boolean result = myRecord.modifyBalance(jsonData , amount);
jsonData = myRecord.toString();
if (jsonData == null) return false;
jedis.set(name , jsonData);
return result;
}
}
package org.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MyRecord
{
/*SET JohnDoe "{\"id\": 1, \"name\": \"John Doe\", \"email\": \"johndoe@example.com\"}"*/
private String country;
private Double balance;
private Integer warnings;
public MyRecord(){}
/*
data base key = our user name
*/
public MyRecord(String country , double balance , int warnings)
{
this.balance = balance;
this.country = country;
this.warnings = warnings;
}
@Override
public String toString()
{
String ans = null;
ObjectMapper objectMapper = new ObjectMapper();
try
{
ans = objectMapper.writeValueAsString(this);
}
catch (JsonProcessingException e)
{
throw new RuntimeException(e);
}
finally {return ans;}
}
public boolean modifyBalance(String jsonData , double amount)
{
try
{
MyRecord myRecord = reverseJson(jsonData);
if (myRecord.balance >= amount)
{
myRecord.balance -= amount;
return true;
}
return false;
}
catch (JsonProcessingException e)
{
throw new RuntimeException(e);
}
}
private MyRecord reverseJson(String json) throws JsonProcessingException
{
ObjectMapper objectMapper = new ObjectMapper();
MyRecord myRecord = objectMapper.readValue(json , MyRecord.class);
return myRecord;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public Double getBalance() {
return balance;
}
public void setBalance(Double balance) {
this.balance = balance;
}
public Integer getWarnings() {
return warnings;
}
public void setWarnings(Integer warnings) {
this.warnings = warnings;
}
}
package org.example;
import java.io.Serializable;
public class TransactionInfo implements Serializable
{
private String name;
private String location;
private double amount;
public TransactionInfo()
{}
public TransactionInfo(String name , String location , double amount)
{
this.amount = amount;
this.name = name;
this.location = location;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount)
{
this.amount = amount;
}
@Override
public String toString() {
return "TransactionInfo{" +
"name='" + name + '\'' +
", location='" + location + '\'' +
", amount=" + amount +
'}';
}
}
\ No newline at end of file
package org.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import java.io.IOException;
public class TransactionInfoDeserializer implements Deserializer<TransactionInfo> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public TransactionInfo deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(data, TransactionInfo.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
\ No newline at end of file
package org.example;
public class WritingHelper
{
String name;
String location;
Double amount;
public WritingHelper(TransactionInfo transactionInfo)
{
name = transactionInfo.getName();
location = transactionInfo.getLocation();
amount = transactionInfo.getAmount();
}
@Override
public String toString()
{
String ans = "";
Double x = amount;
ans += "Name : " + name + " --- Location of Transaction : " + location
+ " --- amount of transaction : " + x.toString();
return ans;
}
public void writeToLog()
{
String ans = toString();
AccountManagerKafkaConsumer.log.info(ans);
}
}
Manifest-Version: 1.0
Main-Class: org.example.AccountManagerKafkaConsumer
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="HOME_LOG" value="./myLogs/AccountManager-Log"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>${HOME_LOG}</file>
<append>true</append>
<immediateFlush>true</immediateFlush>
<encoder>
<pattern>%d %p %c{1.} [%t] %m%n</pattern>
</encoder>
</appender>
<logger name="org.example" level="DEBUG" additivity="false">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
</logger>
<root level="error">
<appender-ref ref="FILE"/>
</root>
</configuration>
\ No newline at end of file
Manifest-Version: 1.0
Main-Class: org.example.AccountManagerKafkaConsumer
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="HOME_LOG" value="./myLogs/AccountManager-Log"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>${HOME_LOG}</file>
<append>true</append>
<immediateFlush>true</immediateFlush>
<encoder>
<pattern>%d %p %c{1.} [%t] %m%n</pattern>
</encoder>
</appender>
<logger name="org.example" level="DEBUG" additivity="false">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
</logger>
<root level="error">
<appender-ref ref="FILE"/>
</root>
</configuration>
\ No newline at end of file
# Default ignored files
/shelf/
/workspace.xml
<component name="ArtifactManager">
<artifact type="jar" name="Consumer-Reporting:jar">
<output-path>$PROJECT_DIR$/out/artifacts/Consumer_Reporting_jar</output-path>
<root id="archive" name="Consumer-Reporting.jar">
<element id="module-output" name="Consumer-Reporting" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/github/luben/zstd-jni/1.5.5-6/zstd-jni-1.5.5-6.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-databind/2.12.1/jackson-databind-2.12.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-core/2.12.1/jackson-core-2.12.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.10.5/snappy-java-1.1.10.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-annotations/2.12.1/jackson-annotations-2.12.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/ch/qos/logback/logback-core/1.2.10/logback-core-1.2.10.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/ch/qos/logback/logback-classic/1.2.10/logback-classic-1.2.10.jar" path-in-jar="/" />
</root>
</artifact>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="Consumer-Reporting" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17 (2)" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
2024-04-15 13:29:51,016 INFO org.example.ReportingKafkaConsumer [main] Topic: VALID_TOPIC --- Name : Ali --- Location of Transaction : jableh --- amount of transaction : 1001.0
2024-04-15 13:30:19,361 INFO org.example.ReportingKafkaConsumer [main] Topic: SUSPICIOUS_TOPIC --- Name : Abd --- Location of Transaction : jableh --- amount of transaction : 1001.0
2024-04-15 13:36:59,856 INFO org.example.ReportingKafkaConsumer [main] Topic: SUSPICIOUS_TOPIC --- Name : Abd --- Location of Transaction : jableh --- amount of transaction : 1001.0
2024-04-15 13:37:07,400 INFO org.example.ReportingKafkaConsumer [main] Topic: VALID_TOPIC --- Name : Ali --- Location of Transaction : jableh --- amount of transaction : 1001.0
2024-04-15 13:38:19,102 INFO org.example.ReportingKafkaConsumer [main] Topic: VALID_TOPIC --- Name : Ali --- Location of Transaction : jableh --- amount of transaction : 1001.0
2024-04-15 13:43:58,654 INFO org.example.ReportingKafkaConsumer [main] Topic: SUSPICIOUS_TOPIC --- Name : Abd --- Location of Transaction : jableh --- amount of transaction : 1001.0
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Consumer-Reporting</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.10</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.10</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.clients.consumer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class ReportingKafkaConsumer {
private static List<String> TOPICS = Arrays.asList("VALID_TOPIC", "SUSPICIOUS_TOPIC");
private static final String BOOTSTRAP_SERVERS = "192.168.181.136:9092,localhost:9093,localhost:9094";
static final Logger log = LoggerFactory.getLogger(ReportingKafkaConsumer.class);
public static void main(String[] args) {
String consumerGroup = "defaultAliConsumerGroup";
if (args.length == 1) {
consumerGroup = args[0];
}
System.out.println("Consumer is part of consumer group " + consumerGroup);
Consumer<Long, TransactionInfo> kafkaConsumer = createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup);
System.out.println("created successfully");
consumeMessages(TOPICS, kafkaConsumer);
}
public static void consumeMessages(List<String> topics, Consumer<Long, TransactionInfo> kafkaConsumer) {
ObjectMapper objectMapper = new ObjectMapper();
kafkaConsumer.subscribe(topics);
System.out.println("subscribed successfully");
while (true) {
ConsumerRecords<Long, TransactionInfo> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
if (consumerRecords.isEmpty()) {
continue;
}
for (ConsumerRecord<Long, TransactionInfo> record : consumerRecords) {
TransactionInfo transactionInfo = record.value();
String topic = record.topic();
System.out.println("key : " + record.key());
System.out.println("Received : " + transactionInfo.toString());
try {
WritingHelper writingHelper = new WritingHelper(transactionInfo, topic);
System.out.println(writingHelper);
writingHelper.writeToLog();
} catch (Exception e) {
System.out.println("Error Parsing");
System.out.println(e.getMessage());
log.error("Could Not Process Transaction: " + transactionInfo.toString());
}
}
kafkaConsumer.commitAsync();
}
}
public static Consumer<Long, TransactionInfo> createKafkaConsumer(String bootstrapServers, String consumerGroup) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TransactionInfoDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new KafkaConsumer<>(properties);
}
}
package org.example;
import java.io.Serializable;
public class TransactionInfo implements Serializable
{
private String name;
private String location;
private double amount;
public TransactionInfo()
{}
public TransactionInfo(String name , String location , double amount)
{
this.amount = amount;
this.name = name;
this.location = location;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount)
{
this.amount = amount;
}
}
\ No newline at end of file
package org.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import java.io.IOException;
public class TransactionInfoDeserializer implements Deserializer<TransactionInfo> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public TransactionInfo deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(data, TransactionInfo.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
\ No newline at end of file
package org.example;
public class WritingHelper
{
String topic;
String name;
String location;
Double amount;
public WritingHelper(TransactionInfo transactionInfo, String topic)
{
this.topic = topic;
name = transactionInfo.getName();
location = transactionInfo.getLocation();
amount = transactionInfo.getAmount();
}
@Override
public String toString()
{
String ans = "";
Double x = amount;
ans += "Topic: " + topic + " --- Name : " + name + " --- Location of Transaction : " + location
+ " --- amount of transaction : " + x.toString();
return ans;
}
public void writeToLog()
{
String ans = toString();
ReportingKafkaConsumer.log.info(ans);
}
}
Manifest-Version: 1.0
Main-Class: org.example.ReportingKafkaConsumer
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="HOME_LOG" value="./myLogs/Reporting-Log"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>${HOME_LOG}</file>
<append>true</append>
<immediateFlush>true</immediateFlush>
<encoder>
<pattern>%d %p %c{1.} [%t] %m%n</pattern>
</encoder>
</appender>
<logger name="org.example" level="DEBUG" additivity="false">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
</logger>
<root level="error">
<appender-ref ref="FILE"/>
</root>
</configuration>
\ No newline at end of file
Manifest-Version: 1.0
Main-Class: org.example.ReportingKafkaConsumer
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="HOME_LOG" value="./myLogs/Reporting-Log"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>${HOME_LOG}</file>
<append>true</append>
<immediateFlush>true</immediateFlush>
<encoder>
<pattern>%d %p %c{1.} [%t] %m%n</pattern>
</encoder>
</appender>
<logger name="org.example" level="DEBUG" additivity="false">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
</logger>
<root level="error">
<appender-ref ref="FILE"/>
</root>
</configuration>
\ No newline at end of file
<component name="ArtifactManager">
<artifact type="jar" name="Consumer:jar">
<output-path>$PROJECT_DIR$/out/artifacts/Consumer_jar</output-path>
<root id="archive" name="Consumer.jar">
<element id="module-output" name="Consumer" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-databind/2.12.1/jackson-databind-2.12.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/github/luben/zstd-jni/1.5.5-6/zstd-jni-1.5.5-6.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-core/2.12.1/jackson-core-2.12.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/fasterxml/jackson/core/jackson-annotations/2.12.1/jackson-annotations-2.12.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.10.5/snappy-java-1.1.10.5.jar" path-in-jar="/" />
</root>
</artifact>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="Consumer" />
<module name="Consumer-Account-Manager" />
<module name="Consumer-User-Notification" />
<module name="Consumer-ِAccount-Manager" />
</profile>
</annotationProcessing>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17 (2)" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Palette2">
<group name="Swing">
<item class="com.intellij.uiDesigner.HSpacer" tooltip-text="Horizontal Spacer" icon="/com/intellij/uiDesigner/icons/hspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="1" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="com.intellij.uiDesigner.VSpacer" tooltip-text="Vertical Spacer" icon="/com/intellij/uiDesigner/icons/vspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="1" anchor="0" fill="2" />
</item>
<item class="javax.swing.JPanel" icon="/com/intellij/uiDesigner/icons/panel.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3" />
</item>
<item class="javax.swing.JScrollPane" icon="/com/intellij/uiDesigner/icons/scrollPane.svg" removable="false" auto-create-binding="false" can-attach-label="true">
<default-constraints vsize-policy="7" hsize-policy="7" anchor="0" fill="3" />
</item>
<item class="javax.swing.JButton" icon="/com/intellij/uiDesigner/icons/button.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="0" fill="1" />
<initial-values>
<property name="text" value="Button" />
</initial-values>
</item>
<item class="javax.swing.JRadioButton" icon="/com/intellij/uiDesigner/icons/radioButton.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="RadioButton" />
</initial-values>
</item>
<item class="javax.swing.JCheckBox" icon="/com/intellij/uiDesigner/icons/checkBox.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="CheckBox" />
</initial-values>
</item>
<item class="javax.swing.JLabel" icon="/com/intellij/uiDesigner/icons/label.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="8" fill="0" />
<initial-values>
<property name="text" value="Label" />
</initial-values>
</item>
<item class="javax.swing.JTextField" icon="/com/intellij/uiDesigner/icons/textField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JPasswordField" icon="/com/intellij/uiDesigner/icons/passwordField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JFormattedTextField" icon="/com/intellij/uiDesigner/icons/formattedTextField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JTextArea" icon="/com/intellij/uiDesigner/icons/textArea.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTextPane" icon="/com/intellij/uiDesigner/icons/textPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JEditorPane" icon="/com/intellij/uiDesigner/icons/editorPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JComboBox" icon="/com/intellij/uiDesigner/icons/comboBox.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="2" anchor="8" fill="1" />
</item>
<item class="javax.swing.JTable" icon="/com/intellij/uiDesigner/icons/table.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JList" icon="/com/intellij/uiDesigner/icons/list.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="2" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTree" icon="/com/intellij/uiDesigner/icons/tree.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTabbedPane" icon="/com/intellij/uiDesigner/icons/tabbedPane.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSplitPane" icon="/com/intellij/uiDesigner/icons/splitPane.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSpinner" icon="/com/intellij/uiDesigner/icons/spinner.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSlider" icon="/com/intellij/uiDesigner/icons/slider.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSeparator" icon="/com/intellij/uiDesigner/icons/separator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3" />
</item>
<item class="javax.swing.JProgressBar" icon="/com/intellij/uiDesigner/icons/progressbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="javax.swing.JToolBar" icon="/com/intellij/uiDesigner/icons/toolbar.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1">
<preferred-size width="-1" height="20" />
</default-constraints>
</item>
<item class="javax.swing.JToolBar$Separator" icon="/com/intellij/uiDesigner/icons/toolbarSeparator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="0" fill="1" />
</item>
<item class="javax.swing.JScrollBar" icon="/com/intellij/uiDesigner/icons/scrollbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="0" anchor="0" fill="2" />
</item>
</group>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ArtifactsWorkspaceSettings">
<artifacts-to-build>
<artifact name="Consumer:jar" />
</artifacts-to-build>
</component>
<component name="AutoImportSettings">
<option name="autoReloadType" value="SELECTIVE" />
</component>
<component name="ChangeListManager">
<list default="true" id="68795978-b1a2-4702-95fa-e8e7afae617a" name="Changes" comment="" />
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="FileTemplateManagerImpl">
<option name="RECENT_TEMPLATES">
<list>
<option value="Class" />
</list>
</option>
</component>
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$/.." />
</component>
<component name="MarkdownSettingsMigration">
<option name="stateVersion" value="1" />
</component>
<component name="ProjectId" id="2e2xOWSyAwlrJgJsmMdML8arM85" />
<component name="ProjectLevelVcsManager" settingsEditedManually="true" />
<component name="ProjectViewState">
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent"><![CDATA[{
"keyToString": {
"RunOnceActivity.OpenProjectViewOnStart": "true",
"RunOnceActivity.ShowReadmeOnStart": "true",
"SHARE_PROJECT_CONFIGURATION_FILES": "true",
"last_opened_file_path": "D:/Second Semester/Advanced DS/Practical/New folder/Consumer-User-Notification",
"project.structure.last.edited": "Modules",
"project.structure.proportion": "0.0",
"project.structure.side.proportion": "0.2"
}
}]]></component>
<component name="RecentsManager">
<key name="CopyFile.RECENT_KEYS">
<recent name="D:\HIAST\FIY\SS\Advanced Distributed Systems\Lab\4\java-kafka-producer\Producer\Consumer-User-Notification\src\main\java\org\example" />
<recent name="D:\HIAST\FIY\SS\Advanced Distributed Systems\Lab\4\java-kafka-consumer\Consumer\src\main\resources" />
</key>
<key name="CopyClassDialog.RECENTS_KEY">
<recent name="org.example" />
</key>
</component>
<component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="68795978-b1a2-4702-95fa-e8e7afae617a" name="Changes" comment="" />
<created>1711116444789</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1711116444789</updated>
</task>
<servers />
</component>
<component name="Vcs.Log.Tabs.Properties">
<option name="TAB_STATES">
<map>
<entry key="MAIN">
<value>
<State />
</value>
</entry>
</map>
</option>
</component>
</project>
\ No newline at end of file
2024-04-15 13:43:58,963 INFO org.example.UserNotificationKafkaConsumer [main] Name : Abd --- Location of Transaction : jableh --- amount of transaction : 1001.0
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.10</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.10</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
\ No newline at end of file
package org.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import redis.clients.jedis.Jedis;
public class DataBaseHandler
{
private static final String REDIS_HOST = "localhost";
private static final int REDIS_PORT = 6379;
private static final String SUSPICIOUS_FIELD = "suspicious";
private static Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);
public static void notifyUser(String name)
{
String jsonData = jedis.get(name);
MyRecord myRecord = new MyRecord();
myRecord.increaseNotification(jsonData);
jsonData = myRecord.toString();
if ((jsonData == null) ) return;
jedis.set(name , jsonData);
}
}
package org.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.protocol.types.Field;
public class MyRecord
{
/*SET JohnDoe "{\"id\": 1, \"name\": \"John Doe\", \"email\": \"johndoe@example.com\"}"*/
private String country;
private Double balance;
private Integer warnings;
public MyRecord(){}
/*
data base key = our user name
*/
public MyRecord (String country , double balance , int warnings)
{
this.balance = balance;
this.country = country;
this.warnings = warnings;
}
@Override
public String toString()
{
String ans = null;
ObjectMapper objectMapper = new ObjectMapper();
try
{
ans = objectMapper.writeValueAsString(this);
}
catch (JsonProcessingException e)
{
throw new RuntimeException(e);
}
finally {return ans;}
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public Double getBalance() {
return balance;
}
public void setBalance(Double balance) {
this.balance = balance;
}
public Integer getWarnings() {
return warnings;
}
public void setWarnings(Integer warnings) {
this.warnings = warnings;
}
public void increaseNotification(String jsonData)
{
try
{
MyRecord myRecord = reverseJson(jsonData);
myRecord.warnings++;
}
catch (JsonProcessingException e)
{
throw new RuntimeException(e);
}
}
private MyRecord reverseJson(String json) throws JsonProcessingException
{
ObjectMapper objectMapper = new ObjectMapper();
MyRecord myRecord = objectMapper.readValue(json , MyRecord.class);
return myRecord;
}
}
package org.example;
import java.io.Serializable;
public class TransactionInfo implements Serializable
{
private String name;
private String location;
private double amount;
public TransactionInfo()
{}
public TransactionInfo(String name , String location , double amount)
{
this.amount = amount;
this.name = name;
this.location = location;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount)
{
this.amount = amount;
}
@Override
public String toString() {
return "TransactionInfo{" +
"name='" + name + '\'' +
", location='" + location + '\'' +
", amount=" + amount +
'}';
}
}
\ No newline at end of file
package org.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.io.IOException;
public class TransactionInfoDeserializer implements Deserializer<TransactionInfo> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public TransactionInfo deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(data, TransactionInfo.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
\ No newline at end of file
package org.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class UserNotificationKafkaConsumer {
private static final String TOPIC = "SUSPICIOUS_TOPIC";
private static final String BOOTSTRAP_SERVERS = "192.168.181.136:9092,localhost:9093,localhost:9094";
public static final Logger log = LoggerFactory.getLogger(UserNotificationKafkaConsumer.class);
public static void main(String[] args) {
String consumerGroup = "SuspiciousTransactionsGroup";
if (args.length == 1) {
consumerGroup = args[0];
}
System.out.println("Consumer is part of consumer group " + consumerGroup);
Consumer<Long, TransactionInfo> kafkaConsumer = createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup);
consumeMessages(TOPIC, kafkaConsumer);
}
public static void consumeMessages(String topic, Consumer<Long, TransactionInfo> kafkaConsumer) {
ObjectMapper objectMapper = new ObjectMapper();
kafkaConsumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<Long, TransactionInfo> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
if (consumerRecords.isEmpty()) {
continue;
}
for (ConsumerRecord<Long, TransactionInfo> record : consumerRecords) {
TransactionInfo transactionInfo = record.value();
System.out.println("Key: " + record.key());
System.out.println("Received: " + transactionInfo.toString());
try {
WritingHelper writingHelper = new WritingHelper(transactionInfo);
writingHelper.writeToLog();
DataBaseHandler.notifyUser(transactionInfo.getName());
System.out.println(writingHelper);
} catch (Exception e) {
System.out.println("Error processing transaction");
System.out.println(e.getMessage());
log.error("Failed to process transaction: " + transactionInfo.toString(), e);
}
}
kafkaConsumer.commitAsync();
}
}
public static Consumer<Long, TransactionInfo> createKafkaConsumer(String bootstrapServers, String consumerGroup) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TransactionInfoDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new KafkaConsumer<>(properties);
}
}
\ No newline at end of file
package org.example;
public class WritingHelper
{
String name;
String location;
Double amount;
public WritingHelper(TransactionInfo transactionInfo)
{
name = transactionInfo.getName();
location = transactionInfo.getLocation();
amount = transactionInfo.getAmount();
}
@Override
public String toString()
{
String ans = "";
Double x = amount;
ans += "Name : " + name + " --- Location of Transaction : " + location
+ " --- amount of transaction : " + x.toString();
return ans;
}
public void writeToLog()
{
String ans = toString();
UserNotificationKafkaConsumer.log.info(ans);
}
}
Manifest-Version: 1.0
Main-Class: org.example.UserNotificationKafkaConsumer
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="HOME_LOG" value="./myLogs/AccountManager-Log"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>${HOME_LOG}</file>
<append>true</append>
<immediateFlush>true</immediateFlush>
<encoder>
<pattern>%d %p %c{1.} [%t] %m%n</pattern>
</encoder>
</appender>
<logger name="org.example" level="DEBUG" additivity="false">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
</logger>
<root level="error">
<appender-ref ref="FILE"/>
</root>
</configuration>
\ No newline at end of file
Manifest-Version: 1.0
Main-Class: org.example.UserNotificationKafkaConsumer
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="HOME_LOG" value="./myLogs/AccountManager-Log"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>${HOME_LOG}</file>
<append>true</append>
<immediateFlush>true</immediateFlush>
<encoder>
<pattern>%d %p %c{1.} [%t] %m%n</pattern>
</encoder>
</appender>
<logger name="org.example" level="DEBUG" additivity="false">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
</logger>
<root level="error">
<appender-ref ref="FILE"/>
</root>
</configuration>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment