/*
 * Decompiled with CFR 0.152.
 */
package org.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.example.DataBaseHandler;
import org.example.TransactionInfo;
import org.example.TransactionInfoDeserializer;
import org.example.WritingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AccountManagerKafkaConsumer {
    private static final String TOPIC = "VALID_TOPIC";
    private static final String BOOTSTRAP_SERVERS = "192.168.181.136:9092,localhost:9093,localhost:9094";
    public static final Logger log = LoggerFactory.getLogger(AccountManagerKafkaConsumer.class);

    public static void main(String[] args) {
        String consumerGroup = "defaultConsumerGroup";
        if (args.length == 1) {
            consumerGroup = args[0];
        }
        System.out.println("Consumer is part of consumer group " + consumerGroup);
        Consumer<Long, TransactionInfo> kafkaConsumer = AccountManagerKafkaConsumer.createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup);
        AccountManagerKafkaConsumer.consumeMessages(TOPIC, kafkaConsumer);
    }

    public static void consumeMessages(String topic, Consumer<Long, TransactionInfo> kafkaConsumer) {
        ObjectMapper objectMapper = new ObjectMapper();
        kafkaConsumer.subscribe(Collections.singletonList(topic));
        while (true) {
            ConsumerRecords<Long, TransactionInfo> consumerRecords;
            if ((consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1L))).isEmpty()) {
                continue;
            }
            for (ConsumerRecord<Long, TransactionInfo> consumerRecord : consumerRecords) {
                TransactionInfo transactionInfo = consumerRecord.value();
                System.out.println("Key: " + String.valueOf(consumerRecord.key()));
                System.out.println("Received: " + transactionInfo.toString());
                try {
                    WritingHelper writingHelper = new WritingHelper(transactionInfo);
                    writingHelper.writeToLog();
                    boolean success = DataBaseHandler.editAmount(transactionInfo.getName(), transactionInfo.getAmount());
                    if (!success) {
                        log.warn(transactionInfo.getName() + ": Insufficient balance to withdraw amount");
                    }
                    System.out.println(writingHelper);
                }
                catch (Exception e) {
                    System.out.println("Error processing transaction");
                    System.out.println(e.getMessage());
                    log.error("Failed to process transaction: " + transactionInfo.toString(), e);
                }
            }
            kafkaConsumer.commitAsync();
        }
    }

    public static Consumer<Long, TransactionInfo> createKafkaConsumer(String bootstrapServers, String consumerGroup) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.deserializer", LongDeserializer.class.getName());
        properties.put("value.deserializer", TransactionInfoDeserializer.class.getName());
        properties.put("group.id", consumerGroup);
        properties.put("enable.auto.commit", (Object)false);
        return new KafkaConsumer<Long, TransactionInfo>(properties);
    }
}

