Commit 030bdf6f authored by mohammad.salama's avatar mohammad.salama

All Ready , All Running Correctly - still validator is dummy, PipeLine Almost...

All Ready , All Running Correctly - still validator is dummy, PipeLine Almost finsished with no clustering or scheduling
parent 1775d918
......@@ -80,7 +80,12 @@
"ProjectGuid": "4f286608-7770-4511-bf37-2b94f5351d42",
"DisplayName": "PriorityStream",
"ColorIndex": 14
},
"4239ec44-4415-4369-8da3-809610735e98": {
"ProjectGuid": "4239ec44-4415-4369-8da3-809610735e98",
"DisplayName": "TestMongoDBValidator",
"ColorIndex": 15
}
},
"NextColorIndex": 15
"NextColorIndex": 0
}
\ No newline at end of file
......@@ -17,9 +17,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Validator", "Validator\Vali
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SimpleStreamConsumerTest", "SimpleStreamConsumerTest\SimpleStreamConsumerTest.csproj", "{8F51B680-F3B6-4892-B854-2E2AF840CD71}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ManualStreamConsumer", "ManualStreamConsumer\ManualStreamConsumer.csproj", "{1BF34FA7-E2A8-4FAC-A7A5-D57264B98B98}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PriorityStream", "PriorityStream\PriorityStream.csproj", "{4F286608-7770-4511-BF37-2B94F5351D42}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PriorityStream", "PriorityStream\PriorityStream.csproj", "{4F286608-7770-4511-BF37-2B94F5351D42}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MessagesConsumer", "MessagesConsumer\MessagesConsumer.csproj", "{EA05977F-A221-49AF-99EA-2086C63E8FB1}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
......@@ -55,14 +55,14 @@ Global
{8F51B680-F3B6-4892-B854-2E2AF840CD71}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8F51B680-F3B6-4892-B854-2E2AF840CD71}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8F51B680-F3B6-4892-B854-2E2AF840CD71}.Release|Any CPU.Build.0 = Release|Any CPU
{1BF34FA7-E2A8-4FAC-A7A5-D57264B98B98}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1BF34FA7-E2A8-4FAC-A7A5-D57264B98B98}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1BF34FA7-E2A8-4FAC-A7A5-D57264B98B98}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1BF34FA7-E2A8-4FAC-A7A5-D57264B98B98}.Release|Any CPU.Build.0 = Release|Any CPU
{4F286608-7770-4511-BF37-2B94F5351D42}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4F286608-7770-4511-BF37-2B94F5351D42}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4F286608-7770-4511-BF37-2B94F5351D42}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4F286608-7770-4511-BF37-2B94F5351D42}.Release|Any CPU.Build.0 = Release|Any CPU
{EA05977F-A221-49AF-99EA-2086C63E8FB1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EA05977F-A221-49AF-99EA-2086C63E8FB1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EA05977F-A221-49AF-99EA-2086C63E8FB1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EA05977F-A221-49AF-99EA-2086C63E8FB1}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......
......@@ -18,10 +18,11 @@ namespace HTTPMessageNode.Controllers
private static readonly string ErrorValidation = "Error When Validating Request";
private static readonly string ErrorGRPCConnection = "Error Connecting to GRPC Servers";
public QueueMessageController(ILogger<QueueMessageController> logger , IDiscoveryClient discovery)
public QueueMessageController(ILogger<QueueMessageController> logger , IDiscoveryClient discovery, IConfiguration configuration)
{
_logger = logger;
discoveryClient = discovery;
//Console.WriteLine("Conf = " + configuration.GetValue<string>("database:conn"));
}
//[HttpGet(Name = "GetWeatherForecast")]
......
......@@ -20,8 +20,6 @@
return res;
}
int account_p = message.LocalPriority;
int x = message.LocalPriority;
if (x > MAX_PRIRITY) x = MAX_PRIRITY;
if (x < MIN_PRIRITY && x != -1) x = MIN_PRIRITY;
......
......@@ -9,6 +9,9 @@ builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddDiscoveryClient();
builder.Configuration.GetValue<string>("database:conn");
var app = builder.Build();
......
......@@ -10,6 +10,9 @@
"Microsoft.AspNetCore": "Warning"
}
},
"database": {
"conn" : "localhost:27017"
},
"eureka": {
"client": {
"serviceUrl": "http://localhost:8761/eureka/",
......
......@@ -10,6 +10,9 @@
"Microsoft.AspNetCore": "Warning"
}
},
"database": {
"conn" : "localhost:27017"
},
"eureka": {
"client": {
"serviceUrl": "http://localhost:8761/eureka/",
......
......@@ -16,3 +16,8 @@
2.0
2.0
2.0
2.0
2.0
2.0
2.0
2.0
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="CSRedisCore" Version="3.8.803" />
<PackageReference Include="StackExchange.Redis" Version="2.8.0" />
</ItemGroup>
</Project>
using CSRedis;
using SimpleStreamConsumerTest;
using StackExchange.Redis;
using Newtonsoft.Json;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
string SYR = "localhost:6374";
var muxer = ConnectionMultiplexer.Connect(SYR);
var db = muxer.GetDatabase();
const int sms_rate = 10;
const string streamName = "SYR";
const string groupName = "SYS";
const string myConsumerID = "some-id";
const int count = sms_rate; // at most reads (count) messages from a stream
/*
var readManual = Task.Run(async () =>
{
while (!token.IsCancellationRequested)
{
var res = await db.ExecuteAsync(cmd, "GROUP", groupName, myConsumerID, "BLOCK", 2000, "COUNT", 10, "STREAMS", streamName3, ">");
Console.WriteLine("length = " + res.Length);
if (res.Length <= 0) continue;
/*var temp = res[0].ToDictionary();
foreach (var r in temp)
{
Console.WriteLine(r.GetType());
Console.WriteLine(r.Value);
}*/
///
/// mesg -> Value -> [0] -> [1] -> [1]
///
/*
var messages = res[0].ToDictionary();
foreach (var message in messages)
{
Console.WriteLine(message.Key);
Console.WriteLine(message.Value.GetType());
//Console.WriteLine(message.Value[0][1][1]);
MessageDTO? messaged = JsonConvert.DeserializeObject<MessageDTO>(message.Value[0][1][1]);
//Console.WriteLine(messaged);
}
Console.WriteLine("*******************************************************");
await Task.Delay(1000);
}
});
tokenSource.CancelAfter(TimeSpan.FromSeconds(300));
await Task.WhenAll(readManual);
*/
using Newtonsoft.Json;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace MessagesConsumer.Extractor
{
public class Extractor
{
private static string myConsumerID = "cons-1";
private static int VeryLow = 1;
private static int Low = 2;
private static int Medium = 3;
private static int High = 4;
private static int VeryHigh = 5;
private static int[] shares;
private static int sms_rate = 100;
private static IDatabase db = null;
public static bool setDatabase(string REDIS , int _sms_rate = 100)
{
try
{
var muxer = ConnectionMultiplexer.Connect(REDIS);
db = muxer.GetDatabase();
sms_rate = _sms_rate;
//Console.WriteLine("Got DB");
setShares();
return true;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
return false;
}
}
private static void setShares()
{
shares = new int[VeryHigh + 1];
int sum = 0;
sum += shares[VeryHigh] = (35 * sms_rate) / 100;
sum += shares[High] = (30 * sms_rate) / 100;
sum += shares[Medium] = (20 * sms_rate) / 100;
sum += shares[Low] = (10 * sms_rate) / 100;
sum += shares[VeryLow] = (5 * sms_rate) / 100;
shares[VeryHigh] += ((sms_rate - sum) > 0 ? sms_rate - sum : 0);
}
public static async Task<RedisValue> ProcessMessagesAsync(string stream, RedisValue id)
{
try
{
var messages = await db.StreamReadGroupAsync(stream, stream, myConsumerID, id, sms_rate);
bool has_pending = true;
if (messages.Length == 0)
{
id = ">";
messages = await db.StreamReadGroupAsync(stream, stream , myConsumerID, id, sms_rate);
has_pending = false;
}
foreach (var entry in messages)
{
Console.WriteLine("EXtraacting");
var messageId = entry.Id;
string? serializedMessage = entry.Values[0].Value.ToString();
Console.WriteLine(serializedMessage);
MessageDTO? message = JsonConvert.DeserializeObject<MessageDTO>(serializedMessage);
Console.WriteLine(message);
Writer.Writer.writeMessage(message);
db.StreamAcknowledge(stream, stream, messageId);
id = messageId;
}
if (!has_pending) id = ">";
return await Task.FromResult(id);
}
catch (Exception ex)
{
Console.WriteLine("Error while Reading or Acking");
return await Task.FromResult(id);
}
}
}
}
......@@ -4,7 +4,7 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace SimpleStreamConsumerTest
namespace MessagesConsumer
{
public class MessageDTO
{
......
using CSRedis;
using StackExchange.Redis;
using Newtonsoft.Json;
using MessagesConsumer.StreamsHandler;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
string redis_read = "localhost:6379";
try
{
TotalWorker.setAll(redis_read);
}
catch (Exception ex)
{
return;
}
string[] provs = new string[] { "MTN", "SYR" };
int turn = 0;
while (!token.IsCancellationRequested)
{
TotalWorker.work(provs[turn]);
turn ^= 1;
Console.WriteLine("\n\n\n\n");
TotalWorker.work(provs[turn]);
turn ^= 1;
Console.WriteLine("**************************************************");
await Task.Delay(1000);
}
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace MessagesConsumer.StreamsHandler
{
public class TotalWorker
{
public static Dictionary<string, RedisValue> lastId = new Dictionary<string, RedisValue>();
public static void setAll(string RedisRead)
{
if (!Extractor.Extractor.setDatabase(RedisRead))
{
Console.WriteLine("ERROR : " + RedisRead);
throw new Exception("ERROR : " + RedisRead);
}
}
public static void work(string stream)
{
RedisValue id = (lastId.ContainsKey(stream) ? lastId[stream] : "0-0");
RedisValue new_id = Extractor.Extractor.ProcessMessagesAsync(stream, id).Result;
lastId[stream] = new_id;
}
}
}
using Newtonsoft.Json;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace MessagesConsumer.Writer
{
public class Writer
{
public static void writeMessage(MessageDTO message)
{
Console.WriteLine("Received : ");
Console.WriteLine("Content " + message.text);
Console.WriteLine("Phone Number = " + message.phoneNumber);
}
}
}
......@@ -6,13 +6,13 @@
"compilationOptions": {},
"targets": {
".NETCoreApp,Version=v6.0": {
"ManualStreamConsumer/1.0.0": {
"MessagesConsumer/1.0.0": {
"dependencies": {
"CSRedisCore": "3.8.803",
"StackExchange.Redis": "2.8.0"
},
"runtime": {
"ManualStreamConsumer.dll": {}
"MessagesConsumer.dll": {}
}
},
"CSRedisCore/3.8.803": {
......@@ -78,7 +78,7 @@
}
},
"libraries": {
"ManualStreamConsumer/1.0.0": {
"MessagesConsumer/1.0.0": {
"type": "project",
"serviceable": false,
"sha512": ""
......
......@@ -11,12 +11,12 @@
using System;
using System.Reflection;
[assembly: System.Reflection.AssemblyCompanyAttribute("ManualStreamConsumer")]
[assembly: System.Reflection.AssemblyCompanyAttribute("MessagesConsumer")]
[assembly: System.Reflection.AssemblyConfigurationAttribute("Debug")]
[assembly: System.Reflection.AssemblyFileVersionAttribute("1.0.0.0")]
[assembly: System.Reflection.AssemblyInformationalVersionAttribute("1.0.0")]
[assembly: System.Reflection.AssemblyProductAttribute("ManualStreamConsumer")]
[assembly: System.Reflection.AssemblyTitleAttribute("ManualStreamConsumer")]
[assembly: System.Reflection.AssemblyProductAttribute("MessagesConsumer")]
[assembly: System.Reflection.AssemblyTitleAttribute("MessagesConsumer")]
[assembly: System.Reflection.AssemblyVersionAttribute("1.0.0.0")]
// Generated by the MSBuild WriteCodeFragment class.
......
......@@ -6,5 +6,5 @@ build_property.ProjectTypeGuids =
build_property.InvariantGlobalization =
build_property.PlatformNeutralAssembly =
build_property._SupportedPlatformList = Linux,macOS,Windows
build_property.RootNamespace = ManualStreamConsumer
build_property.ProjectDir = D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\
build_property.RootNamespace = MessagesConsumer
build_property.ProjectDir = D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\bin\Debug\net6.0\ManualStreamConsumer.exe
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\bin\Debug\net6.0\ManualStreamConsumer.deps.json
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\bin\Debug\net6.0\ManualStreamConsumer.runtimeconfig.json
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\bin\Debug\net6.0\ManualStreamConsumer.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\bin\Debug\net6.0\ref\ManualStreamConsumer.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\bin\Debug\net6.0\ManualStreamConsumer.pdb
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\bin\Debug\net6.0\CSRedisCore.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\bin\Debug\net6.0\Microsoft.Extensions.Logging.Abstractions.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\bin\Debug\net6.0\Newtonsoft.Json.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\bin\Debug\net6.0\Pipelines.Sockets.Unofficial.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\bin\Debug\net6.0\StackExchange.Redis.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\bin\Debug\net6.0\System.IO.Pipelines.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\obj\Debug\net6.0\ManualStreamConsumer.csproj.AssemblyReference.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\obj\Debug\net6.0\ManualStreamConsumer.GeneratedMSBuildEditorConfig.editorconfig
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\obj\Debug\net6.0\ManualStreamConsumer.AssemblyInfoInputs.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\obj\Debug\net6.0\ManualStreamConsumer.AssemblyInfo.cs
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\obj\Debug\net6.0\ManualStreamConsumer.csproj.CoreCompileInputs.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\obj\Debug\net6.0\ManualStreamConsumer.csproj.CopyComplete
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\obj\Debug\net6.0\ManualStreamConsumer.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\obj\Debug\net6.0\ref\ManualStreamConsumer.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\obj\Debug\net6.0\ManualStreamConsumer.pdb
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\ManualStreamConsumer\obj\Debug\net6.0\ManualStreamConsumer.genruntimeconfig.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\bin\Debug\net6.0\MessagesConsumer.exe
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\bin\Debug\net6.0\MessagesConsumer.deps.json
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\bin\Debug\net6.0\MessagesConsumer.runtimeconfig.json
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\bin\Debug\net6.0\MessagesConsumer.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\bin\Debug\net6.0\ref\MessagesConsumer.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\bin\Debug\net6.0\MessagesConsumer.pdb
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\bin\Debug\net6.0\CSRedisCore.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\bin\Debug\net6.0\Microsoft.Extensions.Logging.Abstractions.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\bin\Debug\net6.0\Newtonsoft.Json.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\bin\Debug\net6.0\Pipelines.Sockets.Unofficial.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\bin\Debug\net6.0\StackExchange.Redis.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\bin\Debug\net6.0\System.IO.Pipelines.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\obj\Debug\net6.0\MessagesConsumer.csproj.AssemblyReference.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\obj\Debug\net6.0\MessagesConsumer.GeneratedMSBuildEditorConfig.editorconfig
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\obj\Debug\net6.0\MessagesConsumer.AssemblyInfoInputs.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\obj\Debug\net6.0\MessagesConsumer.AssemblyInfo.cs
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\obj\Debug\net6.0\MessagesConsumer.csproj.CoreCompileInputs.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\obj\Debug\net6.0\MessagesConsumer.csproj.CopyComplete
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\obj\Debug\net6.0\MessagesConsumer.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\obj\Debug\net6.0\ref\MessagesConsumer.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\obj\Debug\net6.0\MessagesConsumer.pdb
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\MessagesConsumer\obj\Debug\net6.0\MessagesConsumer.genruntimeconfig.cache
{
"format": 1,
"restore": {
"D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\ManualStreamConsumer\\ManualStreamConsumer.csproj": {}
"D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\MessagesConsumer\\MessagesConsumer.csproj": {}
},
"projects": {
"D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\ManualStreamConsumer\\ManualStreamConsumer.csproj": {
"D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\MessagesConsumer\\MessagesConsumer.csproj": {
"version": "1.0.0",
"restore": {
"projectUniqueName": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\ManualStreamConsumer\\ManualStreamConsumer.csproj",
"projectName": "ManualStreamConsumer",
"projectPath": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\ManualStreamConsumer\\ManualStreamConsumer.csproj",
"projectUniqueName": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\MessagesConsumer\\MessagesConsumer.csproj",
"projectName": "MessagesConsumer",
"projectPath": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\MessagesConsumer\\MessagesConsumer.csproj",
"packagesPath": "C:\\Users\\moham\\.nuget\\packages\\",
"outputPath": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\ManualStreamConsumer\\obj\\",
"outputPath": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\MessagesConsumer\\obj\\",
"projectStyle": "PackageReference",
"fallbackFolders": [
"C:\\Program Files (x86)\\Microsoft Visual Studio\\Shared\\NuGetPackages"
......
......@@ -306,11 +306,11 @@
"project": {
"version": "1.0.0",
"restore": {
"projectUniqueName": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\ManualStreamConsumer\\ManualStreamConsumer.csproj",
"projectName": "ManualStreamConsumer",
"projectPath": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\ManualStreamConsumer\\ManualStreamConsumer.csproj",
"projectUniqueName": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\MessagesConsumer\\MessagesConsumer.csproj",
"projectName": "MessagesConsumer",
"projectPath": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\MessagesConsumer\\MessagesConsumer.csproj",
"packagesPath": "C:\\Users\\moham\\.nuget\\packages\\",
"outputPath": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\ManualStreamConsumer\\obj\\",
"outputPath": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\MessagesConsumer\\obj\\",
"projectStyle": "PackageReference",
"fallbackFolders": [
"C:\\Program Files (x86)\\Microsoft Visual Studio\\Shared\\NuGetPackages"
......
{
"version": 2,
"dgSpecHash": "6cENbdzNq8kT6/MlTWGw5Q0+MWX12aoV2sPjjfcu5+rOITIeeJ/byAYfge6HWDqZgh2RetSCaiWbDD9/Cjs8qA==",
"dgSpecHash": "GQ4ns18CgDP4y6o8cMaXGK95/thOYK3FsODffBLdndwbQv3Eiy8a02YzrREx+G3IdBo/BKK0bsWPtvss78X/NA==",
"success": true,
"projectFilePath": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\ManualStreamConsumer\\ManualStreamConsumer.csproj",
"projectFilePath": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\MessagesConsumer\\MessagesConsumer.csproj",
"expectedPackageFiles": [
"C:\\Users\\moham\\.nuget\\packages\\csrediscore\\3.8.803\\csrediscore.3.8.803.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\microsoft.extensions.logging.abstractions\\6.0.0\\microsoft.extensions.logging.abstractions.6.0.0.nupkg.sha512",
......
......@@ -10,19 +10,22 @@ namespace PriorityStream.Extractor
{
public class Extractor
{
private static string Provider = "SYR";
private static string groupName = "SYS_MSGS";
private static string myConsumerID = "syr-1";
private static string myConsumerID = "cons-1";
private static int LOW = 1;
private static int Medium = 2;
private static int High = 3;
private static int VeryLow = 1;
private static int Low = 2;
private static int Medium = 3;
private static int High = 4;
private static int VeryHigh = 5;
private static int sms_rate = 10;
private static int[] shares;
private static int sms_rate = 100;
private static IDatabase db = null;
public static bool setDatabase(string REDIS , int _sms_rate)
public static bool setDatabase(string REDIS , int _sms_rate = 100)
{
try
{
......@@ -30,6 +33,7 @@ namespace PriorityStream.Extractor
db = muxer.GetDatabase();
sms_rate = _sms_rate;
//Console.WriteLine("Got DB");
setShares();
return true;
}
catch (Exception ex)
......@@ -39,74 +43,95 @@ namespace PriorityStream.Extractor
}
}
public static List<MessageDTO> ExtractMessages(int priority)
{
string stream = Provider + "_" + priority;
if (priority == LOW)
{
return GetMessagesAsync(stream, 2).Result;
}
else if (priority == Medium)
{
return GetMessagesAsync(stream, 3).Result;
}
else if (priority == High)
{
return GetMessagesAsync(stream, 5).Result;
}
else
private static void setShares()
{
return new List<MessageDTO>();
}
shares = new int[VeryHigh + 1];
int sum = 0;
sum += shares[VeryHigh] = (35 * sms_rate) / 100;
sum += shares[High] = (30 * sms_rate) / 100;
sum += shares[Medium] = (20 * sms_rate) / 100;
sum += shares[Low] = (10 * sms_rate) / 100;
sum += shares[VeryLow] = (5 * sms_rate) / 100;
shares[VeryHigh] += ((sms_rate - sum) > 0 ? sms_rate - sum : 0);
}
private static async Task<List<MessageDTO>> GetMessagesAsync(string stream , int count)
public static async Task<RedisValue> ProcessMessagesAsync(string stream, RedisValue id)
{
try
{
List<RedisValue> msgsID = new List<RedisValue>();
int priority = getPriority(stream);
string write_stream = getWriteStream(stream);
int count = shares[priority];
List<MessageDTO> msgs = new List<MessageDTO>();
var messages = await db.StreamReadGroupAsync(stream, groupName, myConsumerID, ">", count, true);
var messages = await db.StreamReadGroupAsync(stream, groupName, myConsumerID, id, count);
bool has_pending = true;
if (messages.Length == 0)
{
id = ">";
messages = await db.StreamReadGroupAsync(stream, groupName, myConsumerID, id, count);
has_pending = false;
}
foreach (var entry in messages)
{
// Get the message ID
Console.WriteLine("EXtraacting");
var messageId = entry.Id;
msgsID.Add(messageId);
Console.WriteLine(messageId);
// Access the message data (serialized JSON)
//Console.WriteLine(messageId);
string? serializedMessage = entry.Values[0].Value.ToString();
Console.WriteLine(serializedMessage);
if (serializedMessage == null) continue;
// Deserialize the JSON back to a Message object (if needed)
MessageDTO? message = JsonConvert.DeserializeObject<MessageDTO>(serializedMessage);
if (message == null) continue;
msgs.Add(message);
// Process the message data
Console.WriteLine($"Message ID: {messageId}, Text: {message.msgId}, tag: {message.tag}");
}
Console.WriteLine(message);
try
bool success = await Writer.Writer.writeMessageAsync(message, write_stream);
if (success)
{
db.StreamAcknowledge(stream, stream, messageId);
id = messageId;
}
else
{
await db.StreamDeleteAsync(stream, msgsID.ToArray());
has_pending = true;
break;
}
}
if (!has_pending) id = ">";
return await Task.FromResult(id);
}
catch (Exception ex)
{
return await Task.FromResult(new List<MessageDTO>());
Console.WriteLine("Error while Reading or Acking");
return await Task.FromResult(id);
}
return await Task.FromResult(msgs);
}
catch (Exception ex)
private static string getWriteStream(string stream)
{
return await Task.FromResult(new List<MessageDTO>());
string[] write_prio = stream.Split("_");
return write_prio[0];
}
private static int getPriority(string stream)
{
string[] write_prio = stream.Split("_");
return int.Parse(write_prio[1]);
}
}
}
......@@ -2,108 +2,42 @@
using StackExchange.Redis;
using Newtonsoft.Json;
using PriorityStream;
using PriorityStream.Extractor;
using PriorityStream.Writer;
using PriorityStream.StreamsHandler;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
string RedisRead = "localhost:6379";
string RedisWrite = "localhost:6379";
string redis_read = "localhost:6379";
string redis_wite = "localhost:6379";
int[] levels = new int[] { 1, 2, 3 };
int sms_rate = 10;
if (!Extractor.setDatabase(RedisRead , sms_rate) )
try
{
Console.WriteLine("Error Connecting to Redis Read on host : " + RedisRead);
return;
TotalWorker.setAll(redis_read , redis_wite);
}
if (!Writer.setDatabase(RedisWrite))
catch (Exception ex)
{
Console.WriteLine("Error Connecting to Redis Write on host : " + RedisWrite);
return;
}
while (!token.IsCancellationRequested)
{
for (int level = 0; level < levels.Length; level++)
{
List<MessageDTO> messages = Extractor.ExtractMessages(level);
Writer.writeMessages(messages);
}
}
string[] provs = new string[] { "MTN", "SYR" };
int turn = 0;
int[] levels = new int[] { 1, 2, 3, 4, 5 };
/*
const int counttt = 1554; // at most reads (counttt) messages from a stream
var readManual = Task.Run(async () =>
while (!token.IsCancellationRequested)
{
List<RedisValue> msgsID = new List<RedisValue>();
while (!token.IsCancellationRequested)
{
var messages = await db.StreamReadGroupAsync(streamName, groupName, myConsumerID, ">", counttt, true);
//db.StreamDeleteAsync(streamName, msgsID);
foreach (var entry in messages)
{
// Get the message ID
var messageId = entry.Id;
msgsID.Add(messageId);
Console.WriteLine(messageId);
// Access the message data (serialized JSON)
string? serializedMessage = entry.Values[0].Value.ToString();
Console.WriteLine(serializedMessage);
if (serializedMessage == null) continue;
// Deserialize the JSON back to a Message object (if needed)
MessageDTO? message = JsonConvert.DeserializeObject<MessageDTO>(serializedMessage);
if (message == null) continue;
// Process the message data
Console.WriteLine($"Message ID: {messageId}, Text: {message.msgId}, tag: {message.tag}");
}
StreamInfo res32 = db.StreamInfo(streamName);
Console.WriteLine(res32.Length);
//Console.WriteLine($"length: {res32.Length}, radix-tree-keys: {res32.RadixTreeKeys}, radix-tree-nodes: {res32.RadixTreeNodes}, last-generated-id: {res32.LastGeneratedId}, first-entry: {$"{res32.FirstEntry.Id}: [{string.Join(", ", res32.FirstEntry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"}, last-entry: {$"{res32.LastEntry.Id}: [{string.Join(", ", res32.LastEntry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"}");
Console.WriteLine("*******************************************************\n\n\n\n\n\n");
try
{
var x = res32.FirstEntry.Id;
var y = res32.LastEntry.Id;
await db.StreamDeleteAsync(streamName, msgsID.ToArray());
}
catch (Exception ex)
{
}
finally
for (int i = 0; i < levels.Length ; i++)
{
await Task.Delay(1000);
}
TotalWorker.work(provs[turn]+"_"+levels[i]);
}
});
tokenSource.CancelAfter(TimeSpan.FromSeconds(300));
await Task.WhenAll(readManual);
/*StreamInfo res32 = db.StreamInfo(streamName);
Console.WriteLine($"length: {res32.Length}, radix-tree-keys: {res32.RadixTreeKeys}, radix-tree-nodes: {res32.RadixTreeNodes}, last-generated-id: {res32.LastGeneratedId}, first-entry: {$"{res32.FirstEntry.Id}: [{string.Join(", ", res32.FirstEntry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"}, last-entry: {$"{res32.LastEntry.Id}: [{string.Join(", ", res32.LastEntry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"}");
var x = res32.FirstEntry.Id;
var y = res32.LastEntry.Id;
RedisValue[] RedisValues = new RedisValue[2];
RedisValues[0] = x;
RedisValues[1] = y;
await db.StreamDeleteAsync(streamName , RedisValues);*/
turn ^= 1;
Console.WriteLine("**************************************************");
await Task.Delay(1000);
}
......
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace PriorityStream.StreamsHandler
{
public class TotalWorker
{
public static Dictionary<string, RedisValue> lastId = new Dictionary<string, RedisValue>();
public static void setAll(string RedisRead, string RedisWrite)
{
if (!Extractor.Extractor.setDatabase(RedisRead))
{
Console.WriteLine("ERROR : " + RedisRead);
throw new Exception("ERROR : " + RedisRead);
}
if (!Writer.Writer.setDatabase(RedisWrite))
{
Console.WriteLine("ERROR: " + RedisWrite);
throw new Exception("ERROR: " + RedisWrite);
}
}
public static void work(string stream)
{
RedisValue id = (lastId.ContainsKey(stream) ? lastId[stream] : "0-0");
RedisValue new_id = Extractor.Extractor.ProcessMessagesAsync(stream, id).Result;
lastId[stream] = new_id;
}
}
}
......@@ -8,10 +8,10 @@ using System.Threading.Tasks;
namespace PriorityStream.Writer
{
public class Writer
{
private static string Provider = "SYR";
private static string Provider2 = "MTN";
private static IDatabase db = null;
......@@ -21,8 +21,7 @@ namespace PriorityStream.Writer
{
var muxer = ConnectionMultiplexer.Connect(REDIS);
db = muxer.GetDatabase();
return
createConsumerGroup();
return createConsumerGroup();
}
catch (Exception ex)
{
......@@ -35,31 +34,54 @@ namespace PriorityStream.Writer
{
try
{
bool k1 = db.StreamCreateConsumerGroup(Provider,
bool k1 = db.StreamCreateConsumerGroup
(
Provider,
Provider,
"$",
true);
0,
true
);
bool k2 = db.StreamCreateConsumerGroup
(
Provider2,
Provider2,
0,
true
);
return true;
}
catch (Exception ex)
{
//Console.WriteLine(ex.Message);
return (ex.Message.Contains("already exists" , StringComparison.OrdinalIgnoreCase));
return (ex.Message.Contains("already exists", StringComparison.OrdinalIgnoreCase));
}
}
public static void writeMessages(List<MessageDTO> messages)
public static async Task<bool> writeMessageAsync(MessageDTO message, string provider)
{
foreach (var message in messages)
try
{
var serializedMessage = JsonConvert.SerializeObject(message);
db.StreamAddAsync
(Provider,
Console.WriteLine("Writing : " + message);
var temp = await db.StreamAddAsync
(provider,
new NameValueEntry[]
{
new NameValueEntry("message", serializedMessage)
});
return await Task.FromResult(true);
}
catch (Exception ex)
{
Console.WriteLine("Error while Writing");
return await Task.FromResult(false);
}
}
}
......
aaa077d8068e1f4d87f0231f32a17390df0a0505
785960f5a46d2d444ac21205319f075410058891
......@@ -18,24 +18,24 @@ namespace SchedulerNode.RedisQueuer
private static string MTN = "MTN";
private static int LEVELS = 6;
private static IDatabase db = null;
public static void init()
{
var redis = ConnectionMultiplexer.Connect(RedisURL);
var db = redis.GetDatabase();
for (int i=0; i < LEVELS; i++)
db = redis.GetDatabase();
for (int i=1; i < LEVELS; i++)
{
try
{
bool k1 = db.StreamCreateConsumerGroup(Syriatel+"_"+i.ToString(),
"SYS_MSGS",
"$",
0,
true);
bool k2 = db.StreamCreateConsumerGroup(MTN+"_"+i.ToString(),
"SYS_MSGS",
"$",
0,
true);
......@@ -55,7 +55,7 @@ namespace SchedulerNode.RedisQueuer
{
string id = "Error";
string temp = string.Empty;
if (message.Tag.Contains(Syriatel, StringComparison.OrdinalIgnoreCase))
//if (message.Tag.Contains(Syriatel, StringComparison.OrdinalIgnoreCase))
{
// get url using discovery client
var resid = addMessageRedisAsync(message, RedisURL);
......@@ -75,16 +75,12 @@ namespace SchedulerNode.RedisQueuer
{
try
{
var redis = ConnectionMultiplexer.Connect(URL);
string tag = getTag(ref message);
string streamName = tag + "_" + message.LocalPriority.ToString();
Console.WriteLine("stream name = " + streamName);
var db = redis.GetDatabase();
var serializedMessage = JsonConvert.SerializeObject(message);
Console.WriteLine("Sending to stream : " + streamName);
......@@ -121,8 +117,6 @@ namespace SchedulerNode.RedisQueuer
{
return MTN;
}
}
}
}
......@@ -20,6 +20,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="MongoDB.Driver" Version="2.27.0" />
</ItemGroup>
<PropertyGroup>
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment