Commit 0310c291 authored by mohammad.salama's avatar mohammad.salama

Queuer Node Accepts GRPC+HTTP Correctly and Send Messages to streams in redis

parent f580fa93
...@@ -35,7 +35,7 @@ namespace GrpcMessageNode.DataBaseAccess ...@@ -35,7 +35,7 @@ namespace GrpcMessageNode.DataBaseAccess
using var channel = GrpcChannel.ForAddress(validatorAddress); using var channel = GrpcChannel.ForAddress(validatorAddress);
var client = new Validate.ValidateClient(channel); var client = new Validate.ValidateClient(channel);
MessageMetaData messageMeta = extractMetaData(message); MessageMetaData messageMeta = extractMetaData(message);
//Console.WriteLine("Calling GRPC for address = " + validatorAddress); Console.WriteLine("Calling GRPC for address = " + validatorAddress);
var reply = client.ValidateMessageAsync(messageMeta); var reply = client.ValidateMessageAsync(messageMeta);
var ans = reply.GetAwaiter().GetResult(); var ans = reply.GetAwaiter().GetResult();
......
...@@ -33,19 +33,16 @@ namespace GrpcMessageNode.Services ...@@ -33,19 +33,16 @@ namespace GrpcMessageNode.Services
}); });
} }
//Console.WriteLine("new Pr = " + message.LocalPriority); Console.WriteLine("new Pr = " + message.LocalPriority);
//Console.WriteLine("Account Checker Passed "); //Console.WriteLine("Account Checker Passed ");
string reply = sendToCoordinator(message); Acknowledgement reply = sendToCoordinator(message);
return Task.FromResult(new Acknowledgement return Task.FromResult(new Acknowledgement(reply));
{
ReplyCode = reply //"OK on Send " + message.MsgId
});
} }
private string sendToCoordinator(Message message) private Acknowledgement sendToCoordinator(Message message)
{ {
string address = getCoordinatorAddress(); string address = getCoordinatorAddress();
Message2 message2 = copyMessage(message); Message2 message2 = copyMessage(message);
...@@ -58,7 +55,7 @@ namespace GrpcMessageNode.Services ...@@ -58,7 +55,7 @@ namespace GrpcMessageNode.Services
// Console.WriteLine(reply.ReplyCode); // Console.WriteLine(reply.ReplyCode);
return reply.ReplyCode; return new Acknowledgement() { ReplyCode = reply.ReplyCode , RequestID = reply.RequestID};
} }
//not working -- causing unknown exception with nullable parameter http2 //not working -- causing unknown exception with nullable parameter http2
......
...@@ -43,3 +43,6 @@ ...@@ -43,3 +43,6 @@
2.0 2.0
2.0 2.0
2.0 2.0
2.0
2.0
2.0
...@@ -13,7 +13,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HTTPMessageNode", "HTTPMess ...@@ -13,7 +13,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HTTPMessageNode", "HTTPMess
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HTTPMessageGenerator", "HTTPMessageGenerator\HTTPMessageGenerator.csproj", "{C008AEEB-FD9F-4D59-B047-A77BD058C12F}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HTTPMessageGenerator", "HTTPMessageGenerator\HTTPMessageGenerator.csproj", "{C008AEEB-FD9F-4D59-B047-A77BD058C12F}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Validator", "Validator\Validator.csproj", "{D2C3ADDD-BA47-473C-A761-1CE5B1EE6407}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Validator", "Validator\Validator.csproj", "{D2C3ADDD-BA47-473C-A761-1CE5B1EE6407}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SimpleStreamConsumerTest", "SimpleStreamConsumerTest\SimpleStreamConsumerTest.csproj", "{8F51B680-F3B6-4892-B854-2E2AF840CD71}"
EndProject EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
...@@ -45,6 +47,10 @@ Global ...@@ -45,6 +47,10 @@ Global
{D2C3ADDD-BA47-473C-A761-1CE5B1EE6407}.Debug|Any CPU.Build.0 = Debug|Any CPU {D2C3ADDD-BA47-473C-A761-1CE5B1EE6407}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D2C3ADDD-BA47-473C-A761-1CE5B1EE6407}.Release|Any CPU.ActiveCfg = Release|Any CPU {D2C3ADDD-BA47-473C-A761-1CE5B1EE6407}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D2C3ADDD-BA47-473C-A761-1CE5B1EE6407}.Release|Any CPU.Build.0 = Release|Any CPU {D2C3ADDD-BA47-473C-A761-1CE5B1EE6407}.Release|Any CPU.Build.0 = Release|Any CPU
{8F51B680-F3B6-4892-B854-2E2AF840CD71}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8F51B680-F3B6-4892-B854-2E2AF840CD71}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8F51B680-F3B6-4892-B854-2E2AF840CD71}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8F51B680-F3B6-4892-B854-2E2AF840CD71}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
......
...@@ -9,3 +9,4 @@ ...@@ -9,3 +9,4 @@
2.0 2.0
2.0 2.0
2.0 2.0
2.0
...@@ -7,10 +7,6 @@ ...@@ -7,10 +7,6 @@
<UserSecretsId>dotnet-MessageGeneratorGRPC-685F4B88-9109-41BE-B200-D1916D3E7EAB</UserSecretsId> <UserSecretsId>dotnet-MessageGeneratorGRPC-685F4B88-9109-41BE-B200-D1916D3E7EAB</UserSecretsId>
</PropertyGroup> </PropertyGroup>
<ItemGroup>
<None Remove="Protos\schemaQ.proto" />
</ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.0" /> <PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.0" />
</ItemGroup> </ItemGroup>
...@@ -38,7 +34,6 @@ ...@@ -38,7 +34,6 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Protobuf Include="Protos\schemaQ.proto" GrpcServices="Client" />
<Protobuf Include="Protos\schema.proto" GrpcServices="Client" /> <Protobuf Include="Protos\schema.proto" GrpcServices="Client" />
</ItemGroup> </ItemGroup>
......
syntax = "proto3";
option csharp_namespace = "MessageGeneratorGRPC";
package Tranmitter;
message Message2 {
string clientID = 1;
string apiKey = 2;
string msgId = 3;
string phoneNumber = 4;
int32 localPriority = 5;
string text = 6;
string tag = 7;
}
message Acknowledgement2
{
string replyCode = 1;
string requestID = 2;
}
service Queue {
rpc QueueMessage(Message2) returns (Acknowledgement2);
}
...@@ -33,7 +33,7 @@ namespace MessageGeneratorGRPC ...@@ -33,7 +33,7 @@ namespace MessageGeneratorGRPC
var reply = client.SendMessage(message); var reply = client.SendMessage(message);
Console.WriteLine(reply.ReplyCode); Console.WriteLine(reply.ReplyCode + "\n"+reply.RequestID);
Console.WriteLine("Press any key to exit..."); Console.WriteLine("Press any key to exit...");
Console.ReadKey(); Console.ReadKey();
...@@ -51,26 +51,5 @@ namespace MessageGeneratorGRPC ...@@ -51,26 +51,5 @@ namespace MessageGeneratorGRPC
return address; return address;
} }
public void test()
{
string address = "https://localhost:9090"; //getCoordinatorAddress();
using var channel = GrpcChannel.ForAddress(address);
Console.WriteLine("QueuerNode Address = " + address);
var client = new Queue.QueueClient(channel);
Message2 message = new Message2();
message.Text = "Hello World !";
message.ApiKey = "Api-Key";
message.ClientID = "m-salameh";
message.LocalPriority = 1;
message.MsgId = "msg-id=1";
message.PhoneNumber = "043 33 00 83";
Console.WriteLine("Sending to " + address);
var reply = client.QueueMessage(message);
Console.WriteLine(reply.ReplyCode);
}
} }
} }
\ No newline at end of file
1adcf206652457d8a65bbead798dedc4b7da29b0 d9da9c7acde4daeadcd4d04a0d1ede75f20f0917
...@@ -11,12 +11,13 @@ namespace QueuerNode.RedisQueuer ...@@ -11,12 +11,13 @@ namespace QueuerNode.RedisQueuer
{ {
public class MessageQueues public class MessageQueues
{ {
private static string SYRRedisURL = "localhost:48484"; private static string SYRRedisURL = "localhost:6379";
private static string MTNRedisURL = "localhost:48485"; private static string MTNRedisURL = "localhost:48485";
private static string Syriatel = "SYR"; private static string Syriatel = "SYR";
private static string MTN = "MTN"; private static string MTN = "MTN";
public static string addMessage(Message message , IDiscoveryClient discoveryClient) public static string addMessage(Message message , IDiscoveryClient discoveryClient)
{ {
string id = "Error"; string id = "Error";
...@@ -25,15 +26,14 @@ namespace QueuerNode.RedisQueuer ...@@ -25,15 +26,14 @@ namespace QueuerNode.RedisQueuer
{ {
// get url using discovery client // get url using discovery client
var resid = addMessageRedisAsync(message, SYRRedisURL); var resid = addMessageRedisAsync(message, SYRRedisURL);
Console.WriteLine(resid);
id = message.Tag + ":" + message.LocalPriority + ":" + resid; id = message.Tag + ":" + message.LocalPriority + ":" + resid.Result;
} }
else if (message.Tag.Contains(MTN, StringComparison.OrdinalIgnoreCase)) else if (message.Tag.Contains(MTN, StringComparison.OrdinalIgnoreCase))
{ {
var resid = addMessageRedisAsync(message, MTNRedisURL); var resid = addMessageRedisAsync(message, MTNRedisURL);
Console.WriteLine(resid);
id = message.Tag + ":" + message.LocalPriority + ":" + resid.Result;
id = message.Tag + ":" + message.LocalPriority + ":" + resid;
} }
return id; return id;
...@@ -42,17 +42,28 @@ namespace QueuerNode.RedisQueuer ...@@ -42,17 +42,28 @@ namespace QueuerNode.RedisQueuer
private static async Task<string> addMessageRedisAsync(Message message, string URL) private static async Task<string> addMessageRedisAsync(Message message, string URL)
{ {
/*var redis = ConnectionMultiplexer.Connect(URL); var redis = ConnectionMultiplexer.Connect(URL);
string streamName = message.LocalPriority.ToString(); string streamName = message.LocalPriority.ToString();
Console.WriteLine("stream name = " + streamName);
var db = redis.GetDatabase(); var db = redis.GetDatabase();
var serializedMessage = JsonConvert.SerializeObject(message); var serializedMessage = JsonConvert.SerializeObject(message);
var messageId = await db.StreamAddAsync(streamName, new NameValueEntry[] { }, serializedMessage); Console.WriteLine("Sending to stream : " + streamName);
*/
var messageId = "YES"; //var messageId = await db.StreamAddAsync(streamName, new NameValueEntry[] { }, serializedMessage);
var messageId = await db.StreamAddAsync(streamName,new NameValueEntry[]
{ new("tag", "SYR"), new NameValueEntry("message", serializedMessage) });
Console.WriteLine("Done Sending to stream : " + streamName);
Console.WriteLine("Stream msg id = " + messageId);
//var messageId = "YES";
return messageId.ToString(); return messageId.ToString();
} }
} }
......
...@@ -8,10 +8,12 @@ ...@@ -8,10 +8,12 @@
".NETCoreApp,Version=v6.0": { ".NETCoreApp,Version=v6.0": {
"QueuerNode/1.0.0": { "QueuerNode/1.0.0": {
"dependencies": { "dependencies": {
"CSRedisCore": "3.8.803",
"Google.Protobuf": "3.27.1", "Google.Protobuf": "3.27.1",
"Grpc.AspNetCore": "2.40.0", "Grpc.AspNetCore": "2.40.0",
"Grpc.Net.Client": "2.63.0", "Grpc.Net.Client": "2.63.0",
"Grpc.Tools": "2.64.0", "Grpc.Tools": "2.64.0",
"StackExchange.Redis": "2.8.0",
"Steeltoe.Connector.ConnectorCore": "3.2.6", "Steeltoe.Connector.ConnectorCore": "3.2.6",
"Steeltoe.Discovery.Eureka": "3.2.6", "Steeltoe.Discovery.Eureka": "3.2.6",
"Swashbuckle.AspNetCore": "6.4.0" "Swashbuckle.AspNetCore": "6.4.0"
...@@ -20,6 +22,18 @@ ...@@ -20,6 +22,18 @@
"QueuerNode.dll": {} "QueuerNode.dll": {}
} }
}, },
"CSRedisCore/3.8.803": {
"dependencies": {
"Newtonsoft.Json": "13.0.1",
"System.ValueTuple": "4.5.0"
},
"runtime": {
"lib/netstandard2.0/CSRedisCore.dll": {
"assemblyVersion": "3.8.803.0",
"fileVersion": "3.8.803.0"
}
}
},
"Google.Protobuf/3.27.1": { "Google.Protobuf/3.27.1": {
"runtime": { "runtime": {
"lib/net5.0/Google.Protobuf.dll": { "lib/net5.0/Google.Protobuf.dll": {
...@@ -316,6 +330,37 @@ ...@@ -316,6 +330,37 @@
} }
} }
}, },
"Newtonsoft.Json/13.0.1": {
"runtime": {
"lib/netstandard2.0/Newtonsoft.Json.dll": {
"assemblyVersion": "13.0.0.0",
"fileVersion": "13.0.1.25517"
}
}
},
"Pipelines.Sockets.Unofficial/2.2.8": {
"dependencies": {
"System.IO.Pipelines": "5.0.1"
},
"runtime": {
"lib/net5.0/Pipelines.Sockets.Unofficial.dll": {
"assemblyVersion": "1.0.0.0",
"fileVersion": "2.2.8.1080"
}
}
},
"StackExchange.Redis/2.8.0": {
"dependencies": {
"Microsoft.Extensions.Logging.Abstractions": "6.0.0",
"Pipelines.Sockets.Unofficial": "2.2.8"
},
"runtime": {
"lib/net6.0/StackExchange.Redis.dll": {
"assemblyVersion": "2.0.0.0",
"fileVersion": "2.8.0.27420"
}
}
},
"Steeltoe.Common/3.2.6": { "Steeltoe.Common/3.2.6": {
"dependencies": { "dependencies": {
"Microsoft.Extensions.Caching.Abstractions": "3.1.0", "Microsoft.Extensions.Caching.Abstractions": "3.1.0",
...@@ -498,6 +543,7 @@ ...@@ -498,6 +543,7 @@
} }
}, },
"System.Diagnostics.EventLog/6.0.0": {}, "System.Diagnostics.EventLog/6.0.0": {},
"System.IO.Pipelines/5.0.1": {},
"System.Net.Http.Json/3.2.1": { "System.Net.Http.Json/3.2.1": {
"dependencies": { "dependencies": {
"System.Text.Json": "6.0.0" "System.Text.Json": "6.0.0"
...@@ -522,7 +568,8 @@ ...@@ -522,7 +568,8 @@
"System.Runtime.CompilerServices.Unsafe": "6.0.0", "System.Runtime.CompilerServices.Unsafe": "6.0.0",
"System.Text.Encodings.Web": "6.0.0" "System.Text.Encodings.Web": "6.0.0"
} }
} },
"System.ValueTuple/4.5.0": {}
} }
}, },
"libraries": { "libraries": {
...@@ -531,6 +578,13 @@ ...@@ -531,6 +578,13 @@
"serviceable": false, "serviceable": false,
"sha512": "" "sha512": ""
}, },
"CSRedisCore/3.8.803": {
"type": "package",
"serviceable": true,
"sha512": "sha512-+tCmvsJy0f69WMARgRT9nmTtIiwJDkTl9g5H32r7mJ003IxeZKFhLkvjO6MIm/6o79wM+s0lLW4fYlBvWP8C8Q==",
"path": "csrediscore/3.8.803",
"hashPath": "csrediscore.3.8.803.nupkg.sha512"
},
"Google.Protobuf/3.27.1": { "Google.Protobuf/3.27.1": {
"type": "package", "type": "package",
"serviceable": true, "serviceable": true,
...@@ -811,6 +865,27 @@ ...@@ -811,6 +865,27 @@
"path": "microsoft.openapi/1.2.3", "path": "microsoft.openapi/1.2.3",
"hashPath": "microsoft.openapi.1.2.3.nupkg.sha512" "hashPath": "microsoft.openapi.1.2.3.nupkg.sha512"
}, },
"Newtonsoft.Json/13.0.1": {
"type": "package",
"serviceable": true,
"sha512": "sha512-ppPFpBcvxdsfUonNcvITKqLl3bqxWbDCZIzDWHzjpdAHRFfZe0Dw9HmA0+za13IdyrgJwpkDTDA9fHaxOrt20A==",
"path": "newtonsoft.json/13.0.1",
"hashPath": "newtonsoft.json.13.0.1.nupkg.sha512"
},
"Pipelines.Sockets.Unofficial/2.2.8": {
"type": "package",
"serviceable": true,
"sha512": "sha512-zG2FApP5zxSx6OcdJQLbZDk2AVlN2BNQD6MorwIfV6gVj0RRxWPEp2LXAxqDGZqeNV1Zp0BNPcNaey/GXmTdvQ==",
"path": "pipelines.sockets.unofficial/2.2.8",
"hashPath": "pipelines.sockets.unofficial.2.2.8.nupkg.sha512"
},
"StackExchange.Redis/2.8.0": {
"type": "package",
"serviceable": true,
"sha512": "sha512-MjAJ0ejH8zLhtuN5+Z+/I07NmPGdVuGEvE2+4xONQoFwgl+7vbQ/A6jlUgH9UkZb4s9Mu9QDyBq1TkRqQcOgTQ==",
"path": "stackexchange.redis/2.8.0",
"hashPath": "stackexchange.redis.2.8.0.nupkg.sha512"
},
"Steeltoe.Common/3.2.6": { "Steeltoe.Common/3.2.6": {
"type": "package", "type": "package",
"serviceable": true, "serviceable": true,
...@@ -923,6 +998,13 @@ ...@@ -923,6 +998,13 @@
"path": "system.diagnostics.eventlog/6.0.0", "path": "system.diagnostics.eventlog/6.0.0",
"hashPath": "system.diagnostics.eventlog.6.0.0.nupkg.sha512" "hashPath": "system.diagnostics.eventlog.6.0.0.nupkg.sha512"
}, },
"System.IO.Pipelines/5.0.1": {
"type": "package",
"serviceable": true,
"sha512": "sha512-qEePWsaq9LoEEIqhbGe6D5J8c9IqQOUuTzzV6wn1POlfdLkJliZY3OlB0j0f17uMWlqZYjH7txj+2YbyrIA8Yg==",
"path": "system.io.pipelines/5.0.1",
"hashPath": "system.io.pipelines.5.0.1.nupkg.sha512"
},
"System.Net.Http.Json/3.2.1": { "System.Net.Http.Json/3.2.1": {
"type": "package", "type": "package",
"serviceable": true, "serviceable": true,
...@@ -957,6 +1039,13 @@ ...@@ -957,6 +1039,13 @@
"sha512": "sha512-zaJsHfESQvJ11vbXnNlkrR46IaMULk/gHxYsJphzSF+07kTjPHv+Oc14w6QEOfo3Q4hqLJgStUaYB9DBl0TmWg==", "sha512": "sha512-zaJsHfESQvJ11vbXnNlkrR46IaMULk/gHxYsJphzSF+07kTjPHv+Oc14w6QEOfo3Q4hqLJgStUaYB9DBl0TmWg==",
"path": "system.text.json/6.0.0", "path": "system.text.json/6.0.0",
"hashPath": "system.text.json.6.0.0.nupkg.sha512" "hashPath": "system.text.json.6.0.0.nupkg.sha512"
},
"System.ValueTuple/4.5.0": {
"type": "package",
"serviceable": true,
"sha512": "sha512-okurQJO6NRE/apDIP23ajJ0hpiNmJ+f0BwOlB/cSqTLQlw5upkf+5+96+iG2Jw40G1fCVCyPz/FhIABUjMR+RQ==",
"path": "system.valuetuple/4.5.0",
"hashPath": "system.valuetuple.4.5.0.nupkg.sha512"
} }
} }
} }
\ No newline at end of file
eb7499a3413359eab53d407a4c68c9ffbf2fb0a3 3ce302a8ec0f15a09a7724ecf0de1acbf1214a96
...@@ -88,3 +88,7 @@ D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\QueuerNode\ ...@@ -88,3 +88,7 @@ D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\QueuerNode\
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\QueuerNode\obj\Debug\net6.0\ref\QueuerNode.dll D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\QueuerNode\obj\Debug\net6.0\ref\QueuerNode.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\QueuerNode\obj\Debug\net6.0\QueuerNode.pdb D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\QueuerNode\obj\Debug\net6.0\QueuerNode.pdb
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\QueuerNode\obj\Debug\net6.0\QueuerNode.genruntimeconfig.cache D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\QueuerNode\obj\Debug\net6.0\QueuerNode.genruntimeconfig.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\QueuerNode\bin\Debug\net6.0\CSRedisCore.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\QueuerNode\bin\Debug\net6.0\Newtonsoft.Json.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\QueuerNode\bin\Debug\net6.0\Pipelines.Sockets.Unofficial.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\QueuerNode\bin\Debug\net6.0\StackExchange.Redis.dll
...@@ -18,3 +18,10 @@ ...@@ -18,3 +18,10 @@
2.0 2.0
2.0 2.0
2.0 2.0
2.0
2.0
2.0
2.0
2.0
2.0
2.0
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace SimpleStreamConsumerTest
{
public class MessageDTO
{
public string? clientID { get; set; }
public string? apiKey { get; set; }
public string? msgId { get; set; }
public string? phoneNumber { get; set; }
public int localPriority { get; set; }
public string? text { get; set; }
public string? tag { get; set; }
}
}
using CSRedis;
using SimpleStreamConsumerTest;
using StackExchange.Redis;
using Newtonsoft.Json;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
var muxer = ConnectionMultiplexer.Connect("localhost");
var db = muxer.GetDatabase();
const string streamName = "5";
Dictionary<string, string> ParseResult(StreamEntry entry)
=> entry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
int read = 0;
var readTask = Task.Run(async () =>
{
string id = string.Empty;
while (!token.IsCancellationRequested)
{
var messages = await db.StreamRangeAsync(streamName, "-", "+", 1, Order.Descending);
foreach (var entry in messages)
{
// Get the message ID
var messageId = entry.Id;
// Access the message data (serialized JSON)
string? serializedMessage = entry.Values.ToString();
if (serializedMessage == null) continue;
// Deserialize the JSON back to a Message object (if needed)
MessageDTO? message = JsonConvert.DeserializeObject<MessageDTO>(serializedMessage);
if (message == null) continue;
// Process the message data (message.Text, message.Timestamp, etc.)
Console.WriteLine($"Message ID: {messageId}, Text: {message.msgId}, tag: {message.tag}");
}
await Task.Delay(1000);
}
});
tokenSource.CancelAfter(TimeSpan.FromSeconds(20));
await Task.WhenAll(readTask);
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="CSRedisCore" Version="3.8.803" />
<PackageReference Include="StackExchange.Redis" Version="2.8.0" />
</ItemGroup>
</Project>
{
"runtimeTarget": {
"name": ".NETCoreApp,Version=v6.0",
"signature": ""
},
"compilationOptions": {},
"targets": {
".NETCoreApp,Version=v6.0": {
"SimpleStreamConsumerTest/1.0.0": {
"dependencies": {
"CSRedisCore": "3.8.803",
"StackExchange.Redis": "2.8.0"
},
"runtime": {
"SimpleStreamConsumerTest.dll": {}
}
},
"CSRedisCore/3.8.803": {
"dependencies": {
"Newtonsoft.Json": "13.0.1",
"System.ValueTuple": "4.5.0"
},
"runtime": {
"lib/netstandard2.0/CSRedisCore.dll": {
"assemblyVersion": "3.8.803.0",
"fileVersion": "3.8.803.0"
}
}
},
"Microsoft.Extensions.Logging.Abstractions/6.0.0": {
"runtime": {
"lib/net6.0/Microsoft.Extensions.Logging.Abstractions.dll": {
"assemblyVersion": "6.0.0.0",
"fileVersion": "6.0.21.52210"
}
}
},
"Newtonsoft.Json/13.0.1": {
"runtime": {
"lib/netstandard2.0/Newtonsoft.Json.dll": {
"assemblyVersion": "13.0.0.0",
"fileVersion": "13.0.1.25517"
}
}
},
"Pipelines.Sockets.Unofficial/2.2.8": {
"dependencies": {
"System.IO.Pipelines": "5.0.1"
},
"runtime": {
"lib/net5.0/Pipelines.Sockets.Unofficial.dll": {
"assemblyVersion": "1.0.0.0",
"fileVersion": "2.2.8.1080"
}
}
},
"StackExchange.Redis/2.8.0": {
"dependencies": {
"Microsoft.Extensions.Logging.Abstractions": "6.0.0",
"Pipelines.Sockets.Unofficial": "2.2.8"
},
"runtime": {
"lib/net6.0/StackExchange.Redis.dll": {
"assemblyVersion": "2.0.0.0",
"fileVersion": "2.8.0.27420"
}
}
},
"System.IO.Pipelines/5.0.1": {
"runtime": {
"lib/netcoreapp3.0/System.IO.Pipelines.dll": {
"assemblyVersion": "5.0.0.1",
"fileVersion": "5.0.120.57516"
}
}
},
"System.ValueTuple/4.5.0": {}
}
},
"libraries": {
"SimpleStreamConsumerTest/1.0.0": {
"type": "project",
"serviceable": false,
"sha512": ""
},
"CSRedisCore/3.8.803": {
"type": "package",
"serviceable": true,
"sha512": "sha512-+tCmvsJy0f69WMARgRT9nmTtIiwJDkTl9g5H32r7mJ003IxeZKFhLkvjO6MIm/6o79wM+s0lLW4fYlBvWP8C8Q==",
"path": "csrediscore/3.8.803",
"hashPath": "csrediscore.3.8.803.nupkg.sha512"
},
"Microsoft.Extensions.Logging.Abstractions/6.0.0": {
"type": "package",
"serviceable": true,
"sha512": "sha512-/HggWBbTwy8TgebGSX5DBZ24ndhzi93sHUBDvP1IxbZD7FDokYzdAr6+vbWGjw2XAfR2EJ1sfKUotpjHnFWPxA==",
"path": "microsoft.extensions.logging.abstractions/6.0.0",
"hashPath": "microsoft.extensions.logging.abstractions.6.0.0.nupkg.sha512"
},
"Newtonsoft.Json/13.0.1": {
"type": "package",
"serviceable": true,
"sha512": "sha512-ppPFpBcvxdsfUonNcvITKqLl3bqxWbDCZIzDWHzjpdAHRFfZe0Dw9HmA0+za13IdyrgJwpkDTDA9fHaxOrt20A==",
"path": "newtonsoft.json/13.0.1",
"hashPath": "newtonsoft.json.13.0.1.nupkg.sha512"
},
"Pipelines.Sockets.Unofficial/2.2.8": {
"type": "package",
"serviceable": true,
"sha512": "sha512-zG2FApP5zxSx6OcdJQLbZDk2AVlN2BNQD6MorwIfV6gVj0RRxWPEp2LXAxqDGZqeNV1Zp0BNPcNaey/GXmTdvQ==",
"path": "pipelines.sockets.unofficial/2.2.8",
"hashPath": "pipelines.sockets.unofficial.2.2.8.nupkg.sha512"
},
"StackExchange.Redis/2.8.0": {
"type": "package",
"serviceable": true,
"sha512": "sha512-MjAJ0ejH8zLhtuN5+Z+/I07NmPGdVuGEvE2+4xONQoFwgl+7vbQ/A6jlUgH9UkZb4s9Mu9QDyBq1TkRqQcOgTQ==",
"path": "stackexchange.redis/2.8.0",
"hashPath": "stackexchange.redis.2.8.0.nupkg.sha512"
},
"System.IO.Pipelines/5.0.1": {
"type": "package",
"serviceable": true,
"sha512": "sha512-qEePWsaq9LoEEIqhbGe6D5J8c9IqQOUuTzzV6wn1POlfdLkJliZY3OlB0j0f17uMWlqZYjH7txj+2YbyrIA8Yg==",
"path": "system.io.pipelines/5.0.1",
"hashPath": "system.io.pipelines.5.0.1.nupkg.sha512"
},
"System.ValueTuple/4.5.0": {
"type": "package",
"serviceable": true,
"sha512": "sha512-okurQJO6NRE/apDIP23ajJ0hpiNmJ+f0BwOlB/cSqTLQlw5upkf+5+96+iG2Jw40G1fCVCyPz/FhIABUjMR+RQ==",
"path": "system.valuetuple/4.5.0",
"hashPath": "system.valuetuple.4.5.0.nupkg.sha512"
}
}
}
\ No newline at end of file
{
"runtimeOptions": {
"tfm": "net6.0",
"framework": {
"name": "Microsoft.NETCore.App",
"version": "6.0.0"
}
}
}
\ No newline at end of file
// <autogenerated />
using System;
using System.Reflection;
[assembly: global::System.Runtime.Versioning.TargetFrameworkAttribute(".NETCoreApp,Version=v6.0", FrameworkDisplayName = "")]
//------------------------------------------------------------------------------
// <auto-generated>
// This code was generated by a tool.
// Runtime Version:4.0.30319.42000
//
// Changes to this file may cause incorrect behavior and will be lost if
// the code is regenerated.
// </auto-generated>
//------------------------------------------------------------------------------
using System;
using System.Reflection;
[assembly: System.Reflection.AssemblyCompanyAttribute("SimpleStreamConsumerTest")]
[assembly: System.Reflection.AssemblyConfigurationAttribute("Debug")]
[assembly: System.Reflection.AssemblyFileVersionAttribute("1.0.0.0")]
[assembly: System.Reflection.AssemblyInformationalVersionAttribute("1.0.0")]
[assembly: System.Reflection.AssemblyProductAttribute("SimpleStreamConsumerTest")]
[assembly: System.Reflection.AssemblyTitleAttribute("SimpleStreamConsumerTest")]
[assembly: System.Reflection.AssemblyVersionAttribute("1.0.0.0")]
// Generated by the MSBuild WriteCodeFragment class.
is_global = true
build_property.TargetFramework = net6.0
build_property.TargetPlatformMinVersion =
build_property.UsingMicrosoftNETSdkWeb =
build_property.ProjectTypeGuids =
build_property.InvariantGlobalization =
build_property.PlatformNeutralAssembly =
build_property._SupportedPlatformList = Linux,macOS,Windows
build_property.RootNamespace = SimpleStreamConsumerTest
build_property.ProjectDir = D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\
// <auto-generated/>
global using global::System;
global using global::System.Collections.Generic;
global using global::System.IO;
global using global::System.Linq;
global using global::System.Net.Http;
global using global::System.Threading;
global using global::System.Threading.Tasks;
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\bin\Debug\net6.0\SimpleStreamConsumerTest.exe
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\bin\Debug\net6.0\SimpleStreamConsumerTest.deps.json
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\bin\Debug\net6.0\SimpleStreamConsumerTest.runtimeconfig.json
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\bin\Debug\net6.0\SimpleStreamConsumerTest.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\bin\Debug\net6.0\ref\SimpleStreamConsumerTest.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\bin\Debug\net6.0\SimpleStreamConsumerTest.pdb
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\bin\Debug\net6.0\CSRedisCore.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\bin\Debug\net6.0\Microsoft.Extensions.Logging.Abstractions.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\bin\Debug\net6.0\Newtonsoft.Json.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\bin\Debug\net6.0\Pipelines.Sockets.Unofficial.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\bin\Debug\net6.0\StackExchange.Redis.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\bin\Debug\net6.0\System.IO.Pipelines.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\obj\Debug\net6.0\SimpleStreamConsumerTest.csproj.AssemblyReference.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\obj\Debug\net6.0\SimpleStreamConsumerTest.GeneratedMSBuildEditorConfig.editorconfig
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\obj\Debug\net6.0\SimpleStreamConsumerTest.AssemblyInfoInputs.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\obj\Debug\net6.0\SimpleStreamConsumerTest.AssemblyInfo.cs
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\obj\Debug\net6.0\SimpleStreamConsumerTest.csproj.CoreCompileInputs.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\obj\Debug\net6.0\SimpleStreamConsumerTest.csproj.CopyComplete
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\obj\Debug\net6.0\SimpleStreamConsumerTest.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\obj\Debug\net6.0\ref\SimpleStreamConsumerTest.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\obj\Debug\net6.0\SimpleStreamConsumerTest.pdb
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\SimpleStreamConsumerTest\obj\Debug\net6.0\SimpleStreamConsumerTest.genruntimeconfig.cache
{
"format": 1,
"restore": {
"D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\SimpleStreamConsumerTest\\SimpleStreamConsumerTest.csproj": {}
},
"projects": {
"D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\SimpleStreamConsumerTest\\SimpleStreamConsumerTest.csproj": {
"version": "1.0.0",
"restore": {
"projectUniqueName": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\SimpleStreamConsumerTest\\SimpleStreamConsumerTest.csproj",
"projectName": "SimpleStreamConsumerTest",
"projectPath": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\SimpleStreamConsumerTest\\SimpleStreamConsumerTest.csproj",
"packagesPath": "C:\\Users\\moham\\.nuget\\packages\\",
"outputPath": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\SimpleStreamConsumerTest\\obj\\",
"projectStyle": "PackageReference",
"fallbackFolders": [
"C:\\Program Files (x86)\\Microsoft Visual Studio\\Shared\\NuGetPackages"
],
"configFilePaths": [
"C:\\Users\\moham\\AppData\\Roaming\\NuGet\\NuGet.Config",
"C:\\Program Files (x86)\\NuGet\\Config\\Microsoft.VisualStudio.FallbackLocation.config",
"C:\\Program Files (x86)\\NuGet\\Config\\Microsoft.VisualStudio.Offline.config"
],
"originalTargetFrameworks": [
"net6.0"
],
"sources": {
"C:\\Program Files (x86)\\Microsoft SDKs\\NuGetPackages\\": {},
"https://api.nuget.org/v3/index.json": {}
},
"frameworks": {
"net6.0": {
"targetAlias": "net6.0",
"projectReferences": {}
}
},
"warningProperties": {
"warnAsError": [
"NU1605"
]
}
},
"frameworks": {
"net6.0": {
"targetAlias": "net6.0",
"dependencies": {
"CSRedisCore": {
"target": "Package",
"version": "[3.8.803, )"
},
"StackExchange.Redis": {
"target": "Package",
"version": "[2.8.0, )"
}
},
"imports": [
"net461",
"net462",
"net47",
"net471",
"net472",
"net48"
],
"assetTargetFallback": true,
"warn": true,
"frameworkReferences": {
"Microsoft.NETCore.App": {
"privateAssets": "all"
}
},
"runtimeIdentifierGraphPath": "C:\\Program Files\\dotnet\\sdk\\6.0.100\\RuntimeIdentifierGraph.json"
}
}
}
}
}
\ No newline at end of file
<?xml version="1.0" encoding="utf-8" standalone="no"?>
<Project ToolsVersion="14.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup Condition=" '$(ExcludeRestorePackageImports)' != 'true' ">
<RestoreSuccess Condition=" '$(RestoreSuccess)' == '' ">True</RestoreSuccess>
<RestoreTool Condition=" '$(RestoreTool)' == '' ">NuGet</RestoreTool>
<ProjectAssetsFile Condition=" '$(ProjectAssetsFile)' == '' ">$(MSBuildThisFileDirectory)project.assets.json</ProjectAssetsFile>
<NuGetPackageRoot Condition=" '$(NuGetPackageRoot)' == '' ">$(UserProfile)\.nuget\packages\</NuGetPackageRoot>
<NuGetPackageFolders Condition=" '$(NuGetPackageFolders)' == '' ">C:\Users\moham\.nuget\packages\;C:\Program Files (x86)\Microsoft Visual Studio\Shared\NuGetPackages</NuGetPackageFolders>
<NuGetProjectStyle Condition=" '$(NuGetProjectStyle)' == '' ">PackageReference</NuGetProjectStyle>
<NuGetToolVersion Condition=" '$(NuGetToolVersion)' == '' ">6.0.0</NuGetToolVersion>
</PropertyGroup>
<ItemGroup Condition=" '$(ExcludeRestorePackageImports)' != 'true' ">
<SourceRoot Include="C:\Users\moham\.nuget\packages\" />
<SourceRoot Include="C:\Program Files (x86)\Microsoft Visual Studio\Shared\NuGetPackages\" />
</ItemGroup>
</Project>
\ No newline at end of file
<?xml version="1.0" encoding="utf-8" standalone="no"?>
<Project ToolsVersion="14.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" />
\ No newline at end of file
This diff is collapsed.
{
"version": 2,
"dgSpecHash": "SQLQ1eCYushT6o61WXMGvqZBK1hEAZn6cXEVbwNxClF+++otB/xs0pydNnI0hrsERsl5pb17pJHct4wEnhd3dA==",
"success": true,
"projectFilePath": "D:\\HIAST\\FIY\\Project-MSGPriorityQ\\GrpcMessage\\message-priority-queue\\SimpleStreamConsumerTest\\SimpleStreamConsumerTest.csproj",
"expectedPackageFiles": [
"C:\\Users\\moham\\.nuget\\packages\\csrediscore\\3.8.803\\csrediscore.3.8.803.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\microsoft.extensions.logging.abstractions\\6.0.0\\microsoft.extensions.logging.abstractions.6.0.0.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\newtonsoft.json\\13.0.1\\newtonsoft.json.13.0.1.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\pipelines.sockets.unofficial\\2.2.8\\pipelines.sockets.unofficial.2.2.8.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\stackexchange.redis\\2.8.0\\stackexchange.redis.2.8.0.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\system.io.pipelines\\5.0.1\\system.io.pipelines.5.0.1.nupkg.sha512",
"C:\\Users\\moham\\.nuget\\packages\\system.valuetuple\\4.5.0\\system.valuetuple.4.5.0.nupkg.sha512"
],
"logs": []
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment