My Project
Loading...
Searching...
No Matches
Static Public Member Functions | List of all members
Application Class Reference

Consumer service that manages and authorizes valid financial transactions. More...

Static Public Member Functions

static void main (String[] args)
 Application entry point.
 
static void consumeMessages (String topic, Consumer< String, Transaction > kafkaConsumer)
 Subscription and processing loop for Kafka messages.
 
static Consumer< String, TransactioncreateKafkaConsumer (String bootstrapServers, String consumerGroup)
 Configures and initializes a Kafka Consumer.
 
static void main (String[] args)
 Main method to start the Banking API Service.
 
static void processTransactions (IncomingTransactionsReader reader, UserResidenceDatabase db, Producer< String, Transaction > producer)
 Orchestrates the flow from transaction ingestion to Kafka production.
 
static Producer< String, TransactioncreateKafkaProducer (String bootstrapServers)
 Configures and initializes a high-reliability Kafka Producer.
 
static void main (String[] args)
 Application entry point.
 
static void consumeMessages (List< String > topics, Consumer< String, Transaction > kafkaConsumer)
 Consumes and processes records from multiple Kafka topics.
 
static Consumer< String, TransactioncreateKafkaConsumer (String bootstrapServers, String consumerGroup)
 Configures a Kafka Consumer with shared reporting settings.
 
static void main (String[] args)
 Application entry point.
 
static void consumeMessages (String topic, Consumer< String, Transaction > kafkaConsumer)
 Main processing loop for incoming suspicious transactions.
 
static Consumer< String, TransactioncreateKafkaConsumer (String bootstrapServers, String consumerGroup)
 Configures and initializes a Kafka Consumer for notifications.
 

Detailed Description

Consumer service that manages and authorizes valid financial transactions.

Consumer service responsible for alerting users of potential fraud.

Centralized reporting service for all bank transactions.

The central orchestrator of the distributed banking system.

The Account Manager service subscribes to the "valid-transactions" Kafka topic. Its primary responsibility is to simulate the final authorization of funds and ensure that transactions are processed reliably using manual offset commits.

The Banking API Service acts as the primary Producer. It receives incoming credit card transactions, performs a fraud check by querying a Redis-backed UserResidenceDatabase, and routes the data to either the 'valid' or 'suspicious' Kafka topics.

This service subscribes to both "valid-transactions" and "suspicious-transactions" topics. It acts as a logger that persists transaction history to the local disk, categorizing them into 'normal' and 'sus' folders for auditing and monthly statement generation.

This microservice monitors the "suspicious-transactions" Kafka topic. When a suspicious transaction is detected by the Banking API, this service notifies the user via console output and logs the notification to a persistent file system for audit purposes.

Member Function Documentation

◆ consumeMessages() [1/3]

static void Application.consumeMessages ( List< String >  topics,
Consumer< String, Transaction kafkaConsumer 
)
static

Consumes and processes records from multiple Kafka topics.

Subscribes to the provided list of topics and enters a polling loop. Each record is delegated to the reporting logic based on its source topic.

Parameters
topicsList of topic names to monitor.
kafkaConsumerThe initialized Kafka consumer instance.
Note
Safeguard against deserialization errors.

◆ consumeMessages() [2/3]

static void Application.consumeMessages ( String  topic,
Consumer< String, Transaction kafkaConsumer 
)
static

Subscription and processing loop for Kafka messages.

This method subscribes to the specified topic, polls Kafka for new records, processes them by calling approveTransaction, and manually commits offsets to ensure data consistency.

Parameters
topicThe name of the topic to subscribe to.
kafkaConsumerThe initialized Kafka consumer instance.
Note
Manual offset management: We commit offsets only after the batch has been successfully processed to prevent data loss.

◆ consumeMessages() [3/3]

static void Application.consumeMessages ( String  topic,
Consumer< String, Transaction kafkaConsumer 
)
static

Main processing loop for incoming suspicious transactions.

Continuously polls the Kafka topic and triggers user notifications for every received transaction.

Parameters
topicThe name of the suspicious transactions topic.
kafkaConsumerThe initialized Kafka consumer instance.
Note
Null check ensures the application doesn't crash if a malformed message fails to deserialize.

