Commit ae53ceb5 authored by Bashar's avatar Bashar

Added Rethink Realtime Ability

parent 517f28ba
package com.example.consumer; package com.example.consumer;
import com.example.consumer.configuration.SocketTextHandler;
import com.example.consumer.factory.RethinkDBConnectionFactory;
import com.example.consumer.repository.RethinkChange;
import com.rethinkdb.RethinkDB;
import com.rethinkdb.net.Connection;
import com.rethinkdb.net.Cursor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@SpringBootApplication @SpringBootApplication
public class ConsumerApplication { public class ConsumerApplication implements CommandLineRunner {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args); SpringApplication.run(ConsumerApplication.class, args);
} }
private final RethinkDB r = RethinkDB.r;
@Autowired
public RethinkDBConnectionFactory connectionFactory;
@Autowired
private SocketTextHandler socket_server;
@Override
public void run(String... args) throws Exception {
Connection connection = connectionFactory.getConnection();
Cursor<RethinkChange> changeCursor = r.db("my_database").table("my_table").changes().optArg("include_initial",true).
run(connection, RethinkChange.class);
List<Map<String, Object>> result = new ArrayList<>();
for (RethinkChange change : changeCursor){
System.out.println("Something Changed");
result.add(change.getNew_val());
socket_server.broadcast(result);
}
}
// @KafkaListener(topics = "Test") // @KafkaListener(topics = "Test")
// public void handleNotification(String s) { // public void handleNotification(String s) {
// //
......
package com.example.consumer.configuration; package com.example.consumer.configuration;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler; import org.springframework.web.socket.handler.TextWebSocketHandler;
@Component @Component
public class SocketTextHandler { public class SocketTextHandler extends TextWebSocketHandler {
// extends TextWebSocketHandler { private List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
//
// @Override
// public void handleTextMessage(WebSocketSession session, TextMessage message)
// throws InterruptedException, IOException {
//
// String payload = message.getPayload();
// JSONObject jsonObject = new JSONObject(payload);
// session.sendMessage(new TextMessage("Hi " + jsonObject.get("user") + " how may we help you?"));
// }
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
System.out.println("New connection from " + session.getRemoteAddress());
sessions.add(session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
System.out.println("Received message from " + session.getRemoteAddress() + ": " + message.getPayload());
// Handle incoming message
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
System.out.println("Connection closed to " + session.getRemoteAddress() + " with status " + status);
sessions.remove(session);
}
public void broadcast(List<Map<String, Object>> message) throws IOException {
System.out.println("Hey Broadcasting");
System.out.println(message);
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = ow.writeValueAsString(message);
System.out.println(json);
TextMessage s = new TextMessage(json);
System.out.println(s);
for (WebSocketSession session : sessions) {
if (session.isOpen()) {
session.sendMessage(s);
}
}
}
} }
package com.example.consumer.configuration; package com.example.consumer.configuration;
import com.example.consumer.services.RethinkDBService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*; import org.springframework.web.socket.config.annotation.*;
@Configuration @Configuration
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private SocketTextHandler socket_server;
@Override @Override
public void registerStompEndpoints(StompEndpointRegistry registry) { public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS(); System.out.println("HEYYYYYYYYYYYYYYYYYY");
} registry.addHandler(socket_server, "/my-websocket").setAllowedOrigins("*");
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/traps","/changes");
registry.setUserDestinationPrefix("/changes");
} }
} }
...@@ -27,6 +27,7 @@ public class RethinkController { ...@@ -27,6 +27,7 @@ public class RethinkController {
@GetMapping("/data") @GetMapping("/data")
public ResponseEntity<List<Map<String, Object>>> getData() { public ResponseEntity<List<Map<String, Object>>> getData() {
List<Map<String, Object>> result = rethinkDBService.getData("my_database", "my_table"); List<Map<String, Object>> result = rethinkDBService.getData("my_database", "my_table");
System.out.println(result);
if (result != null) { if (result != null) {
return ResponseEntity.ok(result); return ResponseEntity.ok(result);
} else { } else {
......
...@@ -2,12 +2,15 @@ package com.example.consumer.factory; ...@@ -2,12 +2,15 @@ package com.example.consumer.factory;
import com.rethinkdb.RethinkDB; import com.rethinkdb.RethinkDB;
import com.rethinkdb.net.Connection; import com.rethinkdb.net.Connection;
import com.rethinkdb.net.Cursor;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@Component @Component
...@@ -28,6 +31,17 @@ public class RethinkDBConnectionFactory { ...@@ -28,6 +31,17 @@ public class RethinkDBConnectionFactory {
try { try {
connection = r.connection().hostname(host).port(port).connect(); connection = r.connection().hostname(host).port(port).connect();
log.info("RethinkDB connected successfully"); log.info("RethinkDB connected successfully");
List<String> dbList = r.dbList().run(connection);
if (!dbList.contains("my_database")) {
System.out.println("Creating DATABASE Heeeeeeeeeeeeeeeeeeeeeeeeeere");
r.dbCreate("my_database").run(connection);
}
List<String> tables = r.db("my_database").tableList().run(connection);
if (!tables.contains("my_table")) {
System.out.println("Creating Table Heeeeeeeeeeeeeeeeeeeeeeeeeere");
r.db("my_database").tableCreate("my_table").run(connection);
//r.db("my_database").table("my_table").indexCreate("trap").run(connection);
}
} catch (Exception e) { } catch (Exception e) {
log.error("Error connecting to RethinkDB", e); log.error("Error connecting to RethinkDB", e);
} }
......
package com.example.consumer.repository;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RethinkChange {
private Map<String,Object> new_val;
private Map<String,Object> old_val;
}
...@@ -20,17 +20,19 @@ public class DbInitializer implements InitializingBean { ...@@ -20,17 +20,19 @@ public class DbInitializer implements InitializingBean {
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
createDb(); createDb();
} }
private void createDb() { private void createDb() {
System.out.println("We are initializing heeeeeeeeeeeeeeeeeeeeeer");
Connection connection = connectionFactory.getConnection(); Connection connection = connectionFactory.getConnection();
List<String> dbList = r.dbList().run(connection); List<String> dbList = r.dbList().run(connection);
if (!dbList.contains("new_database")) { if (!dbList.contains("new_database")) {
System.out.println("Creating DATABASE Heeeeeeeeeeeeeeeeeeeeeeeeeere");
r.dbCreate("new_database").run(connection); r.dbCreate("new_database").run(connection);
} }
List<String> tables = r.db("new_database").tableList().run(connection); List<String> tables = r.db("new_database").tableList().run(connection);
if (!tables.contains("new_table")) { if (!tables.contains("new_table")) {
System.out.println("Creating Table Heeeeeeeeeeeeeeeeeeeeeeeeeere");
r.db("new_database").tableCreate("new_table").run(connection); r.db("new_database").tableCreate("new_table").run(connection);
r.db("new_database").table("new_table").indexCreate("trap").run(connection); r.db("new_database").table("new_table").indexCreate("trap").run(connection);
} }
......
package com.example.consumer.services; package com.example.consumer.services;
import com.example.consumer.configuration.SocketTextHandler;
import com.example.consumer.factory.RethinkDBConnectionFactory; import com.example.consumer.factory.RethinkDBConnectionFactory;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.rethinkdb.RethinkDB; import com.rethinkdb.RethinkDB;
import com.rethinkdb.gen.ast.Json; import com.rethinkdb.gen.ast.Json;
import com.rethinkdb.gen.ast.Table;
import com.rethinkdb.net.Cursor; import com.rethinkdb.net.Cursor;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -42,6 +44,7 @@ public class RethinkDBService { ...@@ -42,6 +44,7 @@ public class RethinkDBService {
String jsonString = "{\"trap\":\" "+message+"\"}"; String jsonString = "{\"trap\":\" "+message+"\"}";
JsonNode jsonNode = objectMapper.readTree(jsonString); JsonNode jsonNode = objectMapper.readTree(jsonString);
// Create a RethinkDB document object using the parsed JSON object // Create a RethinkDB document object using the parsed JSON object
Map<String, Object> document = objectMapper.convertValue(jsonNode, Map.class); Map<String, Object> document = objectMapper.convertValue(jsonNode, Map.class);
r.db("my_database").table("my_table").insert(document).run(connectionFactory.getConnection()); r.db("my_database").table("my_table").insert(document).run(connectionFactory.getConnection());
...@@ -71,7 +74,17 @@ public class RethinkDBService { ...@@ -71,7 +74,17 @@ public class RethinkDBService {
return null; return null;
} }
} }
//Connection conn = connectionFactory.getConnection();
//Cursor<Map<String, Object> > changeCursor = r.db(database).table(table).changes().run(conn);
/*for (Object change : changeCursor) {
result.add((Map<String, Object>) change);
}
try {
socket_server.broadcast(result);
} catch (IOException e) {
throw new RuntimeException(e);
}*/
} }
server.port = 2005 server.port = 4164
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.template.default-topic=notificationTopic spring.kafka.template.default-topic=notificationTopic
......
This diff is collapsed.
...@@ -3,7 +3,7 @@ version: "3" ...@@ -3,7 +3,7 @@ version: "3"
services: services:
elasticsearch: elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.3.3 image: elasticsearch:8.7.1
container_name: elasticsearch container_name: elasticsearch
environment: environment:
- bootstrap.memory_lock=true - bootstrap.memory_lock=true
...@@ -18,7 +18,7 @@ services: ...@@ -18,7 +18,7 @@ services:
- elastic - elastic
kibana: kibana:
image: docker.elastic.co/kibana/kibana:8.3.3 image: kibana:8.7.1
container_name: kibana container_name: kibana
ports: ports:
- "5601:5601" - "5601:5601"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment