/*
 * Decompiled with CFR 0.152.
 */
package com.ali.notificationsvc.kafkalisteners;

import com.ali.notificationsvc.dtos.CreateNotificationDto;
import com.ali.notificationsvc.services.INotificationService;
import com.ali.notificationsvc.status.RequestStatus;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class NotificationKafkaListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(NotificationKafkaListener.class);
    @Autowired
    private INotificationService notificationService;

    public NotificationKafkaListener(INotificationService notificationService) {
        this.notificationService = notificationService;
    }

    @KafkaListener(topics={"${spring.kafka.topic.name}"}, groupId="${spring.kafka.consumer.group-id}", containerFactory="notificationFactory")
    void listener(ConsumerRecord<Long, CreateNotificationDto> data, Acknowledgment acknowledgment) {
        CreateNotificationDto createNotificationDto = (CreateNotificationDto)data.value();
        LOGGER.info("Received: " + createNotificationDto.toString() + "Key: " + data.key());
        try {
            RequestStatus requestStatus = this.notificationService.create(createNotificationDto);
            if (!requestStatus.isStatus()) {
                throw new RuntimeException("Failed to process message, will retry.");
            }
            acknowledgment.acknowledge();
            LOGGER.info("Message processed successfully. kafka acknowledge");
        }
        catch (Exception e) {
            LOGGER.error("Error processing message: " + e.getMessage());
            throw e;
        }
    }
}