◆ createKafkaConsumer() [1/3]

static Consumer< String, Transaction > Application.createKafkaConsumer ( String  bootstrapServers,
String  consumerGroup 
)
static

Configures and initializes a Kafka Consumer.

Sets up deserializers for Transaction objects and disables auto-commit to allow for manual offset management.

Parameters
bootstrapServersThe list of Kafka brokers.
consumerGroupThe ID of the consumer group this instance belongs to.
Returns
Consumer<String, Transaction> A configured Kafka consumer instance.

Manual offset management is enabled by setting auto-commit to false. 'earliest' ensures the consumer reads from the start of the topic if no offset exists.

◆ createKafkaConsumer() [2/3]

static Consumer< String, Transaction > Application.createKafkaConsumer ( String  bootstrapServers,
String  consumerGroup 
)
static

Configures a Kafka Consumer with shared reporting settings.

Enables auto-commit for reporting purposes, assuming a "best-effort" logging requirement where slight data loss on crash is acceptable for higher throughput.

Parameters
bootstrapServersThe Kafka cluster addresses.
consumerGroupThe reporting service group ID.
Returns
Consumer<String, Transaction> A configured Kafka consumer.

◆ createKafkaConsumer() [3/3]

static Consumer< String, Transaction > Application.createKafkaConsumer ( String  bootstrapServers,
String  consumerGroup 
)
static

Configures and initializes a Kafka Consumer for notifications.

Sets the group ID and uses automatic offset committing for high throughput.

Parameters
bootstrapServersThe list of Kafka brokers.
consumerGroupThe ID for the notification service group.
Returns
Consumer<String, Transaction> A configured Kafka consumer instance.

Auto-commit is enabled here for simple "at-most-once" delivery semantics.

◆ createKafkaProducer()

static Producer< String, Transaction > Application.createKafkaProducer ( String  bootstrapServers)
static

Configures and initializes a high-reliability Kafka Producer.

Applies settings for Exactly-Once Semantics (EOS) to prevent double-charging or duplicate records in the event of network retries.

Parameters
bootstrapServersThe addresses of the Kafka cluster.
Returns
Producer<String, Transaction> A configured Kafka producer instance.

Exactly-Once Semantics (EOS):

  • Idempotence: Assigns sequence numbers to prevent duplicates.
  • Acks=all: Ensures all replicas acknowledge the write.

◆ main() [1/4]

static void Application.main ( String[]  args)
static

Application entry point.

Initializes the consumer group and starts the infinite message consumption loop.

Parameters
argsCommand line arguments (not used).

◆ main() [2/4]

static void Application.main ( String[]  args)
static

Main method to start the Banking API Service.

Initializes the Kafka producer and begins processing the transaction stream. Ensures the producer is flushed and closed gracefully on shutdown.

Parameters
argsCommand line arguments.

◆ main() [3/4]

static void Application.main ( String[]  args)
static

Application entry point.

Initializes the consumer and prepares an unmodifiable list of topics to subscribe to.

Parameters
argsCommand line arguments.

◆ main() [4/4]

static void Application.main ( String[]  args)
static

Application entry point.

Sets up the consumer group and starts monitoring the suspicious transaction stream.

Parameters
argsCommand line arguments.

◆ processTransactions()

static void Application.processTransactions ( IncomingTransactionsReader  reader,
UserResidenceDatabase  db,
Producer< String, Transaction producer 
)
static

Orchestrates the flow from transaction ingestion to Kafka production.

This method implements the core fraud detection logic:

  1. Retrieve the user's home country from Redis.
  2. Compare it to the transaction's reported location.
  3. Route the message to the appropriate topic using the user ID as the Kafka Key.

    Robustness: If a user is missing from the database (e.g., 'Sara'), the method catches the RuntimeException, logs an error, and continues processing the next transaction to prevent service downtime.

Parameters
readerThe source providing incoming transaction objects.
dbThe Redis-based residence database for fraud checks.
producerThe Kafka producer used for message routing.
Note
Key-Based Routing: Passing transaction.getUser() as the Key ensures all transactions for the same user land in the same Kafka partition, guaranteeing processing order.

The documentation for this class was generated from the following files: