Commit f580fa93 authored by mohammad.salama's avatar mohammad.salama

fixed contract for queuer + started logic of streaming by tag-priority

parent 9dacc922
......@@ -21,6 +21,14 @@
<PackageReference Include="Steeltoe.Discovery.Eureka" Version="$(SteeltoeVersion)" />
</ItemGroup>
/**
if happened any problem , visit the official steeltoe homepage to get coherent versions
*/
\ No newline at end of file
<ItemGroup>
<PackageReference Include="CSRedisCore" Version="3.8.803" />
<PackageReference Include="StackExchange.Redis" Version="2.8.0" />
</ItemGroup>
---------------------------------------------------------------------------------------------------------------------------------------------------------------
add redis:
dotnet add package StackExchange.Redis
\ No newline at end of file
......@@ -20,9 +20,9 @@ namespace HTTPMessageNode.Controllers
//[HttpGet(Name = "GetWeatherForecast")]
[Route("/queue-msg")]
[HttpPost]
public string SendMessage([FromBody] MessageDTO messageDTO)
public Acknowledgement SendMessage([FromBody] MessageDTO messageDTO)
{
Console.WriteLine("Msg from : " + messageDTO.clientID);
//Console.WriteLine("Msg from : " + messageDTO.clientID + " pr = " + messageDTO.localPriority);
string validator = getValidatorAddress();
......@@ -33,23 +33,28 @@ namespace HTTPMessageNode.Controllers
if (res == false)
{
return "Error";
return new Acknowledgement(){
ReplyCode = "Error",
RequestID = "-1"
};
}
string address = getAddress();
using var channel = GrpcChannel.ForAddress(address);
var client = new Queue.QueueClient(channel);
Console.WriteLine("Sending to " + address);
//Console.WriteLine("Sending to " + address);
//Console.WriteLine(message.Tag + " , " + message.ClientID + " new pr = " + message.LocalPriority); ;
var reply = client.QueueMessage(message);
Console.WriteLine(reply.ReplyCode);
return (reply.ReplyCode );
//Console.WriteLine(reply.ReplyCode);
return reply;
}
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace QueuerNode.Helper
{
public class MessageQueues
{
private const int priority_levels = 6;
private const int sms_rates = 15;
private Queue<Message>[] messageQueue;
private object[] locks;
public MessageQueues()
{
messageQueue = new Queue<Message>[priority_levels];
locks = new object [priority_levels];
for (int i=0; i < priority_levels; i++)
{
messageQueue[i] = new Queue<Message>();
locks[i] = new object();
}
}
public void addMessage(Message message)
{
int idx = message.LocalPriority;
idx = Math.Min(idx, priority_levels - 1);
idx = Math.Max(idx, 0);
Console.WriteLine("idx = " + idx);
insert(message, idx);
Console.WriteLine("Message Queued: " + message.MsgId);
}
private void insert(Message message , int index)
{
lock (locks[index])
{
messageQueue[index].Enqueue(message);
Console.WriteLine("INserted in Q : " + index);
}
}
public void sendMessages()
{
int x1 = 7, x2 = 5, x3 = 3;
while(x1>0 && messageQueue[1].Count>0)
{
Console.WriteLine(messageQueue[1].Dequeue());
x1--;
}
while (x2 > 0 && messageQueue[1].Count > 0)
{
Console.WriteLine(messageQueue[2].Dequeue());
x2--;
}
while (x3 > 0 && messageQueue[3].Count > 0)
{
Console.WriteLine(messageQueue[3].Dequeue());
x3--;
}
}
}
}
......@@ -11,12 +11,14 @@ message Message {
string phoneNumber = 4;
int32 localPriority = 5;
string text = 6;
string tag = 7;
}
message Acknowledgement
{
string replyCode = 1;
string requestID = 2;
}
service Queue {
......
......@@ -39,4 +39,8 @@
<Protobuf Include="Protos\schema.proto" GrpcServices="Server" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="CSRedisCore" Version="3.8.803" />
<PackageReference Include="StackExchange.Redis" Version="2.8.0" />
</ItemGroup>
</Project>
using Newtonsoft.Json;
using StackExchange.Redis;
using Steeltoe.Discovery;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace QueuerNode.RedisQueuer
{
public class MessageQueues
{
private static string SYRRedisURL = "localhost:48484";
private static string MTNRedisURL = "localhost:48485";
private static string Syriatel = "SYR";
private static string MTN = "MTN";
public static string addMessage(Message message , IDiscoveryClient discoveryClient)
{
string id = "Error";
if (message.Tag.Contains(Syriatel, StringComparison.OrdinalIgnoreCase))
{
// get url using discovery client
var resid = addMessageRedisAsync(message, SYRRedisURL);
Console.WriteLine(resid);
id = message.Tag + ":" + message.LocalPriority + ":" + resid;
}
else if (message.Tag.Contains(MTN, StringComparison.OrdinalIgnoreCase))
{
var resid = addMessageRedisAsync(message, MTNRedisURL);
Console.WriteLine(resid);
id = message.Tag + ":" + message.LocalPriority + ":" + resid;
}
return id;
}
private static async Task<string> addMessageRedisAsync(Message message, string URL)
{
/*var redis = ConnectionMultiplexer.Connect(URL);
string streamName = message.LocalPriority.ToString();
var db = redis.GetDatabase();
var serializedMessage = JsonConvert.SerializeObject(message);
var messageId = await db.StreamAddAsync(streamName, new NameValueEntry[] { }, serializedMessage);
*/
var messageId = "YES";
return messageId.ToString();
}
}
}
......@@ -2,7 +2,7 @@
using Grpc.Core;
using Steeltoe.Common.Discovery;
using Steeltoe.Discovery;
using QueuerNode.Helper;
using QueuerNode.RedisQueuer;
namespace QueuerNode.Services
{
......@@ -10,12 +10,10 @@ namespace QueuerNode.Services
{
private readonly ILogger<QueueMessageService> _logger;
private readonly IDiscoveryClient _client;
private MessageQueues _messageQueues;
public QueueMessageService(ILogger<QueueMessageService> logger , IDiscoveryClient client)
{
_logger = logger;
_client = client;
_messageQueues = new MessageQueues();
}
public override Task<Acknowledgement> QueueMessage(Message message, ServerCallContext context)
......@@ -23,12 +21,18 @@ namespace QueuerNode.Services
//Console.WriteLine("Message Receieved to Queuer !!");
_messageQueues.addMessage(message);
string reqId = MessageQueues.addMessage(message, _client);
return Task.FromResult(new Acknowledgement
{
ReplyCode = "OK on Send : id = " + message.MsgId + " ==> Message Reached Queue"
+ " with priority : " + message.LocalPriority
,
RequestID = reqId
});
}
}
}
......@@ -24,18 +24,19 @@ namespace QueuerNode {
static SchemaReflection() {
byte[] descriptorData = global::System.Convert.FromBase64String(
string.Concat(
"ChNQcm90b3Mvc2NoZW1hLnByb3RvEgpUcmFubWl0dGVyInQKB01lc3NhZ2US",
"EAoIY2xpZW50SUQYASABKAkSDgoGYXBpS2V5GAIgASgJEg0KBW1zZ0lkGAMg",
"ASgJEhMKC3Bob25lTnVtYmVyGAQgASgJEhUKDWxvY2FsUHJpb3JpdHkYBSAB",
"KAUSDAoEdGV4dBgGIAEoCSIkCg9BY2tub3dsZWRnZW1lbnQSEQoJcmVwbHlD",
"b2RlGAEgASgJMkkKBVF1ZXVlEkAKDFF1ZXVlTWVzc2FnZRITLlRyYW5taXR0",
"ZXIuTWVzc2FnZRobLlRyYW5taXR0ZXIuQWNrbm93bGVkZ2VtZW50Qg2qAgpR",
"dWV1ZXJOb2RlYgZwcm90bzM="));
"ChNQcm90b3Mvc2NoZW1hLnByb3RvEgpUcmFubWl0dGVyIoEBCgdNZXNzYWdl",
"EhAKCGNsaWVudElEGAEgASgJEg4KBmFwaUtleRgCIAEoCRINCgVtc2dJZBgD",
"IAEoCRITCgtwaG9uZU51bWJlchgEIAEoCRIVCg1sb2NhbFByaW9yaXR5GAUg",
"ASgFEgwKBHRleHQYBiABKAkSCwoDdGFnGAcgASgJIjcKD0Fja25vd2xlZGdl",
"bWVudBIRCglyZXBseUNvZGUYASABKAkSEQoJcmVxdWVzdElEGAIgASgJMkkK",
"BVF1ZXVlEkAKDFF1ZXVlTWVzc2FnZRITLlRyYW5taXR0ZXIuTWVzc2FnZRob",
"LlRyYW5taXR0ZXIuQWNrbm93bGVkZ2VtZW50Qg2qAgpRdWV1ZXJOb2RlYgZw",
"cm90bzM="));
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
new pbr::FileDescriptor[] { },
new pbr::GeneratedClrTypeInfo(null, null, new pbr::GeneratedClrTypeInfo[] {
new pbr::GeneratedClrTypeInfo(typeof(global::QueuerNode.Message), global::QueuerNode.Message.Parser, new[]{ "ClientID", "ApiKey", "MsgId", "PhoneNumber", "LocalPriority", "Text" }, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::QueuerNode.Acknowledgement), global::QueuerNode.Acknowledgement.Parser, new[]{ "ReplyCode" }, null, null, null, null)
new pbr::GeneratedClrTypeInfo(typeof(global::QueuerNode.Message), global::QueuerNode.Message.Parser, new[]{ "ClientID", "ApiKey", "MsgId", "PhoneNumber", "LocalPriority", "Text", "Tag" }, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::QueuerNode.Acknowledgement), global::QueuerNode.Acknowledgement.Parser, new[]{ "ReplyCode", "RequestID" }, null, null, null, null)
}));
}
#endregion
......@@ -83,6 +84,7 @@ namespace QueuerNode {
phoneNumber_ = other.phoneNumber_;
localPriority_ = other.localPriority_;
text_ = other.text_;
tag_ = other.tag_;
_unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields);
}
......@@ -164,6 +166,18 @@ namespace QueuerNode {
}
}
/// <summary>Field number for the "tag" field.</summary>
public const int TagFieldNumber = 7;
private string tag_ = "";
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public string Tag {
get { return tag_; }
set {
tag_ = pb::ProtoPreconditions.CheckNotNull(value, "value");
}
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public override bool Equals(object other) {
......@@ -185,6 +199,7 @@ namespace QueuerNode {
if (PhoneNumber != other.PhoneNumber) return false;
if (LocalPriority != other.LocalPriority) return false;
if (Text != other.Text) return false;
if (Tag != other.Tag) return false;
return Equals(_unknownFields, other._unknownFields);
}
......@@ -198,6 +213,7 @@ namespace QueuerNode {
if (PhoneNumber.Length != 0) hash ^= PhoneNumber.GetHashCode();
if (LocalPriority != 0) hash ^= LocalPriority.GetHashCode();
if (Text.Length != 0) hash ^= Text.GetHashCode();
if (Tag.Length != 0) hash ^= Tag.GetHashCode();
if (_unknownFields != null) {
hash ^= _unknownFields.GetHashCode();
}
......@@ -240,6 +256,10 @@ namespace QueuerNode {
output.WriteRawTag(50);
output.WriteString(Text);
}
if (Tag.Length != 0) {
output.WriteRawTag(58);
output.WriteString(Tag);
}
if (_unknownFields != null) {
_unknownFields.WriteTo(output);
}
......@@ -274,6 +294,10 @@ namespace QueuerNode {
output.WriteRawTag(50);
output.WriteString(Text);
}
if (Tag.Length != 0) {
output.WriteRawTag(58);
output.WriteString(Tag);
}
if (_unknownFields != null) {
_unknownFields.WriteTo(ref output);
}
......@@ -302,6 +326,9 @@ namespace QueuerNode {
if (Text.Length != 0) {
size += 1 + pb::CodedOutputStream.ComputeStringSize(Text);
}
if (Tag.Length != 0) {
size += 1 + pb::CodedOutputStream.ComputeStringSize(Tag);
}
if (_unknownFields != null) {
size += _unknownFields.CalculateSize();
}
......@@ -332,6 +359,9 @@ namespace QueuerNode {
if (other.Text.Length != 0) {
Text = other.Text;
}
if (other.Tag.Length != 0) {
Tag = other.Tag;
}
_unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields);
}
......@@ -371,6 +401,10 @@ namespace QueuerNode {
Text = input.ReadString();
break;
}
case 58: {
Tag = input.ReadString();
break;
}
}
}
#endif
......@@ -410,6 +444,10 @@ namespace QueuerNode {
Text = input.ReadString();
break;
}
case 58: {
Tag = input.ReadString();
break;
}
}
}
}
......@@ -453,6 +491,7 @@ namespace QueuerNode {
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public Acknowledgement(Acknowledgement other) : this() {
replyCode_ = other.replyCode_;
requestID_ = other.requestID_;
_unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields);
}
......@@ -474,6 +513,18 @@ namespace QueuerNode {
}
}
/// <summary>Field number for the "requestID" field.</summary>
public const int RequestIDFieldNumber = 2;
private string requestID_ = "";
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public string RequestID {
get { return requestID_; }
set {
requestID_ = pb::ProtoPreconditions.CheckNotNull(value, "value");
}
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public override bool Equals(object other) {
......@@ -490,6 +541,7 @@ namespace QueuerNode {
return true;
}
if (ReplyCode != other.ReplyCode) return false;
if (RequestID != other.RequestID) return false;
return Equals(_unknownFields, other._unknownFields);
}
......@@ -498,6 +550,7 @@ namespace QueuerNode {
public override int GetHashCode() {
int hash = 1;
if (ReplyCode.Length != 0) hash ^= ReplyCode.GetHashCode();
if (RequestID.Length != 0) hash ^= RequestID.GetHashCode();
if (_unknownFields != null) {
hash ^= _unknownFields.GetHashCode();
}
......@@ -520,6 +573,10 @@ namespace QueuerNode {
output.WriteRawTag(10);
output.WriteString(ReplyCode);
}
if (RequestID.Length != 0) {
output.WriteRawTag(18);
output.WriteString(RequestID);
}
if (_unknownFields != null) {
_unknownFields.WriteTo(output);
}
......@@ -534,6 +591,10 @@ namespace QueuerNode {
output.WriteRawTag(10);
output.WriteString(ReplyCode);
}
if (RequestID.Length != 0) {
output.WriteRawTag(18);
output.WriteString(RequestID);
}
if (_unknownFields != null) {
_unknownFields.WriteTo(ref output);
}
......@@ -547,6 +608,9 @@ namespace QueuerNode {
if (ReplyCode.Length != 0) {
size += 1 + pb::CodedOutputStream.ComputeStringSize(ReplyCode);
}
if (RequestID.Length != 0) {
size += 1 + pb::CodedOutputStream.ComputeStringSize(RequestID);
}
if (_unknownFields != null) {
size += _unknownFields.CalculateSize();
}
......@@ -562,6 +626,9 @@ namespace QueuerNode {
if (other.ReplyCode.Length != 0) {
ReplyCode = other.ReplyCode;
}
if (other.RequestID.Length != 0) {
RequestID = other.RequestID;
}
_unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields);
}
......@@ -581,6 +648,10 @@ namespace QueuerNode {
ReplyCode = input.ReadString();
break;
}
case 18: {
RequestID = input.ReadString();
break;
}
}
}
#endif
......@@ -600,6 +671,10 @@ namespace QueuerNode {
ReplyCode = input.ReadString();
break;
}
case 18: {
RequestID = input.ReadString();
break;
}
}
}
}
......
df4daa3dd952cd04a679792532393496ca1ee6fd
eb7499a3413359eab53d407a4c68c9ffbf2fb0a3
......@@ -44,6 +44,10 @@
"net6.0": {
"targetAlias": "net6.0",
"dependencies": {
"CSRedisCore": {
"target": "Package",
"version": "[3.8.803, )"
},
"Google.Protobuf": {
"target": "Package",
"version": "[3.27.1, )"
......@@ -62,6 +66,10 @@
"target": "Package",
"version": "[2.64.0, )"
},
"StackExchange.Redis": {
"target": "Package",
"version": "[2.8.0, )"
},
"Steeltoe.Connector.ConnectorCore": {
"target": "Package",
"version": "[3.2.6, )"
......
This diff is collapsed.
{
"version": 2,
"dgSpecHash": "l+pS2V6wbE6JC8adlQKjzr3Zk8W+wRwZjOArO+p5ubaEPShZLA7LpF/MPiyJO+zV1p3fdpJPA1c9EfPg67uQFg==",
"dgSpecHash": "7nJeQF9PaNYmuatMaEICtBg4szZlecxtGBDyfjhVcDpqXKH3DY3oyz1ewYuFVWCXCMp7yOmZV4wnPQpkQX5pDQ==",
"success": true,
"projectFilePath": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\QueuerNode\\QueuerNode.csproj",
"expectedPackageFiles": [
"C:\\Users\\moham\\.nuget\\packages\\csrediscore\\3.8.803\\csrediscore.3.8.803.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\google.protobuf\\3.27.1\\google.protobuf.3.27.1.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\grpc.aspnetcore\\2.40.0\\grpc.aspnetcore.2.40.0.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\grpc.aspnetcore.server\\2.40.0\\grpc.aspnetcore.server.2.40.0.nupkg.sha512",
......@@ -44,6 +45,9 @@
"C:\\Users\\moham\\.nuget\\packages\\microsoft.extensions.options.configurationextensions\\6.0.0\\microsoft.extensions.options.configurationextensions.6.0.0.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\microsoft.extensions.primitives\\6.0.0\\microsoft.extensions.primitives.6.0.0.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\microsoft.openapi\\1.2.3\\microsoft.openapi.1.2.3.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\newtonsoft.json\\13.0.1\\newtonsoft.json.13.0.1.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\pipelines.sockets.unofficial\\2.2.8\\pipelines.sockets.unofficial.2.2.8.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\stackexchange.redis\\2.8.0\\stackexchange.redis.2.8.0.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\steeltoe.common\\3.2.6\\steeltoe.common.3.2.6.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\steeltoe.common.abstractions\\3.2.6\\steeltoe.common.abstractions.3.2.6.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\steeltoe.common.http\\3.2.6\\steeltoe.common.http.3.2.6.nupkg.sha512",
......@@ -60,11 +64,13 @@
"C:\\Users\\moham\\.nuget\\packages\\swashbuckle.aspnetcore.swaggerui\\6.4.0\\swashbuckle.aspnetcore.swaggerui.6.4.0.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\system.diagnostics.diagnosticsource\\6.0.0\\system.diagnostics.diagnosticsource.6.0.0.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\system.diagnostics.eventlog\\6.0.0\\system.diagnostics.eventlog.6.0.0.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\system.io.pipelines\\5.0.1\\system.io.pipelines.5.0.1.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\system.net.http.json\\3.2.1\\system.net.http.json.3.2.1.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\system.reflection.metadataloadcontext\\4.6.0\\system.reflection.metadataloadcontext.4.6.0.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\system.runtime.compilerservices.unsafe\\6.0.0\\system.runtime.compilerservices.unsafe.6.0.0.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\system.text.encodings.web\\6.0.0\\system.text.encodings.web.6.0.0.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\system.text.json\\6.0.0\\system.text.json.6.0.0.nupkg.sha512"
"C:\\Users\\moham\\.nuget\\packages\\system.text.json\\6.0.0\\system.text.json.6.0.0.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\system.valuetuple\\4.5.0\\system.valuetuple.4.5.0.nupkg.sha512"
],
"logs": []
}
\ No newline at end of file
......@@ -17,7 +17,7 @@ namespace Validator.Services
return Task.FromResult(new Reply
{
ReplyCode = "OK ok 200 + validated !! ",
AccountPriority = 5
AccountPriority = 4
}) ;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment