/*
 * Decompiled with CFR 0.152.
 */
package org.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.example.TransactionInfo;
import org.example.TransactionInfoDeserializer;
import org.example.WritingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReportingKafkaConsumer {
    private static List<String> TOPICS = Arrays.asList("VALID_TOPIC", "SUSPICIOUS_TOPIC");
    private static final String BOOTSTRAP_SERVERS = "192.168.181.136:9092,localhost:9093,localhost:9094";
    static final Logger log = LoggerFactory.getLogger(ReportingKafkaConsumer.class);

    public static void main(String[] args) {
        String consumerGroup = "defaultAliConsumerGroup";
        if (args.length == 1) {
            consumerGroup = args[0];
        }
        System.out.println("Consumer is part of consumer group " + consumerGroup);
        Consumer<Long, TransactionInfo> kafkaConsumer = ReportingKafkaConsumer.createKafkaConsumer(BOOTSTRAP_SERVERS, consumerGroup);
        System.out.println("created successfully");
        ReportingKafkaConsumer.consumeMessages(TOPICS, kafkaConsumer);
    }

    public static void consumeMessages(List<String> topics, Consumer<Long, TransactionInfo> kafkaConsumer) {
        ObjectMapper objectMapper = new ObjectMapper();
        kafkaConsumer.subscribe(topics);
        System.out.println("subscribed successfully");
        while (true) {
            ConsumerRecords<Long, TransactionInfo> consumerRecords;
            if ((consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1L))).isEmpty()) {
                continue;
            }
            for (ConsumerRecord<Long, TransactionInfo> consumerRecord : consumerRecords) {
                TransactionInfo transactionInfo = consumerRecord.value();
                String topic = consumerRecord.topic();
                System.out.println("key : " + consumerRecord.key());
                System.out.println("Received : " + transactionInfo.toString());
                try {
                    WritingHelper writingHelper = new WritingHelper(transactionInfo, topic);
                    System.out.println(writingHelper);
                    writingHelper.writeToLog();
                }
                catch (Exception e) {
                    System.out.println("Error Parsing");
                    System.out.println(e.getMessage());
                    log.error("Could Not Process Transaction: " + transactionInfo.toString());
                }
            }
            kafkaConsumer.commitAsync();
        }
    }

    public static Consumer<Long, TransactionInfo> createKafkaConsumer(String bootstrapServers, String consumerGroup) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.deserializer", LongDeserializer.class.getName());
        properties.put("value.deserializer", TransactionInfoDeserializer.class.getName());
        properties.put("group.id", consumerGroup);
        properties.put("enable.auto.commit", (Object)false);
        return new KafkaConsumer<Long, TransactionInfo>(properties);
    }
}

