Commit 5ad35526 authored by Bashar's avatar Bashar

Added Threads Pool and Dispatcher

parent c4887f20
package com.example.SnmpReciever; package com.example.SnmpReciever;
import org.snmp4j.PDU; import com.example.SnmpReciever.component.SnmpListener;
import org.snmp4j.smi.*;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.io.IOException;
@SpringBootApplication @SpringBootApplication
public class SnmpReceiverApplication implements CommandLineRunner { public class SnmpReceiverApplication implements CommandLineRunner {
private final SnmpListener trapReceiver; private final SnmpListener trapReceiver;
...@@ -21,6 +18,6 @@ public class SnmpReceiverApplication implements CommandLineRunner { ...@@ -21,6 +18,6 @@ public class SnmpReceiverApplication implements CommandLineRunner {
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
trapReceiver.startTrapListener(); trapReceiver.listen();
} }
} }
\ No newline at end of file
package com.example.SnmpReciever; package com.example.SnmpReciever.component;
import com.example.SnmpReciever.enitites.TrapData; import com.example.SnmpReciever.enitites.TrapData;
//import com.example.SnmpReciever.services.KafkaService; //import com.example.SnmpReciever.services.KafkaService;
import com.example.SnmpReciever.services.KafkaService;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.snmp4j.*; import org.snmp4j.*;
import org.snmp4j.mp.MPv1;
import org.snmp4j.mp.MPv2c;
import org.snmp4j.security.Priv3DES;
import org.snmp4j.security.SecurityProtocols;
import org.snmp4j.smi.OctetString;
import org.snmp4j.smi.TcpAddress;
import org.snmp4j.smi.TransportIpAddress;
import org.snmp4j.smi.UdpAddress; import org.snmp4j.smi.UdpAddress;
import org.snmp4j.transport.AbstractTransportMapping;
import org.snmp4j.transport.DefaultTcpTransportMapping;
import org.snmp4j.transport.DefaultUdpTransportMapping; import org.snmp4j.transport.DefaultUdpTransportMapping;
import org.snmp4j.util.MultiThreadedMessageDispatcher;
import org.snmp4j.util.ThreadPool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -15,27 +30,32 @@ import java.io.IOException; ...@@ -15,27 +30,32 @@ import java.io.IOException;
/*@RestController /*@RestController
@RequestMapping("/api/product")*/ @RequestMapping("/api/product")*/
@Component @Component
@AllArgsConstructor
public class SnmpListener implements CommandResponder { public class SnmpListener implements CommandResponder {
private final KafkaTemplate<String, String> kafkaTemplate; @Autowired
private final ObjectMapper objectMapper; private KafkaService kafkaService;
private ObjectMapper objectMapper = new ObjectMapper();
@Value("${spring.listener.address}")
private String addressString;
@Value("${spring.listener.threads}")
private Integer threadsNum;
@Value("${spring.listener.community}")
private String community;
@Override @Override
public synchronized void processPdu(CommandResponderEvent event) { public synchronized void processPdu(CommandResponderEvent event) {
System.out.println("Received PDU..."); System.out.println("Received PDU...");
PDU x = event.getPDU(); PDU x = event.getPDU();
// System.out.println(x);
// System.out.println(x.getType());
//
PDUv1 pdu = (PDUv1) x; PDUv1 pdu = (PDUv1) x;
TrapData trapData = new TrapData(pdu); TrapData trapData = new TrapData(pdu);
if (pdu != null) { if (pdu != null) {
try { try {
String pduJson = objectMapper.writeValueAsString(trapData); String pduJson = objectMapper.writeValueAsString(trapData);
// System.out.println(pduJson); kafkaService.send("TRAP", pduJson);
kafkaTemplate.send("TRAP", pduJson);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
System.out.println(e); System.out.println(e);
throw new RuntimeException(e); throw new RuntimeException(e);
...@@ -44,7 +64,7 @@ public class SnmpListener implements CommandResponder { ...@@ -44,7 +64,7 @@ public class SnmpListener implements CommandResponder {
} }
public void startTrapListener() { public void startTrapListener() {
try { try {
TransportMapping<?> transport = new DefaultUdpTransportMapping(new UdpAddress("192.168.26.46/1625")); TransportMapping<?> transport = new DefaultUdpTransportMapping(new UdpAddress(addressString));
System.out.println("Listening to SNMP Trap"); System.out.println("Listening to SNMP Trap");
Snmp snmp = new Snmp(transport); Snmp snmp = new Snmp(transport);
snmp.addCommandResponder(this); snmp.addCommandResponder(this);
...@@ -53,5 +73,42 @@ public class SnmpListener implements CommandResponder { ...@@ -53,5 +73,42 @@ public class SnmpListener implements CommandResponder {
e.printStackTrace(); e.printStackTrace();
} }
} }
public synchronized void listen()
throws IOException {
AbstractTransportMapping transport;
TransportIpAddress address = new UdpAddress(addressString);
if (address instanceof TcpAddress) {
transport = new DefaultTcpTransportMapping((TcpAddress) address);
} else {
transport = new DefaultUdpTransportMapping((UdpAddress) address);
}
ThreadPool threadPool = ThreadPool.create("DispatcherPool", threadsNum);
MessageDispatcher mDispatcher = new MultiThreadedMessageDispatcher(
threadPool, new MessageDispatcherImpl());
// add message processing models
mDispatcher.addMessageProcessingModel(new MPv1());
mDispatcher.addMessageProcessingModel(new MPv2c());
// add all security protocols
SecurityProtocols.getInstance().addDefaultProtocols();
SecurityProtocols.getInstance().addPrivacyProtocol(new Priv3DES());
// Create Target
CommunityTarget target = new CommunityTarget();
target.setCommunity(new OctetString(community));
Snmp snmp = new Snmp(mDispatcher, transport);
snmp.addCommandResponder(this);
transport.listen();
System.out.println("Listening on " + address);
try {
this.wait();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
} }
//package com.example.SnmpReciever.services; package com.example.SnmpReciever.services;
//
//import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
//import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
//import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
//import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
//import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
//
//@Service @Service
//@AllArgsConstructor @AllArgsConstructor
//public class KafkaService { public class KafkaService {
// private final KafkaTemplate<String, String> kafkaTemplate; private final KafkaTemplate<String, String> kafkaTemplate;
//
// public void send(String topic,String message){ public void send(String topic,String message){
// kafkaTemplate.send(topic,message); kafkaTemplate.send(topic,message);
// } }
//} }
...@@ -2,4 +2,6 @@ server.port = 0 ...@@ -2,4 +2,6 @@ server.port = 0
spring.kafka.bootstrap-servers=http://172.29.3.220:9092 spring.kafka.bootstrap-servers=http://172.29.3.220:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.listener.address=192.168.26.46/1625
spring.listener.threads=20
spring.listener.community=public
...@@ -2,4 +2,6 @@ server.port = 0 ...@@ -2,4 +2,6 @@ server.port = 0
spring.kafka.bootstrap-servers=http://172.29.3.220:9092 spring.kafka.bootstrap-servers=http://172.29.3.220:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.listener.address=192.168.26.46/1625
spring.listener.threads=20
spring.listener.community=public
...@@ -25,19 +25,18 @@ public class KafkaConsumerSnmpApplication { ...@@ -25,19 +25,18 @@ public class KafkaConsumerSnmpApplication {
@KafkaListener(topics = "TRAP") @KafkaListener(topics = "TRAP")
public void handleKafkaMessage(String pduJson) { public void handleKafkaMessage(String pduJson) {
String json = pduJson; // Replace with your JSON string String json = pduJson;// Replace with your JSON string
num++; num++;
try { try {
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
TrapData t = objectMapper.readValue(json,TrapData.class); TrapData t = objectMapper.readValue(json,TrapData.class);
//Some Processing: Add this later //Some Processing: Add this later
EnrichedTrap t2 = new EnrichedTrap(t); EnrichedTrap t2 = new EnrichedTrap(t);
System.out.println("Not sent yet"); //System.out.println("Not sent yet");
sender.send(t2); sender.send(t2);
System.out.println("Sent now!"); //System.out.println("Sent now!");
rethinkDBService.saveKafkaMessageToRethink(t2); rethinkDBService.saveKafkaMessageToRethink(t2);
if (num == 1) System.out.println(num);
System.out.println(rethinkDBService.getData());
} catch (Exception e) { } catch (Exception e) {
System.out.println(e); System.out.println(e);
e.printStackTrace(); e.printStackTrace();
......
...@@ -32,9 +32,9 @@ public class RethinkDBService { ...@@ -32,9 +32,9 @@ public class RethinkDBService {
String jsonString = objectMapper.writeValueAsString(rethinkTrap); String jsonString = objectMapper.writeValueAsString(rethinkTrap);
JsonNode jsonNode = objectMapper.readTree(jsonString); JsonNode jsonNode = objectMapper.readTree(jsonString);
Map<String,EnrichedTrap> document = objectMapper.convertValue(jsonNode, Map.class); Map<String,EnrichedTrap> document = objectMapper.convertValue(jsonNode, Map.class);
System.out.println(jsonString); // System.out.println(jsonString);
System.out.println(jsonNode); // System.out.println(jsonNode);
System.out.println(document); // System.out.println(document);
r.db(connectionFactory.getDbName()).table(connectionFactory.getDbTableName()).insert(document).run(connectionFactory.getConnection()); r.db(connectionFactory.getDbName()).table(connectionFactory.getDbTableName()).insert(document).run(connectionFactory.getConnection());
} catch (Exception e) { } catch (Exception e) {
System.out.println("error " + e); System.out.println("error " + e);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment