Commit cef61db6 authored by Bashar's avatar Bashar

Configured ElasticSearch

parent 03ea8ffa
......@@ -27,6 +27,7 @@ import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.example.elasticJavaApi.entities.ProcessedTrap;
import com.example.elasticJavaApi.entities.SeverityLevel;
import com.example.elasticJavaApi.entities.VarBind;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -132,7 +133,11 @@ public class elasticConfig {
addComponent(workingPath+"/settings.json","bashar_setting");
addComponent(workingPath+"/mappings.json","bashar_mapping");
addIndexTemplate(workingPath+"/index_template.json","bashar_template");
ProcessedTrap p = new ProcessedTrap("1.3.2.5.4.6.7","192.168.25.254",6,201,1569845288, SeverityLevel.ERROR, new ArrayList<>(), new GeoPoint(Math.random(),Math.random()));
ArrayList arrayList = new ArrayList<VarBind>();
VarBind v = new VarBind("1.1.1.1","3232");
arrayList.add(v);
arrayList.add(v);
ProcessedTrap p = new ProcessedTrap("1.3.2.5.4.6.7","192.168.25.254",6,201,1569845288, SeverityLevel.ERROR,arrayList , new GeoPoint(Math.random(),Math.random()));
ObjectMapper ob = new ObjectMapper();
//addDataStream("bashar-data-stream-3");
try {
......
......@@ -3,19 +3,28 @@ package com.example.Processing;
import com.example.Processing.entities.EnrichedTrap;
import com.example.Processing.entities.ProcessedTrap;
import com.example.Processing.entities.SeverityLevel;
import com.example.Processing.services.ElasticService;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
@SpringBootApplication
public class ProcessingApplication {
public static void main(String[] args) {
SpringApplication.run(ProcessingApplication.class, args);
}
public static int num = 0;
@Autowired
private ElasticService elasticService;
@KafkaListener(topics = "EnrichedTrap")
public void handleKafkaMessage(String pduJson) {
String json = pduJson; // Replace with your JSON string
......@@ -25,11 +34,28 @@ public class ProcessingApplication {
EnrichedTrap t = objectMapper.readValue(json,EnrichedTrap.class);
//Some Processing: Add this later (Filtering, Prioritizing, Correlation)
ProcessedTrap t2 = new ProcessedTrap(t);
elasticService.addToBulk(t2);
} catch (Exception e) {
System.out.println(e);
e.printStackTrace();
}
//System.out.println(num);
}
@PostConstruct
public void magic(){
Timer timer = new Timer();
TimerTask task = new TimerTask() {
public void run() {
// Call your function here
myFunction();
}
};
// Schedule the task to run every 10 seconds
timer.schedule(task, 0, 3000);
}
public void myFunction(){
System.out.println("Hey");
elasticService.sendBulk();
}
}
package com.example.Processing.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.springframework.data.elasticsearch.core.geo.GeoPoint;
import java.util.ArrayList;
import java.util.List;
@AllArgsConstructor
@NoArgsConstructor
public class ProcessedTrap {
@JsonProperty("enterprise")
public String enterprise;
......@@ -14,7 +19,7 @@ public class ProcessedTrap {
public int genericTrap;
@JsonProperty("specificTrap")
public int specificTrap;
@JsonProperty("timestamp")
@JsonProperty("@timestamp")
public long timestamp;
@JsonProperty("severity")
......@@ -23,11 +28,8 @@ public class ProcessedTrap {
@JsonProperty("variableBindings")
public List<VarBind> variableBindings = new ArrayList<VarBind>();
@JsonProperty("location_X")
public double xPoint;
@JsonProperty("location_Y")
public double yPoint;
@JsonProperty("location")
public GeoPoint Point;
public ProcessedTrap(EnrichedTrap pdu) {
......@@ -38,7 +40,6 @@ public class ProcessedTrap {
this.agentAddress = pdu.getAgentAddress().toString();
this.variableBindings = pdu.getVariableBindings();
this.severity = pdu.getSeverity();
this.xPoint = Math.random();
this.yPoint = Math.random();
this.Point = new GeoPoint(Math.random(),Math.random());
}
}
package com.example.Processing.services;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import com.example.Processing.configuration.elasticConfig;
import com.example.Processing.entities.ProcessedTrap;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
@Service
public class ElasticService {
@Autowired
private elasticConfig elConf;
private List<ProcessedTrap> bulk = new ArrayList<>();
public void addDoc(ProcessedTrap processedTrap){
ObjectMapper objectMapper = new ObjectMapper();
try {
String json = objectMapper.writeValueAsString(processedTrap);
Reader input = new StringReader(json);
System.out.println(json);
IndexRequest request = IndexRequest.of(i -> i
.index("traps-data-stream")
.withJson(input)
);
IndexResponse response = null;
response = elConf.getElasticClient().index(request);
System.out.println("Indexed with version " + response.version());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void addToBulk(ProcessedTrap processedTrap){
bulk.add(processedTrap);
}
public void sendBulk(){
if (bulk.size() == 0) {
System.out.println("Nothing in the bulk");
return;
}
BulkRequest.Builder br = new BulkRequest.Builder();
for (ProcessedTrap s:bulk){
br.operations(op -> op
.create(cr -> cr.index("traps-data-stream").document(s))
);
}
System.out.println("Sent "+bulk.size());
bulk.clear();
BulkResponse result = null;
System.out.println("Hey I am here");
try {
System.out.println("Hey I am here");
result = elConf.getElasticClient().bulk(br.build());
System.out.println(result);
} catch (IOException e) {
System.out.println(e);
throw new RuntimeException(e);
}
}
}
......@@ -3,3 +3,5 @@ spring.kafka.template.default-topic=EnrichedTrap
spring.kafka.consumer.group-id= EnrichedTrapId
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
server.port = 0
workingPath = ./Processing/src/main/resources/utils
{
"index_patterns": ["bashar-data-stream*"],
"index_patterns": ["traps-data-stream*"],
"data_stream": { },
"composed_of": [ "bashar_setting", "bashar_mapping" ],
"composed_of": [ "traps_setting", "traps_mapping"],
"priority": 670,
"_meta": {
"description": "Template for my time series data",
......
{
"settings": {
"index.lifecycle.name": "Bashar",
"index.lifecycle.name": "Traps_Policy",
"index.number_of_shards": 1,
"index.number_of_replicas": 0
}
......
......@@ -49,7 +49,7 @@ public class TrapData {
this.enterprise = pdu.getEnterprise().toString();
this.genericTrap = pdu.getGenericTrap();
this.specificTrap = pdu.getSpecificTrap();
this.timestamp = pdu.getTimestamp();
this.timestamp = pdu.getTimestamp()*1000;
this.agentAddress = pdu.getAgentAddress().toString();
List<VariableBinding> trapVariableBindings = pdu.getAll();
......
{
"index_patterns": ["bashar-data-stream*"],
"data_stream": { },
"composed_of": [ "bashar_setting", "bashar_mapping" ],
"priority": 670,
"_meta": {
"description": "Template for my time series data",
"my-custom-meta-field": "More arbitrary metadata"
}
}
\ No newline at end of file
{
"mappings": {
"properties": {
"@timestamp": {
"type": "date",
"format": "date_optional_time||epoch_millis"
},
"message": {
"type": "wildcard"
}
}
}
}
\ No newline at end of file
{
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"set_priority": {
"priority": 100
},
"rollover": {
"max_primary_shard_size": "50gb",
"max_age": "2m"
}
}
},
"warm": {
"min_age": "2m",
"actions": {
"set_priority": {
"priority": 50
}
}
},
"cold": {
"min_age": "4m",
"actions": {
"set_priority": {
"priority": 0
}
}
},
"delete": {
"min_age": "6m",
"actions": {
"delete": {
}
}
}
}
}
\ No newline at end of file
{
"settings": {
"index.lifecycle.name": "Bashar",
"index.number_of_shards": 1,
"index.number_of_replicas": 0
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment