Commit 1775d918 authored by mohammad.salama's avatar mohammad.salama

extracted by priority , send to intermediate stream which its last consumer...

extracted by priority , send to intermediate stream which its last consumer extracts data correctly, all working
parent dfd3ec46
...@@ -11,45 +11,15 @@ string SYR = "localhost:6374"; ...@@ -11,45 +11,15 @@ string SYR = "localhost:6374";
var muxer = ConnectionMultiplexer.Connect(SYR); var muxer = ConnectionMultiplexer.Connect(SYR);
var db = muxer.GetDatabase(); var db = muxer.GetDatabase();
const string streamName3 = "3"; const int sms_rate = 10;
const string groupName = "SYS_MSGS"; const string streamName = "SYR";
const string groupName = "SYS";
const string myConsumerID = "some-id"; const string myConsumerID = "some-id";
const int count = 1554; // at most reads (count) messages from a stream const int count = sms_rate; // at most reads (count) messages from a stream
/*
var readManual = Task.Run(async () => var readManual = Task.Run(async () =>
{ {
/*TimeSpan blockTime = TimeSpan.FromSeconds(0.2);
string rest = "GROUP " + groupName + " id "
+ "BLOCK " + blockTime.TotalMilliseconds + " "
+ "COUNT " + count
+ " STREAMS " + streamName3 + " >";
string[] rest2 = { "GROUP "
, groupName
, myConsumerID
, "COUNT "
, count.ToString()
, "BLOCK "
, blockTime.TotalMilliseconds.ToString()
, "STREAMS "
, streamName3
,">"};
string[] rest3 = {groupName
, myConsumerID
, count.ToString()
, blockTime.TotalMilliseconds.ToString()
, streamName3
,">"};*/
string cmd = "XREADGROUP";
/*
items = r.xreadgroup("GROUP",GroupName,ConsumerName,"BLOCK","2000","COUNT","10","STREAMS",:my_stream_key,myid)
*/
// var res = db.Execute(cmd , rest3);
while (!token.IsCancellationRequested) while (!token.IsCancellationRequested)
{ {
var res = await db.ExecuteAsync(cmd, "GROUP", groupName, myConsumerID, "BLOCK", 2000, "COUNT", 10, "STREAMS", streamName3, ">"); var res = await db.ExecuteAsync(cmd, "GROUP", groupName, myConsumerID, "BLOCK", 2000, "COUNT", 10, "STREAMS", streamName3, ">");
...@@ -68,6 +38,7 @@ var readManual = Task.Run(async () => ...@@ -68,6 +38,7 @@ var readManual = Task.Run(async () =>
/// ///
/// mesg -> Value -> [0] -> [1] -> [1] /// mesg -> Value -> [0] -> [1] -> [1]
/// ///
/*
var messages = res[0].ToDictionary(); var messages = res[0].ToDictionary();
...@@ -93,7 +64,7 @@ tokenSource.CancelAfter(TimeSpan.FromSeconds(300)); ...@@ -93,7 +64,7 @@ tokenSource.CancelAfter(TimeSpan.FromSeconds(300));
await Task.WhenAll(readManual); await Task.WhenAll(readManual);
*/
......
using Newtonsoft.Json;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace PriorityStream.Extractor
{
public class Extractor
{
private static string Provider = "SYR";
private static string groupName = "SYS_MSGS";
private static string myConsumerID = "syr-1";
private static int LOW = 1;
private static int Medium = 2;
private static int High = 3;
private static int sms_rate = 10;
private static IDatabase db = null;
public static bool setDatabase(string REDIS , int _sms_rate)
{
try
{
var muxer = ConnectionMultiplexer.Connect(REDIS);
db = muxer.GetDatabase();
sms_rate = _sms_rate;
//Console.WriteLine("Got DB");
return true;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
return false;
}
}
public static List<MessageDTO> ExtractMessages(int priority)
{
string stream = Provider + "_" + priority;
if (priority == LOW)
{
return GetMessagesAsync(stream, 2).Result;
}
else if (priority == Medium)
{
return GetMessagesAsync(stream, 3).Result;
}
else if (priority == High)
{
return GetMessagesAsync(stream, 5).Result;
}
else
{
return new List<MessageDTO>();
}
}
private static async Task<List<MessageDTO>> GetMessagesAsync(string stream , int count)
{
try
{
List<RedisValue> msgsID = new List<RedisValue>();
List<MessageDTO> msgs = new List<MessageDTO>();
var messages = await db.StreamReadGroupAsync(stream, groupName, myConsumerID, ">", count, true);
foreach (var entry in messages)
{
// Get the message ID
var messageId = entry.Id;
msgsID.Add(messageId);
Console.WriteLine(messageId);
// Access the message data (serialized JSON)
string? serializedMessage = entry.Values[0].Value.ToString();
Console.WriteLine(serializedMessage);
if (serializedMessage == null) continue;
// Deserialize the JSON back to a Message object (if needed)
MessageDTO? message = JsonConvert.DeserializeObject<MessageDTO>(serializedMessage);
if (message == null) continue;
msgs.Add(message);
// Process the message data
Console.WriteLine($"Message ID: {messageId}, Text: {message.msgId}, tag: {message.tag}");
}
try
{
await db.StreamDeleteAsync(stream, msgsID.ToArray());
}
catch (Exception ex)
{
return await Task.FromResult(new List<MessageDTO>());
}
return await Task.FromResult(msgs);
}
catch (Exception ex)
{
return await Task.FromResult(new List<MessageDTO>());
}
}
}
}
...@@ -2,32 +2,50 @@ ...@@ -2,32 +2,50 @@
using StackExchange.Redis; using StackExchange.Redis;
using Newtonsoft.Json; using Newtonsoft.Json;
using PriorityStream; using PriorityStream;
using PriorityStream.Extractor;
using PriorityStream.Writer;
var tokenSource = new CancellationTokenSource(); var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token; var token = tokenSource.Token;
string REDIS = "localhost:6379"; string RedisRead = "localhost:6379";
string RedisWrite = "localhost:6379";
var muxer = ConnectionMultiplexer.Connect(REDIS); int[] levels = new int[] { 1, 2, 3 };
var db = muxer.GetDatabase();
var server = muxer.GetServers(); int sms_rate = 10;
const string streamName = "SYR_3"; if (!Extractor.setDatabase(RedisRead , sms_rate) )
const string groupName = "SYS_MSGS"; {
const string myConsumerID = "some-id"; Console.WriteLine("Error Connecting to Redis Read on host : " + RedisRead);
return;
}
if (!Writer.setDatabase(RedisWrite))
{
Console.WriteLine("Error Connecting to Redis Write on host : " + RedisWrite);
return;
}
while (!token.IsCancellationRequested)
{
for (int level = 0; level < levels.Length; level++)
{
List<MessageDTO> messages = Extractor.ExtractMessages(level);
Writer.writeMessages(messages);
}
}
const int count = 1554; // at most reads (count) messages from a stream
Console.WriteLine(server[0].DatabaseSize());
/* /*
const int counttt = 1554; // at most reads (counttt) messages from a stream
var readManual = Task.Run(async () => var readManual = Task.Run(async () =>
{ {
List<RedisValue> msgsID = new List<RedisValue>(); List<RedisValue> msgsID = new List<RedisValue>();
while (!token.IsCancellationRequested) while (!token.IsCancellationRequested)
{ {
var messages = await db.StreamReadGroupAsync(streamName, groupName, myConsumerID, ">", count, true); ; var messages = await db.StreamReadGroupAsync(streamName, groupName, myConsumerID, ">", counttt, true);
//db.StreamDeleteAsync(streamName, msgsID); //db.StreamDeleteAsync(streamName, msgsID);
foreach (var entry in messages) foreach (var entry in messages)
{ {
...@@ -80,10 +98,10 @@ await Task.WhenAll(readManual); ...@@ -80,10 +98,10 @@ await Task.WhenAll(readManual);
Console.WriteLine($"length: {res32.Length}, radix-tree-keys: {res32.RadixTreeKeys}, radix-tree-nodes: {res32.RadixTreeNodes}, last-generated-id: {res32.LastGeneratedId}, first-entry: {$"{res32.FirstEntry.Id}: [{string.Join(", ", res32.FirstEntry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"}, last-entry: {$"{res32.LastEntry.Id}: [{string.Join(", ", res32.LastEntry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"}"); Console.WriteLine($"length: {res32.Length}, radix-tree-keys: {res32.RadixTreeKeys}, radix-tree-nodes: {res32.RadixTreeNodes}, last-generated-id: {res32.LastGeneratedId}, first-entry: {$"{res32.FirstEntry.Id}: [{string.Join(", ", res32.FirstEntry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"}, last-entry: {$"{res32.LastEntry.Id}: [{string.Join(", ", res32.LastEntry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"}");
var x = res32.FirstEntry.Id; var x = res32.FirstEntry.Id;
var y = res32.LastEntry.Id; var y = res32.LastEntry.Id;
RedisValue[] redisValues = new RedisValue[2]; RedisValue[] RedisValues = new RedisValue[2];
redisValues[0] = x; RedisValues[0] = x;
redisValues[1] = y; RedisValues[1] = y;
await db.StreamDeleteAsync(streamName , redisValues);*/ await db.StreamDeleteAsync(streamName , RedisValues);*/
......
using Newtonsoft.Json;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace PriorityStream.Writer
{
public class Writer
{
private static string Provider = "SYR";
private static IDatabase db = null;
public static bool setDatabase(string REDIS)
{
try
{
var muxer = ConnectionMultiplexer.Connect(REDIS);
db = muxer.GetDatabase();
return
createConsumerGroup();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
return false;
}
}
private static bool createConsumerGroup()
{
try
{
bool k1 = db.StreamCreateConsumerGroup(Provider,
Provider,
"$",
true);
return true;
}
catch (Exception ex)
{
//Console.WriteLine(ex.Message);
return (ex.Message.Contains("already exists" , StringComparison.OrdinalIgnoreCase));
}
}
public static void writeMessages(List<MessageDTO> messages)
{
foreach (var message in messages)
{
var serializedMessage = JsonConvert.SerializeObject(message);
db.StreamAddAsync
(Provider,
new NameValueEntry[]
{
new NameValueEntry("message", serializedMessage)
});
}
}
}
}
75f67055e6d032e8bfe1cf8f12dbd5849f8b6ffe aaa077d8068e1f4d87f0231f32a17390df0a0505
...@@ -11,13 +11,13 @@ string SYR = "localhost:6379"; ...@@ -11,13 +11,13 @@ string SYR = "localhost:6379";
var muxer = ConnectionMultiplexer.Connect(SYR); var muxer = ConnectionMultiplexer.Connect(SYR);
var db = muxer.GetDatabase(); var db = muxer.GetDatabase();
const string streamName5 = "5"; const int sms_rate = 10;
const string streamName = "SYR_3"; const string streamName = "SYR";
const string groupName = "SYS_MSGS"; const string groupName = "SYR";
const string myConsumerID = "some-id-2"; const string myConsumerID = "some-id";
const int count = 100055566; // at most reads (count) messages from a stream
const int count = sms_rate; // at most reads (count) messages from a stream
/*
var readGroupTask2 = Task.Run(async () => var readGroupTask2 = Task.Run(async () =>
{ {
string id = string.Empty; string id = string.Empty;
...@@ -47,19 +47,53 @@ var readGroupTask2 = Task.Run(async () => ...@@ -47,19 +47,53 @@ var readGroupTask2 = Task.Run(async () =>
if (message == null) continue; if (message == null) continue;
// Process the message data // Process the message data
Console.WriteLine($"Message ID: {messageId}, Text: {message.msgId}, tag: {message.tag}"); Console.WriteLine($"Message ID: {messageId}, Text: {message.msgId}, tag: {message.tag}");
*/
} }
await Task.Delay(1000); await Task.Delay(1000);
} }
}); });*/
var readMessages = Task.Run(async () =>
{
string id = string.Empty;
while (!token.IsCancellationRequested)
{
///var messages = await db.StreamReadGroupAsync(streamName, groupName, myConsumerID, "$", count);
var messages = await db.StreamReadGroupAsync(streamName, groupName, myConsumerID, ">" , count);
//Console.WriteLine(messages.Length);
foreach (var entry in messages)
{
Console.WriteLine(entry);
// Get the message ID
var messageId = entry.Id;
Console.WriteLine(messageId);
// Access the message data (serialized JSON)
string? serializedMessage = entry.Values[0].Value.ToString();
Console.WriteLine(serializedMessage);
if (serializedMessage == null) continue;
// Deserialize the JSON back to a Message object (if needed)
MessageDTO? message = JsonConvert.DeserializeObject<MessageDTO>(serializedMessage);
if (message == null) continue;
// Process the message data
Console.WriteLine($"Message ID: {messageId}, Text: {message.msgId}, tag: {message.tag}");
}
await Task.Delay(1000);
}
});
tokenSource.CancelAfter(TimeSpan.FromSeconds(300)); tokenSource.CancelAfter(TimeSpan.FromSeconds(300));
await Task.WhenAll(readGroupTask2); await Task.WhenAll(readMessages);
......
...@@ -17,7 +17,7 @@ namespace Validator.Services ...@@ -17,7 +17,7 @@ namespace Validator.Services
return Task.FromResult(new Reply return Task.FromResult(new Reply
{ {
ReplyCode = "OK ok 200 + validated !! ", ReplyCode = "OK ok 200 + validated !! ",
AccountPriority = 3 AccountPriority = 0
}) ; }) ;
} }
......
...@@ -4,3 +4,4 @@ ...@@ -4,3 +4,4 @@
2.0 2.0
2.0 2.0
2.0 2.0
2.0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment