Commit 91f6480a authored by mohammad.salama's avatar mohammad.salama

Consumer Groups Added, Consuming Messages Correctly , no Replication or Missed...

Consumer Groups Added, Consuming Messages Correctly , no Replication or Missed Messages , All Connected
parent 89f551b2
...@@ -34,10 +34,17 @@ add redis: ...@@ -34,10 +34,17 @@ add redis:
dotnet add package StackExchange.Redis dotnet add package StackExchange.Redis
docker run -d --name redis-test -p 6377(port on host):6379 (port for container) redis
maps the 6377 from host to 6379 to container
-enter redis cli: -enter redis cli:
docker exec -it <container redis name> redis-cli docker exec -it <container redis name> redis-cli
docker exec -it redis redis-cli docker exec -it redis redis-cli
-delete stream : -delete stream :
del <stream_name> del <stream_name>
......
...@@ -24,6 +24,7 @@ namespace GrpcMessageNode.DataBaseAccess ...@@ -24,6 +24,7 @@ namespace GrpcMessageNode.DataBaseAccess
public static string getPriority(ref Message message , string validatorAddress) public static string getPriority(ref Message message , string validatorAddress)
{ {
Console.WriteLine("Validating NOW IN GRPC !! "); Console.WriteLine("Validating NOW IN GRPC !! ");
int x = message.LocalPriority;
ValidatorReply reply = ValidateAsync(message , validatorAddress); ValidatorReply reply = ValidateAsync(message , validatorAddress);
if (reply.ReplyCode == ValidatorConnectionAndValidationError) if (reply.ReplyCode == ValidatorConnectionAndValidationError)
...@@ -34,7 +35,7 @@ namespace GrpcMessageNode.DataBaseAccess ...@@ -34,7 +35,7 @@ namespace GrpcMessageNode.DataBaseAccess
{ {
return reply.ReplyCode; return reply.ReplyCode;
} }
message.LocalPriority = reply.AccountPriority; message.LocalPriority = x+reply.AccountPriority;
return OK; return OK;
} }
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
int account_p = message.LocalPriority; int account_p = message.LocalPriority;
int x = account_p + message.LocalPriority; int x = message.LocalPriority;
if (x > MAX_PRIRITY) x = MAX_PRIRITY; if (x > MAX_PRIRITY) x = MAX_PRIRITY;
if (x < MIN_PRIRITY && x!=-1) x = MIN_PRIRITY; if (x < MIN_PRIRITY && x!=-1) x = MIN_PRIRITY;
message.LocalPriority = x; message.LocalPriority = x;
......
...@@ -51,3 +51,4 @@ ...@@ -51,3 +51,4 @@
2.0 2.0
2.0 2.0
2.0 2.0
2.0
...@@ -29,6 +29,7 @@ namespace HTTPMessageNode.DataBaseAccess ...@@ -29,6 +29,7 @@ namespace HTTPMessageNode.DataBaseAccess
public static string getPriority(ref Message message, string validatorAddress) public static string getPriority(ref Message message, string validatorAddress)
{ {
//Console.WriteLine("Validating NOW IN GRPC !! "); //Console.WriteLine("Validating NOW IN GRPC !! ");
int x = message.LocalPriority;
ValidatorReply reply = ValidateAsync(message, validatorAddress); ValidatorReply reply = ValidateAsync(message, validatorAddress);
if (reply.ReplyCode == ValidatorConnectionAndValidationError) if (reply.ReplyCode == ValidatorConnectionAndValidationError)
...@@ -40,7 +41,7 @@ namespace HTTPMessageNode.DataBaseAccess ...@@ -40,7 +41,7 @@ namespace HTTPMessageNode.DataBaseAccess
return reply.ReplyCode; return reply.ReplyCode;
} }
Console.WriteLine("Prio from validator = " + reply.AccountPriority); Console.WriteLine("Prio from validator = " + reply.AccountPriority);
message.LocalPriority = reply.AccountPriority; message.LocalPriority = x+reply.AccountPriority;
return OK; return OK;
} }
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
int account_p = message.LocalPriority; int account_p = message.LocalPriority;
int x = account_p + message.LocalPriority; int x = message.LocalPriority;
if (x > MAX_PRIRITY) x = MAX_PRIRITY; if (x > MAX_PRIRITY) x = MAX_PRIRITY;
if (x < MIN_PRIRITY && x != -1) x = MIN_PRIRITY; if (x < MIN_PRIRITY && x != -1) x = MIN_PRIRITY;
message.LocalPriority = x; message.LocalPriority = x;
......
...@@ -13,3 +13,5 @@ ...@@ -13,3 +13,5 @@
2.0 2.0
2.0 2.0
2.0 2.0
2.0
2.0
...@@ -11,14 +11,41 @@ namespace QueuerNode.RedisQueuer ...@@ -11,14 +11,41 @@ namespace QueuerNode.RedisQueuer
{ {
public class MessageQueues public class MessageQueues
{ {
private static string SYRRedisURL = "localhost:6379"; private static string MTNRedisURL = "localhost:6373";
private static string MTNRedisURL = "localhost:48485"; private static string SYRRedisURL = "localhost:6374";
public static string RedisConnectionError = "Error Writing to Redis"; public static string RedisConnectionError = "Error Writing to Redis";
private static string Syriatel = "SYR"; private static string Syriatel = "SYR";
private static string MTN = "MTN"; private static string MTN = "MTN";
private static int LEVELS = 6;
public static void init()
{
Console.WriteLine("Initing");
/* var redis = ConnectionMultiplexer.Connect(SYRRedisURL);
var db = redis.GetDatabase();
for (int i=0; i < LEVELS; i++)
{
bool k = db.StreamCreateConsumerGroup(i.ToString(),
"SYS_MSGS",
"$",
true);
if(k)
{
Console.WriteLine("OK");
}
}
redis = ConnectionMultiplexer.Connect(MTNRedisURL);
db = redis.GetDatabase();
for (int i = 0; i < LEVELS; i++)
{
bool k = db.StreamCreateConsumerGroup(i.ToString(),
"SYS_MSGS",
"$",
true);
}*/
}
public static string addMessage(Message message , IDiscoveryClient discoveryClient) public static string addMessage(Message message , IDiscoveryClient discoveryClient)
{ {
string id = "Error"; string id = "Error";
......
...@@ -16,6 +16,7 @@ namespace QueuerNode.Services ...@@ -16,6 +16,7 @@ namespace QueuerNode.Services
{ {
_logger = logger; _logger = logger;
_client = client; _client = client;
MessageQueues.init();
} }
public override Task<Acknowledgement> QueueMessage(Message message, ServerCallContext context) public override Task<Acknowledgement> QueueMessage(Message message, ServerCallContext context)
...@@ -25,6 +26,8 @@ namespace QueuerNode.Services ...@@ -25,6 +26,8 @@ namespace QueuerNode.Services
string reqId = MessageQueues.addMessage(message, _client); string reqId = MessageQueues.addMessage(message, _client);
Console.WriteLine("req id grcp que = " + reqId);
if (reqId.Equals(MessageQueues.RedisConnectionError)) if (reqId.Equals(MessageQueues.RedisConnectionError))
{ {
return Task.FromResult(new Acknowledgement return Task.FromResult(new Acknowledgement
......
...@@ -27,3 +27,9 @@ ...@@ -27,3 +27,9 @@
2.0 2.0
2.0 2.0
2.0 2.0
2.0
2.0
2.0
2.0
2.0
2.0
...@@ -6,17 +6,90 @@ using Newtonsoft.Json; ...@@ -6,17 +6,90 @@ using Newtonsoft.Json;
var tokenSource = new CancellationTokenSource(); var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token; var token = tokenSource.Token;
var muxer = ConnectionMultiplexer.Connect("localhost"); string SYR = "localhost:6374";
//string MTN = "localhost:6373";
var muxer = ConnectionMultiplexer.Connect(SYR);
var db = muxer.GetDatabase(); var db = muxer.GetDatabase();
const string streamName = "4"; const string streamName5 = "5";
const string streamName2 = "2";
const string groupName = "SYS_MSGS";
const string myConsumerID = "some-id";
const int count = 100055566; // at most reads (count) messages from a stream
/*var readTask = Task.Run(async () =>
{
string id = string.Empty;
while (!token.IsCancellationRequested)
{
var messages = await db.StreamRangeAsync(streamName, "-", "+", count, Order.Descending);
Console.WriteLine(messages.Length);
foreach (var entry in messages)
{
// Get the message ID
var messageId = entry.Id;
Console.WriteLine(messageId);
// Access the message data (serialized JSON)
string? serializedMessage = entry.Values[0].Value.ToString();
Console.WriteLine(serializedMessage);
if (serializedMessage == null) continue;
// Deserialize the JSON back to a Message object (if needed)
MessageDTO? message = JsonConvert.DeserializeObject<MessageDTO>(serializedMessage);
if (message == null) continue;
// Process the message data (message.Text, message.Timestamp, etc.)
Console.WriteLine($"Message ID: {messageId}, Text: {message.msgId}, tag: {message.tag}");
}
await Task.Delay(1000);
}
});*/
var readGroupTask2 = Task.Run(async () =>
{
string id = string.Empty;
while (!token.IsCancellationRequested)
{
var messages = await db.StreamReadGroupAsync(streamName2, groupName, myConsumerID, ">", count);
var readTask = Task.Run(async () => //Console.WriteLine(messages.Length);
foreach (var entry in messages)
{
// Get the message ID
var messageId = entry.Id;
Console.WriteLine(messageId);
// Access the message data (serialized JSON)
string? serializedMessage = entry.Values[0].Value.ToString();
Console.WriteLine(serializedMessage);
if (serializedMessage == null) continue;
// Deserialize the JSON back to a Message object (if needed)
MessageDTO? message = JsonConvert.DeserializeObject<MessageDTO>(serializedMessage);
if (message == null) continue;
// Process the message data (message.Text, message.Timestamp, etc.)
Console.WriteLine($"Message ID: {messageId}, Text: {message.msgId}, tag: {message.tag}");
}
await Task.Delay(1000);
}
});
var readGroupTask5 = Task.Run(async () =>
{ {
string id = string.Empty; string id = string.Empty;
while (!token.IsCancellationRequested) while (!token.IsCancellationRequested)
{ {
var messages = await db.StreamRangeAsync(streamName, "-", "+", 1, Order.Descending); var messages = await db.StreamReadGroupAsync(streamName5, groupName, myConsumerID, ">", count);
//Console.WriteLine(messages.Length);
foreach (var entry in messages) foreach (var entry in messages)
{ {
// Get the message ID // Get the message ID
...@@ -41,9 +114,9 @@ var readTask = Task.Run(async () => ...@@ -41,9 +114,9 @@ var readTask = Task.Run(async () =>
//tokenSource.CancelAfter(TimeSpan(20)); tokenSource.CancelAfter(TimeSpan.FromSeconds(300));
await Task.WhenAll(readTask); await Task.WhenAll(readGroupTask5, readGroupTask2);
......
namespace Validator.MongoDBAccess
{
public class InformationHolder
{
private static string MongoDB = "localhost:6565";
public static void checkMessage(MessageMetaData metaData)
{
//check for auther + authen + quota
}
}
}
3eb880e02d565665785d37493649f94f8244b728 be62c07eb34e9046d5b376f78a753186f703647c
...@@ -2,3 +2,4 @@ ...@@ -2,3 +2,4 @@
2.0 2.0
2.0 2.0
2.0 2.0
2.0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment