/*
 * Decompiled with CFR 0.152.
 */
package org.example;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.example.DataBaseHandler;
import org.example.TransactionInfo;
import org.example.WritingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AccountManagerKafkaConsumer {
    private static final String TOPIC = "VALID_TOPIC";
    private static final String BOOTSTRAP_SERVERS = "192.168.181.136:9092,localhost:9093,localhost:9094";
    static final Logger log = LoggerFactory.getLogger(AccountManagerKafkaConsumer.class);

    public static void main(String[] args) {
        String consumerGroup = "ValidTransactionsGroup";
        if (args.length == 1) {
            consumerGroup = args[0];
        }
        System.out.println("Consumer is part of consumer group " + consumerGroup);
        Consumer<Long, String> kafkaConsumer = AccountManagerKafkaConsumer.createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup);
        AccountManagerKafkaConsumer.consumeMessages(TOPIC, kafkaConsumer);
    }

    public static void consumeMessages(String topic, Consumer<Long, String> kafkaConsumer) {
        ObjectMapper objectMapper = new ObjectMapper();
        kafkaConsumer.subscribe(Collections.singletonList(topic));
        kafkaConsumer.listTopics();
        while (true) {
            ConsumerRecords<Long, String> consumerRecords;
            if ((consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1L))).isEmpty()) {
                System.out.println("empty");
                continue;
            }
            for (ConsumerRecord<Long, String> consumerRecord : consumerRecords) {
                String stringJson = consumerRecord.value();
                System.out.println("key : " + consumerRecord.key());
                System.out.println("Received : " + stringJson);
                try {
                    TransactionInfo transactionInfo = objectMapper.readValue(stringJson, TransactionInfo.class);
                    WritingHelper writingHelper = new WritingHelper(transactionInfo);
                    writingHelper.writeToLog();
                    boolean x = DataBaseHandler.editAmount(transactionInfo.getName(), transactionInfo.getAmount());
                    if (!x) {
                        log.warn(transactionInfo.getName() + " : Balance not Enough to Withdraw Amount");
                    }
                    System.out.println(writingHelper);
                }
                catch (JsonProcessingException e) {
                    System.out.println("Error Parsing");
                    System.out.println(e.getMessage());
                    log.error("Could Not Parse Json in Transaction");
                }
            }
            kafkaConsumer.commitAsync();
        }
    }

    public static Consumer<Long, String> createKafkaConsumer(String bootstrapServers, String consumerGroup) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.deserializer", LongDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        properties.put("group.id", consumerGroup);
        properties.put("enable.auto.commit", (Object)false);
        return new KafkaConsumer<Long, String>(properties);
    }
}

