Commit 4357b7ba authored by ReemyHasan's avatar ReemyHasan

Add SSE components

parent 4ec1a670
package com.example.WebApplication.configuration;
import com.example.WebApplication.controller.SocketTextHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private SocketTextHandler socket_server;
@Value("${websocket.path}")
private String webSocketPath;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// System.out.println(webSocketPath);
registry.addHandler(socket_server, webSocketPath).setAllowedOrigins("*");
}
}
//package com.example.WebApplication.configuration;
//
//import com.example.WebApplication.controller.SocketTextHandler;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.web.socket.config.annotation.EnableWebSocket;
//import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
//import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
//
//@Configuration
//@EnableWebSocket
//public class WebSocketConfig implements WebSocketConfigurer {
//
// @Autowired
// private SocketTextHandler socket_server;
// @Value("${websocket.path}")
// private String webSocketPath;
//
// @Override
// public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
//// System.out.println(webSocketPath);
// registry.addHandler(socket_server, webSocketPath).setAllowedOrigins("*");
// }
//}
package com.example.WebApplication.controller;
import com.example.WebApplication.handlers.RethinkAppChange;
import com.example.WebApplication.handlers.SseEmitters;
import com.example.WebApplication.services.RethinkDBService;
import jakarta.annotation.PostConstruct;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.core.Application;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import com.example.WebApplication.entities.RethinkChange;
import com.rethinkdb.net.Cursor;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RestController
@CrossOrigin("*")
@RequestMapping("/api/notifications")
public class NotificationController implements ApplicationListener<RethinkAppChange> {
@Autowired
private RethinkDBService rethinkDBService;
private final SseEmitters emitters = new SseEmitters();
@PostConstruct
void init() {
rethinkDBService.subscribe();
}
@GetMapping("/sub")
public SseEmitter emitData() {
return emitters.add(new SseEmitter(Long.MAX_VALUE));
}
@Override
public void onApplicationEvent(RethinkAppChange event) {
emitters.send(event.getRethinkChange());
}
}
\ No newline at end of file
package com.example.WebApplication.controller;
import com.example.WebApplication.controller.SocketTextHandler;
//import com.example.WebApplication.controller.SocketTextHandler;
import com.example.WebApplication.entities.RethinkChange;
import com.example.WebApplication.services.RethinkDBService;
import com.rethinkdb.net.Cursor;
......@@ -27,7 +27,6 @@ public class RethinkController {
@GetMapping("/data")
public ResponseEntity<List<Map<String, Object>>> getData() {
List<Map<String, Object>> result = rethinkDBService.getData();
// System.out.println(result);
if (result != null) {
return ResponseEntity.ok(result);
} else {
......
package com.example.WebApplication.controller;
import com.example.WebApplication.entities.EnrichedTrap;
import com.example.WebApplication.entities.RethinkChange;
import com.example.WebApplication.services.RethinkDBService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.rethinkdb.net.Cursor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
@Component
public class SocketTextHandler extends TextWebSocketHandler implements CommandLineRunner {
private List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
@Autowired
private RethinkDBService rethinkDBService;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
System.out.println("New connection from " + session.getRemoteAddress());
sessions.add(session);
this.sendAllToSession(session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
System.out.println("Received message from " + session.getRemoteAddress() + ": " + message.getPayload());
// Handle incoming message
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
System.out.println("Connection closed to " + session.getRemoteAddress() + " with status " + status);
sessions.remove(session);
}
public void broadcast(Object message) throws IOException {
System.out.println("Hey Broadcasting");
System.out.println(message);
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = ow.writeValueAsString(message);
// System.out.println(json);
TextMessage s = new TextMessage(json);
//System.out.println(s);
for (WebSocketSession session : sessions) {
if (session.isOpen()) {
session.sendMessage(s);
}
}
}
public void sendMessageToSession(Object message,WebSocketSession session) {
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = null;
try {
json = ow.writeValueAsString(message);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
// System.out.println(json);
TextMessage s = new TextMessage(json);
if (session.isOpen()) {
try {
session.sendMessage(s);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public void sendAllToSession(WebSocketSession session){
List<Map<String, Object>> data = rethinkDBService.getData();
RethinkChange change = new RethinkChange();
change.setOld_val(null);
for (Map<String, Object> doc: data){
change.setNew_val(doc);
this.sendMessageToSession(change,session);
}
}
@Override
public void run(String... args) throws Exception {
Cursor<RethinkChange> changeCursor = rethinkDBService.subscribe();
System.out.println("I am Subscribing");
// System.out.println(changeCursor.getClass());
// System.out.println(changeCursor);
//List<RethinkChange> result = new ArrayList<>();
for (RethinkChange change : changeCursor){
System.out.println("Something Changed ");
// result.add(change);
try {
this.broadcast(change);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
//package com.example.WebApplication.controller;
//
//import com.example.WebApplication.entities.EnrichedTrap;
//import com.example.WebApplication.entities.RethinkChange;
//import com.example.WebApplication.services.RethinkDBService;
//import com.fasterxml.jackson.core.JsonProcessingException;
//import com.fasterxml.jackson.databind.ObjectMapper;
//import com.fasterxml.jackson.databind.ObjectWriter;
//import com.rethinkdb.net.Cursor;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.boot.CommandLineRunner;
//import org.springframework.stereotype.Component;
//import org.springframework.web.bind.annotation.PostMapping;
//import org.springframework.web.bind.annotation.RequestBody;
//import org.springframework.web.bind.annotation.RequestMapping;
//import org.springframework.web.bind.annotation.RestController;
//import org.springframework.web.socket.CloseStatus;
//import org.springframework.web.socket.TextMessage;
//import org.springframework.web.socket.WebSocketSession;
//import org.springframework.web.socket.handler.TextWebSocketHandler;
//
//import java.io.IOException;
//import java.util.List;
//import java.util.Map;
//import java.util.concurrent.CopyOnWriteArrayList;
//
//@Component
//public class SocketTextHandler extends TextWebSocketHandler implements CommandLineRunner {
// private List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
//
// @Autowired
// private RethinkDBService rethinkDBService;
//
// @Override
// public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// System.out.println("New connection from " + session.getRemoteAddress());
// sessions.add(session);
// this.sendAllToSession(session);
// }
//
// @Override
// protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// System.out.println("Received message from " + session.getRemoteAddress() + ": " + message.getPayload());
// // Handle incoming message
// }
//
// @Override
// public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
// System.out.println("Connection closed to " + session.getRemoteAddress() + " with status " + status);
// sessions.remove(session);
// }
//
// public void broadcast(Object message) throws IOException {
// System.out.println("Hey Broadcasting");
// System.out.println(message);
// ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
// String json = ow.writeValueAsString(message);
//// System.out.println(json);
// TextMessage s = new TextMessage(json);
// //System.out.println(s);
// for (WebSocketSession session : sessions) {
// if (session.isOpen()) {
// session.sendMessage(s);
// }
// }
// }
// public void sendMessageToSession(Object message,WebSocketSession session) {
// ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
// String json = null;
// try {
// json = ow.writeValueAsString(message);
// } catch (JsonProcessingException e) {
// throw new RuntimeException(e);
// }
//// System.out.println(json);
// TextMessage s = new TextMessage(json);
// if (session.isOpen()) {
// try {
// session.sendMessage(s);
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
// }
// }
// public void sendAllToSession(WebSocketSession session){
// List<Map<String, Object>> data = rethinkDBService.getData();
// RethinkChange change = new RethinkChange();
// change.setOld_val(null);
// for (Map<String, Object> doc: data){
// change.setNew_val(doc);
// this.sendMessageToSession(change,session);
// }
// }
//
// @Override
// public void run(String... args) throws Exception {
// Cursor<RethinkChange> changeCursor = rethinkDBService.subscribe();
// System.out.println("I am Subscribing");
//// System.out.println(changeCursor.getClass());
//// System.out.println(changeCursor);
// //List<RethinkChange> result = new ArrayList<>();
// for (RethinkChange change : changeCursor){
// System.out.println("Something Changed ");
//// result.add(change);
// try {
// this.broadcast(change);
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
// }
// }
//
//}
package com.example.WebApplication.entities;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.*;
import org.springframework.boot.context.properties.bind.ConstructorBinding;
import org.springframework.context.ApplicationEvent;
import java.time.Clock;
import java.util.Map;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RethinkChange {
public class RethinkChange{
private Map<String,Object> new_val;
private Map<String,Object> old_val;
}
package com.example.WebApplication.handlers;
import com.example.WebApplication.entities.RethinkChange;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;
import java.time.Clock;
import java.util.Map;
@Getter
@Setter
public class RethinkAppChange extends ApplicationEvent {
private RethinkChange rethinkChange;
public RethinkAppChange(Object source,Map<String,Object> oldVal,Map<String,Object> newVal)
{
super(source);
this.rethinkChange = new RethinkChange(newVal,oldVal);
}
}
package com.example.WebApplication.handlers;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class SseEmitters {
private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
public SseEmitter add(SseEmitter emitter) {
this.emitters.add(emitter);
emitter.onCompletion(() -> {
this.emitters.remove(emitter);
});
emitter.onTimeout(() -> {
emitter.complete();
this.emitters.remove(emitter);
});
return emitter;
}
public void send(Object obj) {
List<SseEmitter> failedEmitters = new ArrayList<>();
this.emitters.forEach(emitter -> {
System.out.println("Kill me");
try {
emitter.send(obj);
} catch (Exception e) {
emitter.completeWithError(e);
failedEmitters.add(emitter);
}
});
this.emitters.removeAll(failedEmitters);
}
}
......@@ -3,41 +3,53 @@ package com.example.WebApplication.services;
import com.example.WebApplication.entities.EnrichedTrap;
import com.example.WebApplication.entities.RethinkChange;
import com.example.WebApplication.factory.RethinkDBConnectionFactory;
import com.example.WebApplication.handlers.RethinkAppChange;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rethinkdb.RethinkDB;
import com.rethinkdb.net.Connection;
import com.rethinkdb.net.Cursor;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.ApplicationEventMulticaster;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.support.TaskUtils;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Service
@RequiredArgsConstructor
public class RethinkDBService{
private final Logger log = LoggerFactory.getLogger(RethinkDBService.class);
private final RethinkDB r = RethinkDB.r;
private final ApplicationEventPublisher eventPublisher;
@Autowired
public RethinkDBConnectionFactory connectionFactory;
// @Autowired
// private SocketTextHandler socket_server;
@Bean
ApplicationEventMulticaster applicationEventMulticaster() {
SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
eventMulticaster.setErrorHandler(TaskUtils.LOG_AND_SUPPRESS_ERROR_HANDLER);
return eventMulticaster;
}
public void saveKafkaMessageToRethink(EnrichedTrap rethinkTrap) {
try {
ObjectMapper objectMapper = new ObjectMapper();
String jsonString = objectMapper.writeValueAsString(rethinkTrap);
JsonNode jsonNode = objectMapper.readTree(jsonString);
Map<String,EnrichedTrap> document = objectMapper.convertValue(jsonNode, Map.class);
// System.out.println(jsonString);
// System.out.println(jsonNode);
// System.out.println(document);
// System.out.println("I am here reem " + connectionFactory.getDbName() + " " + connectionFactory.getDbTableName());
r.db(connectionFactory.getDbName()).table(connectionFactory.getDbTableName()).insert(document).run(connectionFactory.getConnection());
System.out.println("I am here now reem");
} catch (Exception e) {
......@@ -58,11 +70,20 @@ public class RethinkDBService{
return null;
}
}
public Cursor<RethinkChange> subscribe(){
Connection connection = connectionFactory.getConnection();
Cursor<RethinkChange> changeCursor = r.db(connectionFactory.getDbName()).table(connectionFactory.getDbTableName()).changes().optArg("include_initial",true).
run(connection, RethinkChange.class);
return changeCursor;
public void subscribe() {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
singleThreadExecutor.execute(() -> {
try {
Connection connection = connectionFactory.getConnection();
Cursor<RethinkChange> changeCursor = r.db(connectionFactory.getDbName()).table(connectionFactory.getDbTableName()).changes().optArg("include_initial", true).
run(connection, RethinkChange.class);
while (changeCursor.hasNext()) {
RethinkChange changedData = changeCursor.next();
eventPublisher.publishEvent(new RethinkAppChange(this, changedData.getOld_val(), changedData.getNew_val()));
}
} catch (Exception e) {
System.out.println("Bkh");
}});
}
public void deleteById(String id){
Connection connection = connectionFactory.getConnection();
......@@ -85,19 +106,5 @@ public class RethinkDBService{
return null;
}
}
//Connection conn = connectionFactory.getConnection();
//Cursor<Map<String, Object> > changeCursor = r.db(database).table(table).changes().run(conn);
/*for (Object change : changeCursor) {
result.add((Map<String, Object>) change);
}
try {
socket_server.broadcast(result);
} catch (IOException e) {
throw new RuntimeException(e);
}*/
}
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
spring.jpa.hibernate.ddl-auto=update
#spring.jpa.hibernate.ddl-auto=update
spring.datasource.url=jdbc:mysql://172.29.3.220:3306/USER_APP
spring.datasource.username=root
spring.datasource.password=password
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment