Commit 89f551b2 authored by mohammad.salama's avatar mohammad.salama

Connection Exceptions Added , some improvements for logic were done , for now...

Connection Exceptions Added , some improvements for logic were done , for now it is fool-proof , unique IDs , all working
parent 0310c291
...@@ -65,7 +65,12 @@ ...@@ -65,7 +65,12 @@
"ProjectGuid": "d2c3addd-ba47-473c-a761-1ce5b1ee6407", "ProjectGuid": "d2c3addd-ba47-473c-a761-1ce5b1ee6407",
"DisplayName": "Validator", "DisplayName": "Validator",
"ColorIndex": 11 "ColorIndex": 11
},
"8f51b680-f3b6-4892-b854-2e2af840cd71": {
"ProjectGuid": "8f51b680-f3b6-4892-b854-2e2af840cd71",
"DisplayName": "SimpleStreamConsumerTest",
"ColorIndex": 12
} }
}, },
"NextColorIndex": 12 "NextColorIndex": 13
} }
\ No newline at end of file
...@@ -31,4 +31,31 @@ ...@@ -31,4 +31,31 @@
--------------------------------------------------------------------------------------------------------------------------------------------------------------- ---------------------------------------------------------------------------------------------------------------------------------------------------------------
add redis: add redis:
dotnet add package StackExchange.Redis dotnet add package StackExchange.Redis
\ No newline at end of file
-enter redis cli:
docker exec -it <container redis name> redis-cli
docker exec -it redis redis-cli
-delete stream :
del <stream_name>
-list streams :
SCAN 0 TYPE stream
...@@ -5,6 +5,14 @@ namespace GrpcMessageNode.DataBaseAccess ...@@ -5,6 +5,14 @@ namespace GrpcMessageNode.DataBaseAccess
{ {
public class DBAccess public class DBAccess
{ {
public static readonly string ValidatorConnectionAndValidationError = "Error On Validating";
/// <summary>
/// this OK must be in the reply from validation , so reply code should
/// have the same string in it
/// </summary>
public static readonly string OK = "ok";
/// <summary> /// <summary>
/// check message for tag , Authentication , Quota perhaps from service validator with address /// check message for tag , Authentication , Quota perhaps from service validator with address
...@@ -13,21 +21,21 @@ namespace GrpcMessageNode.DataBaseAccess ...@@ -13,21 +21,21 @@ namespace GrpcMessageNode.DataBaseAccess
/// <param name="message" > the message we received to check</param> /// <param name="message" > the message we received to check</param>
/// <param name="validatorAddress"> The validator service address which as a mongodb service </param> /// <param name="validatorAddress"> The validator service address which as a mongodb service </param>
/// <returns> validator reply which is ok if we can proceed with account priority</returns> /// <returns> validator reply which is ok if we can proceed with account priority</returns>
public static int getPriority(ref Message message , string validatorAddress) public static string getPriority(ref Message message , string validatorAddress)
{ {
//Console.WriteLine("Validating NOW IN GRPC !! "); Console.WriteLine("Validating NOW IN GRPC !! ");
ValidatorReply reply = ValidateAsync(message , validatorAddress); ValidatorReply reply = ValidateAsync(message , validatorAddress);
if (reply == null) if (reply.ReplyCode == ValidatorConnectionAndValidationError)
{ {
return -1; return ValidatorConnectionAndValidationError;
} }
if (!reply.ReplyCode.Contains("ok")) if (!reply.ReplyCode.Contains(OK, StringComparison.OrdinalIgnoreCase))
{ {
return -1; return reply.ReplyCode;
} }
message.LocalPriority = reply.AccountPriority;
return reply.AccountPriority; return OK;
} }
private static ValidatorReply ValidateAsync(Message message, string validatorAddress) private static ValidatorReply ValidateAsync(Message message, string validatorAddress)
...@@ -35,12 +43,25 @@ namespace GrpcMessageNode.DataBaseAccess ...@@ -35,12 +43,25 @@ namespace GrpcMessageNode.DataBaseAccess
using var channel = GrpcChannel.ForAddress(validatorAddress); using var channel = GrpcChannel.ForAddress(validatorAddress);
var client = new Validate.ValidateClient(channel); var client = new Validate.ValidateClient(channel);
MessageMetaData messageMeta = extractMetaData(message); MessageMetaData messageMeta = extractMetaData(message);
Console.WriteLine("Calling GRPC for address = " + validatorAddress); Console.WriteLine("Calling GRPC for address = " + validatorAddress);
var reply = client.ValidateMessageAsync(messageMeta);
var ans = reply.GetAwaiter().GetResult(); try
{
return ans; var reply = client.ValidateMessageAsync(messageMeta);
var ans = reply.GetAwaiter().GetResult();
Console.WriteLine("OK , Validator DONE !!");
return ans;
}
catch (Exception ex)
{
Console.WriteLine("Error connecting to validator - address = " + validatorAddress);
return new ValidatorReply() { AccountPriority = -1, ReplyCode = ValidatorConnectionAndValidationError };
}
} }
......
...@@ -7,15 +7,29 @@ ...@@ -7,15 +7,29 @@
*/ */
private static int MAX_PRIRITY = 5; private static int MAX_PRIRITY = 5;
private static int MIN_PRIRITY = 1; private static int MIN_PRIRITY = 1;
public static bool setFinalPriority(ref Message message , string validatorAddress)
public static string setFinalPriority(ref Message message , string validatorAddress)
{ {
int account_p = DataBaseAccess.DBAccess.getPriority(ref message , validatorAddress); string res = DataBaseAccess.DBAccess.getPriority(ref message , validatorAddress);
if (account_p == -1) return false;
if (res == DataBaseAccess.DBAccess.ValidatorConnectionAndValidationError)
{
return res;
}
if (!res.Contains(DataBaseAccess.DBAccess.OK, StringComparison.OrdinalIgnoreCase))
{
return res;
}
int account_p = message.LocalPriority;
int x = account_p + message.LocalPriority; int x = account_p + message.LocalPriority;
if (x > MAX_PRIRITY) x = MAX_PRIRITY; if (x > MAX_PRIRITY) x = MAX_PRIRITY;
if (x < MIN_PRIRITY && x!=-1) x = MIN_PRIRITY; if (x < MIN_PRIRITY && x!=-1) x = MIN_PRIRITY;
message.LocalPriority = x; message.LocalPriority = x;
return true;
return DataBaseAccess.DBAccess.OK;
} }
/// we can complicate the mechanism as much as we want /// we can complicate the mechanism as much as we want
} }
......
...@@ -10,6 +10,14 @@ namespace GrpcMessageNode.Services ...@@ -10,6 +10,14 @@ namespace GrpcMessageNode.Services
{ {
private readonly ILogger<SendMessageService> _logger; private readonly ILogger<SendMessageService> _logger;
private readonly IDiscoveryClient discoveryClient; private readonly IDiscoveryClient discoveryClient;
private static readonly string ErrorValidation = "Error When Validating Request";
private static readonly string ErrorDBConnection = "Error Connecting to DataBase";
private static readonly string ErrorConnection = "Error Connecting to Servers";
private static readonly string ErrorGRPCConnection = "Error Connecting to GRPC Servers";
private static readonly string QueuerNode = "QueuerNode";
private static readonly string Validator = "Validator";
public SendMessageService(ILogger<SendMessageService> logger , IDiscoveryClient client) public SendMessageService(ILogger<SendMessageService> logger , IDiscoveryClient client)
{ {
_logger = logger; _logger = logger;
...@@ -18,18 +26,27 @@ namespace GrpcMessageNode.Services ...@@ -18,18 +26,27 @@ namespace GrpcMessageNode.Services
public override Task<Acknowledgement> SendMessage(Message message, ServerCallContext context) public override Task<Acknowledgement> SendMessage(Message message, ServerCallContext context)
{ {
string validator = getValidatorAddress(); string validator = getAddressOfInstance(Validator);
//Console.WriteLine("Pr = " + message.LocalPriority); if (validator == ErrorConnection)
{
bool res = true; return Task.FromResult(new Acknowledgement
{
ReplyCode = ErrorConnection,
RequestID = ErrorDBConnection
}) ;
}
Console.WriteLine("Pr = " + message.LocalPriority);
res = PriorityHandling.SetPriority.setFinalPriority(ref message , validator);
if (res == false) // something went wrong string res = PriorityHandling.SetPriority.setFinalPriority(ref message , validator);
if (!res.Equals(DataBaseAccess.DBAccess.OK)) // something went wrong
{ {
Console.WriteLine("Error when set Final setFinalPriority is called");
return Task.FromResult(new Acknowledgement return Task.FromResult(new Acknowledgement
{ {
ReplyCode = "ERRORROROR on Send " + message.MsgId ReplyCode = res,
RequestID = ErrorValidation + " : " + res
}); });
} }
...@@ -44,24 +61,42 @@ namespace GrpcMessageNode.Services ...@@ -44,24 +61,42 @@ namespace GrpcMessageNode.Services
private Acknowledgement sendToCoordinator(Message message) private Acknowledgement sendToCoordinator(Message message)
{ {
string address = getCoordinatorAddress(); string address = getAddressOfInstance(QueuerNode);
if (address == ErrorConnection)
{
return (new Acknowledgement
{
ReplyCode = ErrorConnection,
RequestID = ErrorConnection
});
}
Message2 message2 = copyMessage(message); Message2 message2 = copyMessage(message);
using var channel = GrpcChannel.ForAddress(address); using var channel = GrpcChannel.ForAddress(address);
var queue_client = new Queue.QueueClient(channel); var queue_client = new Queue.QueueClient(channel);
try
{
var reply = queue_client.QueueMessage(message2);
var reply = queue_client.QueueMessage(message2); // Console.WriteLine(reply.ReplyCode);
// Console.WriteLine(reply.ReplyCode);
return new Acknowledgement() { ReplyCode = reply.ReplyCode , RequestID = reply.RequestID}; return new Acknowledgement() { ReplyCode = reply.ReplyCode, RequestID = reply.RequestID };
}
catch (Exception e)
{
return new Acknowledgement()
{
ReplyCode = ErrorConnection,
RequestID = ErrorGRPCConnection
};
}
} }
//not working -- causing unknown exception with nullable parameter http2 //not working -- causing unknown exception with nullable parameter http2
private Queue.QueueClient getQueueClient() private Queue.QueueClient getQueueClient()
{ {
string address = getCoordinatorAddress(); string address = getAddressOfInstance(QueuerNode);
using var channel = GrpcChannel.ForAddress(address); using var channel = GrpcChannel.ForAddress(address);
//Console.WriteLine("QueuerNode Address = " + address); //Console.WriteLine("QueuerNode Address = " + address);
var client = new Queue.QueueClient(channel); var client = new Queue.QueueClient(channel);
...@@ -81,26 +116,23 @@ namespace GrpcMessageNode.Services ...@@ -81,26 +116,23 @@ namespace GrpcMessageNode.Services
return message2; return message2;
} }
private string getCoordinatorAddress()
{
string address = "";
var y = discoveryClient.GetInstances("QueuerNode"); /// write names to config file
address = y[0].Uri.ToString();
return address; private string getAddressOfInstance(string instanceName)
}
private string getValidatorAddress()
{ {
string address = ""; string address = "";
try
{
// instanceName = "Validator" or "QueuerNode" ... etc
var y = discoveryClient.GetInstances(instanceName); /// write names to config file
var y = discoveryClient.GetInstances("Validator"); /// write names to config file address = y[0].Uri.ToString();
address = y[0].Uri.ToString();
return address; return address;
}
catch (Exception ex)
{
return ErrorConnection;
}
} }
} }
} }
...@@ -46,3 +46,8 @@ ...@@ -46,3 +46,8 @@
2.0 2.0
2.0 2.0
2.0 2.0
2.0
2.0
2.0
2.0
2.0
...@@ -6,7 +6,6 @@ using System.Net; ...@@ -6,7 +6,6 @@ using System.Net;
using System.Text; using System.Text;
using System.Text.Json; using System.Text.Json;
Console.WriteLine("Hello, World!");
MessageDTO message = new MessageDTO(); MessageDTO message = new MessageDTO();
message.text = "Hello World !"; message.text = "Hello World !";
message.apiKey = "Api-Key"; message.apiKey = "Api-Key";
...@@ -21,10 +20,15 @@ var client = new HttpClient(); ...@@ -21,10 +20,15 @@ var client = new HttpClient();
StringContent payload = new (JsonSerializer.Serialize(message) , Encoding.UTF8 , "application/json"); StringContent payload = new (JsonSerializer.Serialize(message) , Encoding.UTF8 , "application/json");
try
{
HttpResponseMessage reply = await client.PostAsync("https://localhost:7095/queue-msg", payload);
HttpResponseMessage reply = await client.PostAsync("https://localhost:7095/queue-msg", payload);
Console.WriteLine(reply.Content.ToString());
Console.WriteLine(reply.Content.ToString());
}
catch (Exception ex)
{
Console.Error.WriteLine("Error Processing - Connection Problem");
}
//Console.WriteLine(reply.Result); //Console.WriteLine(reply.Result);
...@@ -11,6 +11,13 @@ namespace HTTPMessageNode.Controllers ...@@ -11,6 +11,13 @@ namespace HTTPMessageNode.Controllers
private readonly ILogger<QueueMessageController> _logger; private readonly ILogger<QueueMessageController> _logger;
private readonly IDiscoveryClient discoveryClient; private readonly IDiscoveryClient discoveryClient;
private static readonly string ErrorConnection = "Error Connecting to Servers";
private static readonly string QueuerNode = "QueuerNode"; //put them in files!
private static readonly string Validator = "Validator";
private static readonly string ErrorDBConnection = "Error Connecting to DataBase";
private static readonly string ErrorValidation = "Error When Validating Request";
private static readonly string ErrorGRPCConnection = "Error Connecting to GRPC Servers";
public QueueMessageController(ILogger<QueueMessageController> logger , IDiscoveryClient discovery) public QueueMessageController(ILogger<QueueMessageController> logger , IDiscoveryClient discovery)
{ {
_logger = logger; _logger = logger;
...@@ -24,38 +31,69 @@ namespace HTTPMessageNode.Controllers ...@@ -24,38 +31,69 @@ namespace HTTPMessageNode.Controllers
{ {
//Console.WriteLine("Msg from : " + messageDTO.clientID + " pr = " + messageDTO.localPriority); //Console.WriteLine("Msg from : " + messageDTO.clientID + " pr = " + messageDTO.localPriority);
string validator = getValidatorAddress(); string validator = getAddressOfInstance(Validator);
if (validator == ErrorConnection)
{
return (new Acknowledgement
{
ReplyCode = ErrorConnection,
RequestID = ErrorDBConnection
});
}
Message message = copyMessage(messageDTO); Message message = copyMessage(messageDTO);
bool res = PriorityHandling.SetPriority.setFinalPriority(ref message , validator); Console.WriteLine("old prio = " + message.LocalPriority);
string res = PriorityHandling.SetPriority.setFinalPriority(ref message , validator);
if (res == false)
if (!res.Equals(DataBaseAccess.DBAccess.OK)) // something went wrong
{ {
return new Acknowledgement(){ return (new Acknowledgement
ReplyCode = "Error", {
RequestID = "-1" ReplyCode = res,
}; RequestID = ErrorValidation + " : " + res
});
} }
Console.WriteLine("new prio = " + message.LocalPriority);
string address = getAddress();
string address = getAddressOfInstance(QueuerNode);
if (address == ErrorConnection)
{
return (new Acknowledgement
{
ReplyCode = ErrorConnection,
RequestID = ErrorConnection
});
}
using var channel = GrpcChannel.ForAddress(address); using var channel = GrpcChannel.ForAddress(address);
var client = new Queue.QueueClient(channel); var client = new Queue.QueueClient(channel);
//Console.WriteLine("Sending to " + address); //Console.WriteLine("Sending to " + address);
//Console.WriteLine(message.Tag + " , " + message.ClientID + " new pr = " + message.LocalPriority); ; //Console.WriteLine(message.Tag + " , " + message.ClientID + " new pr = " + message.LocalPriority); ;
var reply = client.QueueMessage(message);
//Console.WriteLine(reply.ReplyCode); try
{
var reply = client.QueueMessage(message);
// Console.WriteLine(reply.ReplyCode);
return new Acknowledgement() { ReplyCode = reply.ReplyCode, RequestID = reply.RequestID };
}
catch (Exception e)
{
return new Acknowledgement()
{
ReplyCode = ErrorConnection,
RequestID = ErrorGRPCConnection
};
}
return reply;
} }
...@@ -71,26 +109,24 @@ namespace HTTPMessageNode.Controllers ...@@ -71,26 +109,24 @@ namespace HTTPMessageNode.Controllers
message.Tag = messageDTO.tag; message.Tag = messageDTO.tag;
return message; return message;
} }
private string getAddress()
{
string address = "";
var y = discoveryClient.GetInstances("QueuerNode"); /// write names to config file
address = y[0].Uri.ToString();
return address; private string getAddressOfInstance(string instanceName)
}
private string getValidatorAddress()
{ {
string address = ""; string address = "";
try
{
// instanceName = "Validator" or "QueuerNode" ... etc
var y = discoveryClient.GetInstances(instanceName); /// write names to config file
var y = discoveryClient.GetInstances("Validator"); /// write names to config file address = y[0].Uri.ToString();
address = y[0].Uri.ToString();
return address; return address;
}
catch (Exception ex)
{
return ErrorConnection;
}
} }
} }
} }
\ No newline at end of file
...@@ -10,6 +10,14 @@ namespace HTTPMessageNode.DataBaseAccess ...@@ -10,6 +10,14 @@ namespace HTTPMessageNode.DataBaseAccess
{ {
public class DBAccess public class DBAccess
{ {
public static readonly string ValidatorConnectionAndValidationError = "Error On Validating";
/// <summary>
/// this OK must be in the reply from validation , so reply code should
/// have the same string in it
/// </summary>
public static readonly string OK = "ok";
/// <summary> /// <summary>
/// check message for tag , Authentication , Quota perhaps from service validator with address /// check message for tag , Authentication , Quota perhaps from service validator with address
...@@ -18,20 +26,22 @@ namespace HTTPMessageNode.DataBaseAccess ...@@ -18,20 +26,22 @@ namespace HTTPMessageNode.DataBaseAccess
/// <param name="message" > the message we received to check</param> /// <param name="message" > the message we received to check</param>
/// <param name="validatorAddress"> The validator service address which as a mongodb service </param> /// <param name="validatorAddress"> The validator service address which as a mongodb service </param>
/// <returns> validator reply which is ok if we can proceed with account priority</returns> /// <returns> validator reply which is ok if we can proceed with account priority</returns>
public static int getPriority(ref Message message , string validatorAddress) public static string getPriority(ref Message message, string validatorAddress)
{ {
ValidatorReply reply = ValidateAsync(message , validatorAddress); //Console.WriteLine("Validating NOW IN GRPC !! ");
ValidatorReply reply = ValidateAsync(message, validatorAddress);
if (reply == null) if (reply.ReplyCode == ValidatorConnectionAndValidationError)
{ {
return -1; return ValidatorConnectionAndValidationError;
} }
if (!reply.ReplyCode.Contains("ok")) if (!reply.ReplyCode.Contains(OK, StringComparison.OrdinalIgnoreCase))
{ {
return -1; return reply.ReplyCode;
} }
Console.WriteLine(reply.ReplyCode); Console.WriteLine("Prio from validator = " + reply.AccountPriority);
return reply.AccountPriority; message.LocalPriority = reply.AccountPriority;
return OK;
} }
private static ValidatorReply ValidateAsync(Message message, string validatorAddress) private static ValidatorReply ValidateAsync(Message message, string validatorAddress)
...@@ -39,11 +49,21 @@ namespace HTTPMessageNode.DataBaseAccess ...@@ -39,11 +49,21 @@ namespace HTTPMessageNode.DataBaseAccess
using var channel = GrpcChannel.ForAddress(validatorAddress); using var channel = GrpcChannel.ForAddress(validatorAddress);
var client = new Validate.ValidateClient(channel); var client = new Validate.ValidateClient(channel);
MessageMetaData messageMeta = extractMetaData(message); MessageMetaData messageMeta = extractMetaData(message);
var reply = client.ValidateMessageAsync(messageMeta);
//Console.WriteLine("Calling GRPC for address = " + validatorAddress);
var ans = reply.GetAwaiter().GetResult();
try
return ans; {
var reply = client.ValidateMessageAsync(messageMeta);
var ans = reply.GetAwaiter().GetResult();
return ans;
}
catch (Exception ex)
{
return new ValidatorReply() { AccountPriority = -1, ReplyCode = ValidatorConnectionAndValidationError };
}
} }
......
...@@ -7,16 +7,27 @@ ...@@ -7,16 +7,27 @@
*/ */
private static int MAX_PRIRITY = 5; private static int MAX_PRIRITY = 5;
private static int MIN_PRIRITY = 1; private static int MIN_PRIRITY = 1;
public static bool setFinalPriority(ref Message message , string validatorAddress) public static string setFinalPriority(ref Message message, string validatorAddress)
{ {
int account_p = DataBaseAccess.DBAccess.getPriority(ref message , validatorAddress); string res = DataBaseAccess.DBAccess.getPriority(ref message, validatorAddress);
if (account_p == -1) return false;
if (res == DataBaseAccess.DBAccess.ValidatorConnectionAndValidationError)
{
return res;
}
if (!res.Contains(DataBaseAccess.DBAccess.OK, StringComparison.OrdinalIgnoreCase))
{
return res;
}
int account_p = message.LocalPriority;
int x = account_p + message.LocalPriority; int x = account_p + message.LocalPriority;
if (x > MAX_PRIRITY) x = MAX_PRIRITY; if (x > MAX_PRIRITY) x = MAX_PRIRITY;
if (x < MIN_PRIRITY && x!=-1) x = MIN_PRIRITY; if (x < MIN_PRIRITY && x != -1) x = MIN_PRIRITY;
message.LocalPriority = x; message.LocalPriority = x;
return true; return DataBaseAccess.DBAccess.OK;
} }
/// we can complicate the mechanism as much as we want /// we can complicate the mechanism as much as we want
} }
......
...@@ -10,3 +10,6 @@ ...@@ -10,3 +10,6 @@
2.0 2.0
2.0 2.0
2.0 2.0
2.0
2.0
2.0
...@@ -7,6 +7,7 @@ namespace MessageGeneratorGRPC ...@@ -7,6 +7,7 @@ namespace MessageGeneratorGRPC
{ {
private readonly ILogger<Worker> _logger; private readonly ILogger<Worker> _logger;
private readonly IDiscoveryClient discoveryClient; private readonly IDiscoveryClient discoveryClient;
private readonly string ServersNotAvail = "Error, Servers not Available";
public Worker(ILogger<Worker> logger, IDiscoveryClient client) public Worker(ILogger<Worker> logger, IDiscoveryClient client)
{ {
_logger = logger; _logger = logger;
...@@ -15,39 +16,54 @@ namespace MessageGeneratorGRPC ...@@ -15,39 +16,54 @@ namespace MessageGeneratorGRPC
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{ {
//test();
string address = getAddress(); string address = getAddress();
if(address == ServersNotAvail)
{
return;
}
using var channel = GrpcChannel.ForAddress(address); using var channel = GrpcChannel.ForAddress(address);
var client = new Send.SendClient(channel); try
{
Message message = new Message();
message.Text = "Hello World !";
message.ApiKey = "Api-Key";
message.ClientID = "m-salameh";
message.LocalPriority = 1;
message.MsgId = "msg-id=1";
message.PhoneNumber = "043 33 00 83";
message.Tag = "SYR";
Console.WriteLine("Sending to " + address);
var client = new Send.SendClient(channel);
var reply = client.SendMessage(message); Message message = new Message();
message.Text = "Hello World !";
message.ApiKey = "Api-Key";
message.ClientID = "m-salameh";
message.LocalPriority = 1;
message.MsgId = "msg-id=1";
message.PhoneNumber = "043 33 00 83";
message.Tag = "SYR";
Console.WriteLine("Sending to " + address);
Console.WriteLine(reply.ReplyCode + "\n"+reply.RequestID);
Console.WriteLine("Press any key to exit..."); var reply = client.SendMessage(message);
Console.ReadKey(); Console.WriteLine("Reply Arrived : ");
Console.WriteLine(reply.ReplyCode + "\n" + reply.RequestID);
//Console.WriteLine("OK");
}
catch (Exception ex)
{
return;
//return Task.something;
}
} }
private string getAddress() private string getAddress()
{ {
string address = ""; string address = "";
try
{
var y = discoveryClient.GetInstances("grpc-message-node"); /// write names to config file
var y = discoveryClient.GetInstances("grpc-message-node"); /// write names to config file address = y[0].Uri.ToString();
}
address = y[0].Uri.ToString(); catch (Exception ex)
{
address = ServersNotAvail;
}
return address; return address;
} }
......
...@@ -13,6 +13,7 @@ namespace QueuerNode.RedisQueuer ...@@ -13,6 +13,7 @@ namespace QueuerNode.RedisQueuer
{ {
private static string SYRRedisURL = "localhost:6379"; private static string SYRRedisURL = "localhost:6379";
private static string MTNRedisURL = "localhost:48485"; private static string MTNRedisURL = "localhost:48485";
public static string RedisConnectionError = "Error Writing to Redis";
private static string Syriatel = "SYR"; private static string Syriatel = "SYR";
private static string MTN = "MTN"; private static string MTN = "MTN";
...@@ -21,50 +22,64 @@ namespace QueuerNode.RedisQueuer ...@@ -21,50 +22,64 @@ namespace QueuerNode.RedisQueuer
public static string addMessage(Message message , IDiscoveryClient discoveryClient) public static string addMessage(Message message , IDiscoveryClient discoveryClient)
{ {
string id = "Error"; string id = "Error";
string temp = string.Empty;
if (message.Tag.Contains(Syriatel, StringComparison.OrdinalIgnoreCase)) if (message.Tag.Contains(Syriatel, StringComparison.OrdinalIgnoreCase))
{ {
// get url using discovery client // get url using discovery client
var resid = addMessageRedisAsync(message, SYRRedisURL); var resid = addMessageRedisAsync(message, SYRRedisURL);
temp = resid.Result;
id = message.Tag + ":" + message.LocalPriority + ":" + resid.Result;
} }
else if (message.Tag.Contains(MTN, StringComparison.OrdinalIgnoreCase)) else if (message.Tag.Contains(MTN, StringComparison.OrdinalIgnoreCase))
{ {
var resid = addMessageRedisAsync(message, MTNRedisURL); var resid = addMessageRedisAsync(message, MTNRedisURL);
temp = resid.Result;
id = message.Tag + ":" + message.LocalPriority + ":" + resid.Result;
} }
if (temp.Equals(RedisConnectionError))
{
return RedisConnectionError;
}
id = message.Tag + ":" + message.LocalPriority + ":" + temp;
return id; return id;
} }
private static async Task<string> addMessageRedisAsync(Message message, string URL) private static async Task<string> addMessageRedisAsync(Message message, string URL)
{ {
var redis = ConnectionMultiplexer.Connect(URL); try
{
var redis = ConnectionMultiplexer.Connect(URL);
string streamName = message.LocalPriority.ToString(); string streamName = message.LocalPriority.ToString();
Console.WriteLine("stream name = " + streamName); Console.WriteLine("stream name = " + streamName);
var db = redis.GetDatabase(); var db = redis.GetDatabase();
var serializedMessage = JsonConvert.SerializeObject(message); var serializedMessage = JsonConvert.SerializeObject(message);
Console.WriteLine("Sending to stream : " + streamName); Console.WriteLine("Sending to stream : " + streamName);
//var messageId = await db.StreamAddAsync(streamName, new NameValueEntry[] { }, serializedMessage); //var messageId = await db.StreamAddAsync(streamName, new NameValueEntry[] { }, serializedMessage);
var messageId = await db.StreamAddAsync(streamName,new NameValueEntry[] var messageId = await db.StreamAddAsync
(streamName,
new NameValueEntry[]
{
new NameValueEntry("message", serializedMessage)
});
{ new("tag", "SYR"), new NameValueEntry("message", serializedMessage) });
Console.WriteLine("Done Sending to stream : " + streamName);
Console.WriteLine("Stream msg id = " + messageId);
//var messageId = "YES";
return messageId.ToString();
}
Console.WriteLine("Done Sending to stream : " + streamName); catch (Exception ex)
Console.WriteLine("Stream msg id = " + messageId); {
//var messageId = "YES"; return await Task.FromResult(RedisConnectionError);
return messageId.ToString(); }
} }
} }
} }
...@@ -10,6 +10,8 @@ namespace QueuerNode.Services ...@@ -10,6 +10,8 @@ namespace QueuerNode.Services
{ {
private readonly ILogger<QueueMessageService> _logger; private readonly ILogger<QueueMessageService> _logger;
private readonly IDiscoveryClient _client; private readonly IDiscoveryClient _client;
private readonly static string ErrorQueuing = "Error";
public QueueMessageService(ILogger<QueueMessageService> logger , IDiscoveryClient client) public QueueMessageService(ILogger<QueueMessageService> logger , IDiscoveryClient client)
{ {
_logger = logger; _logger = logger;
...@@ -23,12 +25,19 @@ namespace QueuerNode.Services ...@@ -23,12 +25,19 @@ namespace QueuerNode.Services
string reqId = MessageQueues.addMessage(message, _client); string reqId = MessageQueues.addMessage(message, _client);
if (reqId.Equals(MessageQueues.RedisConnectionError))
{
return Task.FromResult(new Acknowledgement
{
ReplyCode = ErrorQueuing,
RequestID = reqId,
});
}
return Task.FromResult(new Acknowledgement return Task.FromResult(new Acknowledgement
{ {
ReplyCode = "OK on Send : id = " + message.MsgId + " ==> Message Reached Queue" ReplyCode = "OK on Send : id = " + message.MsgId + " ==> Message Reached Queue"
+ " with priority : " + message.LocalPriority + " with priority : " + message.LocalPriority ,
,
RequestID = reqId RequestID = reqId
}); });
......
...@@ -25,3 +25,5 @@ ...@@ -25,3 +25,5 @@
2.0 2.0
2.0 2.0
2.0 2.0
2.0
2.0
...@@ -9,14 +9,8 @@ var token = tokenSource.Token; ...@@ -9,14 +9,8 @@ var token = tokenSource.Token;
var muxer = ConnectionMultiplexer.Connect("localhost"); var muxer = ConnectionMultiplexer.Connect("localhost");
var db = muxer.GetDatabase(); var db = muxer.GetDatabase();
const string streamName = "5"; const string streamName = "4";
Dictionary<string, string> ParseResult(StreamEntry entry)
=> entry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
int read = 0;
var readTask = Task.Run(async () => var readTask = Task.Run(async () =>
{ {
string id = string.Empty; string id = string.Empty;
...@@ -27,9 +21,10 @@ var readTask = Task.Run(async () => ...@@ -27,9 +21,10 @@ var readTask = Task.Run(async () =>
{ {
// Get the message ID // Get the message ID
var messageId = entry.Id; var messageId = entry.Id;
Console.WriteLine(messageId);
// Access the message data (serialized JSON) // Access the message data (serialized JSON)
string? serializedMessage = entry.Values.ToString(); string? serializedMessage = entry.Values[0].Value.ToString();
Console.WriteLine(serializedMessage);
if (serializedMessage == null) continue; if (serializedMessage == null) continue;
// Deserialize the JSON back to a Message object (if needed) // Deserialize the JSON back to a Message object (if needed)
...@@ -46,7 +41,7 @@ var readTask = Task.Run(async () => ...@@ -46,7 +41,7 @@ var readTask = Task.Run(async () =>
tokenSource.CancelAfter(TimeSpan.FromSeconds(20)); //tokenSource.CancelAfter(TimeSpan(20));
await Task.WhenAll(readTask); await Task.WhenAll(readTask);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment