Commit a8c794e9 authored by mohammad.salama's avatar mohammad.salama

all settings are in appsetting.json all working fine, some improvements were...

all settings are in appsetting.json all working fine, some improvements were made to separate sms_rate(s) , all exceptions so far handled, all subscribed to jaegar
parent 953e7cd3
......@@ -105,7 +105,17 @@
"ProjectGuid": "62d17724-6487-4586-9f2f-c2c0175280a5",
"DisplayName": "ScheduledMessagesHandler",
"ColorIndex": 3
},
"fd51e864-bf79-4d63-a31c-d017c0239c3a": {
"ProjectGuid": "fd51e864-bf79-4d63-a31c-d017c0239c3a",
"DisplayName": "PriorityStreamsExtractor",
"ColorIndex": 4
},
"8f0b7aed-41d9-4326-a17c-89057e166331": {
"ProjectGuid": "8f0b7aed-41d9-4326-a17c-89057e166331",
"DisplayName": "FinalMessagesConsumer",
"ColorIndex": 5
}
},
"NextColorIndex": 4
"NextColorIndex": 6
}
\ No newline at end of file
using Newtonsoft.Json;
using FinalMessagesConsumer.Initializer;
using Newtonsoft.Json;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
......@@ -10,12 +11,11 @@ namespace FinalMessagesConsumer.Extractor
{
public class Extractor
{
private static string myConsumerID = "cons-1";
private static int sms_rate = 100;
private static string myConsumerID = ReadRedisInfoParser.consumer_id;
private static IDatabase db = null;
private static Dictionary<string, int> _sms_rate = new Dictionary<string, int>();
/// <summary>
/// Gets Redis DataBase of which we shall consume messages while read
/// by sms_rate messages per second
......@@ -23,13 +23,15 @@ namespace FinalMessagesConsumer.Extractor
/// <param name="REDIS"></param>
/// <param name="_sms_rate"></param>
/// <returns>boolean : if we could connect to database or not</returns>
public static bool setDatabase(string REDIS , int _sms_rate = 100)
public static bool setDatabase(string REDIS)
{
try
{
var muxer = ConnectionMultiplexer.Connect(REDIS);
db = muxer.GetDatabase();
sms_rate = _sms_rate;
_sms_rate[ProvidersInfoParser.Syriatel] = ProvidersInfoParser.syr_rate;
_sms_rate[ProvidersInfoParser.MTN] = ProvidersInfoParser.mtn_rate;
return true;
}
catch (Exception ex)
......@@ -51,7 +53,7 @@ namespace FinalMessagesConsumer.Extractor
{
try
{
int sms_rate = _sms_rate[stream];
var messages = await db.StreamReadGroupAsync(stream, stream, myConsumerID, id, sms_rate);
bool has_pending = true;
......@@ -93,6 +95,7 @@ namespace FinalMessagesConsumer.Extractor
catch (Exception ex)
{
Console.WriteLine("Error while Reading or Acking");
Console.WriteLine(ex.Message);
return await Task.FromResult(id);
}
......
using FinalMessagesConsumer;
using FinalMessagesConsumer.Initializer;
using FinalMessagesConsumer.StreamsHandler;
IHost host = Host.CreateDefaultBuilder(args)
......@@ -7,11 +8,14 @@ IHost host = Host.CreateDefaultBuilder(args)
services.AddHostedService<Worker>();
})
.Build();
string redis_read = "localhost:6400";
IConfiguration config = host.Services.GetRequiredService<IConfiguration>();
Initializer.init(ref config);
string redis_read = ReadRedisInfoParser.connection;
try
{
TotalWorker.setAll(redis_read);
TotalWorker.setAll();
}
catch (Exception ex)
......
using StackExchange.Redis;
using FinalMessagesConsumer.Initializer;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
......@@ -14,6 +15,7 @@ namespace FinalMessagesConsumer.StreamsHandler
/// </summary>
public static Dictionary<string, RedisValue> lastId = new Dictionary<string, RedisValue>();
private static string RedisRead = ReadRedisInfoParser.connection;
/// <summary>
/// Sets the database from which we will extract messages.
......@@ -23,7 +25,7 @@ namespace FinalMessagesConsumer.StreamsHandler
/// </summary>
/// <param name="RedisRead"></param>
/// <exception cref="throws exeception if could not connect to redis database"> </exception>
public static void setAll(string RedisRead)
public static void setAll()
{
if (!Extractor.Extractor.setDatabase(RedisRead))
{
......
using FinalMessagesConsumer.Initializer;
using FinalMessagesConsumer.StreamsHandler;
namespace FinalMessagesConsumer
......@@ -13,25 +14,16 @@ namespace FinalMessagesConsumer
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
string[] provs = new string[] { "MTN", "SYR" };
string[] provs = new string[] { ProvidersInfoParser.Syriatel, ProvidersInfoParser.MTN };
int turn = 0;
while (true)
{
TotalWorker.work(provs[turn]);
turn ^= 1;
//Console.WriteLine("\n\n\n\n");
TotalWorker.work(provs[turn]);
turn ^= 1;
//Console.WriteLine("**************************************************");
await Task.Delay(1000);
await Task.Delay(100000);
}
}
}
......
......@@ -39,7 +39,6 @@ D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\FinalMessag
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\FinalMessagesConsumer\bin\Debug\net6.0\System.IO.Pipelines.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\FinalMessagesConsumer\bin\Debug\net6.0\runtimes\win\lib\net6.0\System.Diagnostics.EventLog.Messages.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\FinalMessagesConsumer\bin\Debug\net6.0\runtimes\win\lib\net6.0\System.Diagnostics.EventLog.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\FinalMessagesConsumer\obj\Debug\net6.0\FinalMessagesConsumer.csproj.AssemblyReference.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\FinalMessagesConsumer\obj\Debug\net6.0\FinalMessagesConsumer.GeneratedMSBuildEditorConfig.editorconfig
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\FinalMessagesConsumer\obj\Debug\net6.0\FinalMessagesConsumer.AssemblyInfoInputs.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\FinalMessagesConsumer\obj\Debug\net6.0\FinalMessagesConsumer.AssemblyInfo.cs
......@@ -49,3 +48,4 @@ D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\FinalMessag
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\FinalMessagesConsumer\obj\Debug\net6.0\ref\FinalMessagesConsumer.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\FinalMessagesConsumer\obj\Debug\net6.0\FinalMessagesConsumer.pdb
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\FinalMessagesConsumer\obj\Debug\net6.0\FinalMessagesConsumer.genruntimeconfig.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\FinalMessagesConsumer\obj\Debug\net6.0\FinalMessagesConsumer.csproj.AssemblyReference.cache
using GrpcMessageNode.Initializer;
using GrpcMessageNode.Services;
using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
......@@ -20,7 +21,10 @@ public class Program
builder.Services.AddGrpc();
builder.Services.AddDiscoveryClient(builder.Configuration);
const string serviceName = "http_2-protocol-node";
IConfiguration config = builder.Configuration;
Initializer.init(ref config);
string serviceName = ServiceNameParser.serviceName;
/*builder.Logging.AddOpenTelemetry(options =>
{
......
......@@ -100,6 +100,7 @@ D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\GrpcMessage
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\GrpcMessage\bin\Debug\net6.0\Swashbuckle.AspNetCore.SwaggerUI.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\GrpcMessage\bin\Debug\net6.0\System.Diagnostics.DiagnosticSource.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\GrpcMessage\bin\Debug\net6.0\System.Reflection.MetadataLoadContext.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\GrpcMessage\obj\Debug\net6.0\GrpcMessageNode.csproj.AssemblyReference.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\GrpcMessage\obj\Debug\net6.0\GrpcMessageNode.GeneratedMSBuildEditorConfig.editorconfig
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\GrpcMessage\obj\Debug\net6.0\GrpcMessageNode.AssemblyInfoInputs.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\GrpcMessage\obj\Debug\net6.0\GrpcMessageNode.AssemblyInfo.cs
......
......@@ -3,6 +3,7 @@ using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using HTTPMessageNode.Initializer;
var builder = WebApplication.CreateBuilder(args);
......@@ -13,8 +14,11 @@ builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddDiscoveryClient();
//builder.Configuration.GetValue<string>("database:conn");
const string serviceName = "http-protocol-node";
IConfiguration conf = builder.Configuration;
Initializer.init(ref conf);
string serviceName = ServiceNameParser.serviceName;
/*
builder.Logging.AddOpenTelemetry(options =>
{
......
using Newtonsoft.Json;
using PriorityStreamsExtractor.Initializer;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
......@@ -10,8 +11,8 @@ namespace PriorityStreamsExtractor.Extractor
{
public class Extractor
{
private static string groupName = "SYS_MSGS";
private static string myConsumerID = "cons-1";
private static string groupName = ReadRedisInfoParser.group_name;
private static string myConsumerID = ReadRedisInfoParser.consumer_id;
private static int VeryLow = 1;
private static int Low = 2;
......@@ -19,9 +20,9 @@ namespace PriorityStreamsExtractor.Extractor
private static int High = 4;
private static int VeryHigh = 5;
private static int[] shares;
private static Dictionary<string, int[]> shares = new Dictionary<string, int[]>();
private static int sms_rate = 100;
private static Dictionary<string, int> sms_rate = new Dictionary<string, int>();
private static IDatabase db = null;
......@@ -31,15 +32,15 @@ namespace PriorityStreamsExtractor.Extractor
/// by associating number of messages limit to read to each priority which are precentage of sms_rate
/// </summary>
/// <param name="REDIS"></param>
/// <param name="_sms_rate"></param>
/// <returns>boolean : if we could connect to database or not</returns>
public static bool setDatabase(string REDIS , int _sms_rate = 100)
public static bool setDatabase(string REDIS)
{
try
{
var muxer = ConnectionMultiplexer.Connect(REDIS);
db = muxer.GetDatabase();
sms_rate = _sms_rate;
sms_rate[ProvidersInfoParser.Syriatel] = ProvidersInfoParser.syr_rate;
sms_rate[ProvidersInfoParser.MTN] = ProvidersInfoParser.mtn_rate;
//Console.WriteLine("Got DB");
setShares();
return true;
......@@ -53,15 +54,19 @@ namespace PriorityStreamsExtractor.Extractor
private static void setShares()
{
shares = new int[VeryHigh + 1];
foreach (KeyValuePair <string,int> entry in sms_rate)
{
string provider = entry.Key;
int rate = entry.Value;
int sum = 0;
sum += shares[VeryHigh] = (35 * sms_rate) / 100;
sum += shares[High] = (30 * sms_rate) / 100;
sum += shares[Medium] = (20 * sms_rate) / 100;
sum += shares[Low] = (10 * sms_rate) / 100;
sum += shares[VeryLow] = (5 * sms_rate) / 100;
shares[VeryHigh] += ((sms_rate - sum) > 0 ? sms_rate - sum : 0);
shares[provider] = new int[VeryHigh + 1];
sum += shares[provider][VeryHigh] = (35 * rate) / 100;
sum += shares[provider][High] = (30 * rate) / 100;
sum += shares[provider][Medium] = (20 * rate) / 100;
sum += shares[provider][Low] = (10 * rate) / 100;
sum += shares[provider][VeryLow] = (5 * rate) / 100;
shares[provider][VeryHigh] += ((rate - sum) > 0 ? rate - sum : 0);
}
}
/// <summary>
......@@ -80,7 +85,7 @@ namespace PriorityStreamsExtractor.Extractor
string write_stream = getWriteStream(stream);
int count = shares[priority];
int count = shares[write_stream][priority];
var messages = await db.StreamReadGroupAsync(stream, groupName, myConsumerID, id, count);
......
......@@ -10,23 +10,28 @@ namespace PriorityStreamsExtractor.Initializer
{
public static string MTN = "";
public static string Syriatel = "";
public static int mtn_rate = 0;
public static int syr_rate = 0;
public static void setInfo(ref IConfiguration config)
{
var syr_sect = config.GetSection("RedisInfo").GetSection("Providors").GetSection("Syriatel");
var mtn_sect = config.GetSection("RedisInfo").GetSection("Providors").GetSection("MTN");
string? stag = syr_sect.GetSection("Tag").Value;
string? srate = syr_sect.GetSection("sms_rate").Value;
string? mtag = mtn_sect.GetSection("Tag").Value;
string? mrate = mtn_sect.GetSection("sms_rate").Value;
if (stag == null || mtag == null)
if (stag == null || srate == null || mtag == null || mrate == null)
{
throw new ArgumentException("Providers Full Info (Names(Tags) + Sms_Rate) Not Defined in appsettings.json");
}
MTN = mtag;
Syriatel = stag;
mtn_rate = int.Parse(mrate);
syr_rate = int.Parse(srate);
}
}
}
......@@ -11,12 +11,13 @@ IHost host = Host.CreateDefaultBuilder(args)
IConfiguration config = host.Services.GetRequiredService<IConfiguration>();
Initializer.init(ref config);
string redis_read = "localhost:6379";
string redis_wite = "localhost:6400";
string redis_read = ReadRedisInfoParser.connection;
string redis_wite = WriteRedisInfoParser.connection;
try
{
TotalWorker.setAll(redis_read, redis_wite);
TotalWorker.setAll();
}
catch (Exception ex)
......
using StackExchange.Redis;
using PriorityStreamsExtractor.Initializer;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
......@@ -22,7 +23,12 @@ namespace PriorityStreamsExtractor.StreamsHandler
/// </summary>
/// <param name="RedisRead"></param>
/// <exception cref="throws exeception if could not connect to redis database"> </exception>
public static void setAll(string RedisRead, string RedisWrite)
private static readonly string RedisRead = ReadRedisInfoParser.connection;
private static readonly string RedisWrite = WriteRedisInfoParser.connection;
public static void setAll()
{
if (!Extractor.Extractor.setDatabase(RedisRead))
{
......
using PriorityStreamsExtractor.Initializer;
using PriorityStreamsExtractor.StreamsHandler;
namespace PriorityStreamsExtractor
......@@ -13,7 +14,7 @@ namespace PriorityStreamsExtractor
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
string[] provs = new string[] { "MTN", "SYR" };
string[] provs = new string[] { ProvidersInfoParser.Syriatel, ProvidersInfoParser.MTN };
int turn = 0;
int[] levels = new int[] { 1, 2, 3, 4, 5 };
......
using Newtonsoft.Json;
using PriorityStreamsExtractor.Initializer;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
......@@ -10,8 +11,6 @@ namespace PriorityStreamsExtractor.Writer
{
public class Writer
{
private static string Provider = "SYR";
private static string Provider2 = "MTN";
/// <summary>
/// Stream Max Length Set to One Hundred Million which means stores only one
/// Hundred Million Messages in a stream, if more messages came , old ones are erased
......@@ -48,16 +47,16 @@ namespace PriorityStreamsExtractor.Writer
{
bool k1 = db.StreamCreateConsumerGroup
(
Provider,
Provider,
ProvidersInfoParser.Syriatel,
ProvidersInfoParser.Syriatel,
0,
true
);
bool k2 = db.StreamCreateConsumerGroup
(
Provider2,
Provider2,
ProvidersInfoParser.MTN,
ProvidersInfoParser.MTN,
0,
true
);
......
using MongoDB.Bson;
using MongoDB.Driver;
using Scheduler.Initializer;
using SchedulerNode;
using SchedulerNode.RedisQueuer;
using SchedulerNode.Services;
......@@ -8,12 +9,12 @@ namespace Scheduler.MongoMessages
{
public class MongoMessagesShceduler
{
private static string MONGODB = "mongodb://127.0.0.1:27017";
private static string DataBaseName = "scheduled-messages";
private static string myCollection = "messages";
public static string ConnectionError = "Error Connecting to MongoDB on : " + MONGODB;
private static IMongoCollection<BsonDocument> collection;
private static string MongoURL = MessagesMongoDBParser.connection;
private static string DataBaseName = MessagesMongoDBParser.database;
private static string myCollection = MessagesMongoDBParser.collection;
private static IMongoCollection<BsonDocument> collection;
public static string ConnectionError = "Error Connecting to MongoDB on : " + MongoURL;
private static string NotTaken = "Not-Taken";
/// <summary>
......@@ -25,7 +26,7 @@ namespace Scheduler.MongoMessages
{
try
{
var client = new MongoClient(MONGODB);
var client = new MongoClient(MongoURL);
var database = client.GetDatabase(DataBaseName);
......
......@@ -18,11 +18,12 @@ builder.Services.AddGrpc();
builder.Services.AddDiscoveryClient();
IConfiguration config = builder.Configuration;
Initializer.init(ref config);
MessageQueues.init();
MongoMessagesShceduler.init();
const string serviceName = "Scheduler-1";
string serviceName = ServiceNameParser.serviceName;
/*
builder.Logging.AddOpenTelemetry(options =>
{
......
using Newtonsoft.Json;
using Scheduler.Initializer;
using StackExchange.Redis;
using Steeltoe.Discovery;
using System;
......@@ -11,15 +12,15 @@ namespace SchedulerNode.RedisQueuer
{
public class MessageQueues
{
private static string RedisURL = "localhost:6379";
public static string RedisConnectionError = "Error Writing to Redis";
private readonly static string RedisURL = RedisInfoParser.connection;
private static string Syriatel = "SYR";
private static string MTN = "MTN";
private readonly static string Syriatel = RedisInfoParser.Syriatel;
private readonly static string MTN = RedisInfoParser.MTN;
private static int LEVELS = 6;
private readonly static int LEVELS = 6;
private static int StreamMaxLength = 100000000;
private static IDatabase db = null;
public readonly static string RedisConnectionError = "Error Writing to Redis";
/// <summary>
/// Connects to Redis Stream With Streams for Each Priority and create consuming groups
......
......@@ -5,6 +5,7 @@
public static void init(ref IConfiguration conf)
{
RedisInfoParser.setInfo(ref conf);
MessagesMongoDBParser.setStorageDB(ref conf);
}
}
}
namespace ScheduledMessagesHandler.Initializer
{
public class MessagesMongoDBParser
{
public static string connection = "";
public static string database = "";
public static string collection = "";
public static void setStorageDB(ref IConfiguration config)
{
string? conn = config.GetSection("MessageStorage").GetSection("Mongo").Value;
string? namedb = config.GetSection("MessageStorage").GetSection("DBName").Value;
string? colldb = config.GetSection("MessageStorage").GetSection("collection").Value;
if (conn == null || namedb == null || colldb == null)
{
throw new ArgumentException("Messages MongoDB DataBase Full Information (DBName + Collection + Not Defined in appsettings.json");
}
connection = conn;
database = namedb;
collection = colldb;
}
}
}
using MongoDB.Bson;
using MongoDB.Driver;
using ScheduledMessagesHandler.Initializer;
using ScheduledMessagesHandler.RedisQueuer;
namespace ScheduledMessagesHandler.MongoMessages
{
public class MongoMessagesShceduler
{
private static string MONGODB = "mongodb://127.0.0.1:27017";
private static string DataBaseName = "scheduled-messages";
private static string myCollection = "messages";
public static string ConnectionError = "Error Connecting to MongoDB on : " + MONGODB;
private static string MongoURL = MessagesMongoDBParser.connection;
private static string DataBaseName = MessagesMongoDBParser.database;
private static string myCollection = MessagesMongoDBParser.collection;
public static string ConnectionError = "Error Connecting to MongoDB on : " + MongoURL;
private static IMongoCollection<BsonDocument> collection;
private static string Pending = "Pending";
private static string Acked = "Acked";
......@@ -25,7 +28,7 @@ namespace ScheduledMessagesHandler.MongoMessages
{
try
{
var client = new MongoClient(MONGODB);
var client = new MongoClient(MongoURL);
var database = client.GetDatabase(DataBaseName);
......
......@@ -16,9 +16,9 @@ IHost host = Host.CreateDefaultBuilder(args)
.Build();
config = host.Services.GetRequiredService<IConfiguration>();
Initializer.init(ref config);
//host.InitRedis();
MessageQueues.init();
MongoMessagesShceduler.init();
......
using Newtonsoft.Json;
using ScheduledMessagesHandler;
using ScheduledMessagesHandler.Initializer;
using StackExchange.Redis;
using Steeltoe.Discovery;
......@@ -7,11 +8,11 @@ namespace ScheduledMessagesHandler.RedisQueuer
{
public class MessageQueues
{
private static string RedisURL = "localhost:6379";
public static string RedisConnectionError = "Error Writing to Redis";
private readonly static string RedisURL = RedisInfoParser.connection;
private static string Syriatel = "SYR";
private static string MTN = "MTN";
private readonly static string Syriatel = RedisInfoParser.Syriatel;
private readonly static string MTN = RedisInfoParser.MTN;
public readonly static string RedisConnectionError = "Error Writing to Redis";
private static int LEVELS = 6;
private static int StreamMaxLength = 100000000;
......
......@@ -11,5 +11,10 @@
"Syriatel": "SYR",
"MTN": "MTN"
}
},
"MessageStorage": {
"Mongo": "mongodb://127.0.0.1:27017",
"DBName": "scheduled-messages",
"collection": "messages"
}
}
......@@ -11,5 +11,10 @@
"Syriatel": "SYR",
"MTN": "MTN"
}
},
"MessageStorage": {
"Mongo": "mongodb://127.0.0.1:27017",
"DBName": "scheduled-messages",
"collection": "messages"
}
}
945f7764f755b8eee0042b69b0267d4e6f4266bb
0f1a2ce0938f1ef7cfc97270642d94a615e90ff5
namespace Validator.MongoDBAccess
using Validator.Initializer;
namespace Validator.MongoDBAccess
{
public class InformationHolder
{
private static string MongoDB = "localhost:6565";
private static readonly string URL = AccountsDBParser.connection;
private static readonly string DBName = AccountsDBParser.DBName;
private static readonly string collection = AccountsDBParser.collection;
public static void checkMessage(MessageMetaData metaData)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment