Commit 87f37e70 authored by Ali's avatar Ali

add logger configuration

parent 1d1d41f3
FROM 172.29.3.41:5000/openjdk:17-jdk
FROM openjdk:17-jdk
WORKDIR /app
......
Timestamp: 2024-08-17 21:18:09.399 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: c.a.n.NotificationsvcApplicationTests | Message: Starting NotificationsvcApplicationTests using Java 17.0.5 with PID 1716 (started by Ali in C:\microservices-git\NotificationSVC\notificationsvc)
Timestamp: 2024-08-17 21:18:09.406 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: c.a.n.NotificationsvcApplicationTests | Message: No active profile set, falling back to 1 default profile: "default"
Timestamp: 2024-08-17 21:18:12.642 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.s.d.r.c.RepositoryConfigurationDelegate | Message: Bootstrapping Spring Data JPA repositories in DEFAULT mode.
Timestamp: 2024-08-17 21:18:12.793 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.s.d.r.c.RepositoryConfigurationDelegate | Message: Finished Spring Data repository scanning in 131 ms. Found 1 JPA repository interface.
Timestamp: 2024-08-17 21:18:13.472 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.s.cloud.context.scope.GenericScope | Message: BeanFactory id=1a3e203c-9d51-3437-a2dc-4270e603adeb
Timestamp: 2024-08-17 21:18:15.483 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.h.jpa.internal.util.LogHelper | Message: HHH000204: Processing PersistenceUnitInfo [name: default]
Timestamp: 2024-08-17 21:18:15.730 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: org.hibernate.Version | Message: HHH000412: Hibernate ORM core version 6.5.2.Final
Timestamp: 2024-08-17 21:18:15.870 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.h.c.i.RegionFactoryInitiator | Message: HHH000026: Second-level cache disabled
Timestamp: 2024-08-17 21:18:17.216 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.s.o.j.p.SpringPersistenceUnitInfo | Message: No LoadTimeWeaver setup: ignoring JPA class transformer
Timestamp: 2024-08-17 21:18:17.305 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: com.zaxxer.hikari.HikariDataSource | Message: HikariPool-1 - Starting...
Timestamp: 2024-08-17 21:18:18.375 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: com.zaxxer.hikari.pool.HikariPool | Message: HikariPool-1 - Added connection com.mysql.cj.jdbc.ConnectionImpl@5f4df55e
Timestamp: 2024-08-17 21:18:18.381 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: com.zaxxer.hikari.HikariDataSource | Message: HikariPool-1 - Start completed.
Timestamp: 2024-08-17 21:18:20.994 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.h.e.t.j.p.i.JtaPlatformInitiator | Message: HHH000489: No JTA platform available (set 'hibernate.transaction.jta.platform' to enable JTA platform integration)
Timestamp: 2024-08-17 21:18:21.092 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.s.o.j.LocalContainerEntityManagerFactoryBean | Message: Initialized JPA EntityManagerFactory for persistence unit 'default'
Timestamp: 2024-08-17 21:18:22.303 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.s.d.j.r.query.QueryEnhancerFactory | Message: Hibernate is in classpath; If applicable, HQL parser will be used.
Timestamp: 2024-08-17 21:18:24.347 | Thread: [main] | Level: WARN | Trace: [traceId=, spanId=, parentId=] | Logger: o.s.b.a.o.j.JpaBaseConfiguration$JpaWebConfiguration | Message: spring.jpa.open-in-view is enabled by default. Therefore, database queries may be performed during view rendering. Explicitly configure spring.jpa.open-in-view to disable this warning
Timestamp: 2024-08-17 21:18:29.654 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.s.cloud.commons.util.InetUtils | Message: Cannot determine local hostname
Timestamp: 2024-08-17 21:18:29.829 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.s.b.a.e.web.EndpointLinksResolver | Message: Exposing 0 endpoints beneath base path '/actuator'
Timestamp: 2024-08-17 21:18:30.402 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.a.k.c.consumer.ConsumerConfig | Message: ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.include.jmx.reporter = true
auto.offset.reset = latest
bootstrap.servers = [192.168.159.129:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-notification-group-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
enable.metrics.push = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = notification-group
group.instance.id = null
group.protocol = classic
group.remote.assignor = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.LongDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.max.ms = 1000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = PLAIN
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = SASL_PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class com.ali.notificationsvc.deserialization.NotificationDeserializer
Timestamp: 2024-08-17 21:18:30.825 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.a.k.c.t.i.KafkaMetricsCollector | Message: initializing Kafka metrics collector
Timestamp: 2024-08-17 21:18:31.129 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.a.k.c.s.a.AbstractLogin | Message: Successfully logged in.
Timestamp: 2024-08-17 21:18:31.285 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.a.kafka.common.utils.AppInfoParser | Message: Kafka version: 3.7.0
Timestamp: 2024-08-17 21:18:31.285 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.a.kafka.common.utils.AppInfoParser | Message: Kafka commitId: 2ae524ed625438c5
Timestamp: 2024-08-17 21:18:31.286 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.a.kafka.common.utils.AppInfoParser | Message: Kafka startTimeMs: 1723918711280
Timestamp: 2024-08-17 21:18:31.293 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.a.k.c.c.i.LegacyKafkaConsumer | Message: [Consumer clientId=consumer-notification-group-1, groupId=notification-group] Subscribed to topic(s): Notification_Topic
Timestamp: 2024-08-17 21:18:31.405 | Thread: [main] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: c.a.n.NotificationsvcApplicationTests | Message: Started NotificationsvcApplicationTests in 24.308 seconds (process running for 27.568)
Timestamp: 2024-08-17 21:18:32.623 | Thread: [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.a.kafka.common.network.Selector | Message: [Consumer clientId=consumer-notification-group-1, groupId=notification-group] Failed authentication with aliubuntuserver/192.168.159.129 (channelId=-1) (Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are [])
Timestamp: 2024-08-17 21:18:32.628 | Thread: [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.apache.kafka.clients.NetworkClient | Message: [Consumer clientId=consumer-notification-group-1, groupId=notification-group] Node -1 disconnected.
Timestamp: 2024-08-17 21:18:32.634 | Thread: [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] | Level: ERROR | Trace: [traceId=, spanId=, parentId=] | Logger: o.apache.kafka.clients.NetworkClient | Message: [Consumer clientId=consumer-notification-group-1, groupId=notification-group] Connection to node -1 (aliubuntuserver/192.168.159.129:9092) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []
Timestamp: 2024-08-17 21:18:32.635 | Thread: [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] | Level: WARN | Trace: [traceId=, spanId=, parentId=] | Logger: o.apache.kafka.clients.NetworkClient | Message: [Consumer clientId=consumer-notification-group-1, groupId=notification-group] Bootstrap broker 192.168.159.129:9092 (id: -1 rack: null) disconnected
Timestamp: 2024-08-17 21:18:32.638 | Thread: [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] | Level: ERROR | Trace: [traceId=, spanId=, parentId=] | Logger: o.s.k.l.KafkaMessageListenerContainer | Message: Authentication/Authorization Exception and no authExceptionRetryInterval set
org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []
Timestamp: 2024-08-17 21:18:32.647 | Thread: [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] | Level: ERROR | Trace: [traceId=, spanId=, parentId=] | Logger: o.s.k.l.KafkaMessageListenerContainer | Message: Fatal consumer exception; stopping container
Timestamp: 2024-08-17 21:18:32.657 | Thread: [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.a.k.c.c.i.ConsumerCoordinator | Message: [Consumer clientId=consumer-notification-group-1, groupId=notification-group] Resetting generation and member id due to: consumer pro-actively leaving the group
Timestamp: 2024-08-17 21:18:32.658 | Thread: [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.a.k.c.c.i.ConsumerCoordinator | Message: [Consumer clientId=consumer-notification-group-1, groupId=notification-group] Request joining group due to: consumer pro-actively leaving the group
Timestamp: 2024-08-17 21:18:32.680 | Thread: [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.a.kafka.common.metrics.Metrics | Message: Metrics scheduler closed
Timestamp: 2024-08-17 21:18:32.681 | Thread: [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.a.kafka.common.metrics.Metrics | Message: Closing reporter org.apache.kafka.common.metrics.JmxReporter
Timestamp: 2024-08-17 21:18:32.684 | Thread: [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.a.kafka.common.metrics.Metrics | Message: Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
Timestamp: 2024-08-17 21:18:32.686 | Thread: [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.a.kafka.common.metrics.Metrics | Message: Metrics reporters closed
Timestamp: 2024-08-17 21:18:32.718 | Thread: [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.a.kafka.common.utils.AppInfoParser | Message: App info kafka.consumer for consumer-notification-group-1 unregistered
Timestamp: 2024-08-17 21:18:32.719 | Thread: [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.s.k.l.KafkaMessageListenerContainer | Message: notification-group: Consumer stopped
Timestamp: 2024-08-17 21:18:33.222 | Thread: [SpringApplicationShutdownHook] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: o.s.o.j.LocalContainerEntityManagerFactoryBean | Message: Closing JPA EntityManagerFactory for persistence unit 'default'
Timestamp: 2024-08-17 21:18:33.227 | Thread: [SpringApplicationShutdownHook] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: com.zaxxer.hikari.HikariDataSource | Message: HikariPool-1 - Shutdown initiated...
Timestamp: 2024-08-17 21:18:33.255 | Thread: [SpringApplicationShutdownHook] | Level: INFO | Trace: [traceId=, spanId=, parentId=] | Logger: com.zaxxer.hikari.HikariDataSource | Message: HikariPool-1 - Shutdown completed.
......@@ -83,6 +83,11 @@
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.13.4</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.3</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-observation-test</artifactId>
......
......@@ -14,6 +14,12 @@ import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.backoff.FixedBackOff;
import java.util.HashMap;
import java.util.Map;
......@@ -55,6 +61,13 @@ public class NotificationKafkaConsumer {
ConcurrentKafkaListenerContainerFactory<Long,CreateNotificationDto> notificationFactory
= new ConcurrentKafkaListenerContainerFactory<>();
notificationFactory.setConsumerFactory(notificationConsumerFactory);
// Set acknowledgment mode to manual
notificationFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// Set the error handler with retry functionality
notificationFactory.setCommonErrorHandler(new DefaultErrorHandler(
new FixedBackOff(1000L, 5)));
return notificationFactory;
}
}
......@@ -4,6 +4,8 @@ import com.ali.notificationsvc.dtos.CreateNotificationDto;
import com.ali.notificationsvc.dtos.NotificationDto;
import com.ali.notificationsvc.services.INotificationService;
import com.ali.notificationsvc.status.RequestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
......@@ -15,6 +17,8 @@ import java.util.List;
@RequestMapping("/api/notifications")
public class NotificationController {
private static final Logger LOGGER = LoggerFactory.getLogger(NotificationController.class);
@Autowired
INotificationService notificationService;
......@@ -23,43 +27,59 @@ public class NotificationController {
}
@GetMapping("getById/{id}")
public ResponseEntity<NotificationDto> getById(@PathVariable("id") int id) {
LOGGER.info(String.format("get notification details by %s id",id));
NotificationDto notificationDto = new NotificationDto();
notificationDto = notificationService.getById(id);
return ResponseEntity.ok(notificationDto);
}
@GetMapping("getAll")
public ResponseEntity<List<NotificationDto>> getAllNotifications() {
LOGGER.info(String.format("get all notification"));
List<NotificationDto> notificationDtos = notificationService.getAll();
return ResponseEntity.ok(notificationDtos);
}
@GetMapping("getAllByCompanyId/{id}")
public ResponseEntity<List<NotificationDto>> getAllByCompanyId(@PathVariable("id") int companyId) {
LOGGER.info(String.format("get all notification by company id: %s", companyId));
List<NotificationDto> notificationDtos = notificationService.getNotificationByCompanyId(companyId);
return ResponseEntity.ok(notificationDtos);
}
@GetMapping("getAllByJobseekerId/{id}")
public ResponseEntity<List<NotificationDto>> getAllByJobseekerId(@PathVariable("id") int jobseekerId) {
LOGGER.info(String.format("get all notification by jobseeker id: %s", jobseekerId));
List<NotificationDto> notificationDtos = notificationService.getNotificationByJobseekerId(jobseekerId);
return ResponseEntity.ok(notificationDtos);
}
@GetMapping("isExist/{id}")
public ResponseEntity<Boolean> isExist(@PathVariable("id") int notificationId) {
LOGGER.info(String.format("check if notification is exist where id: %s", notificationId));
boolean isExist = notificationService.existById(notificationId);
return ResponseEntity.ok(isExist);
}
@GetMapping("getUnReadNotificationNumForJobseeker/{id}")
public ResponseEntity<Integer> getUnReadNotificationNumForJobseeker(@PathVariable("id") int jobseekerId) {
LOGGER.info(String.format("get all un read notifications for jobseeker id: %s", jobseekerId));
int num = notificationService.getUnReadNotificationsNumberByJobseekerId(jobseekerId);
return ResponseEntity.ok(num);
}
@GetMapping("getUnReadNotificationNumForCompany/{id}")
public ResponseEntity<Integer> getUnReadNotificationNumForCompany(@PathVariable("id") int companyId) {
LOGGER.info(String.format("get all un read notifications for company id: %s", companyId));
int num = notificationService.getUnReadNotificationsNumberByCompanyId(companyId);
return ResponseEntity.ok(num);
}
@PostMapping("create")
public ResponseEntity<RequestStatus> createNotification(@RequestBody CreateNotificationDto createNotificationDto) {
LOGGER.info(String.format("create new notification in synchronized way"));
RequestStatus status = notificationService.create(createNotificationDto);
if (status.isStatus()) {
return ResponseEntity.status(HttpStatus.CREATED).body(status);
......@@ -69,6 +89,8 @@ public class NotificationController {
}
@PutMapping("update")
public ResponseEntity<RequestStatus> updateNotification(@RequestBody NotificationDto notificationDto) {
LOGGER.info(String.format("update notification where id is: %s", notificationDto.getId()));
RequestStatus status = notificationService.update(notificationDto);
if (status.isStatus()) {
return ResponseEntity.ok(status);
......@@ -78,6 +100,8 @@ public class NotificationController {
}
@DeleteMapping("delete/{id}")
public ResponseEntity<RequestStatus> deleteNotification(@PathVariable("id") int id) {
LOGGER.info(String.format("delete notification where id is: %s", id));
RequestStatus status = notificationService.delete(id);
if (status.isStatus()) {
return ResponseEntity.ok(status);
......
......@@ -4,13 +4,18 @@ import com.ali.notificationsvc.dtos.CreateNotificationDto;
import com.ali.notificationsvc.services.INotificationService;
import com.ali.notificationsvc.status.RequestStatus;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class NotificationKafkaListener {
private static final Logger LOGGER = LoggerFactory.getLogger(NotificationKafkaListener.class);
@Autowired
private INotificationService notificationService;
......@@ -22,12 +27,23 @@ public class NotificationKafkaListener {
groupId ="${spring.kafka.consumer.group-id}",
containerFactory = "notificationFactory"
)
void listener(ConsumerRecord<Long, CreateNotificationDto> data){
void listener(ConsumerRecord<Long, CreateNotificationDto> data, Acknowledgment acknowledgment) {
CreateNotificationDto createNotificationDto = data.value();
System.out.println("Key: " + data.key());
System.out.println("Received: " + createNotificationDto.toString());
RequestStatus requestStatus = notificationService.create(createNotificationDto);
System.out.println(requestStatus.isStatus());
System.out.println(requestStatus.getDescribtion());
LOGGER.info("Received: " + createNotificationDto.toString()+ "Key: " + data.key());
try {
RequestStatus requestStatus = notificationService.create(createNotificationDto);
if (requestStatus.isStatus()) {
// Acknowledge the message if successfully processed
acknowledgment.acknowledge();
LOGGER.info("Message processed successfully. kafka acknowledge");
} else {
// If processing fails, throw an exception to trigger retry
throw new RuntimeException("Failed to process message, will retry.");
}
} catch (Exception e) {
// Log the failure and let the retry mechanism handle the retry
LOGGER.error("Error processing message: " + e.getMessage());
throw e; // Propagate the exception to trigger a retry
}
}
}
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
Timestamp: %d{yyyy-MM-dd HH:mm:ss.SSS} | Thread: [%thread] | Level: %-5level | Trace: [traceId=%X{traceId}, spanId=%X{spanId}, parentId=%X{parentId}] | Logger: %logger{36} | Message: %msg%n
</pattern>
</encoder>
</appender>
<appender name="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/notification-service.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/notification-service.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>10</maxHistory>
<totalSizeCap>1GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>
Timestamp: %d{yyyy-MM-dd HH:mm:ss.SSS} | Thread: [%thread] | Level: %-5level | Trace: [traceId=%X{traceId}, spanId=%X{spanId}, parentId=%X{parentId}] | Logger: %logger{36} | Message: %msg%n
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE" />
<appender-ref ref="FILE" />
</root>
</configuration>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment