Commit e0c31d51 authored by Bashar's avatar Bashar

Web Application Functions

parent d8cfab23
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AutoImportSettings">
<option name="autoReloadType" value="SELECTIVE" />
</component>
<component name="ChangeListManager">
<list default="true" id="1ccc43e7-776d-4cb8-8cc6-364a30c76234" name="Changes" comment="" />
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="FileTemplateManagerImpl">
<option name="RECENT_TEMPLATES">
<list>
<option value="Class" />
</list>
</option>
</component>
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$/.." />
</component>
<component name="ProjectId" id="2SFLfJ0zAKPEnXhpqWizgRq7ZOP" />
<component name="ProjectViewState">
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent"><![CDATA[{
"keyToString": {
"RunOnceActivity.OpenProjectViewOnStart": "true",
"RunOnceActivity.ShowReadmeOnStart": "true",
"SHARE_PROJECT_CONFIGURATION_FILES": "true",
"last_opened_file_path": "D:/Graduation_Project/GitHub/ver4/Elastic_Configs/elasticJavaApi/src/main/java/com/example/elasticJavaApi/entities"
}
}]]></component>
<component name="RecentsManager">
<key name="CopyFile.RECENT_KEYS">
<recent name="D:\Graduation_Project\GitHub\ver4\Elastic_Configs\elasticJavaApi\src\main\java\com\example\elasticJavaApi\entities" />
</key>
<key name="CopyClassDialog.RECENTS_KEY">
<recent name="com.example.elasticJavaApi.entities" />
</key>
</component>
<component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
<component name="Vcs.Log.Tabs.Properties">
<option name="TAB_STATES">
<map>
<entry key="MAIN">
<value>
<State>
<option name="FILTERS">
<map>
<entry key="branch">
<value>
<list>
<option value="origin/master" />
</list>
</value>
</entry>
</map>
</option>
</State>
</value>
</entry>
</map>
</option>
</component>
</project>
\ No newline at end of file
......@@ -2,6 +2,7 @@
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile default="true" name="Default" enabled="true" />
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
......
......@@ -20,7 +20,6 @@ import java.util.Map;
public class ConsumerApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
private final RethinkDB r = RethinkDB.r;
......
package com.example.consumer.controller;
import com.example.consumer.entity.Trap;
import com.example.consumer.services.TrapService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.*;
@RestController
@CrossOrigin(origins = {"http://localhost:5173"})
@RequestMapping("/api/elastic")
public class TrapController {
@Autowired
private TrapService trapService;
@GetMapping("/findAllTraps")
Iterable<Trap> findAll(){
return trapService.getTraps();
}
@PostMapping("/insertTraps")
public Trap insertTrap(@RequestBody Trap trap){
return trapService.insertTrap(trap);
}
@KafkaListener(topics = "Test")
public void handleKafkaMessage(String message) {
trapService.saveKafkaMessageToElastic(message);
}
}
//package com.example.consumer.controller;
//
//import com.example.consumer.entity.Trap;
//import com.example.consumer.services.TrapService;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.kafka.annotation.KafkaListener;
//import org.springframework.web.bind.annotation.*;
//
//@RestController
//@CrossOrigin(origins = {"http://localhost:5173"})
//@RequestMapping("/api/elastic")
//public class TrapController {
//
// @Autowired
// private TrapService trapService;
// @GetMapping("/findAllTraps")
// Iterable<Trap> findAll(){
// return trapService.getTraps();
//
// }
//
// @PostMapping("/insertTraps")
// public Trap insertTrap(@RequestBody Trap trap){
// return trapService.insertTrap(trap);
// }
// @KafkaListener(topics = "Test")
// public void handleKafkaMessage(String message) {
// trapService.saveKafkaMessageToElastic(message);
// }
//
//}
//
package com.example.consumer.repository;
import com.example.consumer.entity.Trap;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface TrapRepository extends ElasticsearchRepository<Trap,Integer> {
}
//package com.example.consumer.repository;
//
//
//import com.example.consumer.entity.Trap;
//import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
//import org.springframework.stereotype.Repository;
//
//@Repository
//public interface TrapRepository extends ElasticsearchRepository<Trap,Integer> {
//
//}
//
package com.example.consumer.services;
import com.example.consumer.repository.TrapRepository;
import com.example.consumer.entity.Trap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class TrapService {
@Autowired
private TrapRepository trapRepository;
private final Logger log = LoggerFactory.getLogger(TrapService.class);
public Iterable<Trap> getTraps() {
try{
return trapRepository.findAll();
} catch (Exception e) {
log.error("Error getting data from Elastic", e);
return null;
}
}
public Trap insertTrap(Trap trap) {
try {
trapRepository.save(trap);
log.info("Data saved successfully in Elastic");
return trap;
}
catch (Exception e) {
log.error("Error saving data in Elastic", e);
}
return null;
}
public Trap updateTrap(Trap trap, int id) {
Trap trap1 = trapRepository.findById(id).get();
trap1.setTrap(trap.getTrap());
return trap1;
}
public void deleteTrap(int id ) {
trapRepository.deleteById(id);
}
public void saveKafkaMessageToElastic(String message) {
try {
Trap trap = new Trap(message);
trapRepository.save(trap);
log.info("Data saved successfully in Elastic");
} catch (Exception e) {
System.out.println("error " + e);
}
}
}
//package com.example.consumer.services;
//
//import com.example.consumer.repository.TrapRepository;
//import com.example.consumer.entity.Trap;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Service;
//
//
//@Service
//public class TrapService {
// @Autowired
// private TrapRepository trapRepository;
//
// private final Logger log = LoggerFactory.getLogger(TrapService.class);
// public Iterable<Trap> getTraps() {
//
// try{
// return trapRepository.findAll();
// } catch (Exception e) {
// log.error("Error getting data from Elastic", e);
// return null;
// }
// }
//
// public Trap insertTrap(Trap trap) {
// try {
// trapRepository.save(trap);
//
// log.info("Data saved successfully in Elastic");
// return trap;
// }
// catch (Exception e) {
// log.error("Error saving data in Elastic", e);
// }
// return null;
// }
//
// public Trap updateTrap(Trap trap, int id) {
// Trap trap1 = trapRepository.findById(id).get();
// trap1.setTrap(trap.getTrap());
// return trap1;
// }
//
// public void deleteTrap(int id ) {
// trapRepository.deleteById(id);
// }
// public void saveKafkaMessageToElastic(String message) {
// try {
// Trap trap = new Trap(message);
// trapRepository.save(trap);
//
// log.info("Data saved successfully in Elastic");
//
// } catch (Exception e) {
//
// System.out.println("error " + e);
// }
// }
//}
server.port = 4164
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.bootstrap-servers=172.29.3.220:9092
spring.kafka.template.default-topic=notificationTopic
spring.kafka.consumer.group-id= notificationId
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
rethinkdb.host = localhost
rethinkdb.host = 172.29.3.220
rethinkdb.port = 28015
spring.data.elasticsearch.cluster-names=trapsService
spring.data.elasticsearch.cluster-node=localhost:9200
spring.data.elasticsearch.cluster-node=172.29.3.220:9200
server.port = 4164
spring.kafka.bootstrap-servers=172.29.3.220:9092
spring.kafka.template.default-topic=notificationTopic
spring.kafka.consumer.group-id= notificationId
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
rethinkdb.host = 172.29.3.220
rethinkdb.port = 28015
spring.data.elasticsearch.cluster-names=trapsService
spring.data.elasticsearch.cluster-node=172.29.3.220:9200
......@@ -52,7 +52,6 @@ public class ProcessingApplication {
};
// Schedule the task to run every 10 seconds
timer.schedule(task, 0, 3000);
}
public void myFunction(){
System.out.println("Hey");
......
......@@ -39,7 +39,7 @@ public class elasticConfig {
@Value("${workingPath}")
private String workingPath;
private RestClient restClient = RestClient.builder(
new HttpHost("192.168.25.254", 9200)).build();
new HttpHost("172.29.3.220", 9200)).build();
// Create the transport with a Jackson mapper
private ElasticsearchTransport transport = new RestClientTransport(
......
spring.kafka.bootstrap-servers=http://192.168.25.254:9092
spring.kafka.bootstrap-servers=http://172.29.3.220:9092
spring.kafka.template.default-topic=EnrichedTrap
spring.kafka.consumer.group-id= EnrichedTrapId
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
......
......@@ -24,11 +24,17 @@ public class SnmpListener implements CommandResponder {
@Override
public synchronized void processPdu(CommandResponderEvent event) {
PDUv1 pdu = (PDUv1) event.getPDU();
System.out.println("Received PDU...");
PDU x = event.getPDU();
// System.out.println(x);
// System.out.println(x.getType());
//
PDUv1 pdu = (PDUv1) x;
TrapData trapData = new TrapData(pdu);
if (pdu != null) {
try {
String pduJson = objectMapper.writeValueAsString(trapData);
// System.out.println(pduJson);
kafkaTemplate.send("TRAP", pduJson);
} catch (JsonProcessingException e) {
System.out.println(e);
......@@ -38,7 +44,7 @@ public class SnmpListener implements CommandResponder {
}
public void startTrapListener() {
try {
TransportMapping<?> transport = new DefaultUdpTransportMapping(new UdpAddress("localhost/1625"));
TransportMapping<?> transport = new DefaultUdpTransportMapping(new UdpAddress("192.168.26.46/1625"));
System.out.println("Listening to SNMP Trap");
Snmp snmp = new Snmp(transport);
snmp.addCommandResponder(this);
......
server.port = 0
spring.kafka.bootstrap-servers=http://192.168.25.254:9092
spring.kafka.bootstrap-servers=http://172.29.3.220:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
server.port = 0
spring.kafka.bootstrap-servers=http://192.168.25.254:9092
spring.kafka.bootstrap-servers=http://172.29.3.220:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
......@@ -21,6 +21,11 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.snmp4j</groupId>
<artifactId>snmp4j</artifactId>
<version>3.6.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
......
package com.example.WebApplication.configuration;
import com.example.WebApplication.controller.SocketTextHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private SocketTextHandler socket_server;
@Value("${websocket.path}")
private String webSocketPath;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
System.out.println(webSocketPath);
registry.addHandler(socket_server, webSocketPath).setAllowedOrigins("*");
}
}
//package com.example.WebApplication.controller;
//
//import com.example.WebApplication.controller.SocketTextHandler;
//import com.example.WebApplication.entities.RethinkChange;
//import com.example.WebApplication.services.RethinkDBService;
//import com.rethinkdb.net.Cursor;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.boot.CommandLineRunner;
//import org.springframework.web.bind.annotation.CrossOrigin;
//import org.springframework.web.bind.annotation.RequestMapping;
//import org.springframework.web.bind.annotation.RestController;
//
//import java.io.IOException;
//
//@RestController
//@RequestMapping("/api/rethink")
//public class RethinkController {
//
// @Override
// public void run(String... args) throws Exception {
// Cursor<RethinkChange> changeCursor = rethinkDBService.subscribe();
// System.out.println("I am Subscribing");
// //List<RethinkChange> result = new ArrayList<>();
// for (RethinkChange change : changeCursor){
//// System.out.println("Something Changed ");
//// System.out.println(change);
//// result.add(change);
// try {
// socket_server.broadcast(change);
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
// }
//
// }
//}
package com.example.WebApplication.controller;
import com.example.WebApplication.entities.EnrichedTrap;
import com.example.WebApplication.entities.RethinkChange;
import com.example.WebApplication.services.RethinkDBService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.rethinkdb.net.Cursor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
@Component
@RestController
@RequestMapping("/api/rethink")
public class SocketTextHandler extends TextWebSocketHandler implements CommandLineRunner {
private List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
@Autowired
private RethinkDBService rethinkDBService;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
System.out.println("New connection from " + session.getRemoteAddress());
sessions.add(session);
this.sendAllToSession(session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
System.out.println("Received message from " + session.getRemoteAddress() + ": " + message.getPayload());
// Handle incoming message
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
System.out.println("Connection closed to " + session.getRemoteAddress() + " with status " + status);
sessions.remove(session);
}
public void broadcast(Object message) throws IOException {
System.out.println("Hey Broadcasting");
System.out.println(message);
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = ow.writeValueAsString(message);
System.out.println(json);
TextMessage s = new TextMessage(json);
//System.out.println(s);
for (WebSocketSession session : sessions) {
if (session.isOpen()) {
session.sendMessage(s);
}
}
}
public void sendMessageToSession(Object message,WebSocketSession session) {
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = null;
try {
json = ow.writeValueAsString(message);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
System.out.println(json);
TextMessage s = new TextMessage(json);
if (session.isOpen()) {
try {
session.sendMessage(s);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public void sendAllToSession(WebSocketSession session){
List<Map<String, Object>> data = rethinkDBService.getData();
RethinkChange change = new RethinkChange();
change.setOld_val(null);
for (Map<String, Object> doc: data){
change.setNew_val(doc);
this.sendMessageToSession(change,session);
}
}
@Override
public void run(String... args) throws Exception {
Cursor<RethinkChange> changeCursor = rethinkDBService.subscribe();
System.out.println("I am Subscribing");
System.out.println(changeCursor.getClass());
System.out.println(changeCursor);
//List<RethinkChange> result = new ArrayList<>();
for (RethinkChange change : changeCursor){
System.out.println("Something Changed ");
// result.add(change);
try {
this.broadcast(change);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@PostMapping("/delete")
public void delete(@RequestBody String id){
rethinkDBService.deleteById(id);
}
}
package com.example.WebApplication.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.util.ArrayList;
import java.util.List;
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class EnrichedTrap {
@JsonProperty("enterprise")
public String enterprise;
@JsonProperty("agentAddress")
public String agentAddress;
@JsonProperty("genericTrap")
public int genericTrap;
@JsonProperty("specificTrap")
public int specificTrap;
@JsonProperty("timestamp")
public long timestamp;
@JsonProperty("severity")
public String severity;
@JsonProperty("variableBindings")
public List<VarBind> variableBindings = new ArrayList<VarBind>();
public EnrichedTrap(TrapData pdu) {
this.enterprise = pdu.getEnterprise().toString();
this.genericTrap = pdu.getGenericTrap();
this.specificTrap = pdu.getSpecificTrap();
this.timestamp = pdu.getTimestamp();
this.agentAddress = pdu.getAgentAddress().toString();
this.variableBindings = pdu.getVariableBindings();
/*if (this.variableBindings.size() > 5){
this.severity = SeverityLevel.WARNING;
}
else if (this.variableBindings.size() >= 3){
this.severity = SeverityLevel.ERROR;
}
else this.severity = SeverityLevel.INFO;*/
}
/*public EnrichedTrap(String enterprise, String agentAddress, int genericTrap, int specificTrap, long timestamp, String severity, List<VarBind> variableBindings) {
this.enterprise = enterprise;
this.agentAddress = agentAddress;
this.genericTrap = genericTrap;
this.specificTrap = specificTrap;
this.timestamp = timestamp;
this.severity = (severity);
this.variableBindings = variableBindings;
}*/
}
package com.example.WebApplication.entities;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RethinkChange {
private Map<String,Object> new_val;
private Map<String,Object> old_val;
}
package com.example.WebApplication.entities;
public class SeverityLevel {
/*INFO("INFO"),
WARNING("WARNING"),
ERROR("ERROR");
private final String stringValue;
SeverityLevel(String stringValue) {
this.stringValue = stringValue;
}
public String getStringValue() {
return stringValue;
}
public static SeverityLevel fromStringValue(String stringValue) {
for (SeverityLevel enumValue : SeverityLevel.values()) {
if (enumValue.stringValue.equals(stringValue)) {
return enumValue;
}
}
throw new IllegalArgumentException("Invalid string value for SeverityLevel: " + stringValue);
}
public static SeverityLevel fromString(String string) {
for (SeverityLevel enumValue : SeverityLevel.values()) {
if (enumValue.stringValue.equalsIgnoreCase(string)) {
return enumValue;
}
}
throw new IllegalArgumentException("Invalid string value for SeverityLevel: " + string);
}*/
}
package com.example.WebApplication.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.snmp4j.PDUv1;
import org.snmp4j.smi.VariableBinding;
import java.util.ArrayList;
import java.util.List;
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class TrapData {
@JsonProperty("enterprise")
public String enterprise;
@JsonProperty("agentAddress")
public String agentAddress;
@JsonProperty("genericTrap")
public int genericTrap;
@JsonProperty("specificTrap")
public int specificTrap;
@JsonProperty("timestamp")
public long timestamp;
@JsonProperty("variableBindings")
public List<VarBind> variableBindings = new ArrayList<VarBind>();
public TrapData(PDUv1 pdu) {
this.enterprise = pdu.getEnterprise().toString();
this.genericTrap = pdu.getGenericTrap();
this.specificTrap = pdu.getSpecificTrap();
this.timestamp = pdu.getTimestamp();
this.agentAddress = pdu.getAgentAddress().toString();
List<VariableBinding> trapVariableBindings = pdu.getAll();
for (VariableBinding vb : trapVariableBindings) {
this.variableBindings.add(new VarBind(vb));
}
}
public void print(){
System.out.println("enterprise = " + this.getEnterprise());
System.out.println("enterprise = " + this.getAgentAddress());
System.out.println("genericTrap = " + this.getGenericTrap());
System.out.println("specificTrap = " + this.getSpecificTrap());
System.out.println("timestamp = " + this.getTimestamp());
System.out.println("variableBindings = {");
for (VarBind v : this.getVariableBindings()){
v.print();
}
System.out.println("}");
}
}
package com.example.WebApplication.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.snmp4j.smi.VariableBinding;
@AllArgsConstructor
@NoArgsConstructor
@Setter
@Getter
public class VarBind {
@JsonProperty("oid")
public String oid;
@JsonProperty("value")
public String value;
public VarBind(VariableBinding vb){
this.oid = vb.getOid().toString();
this.value = vb.getVariable().toString();
}
public void print(){
System.out.println("oid = "+ this.getOid() + "," + "value = "+ this.getValue());
}
}
package com.example.WebApplication.factory;
import com.rethinkdb.RethinkDB;
import com.rethinkdb.net.Connection;
import jakarta.annotation.PostConstruct;
import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Getter
@Setter
public class RethinkDBConnectionFactory {
private final Logger log = LoggerFactory.getLogger(RethinkDBConnectionFactory.class);
private final RethinkDB r = RethinkDB.r;
private Connection connection;
@Value("${rethinkdb.host}")
private String host;
@Value("${rethinkdb.port}")
private int port;
@Value("${rethinkDBName}")
private String dbName;
@Value("${rethinkDBTableName}")
private String dbTableName;
@PostConstruct
public Connection init() {
try {
connection = r.connection().hostname(host).port(port).connect();
log.info("RethinkDB connected successfully");
List<String> dbList = r.dbList().run(connection);
if (!dbList.contains(dbName)) {
// System.out.println("Creating DATABASE Heeeeeeeeeeeeeeeeeeeeeeeeeere");
r.dbCreate(dbName).run(connection);
}
List<String> tables = r.db(dbName).tableList().run(connection);
if (!tables.contains(dbTableName)) {
// System.out.println("Creating Table Heeeeeeeeeeeeeeeeeeeeeeeeeere");
r.db(dbName).tableCreate(dbTableName).run(connection);
//r.db(dbName).table(dbTableName).indexCreate("timestamp").run(connection);
//r.db("my_database").table("my_table").indexCreate("trap").run(connection);
}
} catch (Exception e) {
log.error("Error connecting to RethinkDB", e);
}
return null;
}
public Connection getConnection() {
return connection;
}
}
package com.example.WebApplication.services;
import com.example.WebApplication.entities.EnrichedTrap;
import com.example.WebApplication.entities.RethinkChange;
import com.example.WebApplication.factory.RethinkDBConnectionFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rethinkdb.RethinkDB;
import com.rethinkdb.net.Connection;
import com.rethinkdb.net.Cursor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Service
public class RethinkDBService{
private final Logger log = LoggerFactory.getLogger(RethinkDBService.class);
private final RethinkDB r = RethinkDB.r;
@Autowired
public RethinkDBConnectionFactory connectionFactory;
// @Autowired
// private SocketTextHandler socket_server;
public void saveKafkaMessageToRethink(EnrichedTrap rethinkTrap) {
try {
ObjectMapper objectMapper = new ObjectMapper();
String jsonString = objectMapper.writeValueAsString(rethinkTrap);
JsonNode jsonNode = objectMapper.readTree(jsonString);
Map<String,EnrichedTrap> document = objectMapper.convertValue(jsonNode, Map.class);
System.out.println(jsonString);
System.out.println(jsonNode);
System.out.println(document);
System.out.println("I am here reem " + connectionFactory.getDbName() + " " + connectionFactory.getDbTableName());
r.db(connectionFactory.getDbName()).table(connectionFactory.getDbTableName()).insert(document).run(connectionFactory.getConnection());
System.out.println("I am here now reem");
} catch (Exception e) {
System.out.println("error " + e);
}
}
public List<Map<String, Object> > getData() {
try {
Cursor<Map<String, Object>> cursor = r.db(connectionFactory.getDbName()).table(connectionFactory.getDbTableName()).run(connectionFactory.getConnection());
List<Map<String, Object>> result = new ArrayList<>();
while (cursor.hasNext()) {
result.add(cursor.next());
}
return result;
} catch (Exception e) {
log.error("Error getting data from RethinkDB", e);
return null;
}
}
public Cursor<RethinkChange> subscribe(){
Connection connection = connectionFactory.getConnection();
Cursor<RethinkChange> changeCursor = r.db(connectionFactory.getDbName()).table(connectionFactory.getDbTableName()).changes().optArg("include_initial",true).
run(connection, RethinkChange.class);
return changeCursor;
}
public void deleteById(String id){
Connection connection = connectionFactory.getConnection();
try{
r.db(connectionFactory.getDbTableName()).table(connectionFactory.getDbTableName()).get(id).delete();
}
catch (Exception e){
System.out.println("Couldn't Delete Data. Maybe the id is not there.");
}
return;
}
public Map<String,Object> getByID(String id){
Connection connection = connectionFactory.getConnection();
try{
Map<String,Object> data = (Map<String, Object>) r.db(connectionFactory.getDbTableName()).table(connectionFactory.getDbTableName()).get(id);
return data;
}
catch (Exception e){
System.out.println("Couldn't Fetch Data. Maybe the id is not there.");
return null;
}
}
//Connection conn = connectionFactory.getConnection();
//Cursor<Map<String, Object> > changeCursor = r.db(database).table(table).changes().run(conn);
/*for (Object change : changeCursor) {
result.add((Map<String, Object>) change);
}
try {
socket_server.broadcast(result);
} catch (IOException e) {
throw new RuntimeException(e);
}*/
}
server.port = 6647
rethinkdb.host = 172.29.3.220
rethinkdb.port = 28015
rethinkDBName = Traps
rethinkDBTableName = Raw-Traps
websocket.path = /fms-websocket
......@@ -32,11 +32,12 @@ public class KafkaConsumerSnmpApplication {
TrapData t = objectMapper.readValue(json,TrapData.class);
//Some Processing: Add this later
EnrichedTrap t2 = new EnrichedTrap(t);
System.out.println("Not sent yet");
sender.send(t2);
System.out.println("Sent now!");
//System.out.println("Not sent yet");
//sender.send(t2);
//System.out.println("Sent now!");
rethinkDBService.saveKafkaMessageToRethink(t2);
System.out.println(rethinkDBService.getData());
if (num == 1)
System.out.println(rethinkDBService.getData());
} catch (Exception e) {
System.out.println(e);
e.printStackTrace();
......
server.port = 0
spring.kafka.bootstrap-servers=http://192.168.25.254:9092
spring.kafka.bootstrap-servers=http://172.29.3.220:9092
spring.kafka.template.default-topic=TRAP
spring.kafka.consumer.group-id= TRAPId
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
......@@ -9,7 +9,7 @@ spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.S
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
rethinkdb.host = 192.168.25.254
rethinkdb.host = 172.29.3.220
rethinkdb.port = 28015
rethinkDBName = Traps
rethinkDBTableName = Raw-Traps
server.port = 0
spring.kafka.bootstrap-servers=http://192.168.25.254:9092
spring.kafka.bootstrap-servers=http://172.29.3.220:9092
spring.kafka.template.default-topic=TRAP
spring.kafka.consumer.group-id= TRAPId
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
......@@ -9,7 +9,7 @@ spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.S
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
rethinkdb.host = 192.168.25.254
rethinkdb.host = 172.29.3.220
rethinkdb.port = 28015
rethinkDBName = Traps
rethinkDBTableName = Raw-Traps
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment