Commit 66b4dfc9 authored by Ali's avatar Ali

add some endpoints

parent d38482db
...@@ -74,6 +74,15 @@ ...@@ -74,6 +74,15 @@
<artifactId>modelmapper</artifactId> <artifactId>modelmapper</artifactId>
<version>3.1.0</version> <version>3.1.0</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.13.4</version>
</dependency>
</dependencies> </dependencies>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
......
package com.ali.notificationsvc.config;
import com.ali.notificationsvc.deserialization.NotificationDeserializer;
import com.ali.notificationsvc.dtos.CreateNotificationDto;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class NotificationKafkaConsumer {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.properties.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${spring.kafka.properties.sasl.jaas.config}")
private String saslJaasConfig;
@Bean
public Map<String, Object> notificationConsumerConfig(){
Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NotificationDeserializer.class);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
return props;
}
@Bean
public ConsumerFactory<Long, CreateNotificationDto> notificationConsumerFactory(){
return new DefaultKafkaConsumerFactory<>(notificationConsumerConfig());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Long,CreateNotificationDto>> notificationFactory
(ConsumerFactory<Long,CreateNotificationDto> notificationConsumerFactory){
ConcurrentKafkaListenerContainerFactory<Long,CreateNotificationDto> notificationFactory
= new ConcurrentKafkaListenerContainerFactory<>();
notificationFactory.setConsumerFactory(notificationConsumerFactory);
return notificationFactory;
}
}
package com.ali.notificationsvc.deserialization;
import com.ali.notificationsvc.dtos.CreateNotificationDto;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.io.IOException;
public class NotificationDeserializer implements Deserializer<CreateNotificationDto> {
private ObjectMapper objectMapper;
public NotificationDeserializer() {
this.objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
}
@Override
public CreateNotificationDto deserialize(String s, byte[] bytes) {
try {
return objectMapper.readValue(bytes, CreateNotificationDto.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
package com.ali.notificationsvc.kafkalisteners;
import com.ali.notificationsvc.dtos.CreateNotificationDto;
import com.ali.notificationsvc.services.INotificationService;
import com.ali.notificationsvc.status.RequestStatus;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class NotificationKafkaListener {
@Autowired
private INotificationService notificationService;
public NotificationKafkaListener(INotificationService notificationService) {
this.notificationService = notificationService;
}
@KafkaListener(
topics ="${spring.kafka.topic.name}",
groupId ="${spring.kafka.consumer.group-id}",
containerFactory = "notificationFactory"
)
void listener(ConsumerRecord<Long, CreateNotificationDto> data){
CreateNotificationDto createNotificationDto = data.value();
System.out.println("Key: " + data.key());
System.out.println("Received: " + createNotificationDto.toString());
RequestStatus requestStatus = notificationService.create(createNotificationDto);
System.out.println(requestStatus.isStatus());
System.out.println(requestStatus.getDescribtion());
}
}
...@@ -4,4 +4,14 @@ server.port=8090 ...@@ -4,4 +4,14 @@ server.port=8090
spring.datasource.url=jdbc:mysql://localhost:3306/notificationSVCDB spring.datasource.url=jdbc:mysql://localhost:3306/notificationSVCDB
spring.datasource.username=root spring.datasource.username=root
spring.datasource.password=ali123 spring.datasource.password=ali123
spring.jpa.hibernate.ddl-auto=update spring.jpa.hibernate.ddl-auto=update
\ No newline at end of file spring.kafka.bootstrap-servers=192.168.159.129:9092
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user1" password="r48ZrwWBmu";
spring.kafka.consumer.group-id=notification-group
spring.kafka.topic.name=Notification_Topic
spring.kafka.consumer.auto-offset-reset=earliest
management.endpoints.web.exposure.include=prometheus
management.endpoint.prometheus.enabled=true
management.endpoint.health.show.details=always
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment