Commit a7e6bfd6 authored by mohammad.salama's avatar mohammad.salama

Added Documentaion with some variables names changes

parent 03e28384
...@@ -15,12 +15,11 @@ namespace GrpcMessageNode.DataBaseAccess ...@@ -15,12 +15,11 @@ namespace GrpcMessageNode.DataBaseAccess
/// <summary> /// <summary>
/// check message for tag , Authentication , Quota perhaps from service validator with address /// Validate Message by its MetaData (quota - api key ...) by a Validator Node
/// passed as a parameter to the function
/// </summary> /// </summary>
/// <param name="message" > the message we received to check</param> /// <param name="message" > the message we received to check</param>
/// <param name="validatorAddress"> The validator service address which as a mongodb service </param> /// <param name="validatorAddress"> The validator service address which as a mongodb service </param>
/// <returns> validator reply which is ok if we can proceed with account priority</returns> /// <returns> string : Success or an Error</returns>
public static string getPriority(ref Message message , string validatorAddress) public static string getPriority(ref Message message , string validatorAddress)
{ {
Console.WriteLine("Validating NOW IN GRPC !! "); Console.WriteLine("Validating NOW IN GRPC !! ");
...@@ -39,6 +38,12 @@ namespace GrpcMessageNode.DataBaseAccess ...@@ -39,6 +38,12 @@ namespace GrpcMessageNode.DataBaseAccess
return OK; return OK;
} }
/// <summary>
/// Calls the Validator Service and Gets the Reply
/// </summary>
/// <param name="message"></param>
/// <param name="validatorAddress"></param>
/// <returns>Validator Reply</returns>
private static ValidatorReply ValidateAsync(Message message, string validatorAddress) private static ValidatorReply ValidateAsync(Message message, string validatorAddress)
{ {
using var channel = GrpcChannel.ForAddress(validatorAddress); using var channel = GrpcChannel.ForAddress(validatorAddress);
...@@ -66,6 +71,12 @@ namespace GrpcMessageNode.DataBaseAccess ...@@ -66,6 +71,12 @@ namespace GrpcMessageNode.DataBaseAccess
} }
/// <summary>
/// Extracts Message MetaData from a Message that Concerns us with the Validation Process
/// </summary>
/// <param name="message"></param>
/// <returns>Message Meta Data</returns>
private static MessageMetaData extractMetaData(Message message) private static MessageMetaData extractMetaData(Message message)
{ {
MessageMetaData metaData = new MessageMetaData(); MessageMetaData metaData = new MessageMetaData();
......
...@@ -8,6 +8,14 @@ namespace GrpcMessageNode.LoadBalancer ...@@ -8,6 +8,14 @@ namespace GrpcMessageNode.LoadBalancer
private static Random random = new Random(); private static Random random = new Random();
private static int offset = random.Next(200000,int.MaxValue); private static int offset = random.Next(200000,int.MaxValue);
/// <summary>
/// Utilizes the Eureka Service to get Address of A Node by its name
/// with client-side load-balancing
/// </summary>
/// <param name="instanceName">name of service (node) we are looking for</param>
/// <param name="discoveryClient"></param>
/// <returns>string : address of the node or error message</returns>
public static string getAddressOfInstance(string instanceName , ref IDiscoveryClient discoveryClient) public static string getAddressOfInstance(string instanceName , ref IDiscoveryClient discoveryClient)
{ {
string address = ""; string address = "";
......
...@@ -8,7 +8,13 @@ ...@@ -8,7 +8,13 @@
private static int MAX_PRIRITY = 5; private static int MAX_PRIRITY = 5;
private static int MIN_PRIRITY = 1; private static int MIN_PRIRITY = 1;
/// <summary>
/// Send the Message to A Valuidtor Node wich validate the message and gets the
/// Account Priority then sets the final priority of the message
/// </summary>
/// <param name="message"></param>
/// <param name="validatorAddress"></param>
/// <returns>String : Status of the Validation Request</returns>
public static string setFinalPriority(ref Message message , string validatorAddress) public static string setFinalPriority(ref Message message , string validatorAddress)
{ {
string res = DataBaseAccess.DBAccess.getPriority(ref message , validatorAddress); string res = DataBaseAccess.DBAccess.getPriority(ref message , validatorAddress);
......
...@@ -59,6 +59,12 @@ namespace GrpcMessageNode.Services ...@@ -59,6 +59,12 @@ namespace GrpcMessageNode.Services
return Task.FromResult(new Acknowledgement(reply)); return Task.FromResult(new Acknowledgement(reply));
} }
/// <summary>
/// Selects a Coordinator (Scheduler Node) with client side
/// load-balancing and send the message to it
/// </summary>
/// <param name="message"></param>
/// <returns>Acknowledgment of the Message containing the status (Success + Failure)</returns>
private Acknowledgement sendToCoordinator(Message message) private Acknowledgement sendToCoordinator(Message message)
{ {
string address = LoadBalancer.AddressResolver.getAddressOfInstance(Scheduler , ref discoveryClient); string address = LoadBalancer.AddressResolver.getAddressOfInstance(Scheduler , ref discoveryClient);
...@@ -93,16 +99,6 @@ namespace GrpcMessageNode.Services ...@@ -93,16 +99,6 @@ namespace GrpcMessageNode.Services
} }
} }
//not working -- causing unknown exception with nullable parameter http2
/*private Queue.QueueClient getQueueClient()
{
string address = LoadBalancer.AddressResolver.getAddressOfInstance(Scheduler , ref discoveryClient);
using var channel = GrpcChannel.ForAddress(address);
//Console.WriteLine("Scheduler Address = " + address);
var client = new Queue.QueueClient(channel);
return client;
}*/
private Message2 copyMessage(Message message) private Message2 copyMessage(Message message)
{ {
Message2 message2 = new Message2(); Message2 message2 = new Message2();
...@@ -115,24 +111,5 @@ namespace GrpcMessageNode.Services ...@@ -115,24 +111,5 @@ namespace GrpcMessageNode.Services
message2.Tag = message.Tag; message2.Tag = message.Tag;
return message2; return message2;
} }
/*private string getAddressOfInstance(string instanceName)
{
string address = "";
try
{
// instanceName = "Validator" or "Scheduler" ... etc
var y = discoveryClient.GetInstances(instanceName); /// write names to config file
address = y[0].Uri.ToString();
return address;
}
catch (Exception ex)
{
return ErrorConnection;
}
}*/
} }
} }
...@@ -25,7 +25,6 @@ namespace HTTPMessageNode.Controllers ...@@ -25,7 +25,6 @@ namespace HTTPMessageNode.Controllers
//Console.WriteLine("Conf = " + configuration.GetValue<string>("database:conn")); //Console.WriteLine("Conf = " + configuration.GetValue<string>("database:conn"));
} }
//[HttpGet(Name = "GetWeatherForecast")]
[Route("/queue-msg")] [Route("/queue-msg")]
[HttpPost] [HttpPost]
public Acknowledgement SendMessage([FromBody] MessageDTO messageDTO) public Acknowledgement SendMessage([FromBody] MessageDTO messageDTO)
...@@ -98,6 +97,11 @@ namespace HTTPMessageNode.Controllers ...@@ -98,6 +97,11 @@ namespace HTTPMessageNode.Controllers
} }
/// <summary>
/// Cast MessageDTO to A Message and Return it to Send it Through GRPC
/// </summary>
/// <param name="messageDTO"></param>
/// <returns>Message with Members Values identical to the members of MessageDTO</returns>
private Message copyMessage(MessageDTO messageDTO) private Message copyMessage(MessageDTO messageDTO)
{ {
Message message = new Message(); Message message = new Message();
...@@ -116,23 +120,5 @@ namespace HTTPMessageNode.Controllers ...@@ -116,23 +120,5 @@ namespace HTTPMessageNode.Controllers
return message; return message;
} }
/*private string getAddressOfInstance(string instanceName)
{
string address = "";
try
{
// instanceName = "Validator" or "Scheduler" ... etc
var y = discoveryClient.GetInstances(instanceName); /// write names to config file
address = y[0].Uri.ToString();
return address;
}
catch (Exception ex)
{
return ErrorConnection;
}
}*/
} }
} }
\ No newline at end of file
...@@ -20,12 +20,11 @@ namespace HTTPMessageNode.DataBaseAccess ...@@ -20,12 +20,11 @@ namespace HTTPMessageNode.DataBaseAccess
/// <summary> /// <summary>
/// check message for tag , Authentication , Quota perhaps from service validator with address /// Validate Message by its MetaData (quota - api key ...) by a Validator Node
/// passed as a parameter to the function
/// </summary> /// </summary>
/// <param name="message" > the message we received to check</param> /// <param name="message" > the message we received to check</param>
/// <param name="validatorAddress"> The validator service address which as a mongodb service </param> /// <param name="validatorAddress"> The validator service address which as a mongodb service </param>
/// <returns> validator reply which is ok if we can proceed with account priority</returns> /// <returns> string : Success or an Error</returns>
public static string getPriority(ref Message message, string validatorAddress) public static string getPriority(ref Message message, string validatorAddress)
{ {
//Console.WriteLine("Validating NOW IN GRPC !! "); //Console.WriteLine("Validating NOW IN GRPC !! ");
...@@ -45,6 +44,12 @@ namespace HTTPMessageNode.DataBaseAccess ...@@ -45,6 +44,12 @@ namespace HTTPMessageNode.DataBaseAccess
return OK; return OK;
} }
/// <summary>
/// Calls the Validator Service and Gets the Reply
/// </summary>
/// <param name="message"></param>
/// <param name="validatorAddress"></param>
/// <returns>Validator Reply</returns>
private static ValidatorReply ValidateAsync(Message message, string validatorAddress) private static ValidatorReply ValidateAsync(Message message, string validatorAddress)
{ {
using var channel = GrpcChannel.ForAddress(validatorAddress); using var channel = GrpcChannel.ForAddress(validatorAddress);
...@@ -68,6 +73,11 @@ namespace HTTPMessageNode.DataBaseAccess ...@@ -68,6 +73,11 @@ namespace HTTPMessageNode.DataBaseAccess
} }
/// <summary>
/// Extracts Message MetaData from a Message that Concerns us with the Validation Process
/// </summary>
/// <param name="message"></param>
/// <returns>Message Meta Data</returns>
private static MessageMetaData extractMetaData(Message message) private static MessageMetaData extractMetaData(Message message)
{ {
MessageMetaData metaData = new MessageMetaData(); MessageMetaData metaData = new MessageMetaData();
......
...@@ -9,6 +9,14 @@ namespace HTTPMessageNode.LoadBalancer ...@@ -9,6 +9,14 @@ namespace HTTPMessageNode.LoadBalancer
private static Random random = new Random(); private static Random random = new Random();
private static int offset = random.Next(200000,int.MaxValue); private static int offset = random.Next(200000,int.MaxValue);
/// <summary>
/// Utilizes the Eureka Service to get Address of A Node by its name
/// with client-side load-balancing
/// </summary>
/// <param name="instanceName">name of service (node) we are looking for</param>
/// <param name="discoveryClient"></param>
/// <returns>string : address of the node or error message</returns>
public static string getAddressOfInstance(string instanceName , ref IDiscoveryClient discoveryClient) public static string getAddressOfInstance(string instanceName , ref IDiscoveryClient discoveryClient)
{ {
string address = ""; string address = "";
......
...@@ -7,6 +7,13 @@ ...@@ -7,6 +7,13 @@
*/ */
private static int MAX_PRIRITY = 5; private static int MAX_PRIRITY = 5;
private static int MIN_PRIRITY = 1; private static int MIN_PRIRITY = 1;
/// <summary>
/// Send the Message to A Valuidtor Node wich validate the message and gets the
/// Account Priority then sets the final priority of the message
/// </summary>
/// <param name="message"></param>
/// <param name="validatorAddress"></param>
/// <returns>String : Status of the Validation Request</returns>
public static string setFinalPriority(ref Message message, string validatorAddress) public static string setFinalPriority(ref Message message, string validatorAddress)
{ {
string res = DataBaseAccess.DBAccess.getPriority(ref message, validatorAddress); string res = DataBaseAccess.DBAccess.getPriority(ref message, validatorAddress);
......
...@@ -12,18 +12,17 @@ namespace MessagesConsumer.Extractor ...@@ -12,18 +12,17 @@ namespace MessagesConsumer.Extractor
{ {
private static string myConsumerID = "cons-1"; private static string myConsumerID = "cons-1";
private static int VeryLow = 1;
private static int Low = 2;
private static int Medium = 3;
private static int High = 4;
private static int VeryHigh = 5;
private static int[] shares;
private static int sms_rate = 100; private static int sms_rate = 100;
private static IDatabase db = null; private static IDatabase db = null;
/// <summary>
/// Gets Redis DataBase of which we shall consume messages while read
/// by sms_rate messages per second
/// </summary>
/// <param name="REDIS"></param>
/// <param name="_sms_rate"></param>
/// <returns>boolean : if we could connect to database or not</returns>
public static bool setDatabase(string REDIS , int _sms_rate = 100) public static bool setDatabase(string REDIS , int _sms_rate = 100)
{ {
try try
...@@ -31,8 +30,6 @@ namespace MessagesConsumer.Extractor ...@@ -31,8 +30,6 @@ namespace MessagesConsumer.Extractor
var muxer = ConnectionMultiplexer.Connect(REDIS); var muxer = ConnectionMultiplexer.Connect(REDIS);
db = muxer.GetDatabase(); db = muxer.GetDatabase();
sms_rate = _sms_rate; sms_rate = _sms_rate;
//Console.WriteLine("Got DB");
setShares();
return true; return true;
} }
catch (Exception ex) catch (Exception ex)
...@@ -42,20 +39,14 @@ namespace MessagesConsumer.Extractor ...@@ -42,20 +39,14 @@ namespace MessagesConsumer.Extractor
} }
} }
private static void setShares() /// <summary>
{ /// Reads Messages from Stream starting with the id provided (its track is kept by the TotalWorker)
shares = new int[VeryHigh + 1]; /// After Extracting Messages they are prcocessed and Written throgh the WRITER
int sum = 0; /// then Acked
sum += shares[VeryHigh] = (35 * sms_rate) / 100; /// </summary>
sum += shares[High] = (30 * sms_rate) / 100; /// <param name="stream"></param>
sum += shares[Medium] = (20 * sms_rate) / 100; /// <param name="id"></param>
sum += shares[Low] = (10 * sms_rate) / 100; /// <returns></returns>
sum += shares[VeryLow] = (5 * sms_rate) / 100;
shares[VeryHigh] += ((sms_rate - sum) > 0 ? sms_rate - sum : 0);
}
public static async Task<RedisValue> ProcessMessagesAsync(string stream, RedisValue id) public static async Task<RedisValue> ProcessMessagesAsync(string stream, RedisValue id)
{ {
try try
...@@ -106,5 +97,6 @@ namespace MessagesConsumer.Extractor ...@@ -106,5 +97,6 @@ namespace MessagesConsumer.Extractor
} }
} }
} }
} }
...@@ -9,7 +9,20 @@ namespace MessagesConsumer.StreamsHandler ...@@ -9,7 +9,20 @@ namespace MessagesConsumer.StreamsHandler
{ {
public class TotalWorker public class TotalWorker
{ {
/// <summary>
/// holds the last id we Acked for each stream
/// </summary>
public static Dictionary<string, RedisValue> lastId = new Dictionary<string, RedisValue>(); public static Dictionary<string, RedisValue> lastId = new Dictionary<string, RedisValue>();
/// <summary>
/// Sets the database from which we will extract messages.
/// this database the redis database that Priority Stream Service Writes To.
/// Here are the Messages all sorted and set and we only read (extract them)
/// then write them to whatever interface we want.
/// </summary>
/// <param name="RedisRead"></param>
/// <exception cref="throws exeception if could not connect to redis database"> </exception>
public static void setAll(string RedisRead) public static void setAll(string RedisRead)
{ {
if (!Extractor.Extractor.setDatabase(RedisRead)) if (!Extractor.Extractor.setDatabase(RedisRead))
...@@ -18,6 +31,13 @@ namespace MessagesConsumer.StreamsHandler ...@@ -18,6 +31,13 @@ namespace MessagesConsumer.StreamsHandler
throw new Exception("ERROR : " + RedisRead); throw new Exception("ERROR : " + RedisRead);
} }
} }
/// <summary>
/// Extract Messages from stream while ensuring ALL MESSAGES ARE READ
/// Keeps Track of last Acked ID in each stream
/// </summary>
/// <param name="stream"></param>
public static void work(string stream) public static void work(string stream)
{ {
RedisValue id = (lastId.ContainsKey(stream) ? lastId[stream] : "0-0"); RedisValue id = (lastId.ContainsKey(stream) ? lastId[stream] : "0-0");
......
...@@ -25,6 +25,14 @@ namespace PriorityStream.Extractor ...@@ -25,6 +25,14 @@ namespace PriorityStream.Extractor
private static IDatabase db = null; private static IDatabase db = null;
/// <summary>
/// Gets Redis DataBase of which we shall consume messages while read and it is the same redis that
/// Scheduler Nodes Writes To
/// by associating number of messages limit to read to each priority which are precentage of sms_rate
/// </summary>
/// <param name="REDIS"></param>
/// <param name="_sms_rate"></param>
/// <returns>boolean : if we could connect to database or not</returns>
public static bool setDatabase(string REDIS , int _sms_rate = 100) public static bool setDatabase(string REDIS , int _sms_rate = 100)
{ {
try try
...@@ -56,7 +64,14 @@ namespace PriorityStream.Extractor ...@@ -56,7 +64,14 @@ namespace PriorityStream.Extractor
shares[VeryHigh] += ((sms_rate - sum) > 0 ? sms_rate - sum : 0); shares[VeryHigh] += ((sms_rate - sum) > 0 ? sms_rate - sum : 0);
} }
/// <summary>
/// Reads Messages from Stream starting with the id provided (its track is kept by the StreamsHandler.TotalWorker)
/// After Extracting Messages they are prcocessed and Written through the WRITER to
/// another Redis where MessageConsumer Services Reads them, then Ack Messages after Writing Them
/// </summary>
/// <param name="stream"></param>
/// <param name="id"></param>
/// <returns></returns>
public static async Task<RedisValue> ProcessMessagesAsync(string stream, RedisValue id) public static async Task<RedisValue> ProcessMessagesAsync(string stream, RedisValue id)
{ {
try try
......
...@@ -9,7 +9,19 @@ namespace PriorityStream.StreamsHandler ...@@ -9,7 +9,19 @@ namespace PriorityStream.StreamsHandler
{ {
public class TotalWorker public class TotalWorker
{ {
/// <summary>
/// holds the last id we Acked for each stream
/// </summary>
public static Dictionary<string, RedisValue> lastId = new Dictionary<string, RedisValue>(); public static Dictionary<string, RedisValue> lastId = new Dictionary<string, RedisValue>();
/// <summary>
/// Sets the database from which we will extract messages.
/// this database the redis database that Priority Stream Service Writes To.
/// Here are the Messages all sorted and set and we only read (extract them)
/// then write them to whatever interface we want.
/// </summary>
/// <param name="RedisRead"></param>
/// <exception cref="throws exeception if could not connect to redis database"> </exception>
public static void setAll(string RedisRead, string RedisWrite) public static void setAll(string RedisRead, string RedisWrite)
{ {
if (!Extractor.Extractor.setDatabase(RedisRead)) if (!Extractor.Extractor.setDatabase(RedisRead))
...@@ -25,6 +37,12 @@ namespace PriorityStream.StreamsHandler ...@@ -25,6 +37,12 @@ namespace PriorityStream.StreamsHandler
} }
} }
/// <summary>
/// Extract Messages from stream while ensuring ALL MESSAGES ARE READ
/// Keeps Track of last Acked ID in each stream
/// </summary>
/// <param name="stream"></param>
public static void work(string stream) public static void work(string stream)
{ {
RedisValue id = (lastId.ContainsKey(stream) ? lastId[stream] : "0-0"); RedisValue id = (lastId.ContainsKey(stream) ? lastId[stream] : "0-0");
......
...@@ -12,10 +12,21 @@ namespace PriorityStream.Writer ...@@ -12,10 +12,21 @@ namespace PriorityStream.Writer
{ {
private static string Provider = "SYR"; private static string Provider = "SYR";
private static string Provider2 = "MTN"; private static string Provider2 = "MTN";
/// <summary>
/// Stream Max Length Set to One Hundred Million which means stores only one
/// Hundred Million Messages in a stream, if more messages came , old ones are erased
/// </summary>
private static int StreamMaxLength = 100000000; private static int StreamMaxLength = 100000000;
private static IDatabase db = null; private static IDatabase db = null;
/// <summary>
/// Get Redis DataBase To Which We Write Messages
/// in Order to Exrtact Them Later Easily, And Creates the Consumer Groups
/// for Each Provider so that consumers can work properly
/// </summary>
/// <param name="REDIS"></param>
/// <returns>boolean: true if we can get database and all consumer groups are created</returns>
public static bool setDatabase(string REDIS) public static bool setDatabase(string REDIS)
{ {
try try
...@@ -59,6 +70,16 @@ namespace PriorityStream.Writer ...@@ -59,6 +70,16 @@ namespace PriorityStream.Writer
} }
} }
/// <summary>
/// Writes A Message after Casting the MessageDTO to the coherent stream which matches
/// the name of the provider (MTN , SYR ..)
/// this method keeps the streams sizes limited so no over storage happens
/// </summary>
/// <param name="message"></param>
/// <param name="provider"></param>
/// <returns>async task (bool): true if we could write the messages</returns>
public static async Task<bool> writeMessageAsync(MessageDTO message, string provider) public static async Task<bool> writeMessageAsync(MessageDTO message, string provider)
{ {
try try
...@@ -95,10 +116,5 @@ namespace PriorityStream.Writer ...@@ -95,10 +116,5 @@ namespace PriorityStream.Writer
} }
public static void trimStream(string stream , RedisValue id)
{
}
} }
} }
...@@ -9,6 +9,14 @@ namespace Scheduler.LoadBalancer ...@@ -9,6 +9,14 @@ namespace Scheduler.LoadBalancer
private static Random random = new Random(); private static Random random = new Random();
private static int offset = random.Next(200000,int.MaxValue); private static int offset = random.Next(200000,int.MaxValue);
/// <summary>
/// Utilizes the Eureka Service to get Address of A Node by its name
/// with client-side load-balancing
/// </summary>
/// <param name="instanceName">name of service (node) we are looking for</param>
/// <param name="discoveryClient"></param>
/// <returns>string : address of the node or error message</returns>
public static string getAddressOfInstance(string instanceName , ref IDiscoveryClient discoveryClient) public static string getAddressOfInstance(string instanceName , ref IDiscoveryClient discoveryClient)
{ {
string address = ""; string address = "";
......
...@@ -16,6 +16,11 @@ namespace Scheduler.MongoMessages ...@@ -16,6 +16,11 @@ namespace Scheduler.MongoMessages
private static string NotTaken = "Not-Taken"; private static string NotTaken = "Not-Taken";
/// <summary>
/// Connects to MongoDB to Store Messges and Set Things Up
/// </summary>
/// <param name="MyId"></param>
/// <returns>string : ok if all goes well , otherwise something else</returns>
public static string init(string MyId) public static string init(string MyId)
{ {
try try
...@@ -35,6 +40,11 @@ namespace Scheduler.MongoMessages ...@@ -35,6 +40,11 @@ namespace Scheduler.MongoMessages
} }
/// <summary>
/// insert a scheduled message in the mongodb
/// </summary>
/// <param name="message"></param>
/// <returns>string: ok or else</returns>
public static string insertMessage(ref Message message) public static string insertMessage(ref Message message)
{ {
try try
...@@ -62,74 +72,5 @@ namespace Scheduler.MongoMessages ...@@ -62,74 +72,5 @@ namespace Scheduler.MongoMessages
} }
} }
/*
//getBulkOfMessages where their Oks is MyId (not Acked)
//queue them
//then ack them (ok = Acked)
//when finished get Oks = 0 and put their Ok as MyId
public static void getDuedMessagesAndQueue()
{
bool thereArePending = true;
while (true)
{
if (thereArePending)
{
thereArePending = getBulkMessages(Pending);
}
if (! thereArePending)
{
thereArePending = getBulkMessages(NotTaken);
}
Task.Delay(5000);
}
}
private static bool getBulkMessages(string status)
{
var specificTime = DateTime.UtcNow;
var filter = Builders<BsonDocument>.Filter.And(
Builders<BsonDocument>.Filter.Lte("timestamp", specificTime),
Builders<BsonDocument>.Filter.Eq("status", status)
);
var docs = collection.Find(filter).Limit(limit).ToList<BsonDocument>();
if (docs.Count == 0)
{
return false;
}
UpdateDefinition<BsonDocument> updatePending = Builders<BsonDocument>.Update.Set("status", Pending);
UpdateDefinition<BsonDocument> updateAcj = Builders<BsonDocument>.Update.Set("status", Acked);
foreach (var doc in docs)
{
Message message = getMessage(doc);
var _filter = Builders<BsonDocument>.Filter.Eq("_id", doc["_id"]);
collection.UpdateOne(_filter, updatePending);
}
return true;
}
private static Message getMessage(BsonDocument doc)
{
Message message = new Message();
message.ClientID = (string)doc["sender"];
message.Text = (string)doc["content"];
message.Tag = (string)doc["tag"];
message.LocalPriority = (int)doc["priority"];
message.PhoneNumber = (string)doc["phone-number"];
message.MsgId = (string)doc["msg-id"];
message.ApiKey = (string)doc["api-key"];
message.Year = message.Month = message.Day = message.Hour = message.Minute = 0;
return message;
}
*/
} }
} }
...@@ -20,6 +20,11 @@ namespace SchedulerNode.RedisQueuer ...@@ -20,6 +20,11 @@ namespace SchedulerNode.RedisQueuer
private static int LEVELS = 6; private static int LEVELS = 6;
private static int StreamMaxLength = 100000000; private static int StreamMaxLength = 100000000;
private static IDatabase db = null; private static IDatabase db = null;
/// <summary>
/// Connects to Redis Stream With Streams for Each Priority and create consuming groups
/// if not created
/// </summary>
public static void init() public static void init()
{ {
...@@ -52,6 +57,13 @@ namespace SchedulerNode.RedisQueuer ...@@ -52,6 +57,13 @@ namespace SchedulerNode.RedisQueuer
} }
} }
/// <summary>
/// Add a Message to the According Redis Stream
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public static string addMessage(Message message) public static string addMessage(Message message)
{ {
string id = "Error"; string id = "Error";
......
...@@ -23,9 +23,11 @@ namespace SchedulerNode.Services ...@@ -23,9 +23,11 @@ namespace SchedulerNode.Services
public override Task<Acknowledgement> QueueMessage(Message message, ServerCallContext context) public override Task<Acknowledgement> QueueMessage(Message message, ServerCallContext context)
{ {
DateTime current = DateTime.Now;
DateTime des = new DateTime(message.Year, message.Month, message.Day, message.Hour, message.Minute, 0);
//Console.WriteLine("Message Receieved to Queuer !!"); //Console.WriteLine("Message Receieved to Queuer !!");
if (message.Year == 0) if (message.Year == 0 || DateTime.Compare(des,current)<=0)
{ {
return Task.FromResult(SendAsap(ref message)); return Task.FromResult(SendAsap(ref message));
} }
...@@ -37,7 +39,12 @@ namespace SchedulerNode.Services ...@@ -37,7 +39,12 @@ namespace SchedulerNode.Services
} }
public Acknowledgement SendAsap(ref Message message) /// <summary>
/// Send Message to Redis Stream ASAP
/// </summary>
/// <param name="message"></param>
/// <returns>Acknowledgment: status of request and id if all goes well</returns>
private Acknowledgement SendAsap(ref Message message)
{ {
string reqId = MessageQueues.addMessage(message); string reqId = MessageQueues.addMessage(message);
...@@ -60,6 +67,11 @@ namespace SchedulerNode.Services ...@@ -60,6 +67,11 @@ namespace SchedulerNode.Services
} }
/// <summary>
/// Schedule Message and Write it to MongoDB to Send it When Dued
/// </summary>
/// <param name="message"></param>
/// <returns>Acknowledgment: status of request and id if all goes well</returns>
private Acknowledgement Schedule(ref Message message) private Acknowledgement Schedule(ref Message message)
{ {
string res = MongoMessagesShceduler.insertMessage(ref message); string res = MongoMessagesShceduler.insertMessage(ref message);
......
...@@ -17,6 +17,10 @@ namespace ScheduledMessagesHandler.MongoMessages ...@@ -17,6 +17,10 @@ namespace ScheduledMessagesHandler.MongoMessages
private static string NotTaken = "Not-Taken"; private static string NotTaken = "Not-Taken";
private static int limit = 100000; private static int limit = 100000;
/// <summary>
/// Connects to MongoDB Where Scheduled Messages Are Stored by The Scheduler Service
/// </summary>
/// <returns>string : ok if every thing goes fine</returns>
public static string init() public static string init()
{ {
try try
...@@ -36,6 +40,9 @@ namespace ScheduledMessagesHandler.MongoMessages ...@@ -36,6 +40,9 @@ namespace ScheduledMessagesHandler.MongoMessages
} }
/// <summary>
/// extract dued scheduled messages cautiously and write them to Redis Stream by Priority
/// </summary>
public static void getDuedMessagesAndQueue() public static void getDuedMessagesAndQueue()
{ {
bool thereArePending = true; bool thereArePending = true;
...@@ -50,11 +57,16 @@ namespace ScheduledMessagesHandler.MongoMessages ...@@ -50,11 +57,16 @@ namespace ScheduledMessagesHandler.MongoMessages
{ {
thereArePending = getBulkMessages(NotTaken); thereArePending = getBulkMessages(NotTaken);
} }
//Task.Delay(5000);
} }
} }
/// <summary>
/// extract dued messages with status as passed and process them
/// </summary>
/// <param name="status"></param>
/// <returns></returns>
private static bool getBulkMessages(string status) private static bool getBulkMessages(string status)
{ {
var specificTime = DateTime.UtcNow; var specificTime = DateTime.UtcNow;
......
...@@ -17,6 +17,10 @@ namespace ScheduledMessagesHandler.RedisQueuer ...@@ -17,6 +17,10 @@ namespace ScheduledMessagesHandler.RedisQueuer
private static int StreamMaxLength = 100000000; private static int StreamMaxLength = 100000000;
private static IDatabase db = null; private static IDatabase db = null;
/// <summary>
/// Connects to Redis Stream With Streams for Each Priority and create consuming groups
/// if not created
/// </summary>
public static void init() public static void init()
{ {
...@@ -50,6 +54,11 @@ namespace ScheduledMessagesHandler.RedisQueuer ...@@ -50,6 +54,11 @@ namespace ScheduledMessagesHandler.RedisQueuer
} }
/// <summary>
/// Add a Message by Casting the MessageDTO to According Redis Stream
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public static string addMessage(MessageDTO message) public static string addMessage(MessageDTO message)
{ {
string id = "Error"; string id = "Error";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment