Commit a55e48b0 authored by mohammad.salama's avatar mohammad.salama

Device Sends Message to Queue node perfectly -all GRPC- with Discovery

parent 24960b2a
...@@ -35,7 +35,12 @@ ...@@ -35,7 +35,12 @@
"ProjectGuid": "494712ab-c5a0-4525-81ae-dd203f249cb9", "ProjectGuid": "494712ab-c5a0-4525-81ae-dd203f249cb9",
"DisplayName": "MessageGeneratorGRPC", "DisplayName": "MessageGeneratorGRPC",
"ColorIndex": 5 "ColorIndex": 5
},
"676d9604-4597-4935-826a-7f054fd5936a": {
"ProjectGuid": "676d9604-4597-4935-826a-7f054fd5936a",
"DisplayName": "QueuerNode",
"ColorIndex": 6
} }
}, },
"NextColorIndex": 6 "NextColorIndex": 7
} }
\ No newline at end of file

using GrpcMessageNode.DataBaseAccess;
namespace GrpcMessageNode.DataBaseAccess
{
public static class DBAccess
{
/// <summary>
/// this class encapsulates MessageStorage and ProcessMessage classed and through it
/// we access them as a type of layering
/// </summary>
private static string messageStorageURL = "db that stores messagees";
private static string accountStorageURL = "db that holds information about account + quota";
public static int getPriorityAndStoreMessage(Message message)
{
//authenticate and something else with logic and booleans to ensure all operation is done
MessageStorage.store(message, messageStorageURL);
int x = ProcessMessage.getPriority(message, accountStorageURL);
// if something wrong retrn -1
return x;
}
}
}
namespace GrpcMessageNode.DataBaseAccess
{
public class MessageStorage
{
//use this class to store messages in database
public static bool store(Message message , string DB)
{
Console.WriteLine(message.MsgId + " - is being stored in data base : " + DB);
return true;
}
}
}
namespace GrpcMessageNode.DataBaseAccess
{
public class ProcessMessage
{
/// use this class to authenticate and process logic on message receipt
/// get the account priority and use it to determine total priority before
/// sending the message to coordinator and queuing
///
public static int getPriority(Message message , string DB)
{
string accId = message.ClientID;
Console.WriteLine("Getting priority of account " + accId + " from DataBase : " + DB);
int x = 1;
return x;
}
}
}
namespace GrpcMessageNode.PriorityHandling
{
public class SetPriority
{
/*
* better to make priorities configurable from files !!
*/
private static int MAX_PRIRITY = 5;
private static int MIN_PRIRITY = 1;
public static bool setFinalPriority(Message message)
{
int account_p = DataBaseAccess.DBAccess.getPriorityAndStoreMessage(message);
int x = account_p + message.LocalPriority;
if (x > MAX_PRIRITY) x = MAX_PRIRITY;
if (x < MIN_PRIRITY && x!=-1) x = MIN_PRIRITY;
message.LocalPriority = x;
if (x == -1) return false;
return true;
}
/// we can complicate the mechanism as much as we want
}
}
...@@ -19,48 +19,52 @@ namespace GrpcMessageNode.Services ...@@ -19,48 +19,52 @@ namespace GrpcMessageNode.Services
public override Task<Acknowledgement> SendMessage(Message message, ServerCallContext context) public override Task<Acknowledgement> SendMessage(Message message, ServerCallContext context)
{ {
Console.WriteLine("Got to Receiver Node"); Console.WriteLine("Got to Receiver Node");
int priority = getPriority(message.ClientID);
modifyPriority(message, priority); bool res = PriorityHandling.SetPriority.setFinalPriority(message);
sendToCoordinator(message); if (res == false) // something went wrong
{
return Task.FromResult(new Acknowledgement
{
ReplyCode = "ERRORROROR on Send " + message.MsgId
});
}
Console.WriteLine("WE ARE AFTER THE CHECKING OF RES");
string reply = sendToCoordinator(message);
return Task.FromResult(new Acknowledgement return Task.FromResult(new Acknowledgement
{ {
ReplyCode = "OK on Send " + message.MsgId ReplyCode = reply //"OK on Send " + message.MsgId
}); });
} }
private void modifyPriority (Message message , int priority)
{
// apply some algorithm here
message.LocalPriority += priority;
}
private int getPriority(string clientID)
{
// look up in data base
int x = 5;
return x;
}
private string sendToCoordinator(Message message) private string sendToCoordinator(Message message)
{ {
string res = "k + "; string address = getCoordinatorAddress();
Message2 message2 = copyMessage(message);
string address = getAddress();
using var channel = GrpcChannel.ForAddress(address); using var channel = GrpcChannel.ForAddress(address);
var client = new Queue.QueueClient(channel);
Message2 message2 = copyMessage(message); Console.WriteLine("QueuerNode Address = " + address);
var reply = client.QueueMessage(message2); var queue_client = new Queue.QueueClient(channel);
var reply = queue_client.QueueMessage(message2);
Console.WriteLine(reply.ReplyCode); Console.WriteLine(reply.ReplyCode);
Console.WriteLine("Press any key to exit..."); return reply.ReplyCode;
Console.ReadKey(); }
return res + reply.ReplyCode; //not working -- causing unknown exception with nullable parameter http2
private Queue.QueueClient getQueueClient()
{
string address = getCoordinatorAddress();
using var channel = GrpcChannel.ForAddress(address);
Console.WriteLine("QueuerNode Address = " + address);
var client = new Queue.QueueClient(channel);
return client;
} }
private Message2 copyMessage(Message message) private Message2 copyMessage(Message message)
...@@ -75,11 +79,11 @@ namespace GrpcMessageNode.Services ...@@ -75,11 +79,11 @@ namespace GrpcMessageNode.Services
return message2; return message2;
} }
private string getAddress() private string getCoordinatorAddress()
{ {
string address = ""; string address = "";
var y = discoveryClient.GetInstances("Coordinator"); /// write names to config file var y = discoveryClient.GetInstances("QueuerNode"); /// write names to config file
address = y[0].Uri.ToString(); address = y[0].Uri.ToString();
......
622b3c876805fb931482a4d8677d1cf405e12a85 286ecfa468b3d36df6d23da6551204042dd6dca9
...@@ -29,3 +29,9 @@ ...@@ -29,3 +29,9 @@
2.0 2.0
2.0 2.0
2.0 2.0
2.0
2.0
2.0
2.0
2.0
2.0
...@@ -5,9 +5,11 @@ VisualStudioVersion = 17.0.31903.59 ...@@ -5,9 +5,11 @@ VisualStudioVersion = 17.0.31903.59
MinimumVisualStudioVersion = 10.0.40219.1 MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GrpcMessageNode", "GrpcMessage\GrpcMessageNode.csproj", "{E55ED87C-0EE4-4A4C-BBBB-76CBF482188D}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GrpcMessageNode", "GrpcMessage\GrpcMessageNode.csproj", "{E55ED87C-0EE4-4A4C-BBBB-76CBF482188D}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CoordinatorOne", "CoordinatorOne\CoordinatorOne.csproj", "{2B3382EE-564E-44BC-9B3A-2F67273B2802}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CoordinatorOne", "CoordinatorOne\CoordinatorOne.csproj", "{2B3382EE-564E-44BC-9B3A-2F67273B2802}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MessageGeneratorGRPC", "MessageGeneratorGRPC\MessageGeneratorGRPC.csproj", "{494712AB-C5A0-4525-81AE-DD203F249CB9}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MessageGeneratorGRPC", "MessageGeneratorGRPC\MessageGeneratorGRPC.csproj", "{494712AB-C5A0-4525-81AE-DD203F249CB9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "QueuerNode", "QueuerNode\QueuerNode.csproj", "{676D9604-4597-4935-826A-7F054FD5936A}"
EndProject EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
...@@ -27,6 +29,10 @@ Global ...@@ -27,6 +29,10 @@ Global
{494712AB-C5A0-4525-81AE-DD203F249CB9}.Debug|Any CPU.Build.0 = Debug|Any CPU {494712AB-C5A0-4525-81AE-DD203F249CB9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{494712AB-C5A0-4525-81AE-DD203F249CB9}.Release|Any CPU.ActiveCfg = Release|Any CPU {494712AB-C5A0-4525-81AE-DD203F249CB9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{494712AB-C5A0-4525-81AE-DD203F249CB9}.Release|Any CPU.Build.0 = Release|Any CPU {494712AB-C5A0-4525-81AE-DD203F249CB9}.Release|Any CPU.Build.0 = Release|Any CPU
{676D9604-4597-4935-826A-7F054FD5936A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{676D9604-4597-4935-826A-7F054FD5936A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{676D9604-4597-4935-826A-7F054FD5936A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{676D9604-4597-4935-826A-7F054FD5936A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
......
...@@ -7,6 +7,10 @@ ...@@ -7,6 +7,10 @@
<UserSecretsId>dotnet-MessageGeneratorGRPC-685F4B88-9109-41BE-B200-D1916D3E7EAB</UserSecretsId> <UserSecretsId>dotnet-MessageGeneratorGRPC-685F4B88-9109-41BE-B200-D1916D3E7EAB</UserSecretsId>
</PropertyGroup> </PropertyGroup>
<ItemGroup>
<None Remove="Protos\schemaQ.proto" />
</ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.0" /> <PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.0" />
</ItemGroup> </ItemGroup>
...@@ -34,6 +38,7 @@ ...@@ -34,6 +38,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Protobuf Include="Protos\schemaQ.proto" GrpcServices="Client" />
<Protobuf Include="Protos\schema.proto" GrpcServices="Client" /> <Protobuf Include="Protos\schema.proto" GrpcServices="Client" />
</ItemGroup> </ItemGroup>
......
syntax = "proto3";
option csharp_namespace = "MessageGeneratorGRPC";
package Tranmitter;
message Message2 {
string clientID = 1;
string apiKey = 2;
string msgId = 3;
string phoneNumber = 4;
int32 localPriority = 5;
string text = 6;
}
message Acknowledgement2
{
string replyCode = 1;
}
service Queue {
rpc QueueMessage(Message2) returns (Acknowledgement2);
}
...@@ -15,10 +15,11 @@ namespace MessageGeneratorGRPC ...@@ -15,10 +15,11 @@ namespace MessageGeneratorGRPC
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{ {
//test();
string address = getAddress(); string address = getAddress();
using var channel = GrpcChannel.ForAddress(address); using var channel = GrpcChannel.ForAddress(address);
var client = new Send.SendClient(channel); var client = new Send.SendClient(channel);
Message message = new Message(); Message message = new Message();
message.Text = "Hello World !"; message.Text = "Hello World !";
message.ApiKey = "Api-Key"; message.ApiKey = "Api-Key";
...@@ -27,6 +28,9 @@ namespace MessageGeneratorGRPC ...@@ -27,6 +28,9 @@ namespace MessageGeneratorGRPC
message.MsgId = "msg-id=1"; message.MsgId = "msg-id=1";
message.PhoneNumber = "043 33 00 83"; message.PhoneNumber = "043 33 00 83";
Console.WriteLine("Sending to " + address);
var reply = client.SendMessage(message); var reply = client.SendMessage(message);
Console.WriteLine(reply.ReplyCode); Console.WriteLine(reply.ReplyCode);
...@@ -46,5 +50,27 @@ namespace MessageGeneratorGRPC ...@@ -46,5 +50,27 @@ namespace MessageGeneratorGRPC
return address; return address;
} }
public void test()
{
string address = "https://localhost:9090"; //getCoordinatorAddress();
using var channel = GrpcChannel.ForAddress(address);
Console.WriteLine("QueuerNode Address = " + address);
var client = new Queue.QueueClient(channel);
Message2 message = new Message2();
message.Text = "Hello World !";
message.ApiKey = "Api-Key";
message.ClientID = "m-salameh";
message.LocalPriority = 1;
message.MsgId = "msg-id=1";
message.PhoneNumber = "043 33 00 83";
Console.WriteLine("Sending to " + address);
var reply = client.QueueMessage(message);
Console.WriteLine(reply.ReplyCode);
}
} }
} }
\ No newline at end of file
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
"client": { "client": {
"serviceUrl": "http://localhost:8761/eureka/", "serviceUrl": "http://localhost:8761/eureka/",
"shouldFetchRegistry": "true", "shouldFetchRegistry": "true",
"shouldRegisterWithEureka": true, "shouldRegisterWithEureka": false,
"validateCertificates": false "validateCertificates": false
}, },
"instance": {}, "instance": {},
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
"client": { "client": {
"serviceUrl": "http://localhost:8761/eureka/", "serviceUrl": "http://localhost:8761/eureka/",
"shouldFetchRegistry": "true", "shouldFetchRegistry": "true",
"shouldRegisterWithEureka": true, "shouldRegisterWithEureka": false,
"validateCertificates": false "validateCertificates": false
}, },
"instance": {}, "instance": {},
......
obj\Debug\net6.0\Protos/Schema-1Grpc.cs \
obj\Debug\net6.0\Protos/Schema1.cs: Protos/schema-1.proto
\ No newline at end of file
obj\Debug\net6.0\Protos/SchemaQ.cs \
obj\Debug\net6.0\Protos/SchemaQGrpc.cs: Protos/schemaQ.proto
\ No newline at end of file
d9da9c7acde4daeadcd4d04a0d1ede75f20f0917 1adcf206652457d8a65bbead798dedc4b7da29b0
// <auto-generated>
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: Protos/schema-1.proto
// </auto-generated>
#pragma warning disable 0414, 1591, 8981, 0612
#region Designer generated code
using grpc = global::Grpc.Core;
namespace QueuerNode {
public static partial class Queue
{
static readonly string __ServiceName = "Tranmitter.Queue";
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static void __Helper_SerializeMessage(global::Google.Protobuf.IMessage message, grpc::SerializationContext context)
{
#if !GRPC_DISABLE_PROTOBUF_BUFFER_SERIALIZATION
if (message is global::Google.Protobuf.IBufferMessage)
{
context.SetPayloadLength(message.CalculateSize());
global::Google.Protobuf.MessageExtensions.WriteTo(message, context.GetBufferWriter());
context.Complete();
return;
}
#endif
context.Complete(global::Google.Protobuf.MessageExtensions.ToByteArray(message));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static class __Helper_MessageCache<T>
{
public static readonly bool IsBufferMessage = global::System.Reflection.IntrospectionExtensions.GetTypeInfo(typeof(global::Google.Protobuf.IBufferMessage)).IsAssignableFrom(typeof(T));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static T __Helper_DeserializeMessage<T>(grpc::DeserializationContext context, global::Google.Protobuf.MessageParser<T> parser) where T : global::Google.Protobuf.IMessage<T>
{
#if !GRPC_DISABLE_PROTOBUF_BUFFER_SERIALIZATION
if (__Helper_MessageCache<T>.IsBufferMessage)
{
return parser.ParseFrom(context.PayloadAsReadOnlySequence());
}
#endif
return parser.ParseFrom(context.PayloadAsNewBuffer());
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::QueuerNode.Message> __Marshaller_Tranmitter_Message = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::QueuerNode.Message.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::QueuerNode.Acknowledgement> __Marshaller_Tranmitter_Acknowledgement = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::QueuerNode.Acknowledgement.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Method<global::QueuerNode.Message, global::QueuerNode.Acknowledgement> __Method_QueueMessage = new grpc::Method<global::QueuerNode.Message, global::QueuerNode.Acknowledgement>(
grpc::MethodType.Unary,
__ServiceName,
"QueueMessage",
__Marshaller_Tranmitter_Message,
__Marshaller_Tranmitter_Acknowledgement);
/// <summary>Service descriptor</summary>
public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor
{
get { return global::QueuerNode.Schema1Reflection.Descriptor.Services[0]; }
}
/// <summary>Base class for server-side implementations of Queue</summary>
[grpc::BindServiceMethod(typeof(Queue), "BindService")]
public abstract partial class QueueBase
{
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::System.Threading.Tasks.Task<global::QueuerNode.Acknowledgement> QueueMessage(global::QueuerNode.Message request, grpc::ServerCallContext context)
{
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
}
}
/// <summary>Creates service definition that can be registered with a server</summary>
/// <param name="serviceImpl">An object implementing the server-side handling logic.</param>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public static grpc::ServerServiceDefinition BindService(QueueBase serviceImpl)
{
return grpc::ServerServiceDefinition.CreateBuilder()
.AddMethod(__Method_QueueMessage, serviceImpl.QueueMessage).Build();
}
/// <summary>Register service method with a service binder with or without implementation. Useful when customizing the service binding logic.
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.</summary>
/// <param name="serviceBinder">Service methods will be bound by calling <c>AddMethod</c> on this object.</param>
/// <param name="serviceImpl">An object implementing the server-side handling logic.</param>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public static void BindService(grpc::ServiceBinderBase serviceBinder, QueueBase serviceImpl)
{
serviceBinder.AddMethod(__Method_QueueMessage, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::QueuerNode.Message, global::QueuerNode.Acknowledgement>(serviceImpl.QueueMessage));
}
}
}
#endregion
This diff is collapsed.
This diff is collapsed.
// <auto-generated>
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: Protos/schemaQ.proto
// </auto-generated>
#pragma warning disable 0414, 1591, 8981, 0612
#region Designer generated code
using grpc = global::Grpc.Core;
namespace MessageGeneratorGRPC {
public static partial class Queue
{
static readonly string __ServiceName = "Tranmitter.Queue";
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static void __Helper_SerializeMessage(global::Google.Protobuf.IMessage message, grpc::SerializationContext context)
{
#if !GRPC_DISABLE_PROTOBUF_BUFFER_SERIALIZATION
if (message is global::Google.Protobuf.IBufferMessage)
{
context.SetPayloadLength(message.CalculateSize());
global::Google.Protobuf.MessageExtensions.WriteTo(message, context.GetBufferWriter());
context.Complete();
return;
}
#endif
context.Complete(global::Google.Protobuf.MessageExtensions.ToByteArray(message));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static class __Helper_MessageCache<T>
{
public static readonly bool IsBufferMessage = global::System.Reflection.IntrospectionExtensions.GetTypeInfo(typeof(global::Google.Protobuf.IBufferMessage)).IsAssignableFrom(typeof(T));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static T __Helper_DeserializeMessage<T>(grpc::DeserializationContext context, global::Google.Protobuf.MessageParser<T> parser) where T : global::Google.Protobuf.IMessage<T>
{
#if !GRPC_DISABLE_PROTOBUF_BUFFER_SERIALIZATION
if (__Helper_MessageCache<T>.IsBufferMessage)
{
return parser.ParseFrom(context.PayloadAsReadOnlySequence());
}
#endif
return parser.ParseFrom(context.PayloadAsNewBuffer());
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::MessageGeneratorGRPC.Message2> __Marshaller_Tranmitter_Message2 = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::MessageGeneratorGRPC.Message2.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::MessageGeneratorGRPC.Acknowledgement2> __Marshaller_Tranmitter_Acknowledgement2 = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::MessageGeneratorGRPC.Acknowledgement2.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Method<global::MessageGeneratorGRPC.Message2, global::MessageGeneratorGRPC.Acknowledgement2> __Method_QueueMessage = new grpc::Method<global::MessageGeneratorGRPC.Message2, global::MessageGeneratorGRPC.Acknowledgement2>(
grpc::MethodType.Unary,
__ServiceName,
"QueueMessage",
__Marshaller_Tranmitter_Message2,
__Marshaller_Tranmitter_Acknowledgement2);
/// <summary>Service descriptor</summary>
public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor
{
get { return global::MessageGeneratorGRPC.SchemaQReflection.Descriptor.Services[0]; }
}
/// <summary>Client for Queue</summary>
public partial class QueueClient : grpc::ClientBase<QueueClient>
{
/// <summary>Creates a new client for Queue</summary>
/// <param name="channel">The channel to use to make remote calls.</param>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public QueueClient(grpc::ChannelBase channel) : base(channel)
{
}
/// <summary>Creates a new client for Queue that uses a custom <c>CallInvoker</c>.</summary>
/// <param name="callInvoker">The callInvoker to use to make remote calls.</param>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public QueueClient(grpc::CallInvoker callInvoker) : base(callInvoker)
{
}
/// <summary>Protected parameterless constructor to allow creation of test doubles.</summary>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
protected QueueClient() : base()
{
}
/// <summary>Protected constructor to allow creation of configured clients.</summary>
/// <param name="configuration">The client configuration.</param>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
protected QueueClient(ClientBaseConfiguration configuration) : base(configuration)
{
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::MessageGeneratorGRPC.Acknowledgement2 QueueMessage(global::MessageGeneratorGRPC.Message2 request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return QueueMessage(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::MessageGeneratorGRPC.Acknowledgement2 QueueMessage(global::MessageGeneratorGRPC.Message2 request, grpc::CallOptions options)
{
return CallInvoker.BlockingUnaryCall(__Method_QueueMessage, null, options, request);
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual grpc::AsyncUnaryCall<global::MessageGeneratorGRPC.Acknowledgement2> QueueMessageAsync(global::MessageGeneratorGRPC.Message2 request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return QueueMessageAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual grpc::AsyncUnaryCall<global::MessageGeneratorGRPC.Acknowledgement2> QueueMessageAsync(global::MessageGeneratorGRPC.Message2 request, grpc::CallOptions options)
{
return CallInvoker.AsyncUnaryCall(__Method_QueueMessage, null, options, request);
}
/// <summary>Creates a new instance of client from given <c>ClientBaseConfiguration</c>.</summary>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
protected override QueueClient NewInstance(ClientBaseConfiguration configuration)
{
return new QueueClient(configuration);
}
}
}
}
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace QueuerNode.Helper
{
public static class MessageQueues
{
private const int priority_levels = 6;
private const int sms_rates = 15;
private static Queue<Message>[] messageQueue = new Queue<Message>[priority_levels];
public static void addMessage(Message message)
{
int idx = message.LocalPriority;
idx = Math.Min(idx, priority_levels - 1);
idx = Math.Max(idx, 0);
Console.WriteLine("idx = " + idx);
messageQueue[idx].Enqueue(message);
Console.WriteLine("Message Queued: " + message.MsgId);
}
public static void sendMessages()
{
int x1 = 7, x2 = 5, x3 = 3;
while(x1>0 && messageQueue[1].Count>0)
{
Console.WriteLine(messageQueue[1].Dequeue());
x1--;
}
while (x2 > 0 && messageQueue[1].Count > 0)
{
Console.WriteLine(messageQueue[2].Dequeue());
x2--;
}
while (x3 > 0 && messageQueue[3].Count > 0)
{
Console.WriteLine(messageQueue[3].Dequeue());
x3--;
}
}
}
}
using QueuerNode.Services;
using Steeltoe.Discovery.Client;
var builder = WebApplication.CreateBuilder(args);
// Additional configuration is required to successfully run gRPC on macOS.
// For instructions on how to configure Kestrel and gRPC clients on macOS, visit https://go.microsoft.com/fwlink/?linkid=2099682
// Add services to the container.
builder.Services.AddGrpc();
builder.Services.AddDiscoveryClient();
var app = builder.Build();
// Configure the HTTP request pipeline.
app.MapGrpcService<QueueMessageService>();
app.MapGet("/", () => "Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
app.Run();
{
"profiles": {
"QueuerNode": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": false,
"applicationUrl": "http://localhost:5004;https://localhost:9090",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
syntax = "proto3";
option csharp_namespace = "QueuerNode";
package Tranmitter;
message Message {
string clientID = 1;
string apiKey = 2;
string msgId = 3;
string phoneNumber = 4;
int32 localPriority = 5;
string text = 6;
}
message Acknowledgement
{
string replyCode = 1;
}
service Queue {
rpc QueueMessage(Message) returns (Acknowledgement);
}
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
<ItemGroup>
<None Remove="Protos\schema.proto" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Grpc.AspNetCore" Version="2.40.0" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.27.1" />
<PackageReference Include="Grpc.Net.Client" Version="2.63.0" />
<PackageReference Include="Grpc.Tools" Version="2.64.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>
<PropertyGroup>
<SteeltoeVersion>3.2.6</SteeltoeVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.*" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Steeltoe.Connector.ConnectorCore" Version="$(SteeltoeVersion)" />
<PackageReference Include="Steeltoe.Discovery.Eureka" Version="$(SteeltoeVersion)" />
</ItemGroup>
<ItemGroup>
<Protobuf Include="Protos\schema.proto" GrpcServices="Server" />
</ItemGroup>
</Project>

using Grpc.Core;
using Steeltoe.Common.Discovery;
using Steeltoe.Discovery;
using QueuerNode.Helper;
namespace QueuerNode.Services
{
public class QueueMessageService : Queue.QueueBase
{
private readonly ILogger<QueueMessageService> _logger;
private readonly IDiscoveryClient _client;
public QueueMessageService(ILogger<QueueMessageService> logger , IDiscoveryClient client)
{
_logger = logger;
_client = client;
}
public override Task<Acknowledgement> QueueMessage(Message message, ServerCallContext context)
{
//Console.WriteLine("Message Receieved to Queuer !!");
//MessageQueues.addMessage(message);
return Task.FromResult(new Acknowledgement
{
ReplyCode = "OK on Send " + message.MsgId + " , "+ message.LocalPriority + " Message Reached Queue"
});
}
}
}
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
}
}
{
"spring": {
"application": {
"name": "QueuerNode"
}
},
"Logging": {
"LogLevel": {
"Default": "Error",
"Microsoft.Hosting.Lifetime": "Error"
}
},
"eureka": {
"client": {
"serviceUrl": "http://localhost:8761/eureka/",
"shouldFetchRegistry": "false",
"shouldRegisterWithEureka": true,
"validateCertificates": false
}
},
"instance": {},
"AllowedHosts": "*",
"Kestrel": {
"EndpointDefaults": {
"Protocols": "Http2"
}
}
}
\ No newline at end of file
This diff is collapsed.
{
"runtimeOptions": {
"tfm": "net6.0",
"frameworks": [
{
"name": "Microsoft.NETCore.App",
"version": "6.0.0"
},
{
"name": "Microsoft.AspNetCore.App",
"version": "6.0.0"
}
],
"configProperties": {
"System.GC.Server": true,
"System.Runtime.Serialization.EnableUnsafeBinaryFormatterSerialization": false
}
}
}
\ No newline at end of file
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
}
}
{
"spring": {
"application": {
"name": "QueuerNode"
}
},
"Logging": {
"LogLevel": {
"Default": "Error",
"Microsoft.Hosting.Lifetime": "Error"
}
},
"eureka": {
"client": {
"serviceUrl": "http://localhost:8761/eureka/",
"shouldFetchRegistry": "false",
"shouldRegisterWithEureka": true,
"validateCertificates": false
}
},
"instance": {},
"AllowedHosts": "*",
"Kestrel": {
"EndpointDefaults": {
"Protocols": "Http2"
}
}
}
\ No newline at end of file
// <autogenerated />
using System;
using System.Reflection;
[assembly: global::System.Runtime.Versioning.TargetFrameworkAttribute(".NETCoreApp,Version=v6.0", FrameworkDisplayName = "")]
obj\Debug\net6.0\Protos/Greet.cs \
obj\Debug\net6.0\Protos/GreetGrpc.cs: Protos/greet.proto
\ No newline at end of file
obj\Debug\net6.0\Protos/Schema-1Grpc.cs \
obj\Debug\net6.0\Protos/Schema1.cs: Protos/schema-1.proto
\ No newline at end of file
obj\Debug\net6.0\Protos/Schema.cs \
obj\Debug\net6.0\Protos/SchemaGrpc.cs: Protos/schema.proto
\ No newline at end of file
This diff is collapsed.
// <auto-generated>
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: Protos/greet.proto
// </auto-generated>
#pragma warning disable 0414, 1591
#region Designer generated code
using grpc = global::Grpc.Core;
namespace QueuerNode {
/// <summary>
/// The greeting service definition.
/// </summary>
public static partial class Greeter
{
static readonly string __ServiceName = "greet.Greeter";
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static void __Helper_SerializeMessage(global::Google.Protobuf.IMessage message, grpc::SerializationContext context)
{
#if !GRPC_DISABLE_PROTOBUF_BUFFER_SERIALIZATION
if (message is global::Google.Protobuf.IBufferMessage)
{
context.SetPayloadLength(message.CalculateSize());
global::Google.Protobuf.MessageExtensions.WriteTo(message, context.GetBufferWriter());
context.Complete();
return;
}
#endif
context.Complete(global::Google.Protobuf.MessageExtensions.ToByteArray(message));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static class __Helper_MessageCache<T>
{
public static readonly bool IsBufferMessage = global::System.Reflection.IntrospectionExtensions.GetTypeInfo(typeof(global::Google.Protobuf.IBufferMessage)).IsAssignableFrom(typeof(T));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static T __Helper_DeserializeMessage<T>(grpc::DeserializationContext context, global::Google.Protobuf.MessageParser<T> parser) where T : global::Google.Protobuf.IMessage<T>
{
#if !GRPC_DISABLE_PROTOBUF_BUFFER_SERIALIZATION
if (__Helper_MessageCache<T>.IsBufferMessage)
{
return parser.ParseFrom(context.PayloadAsReadOnlySequence());
}
#endif
return parser.ParseFrom(context.PayloadAsNewBuffer());
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::QueuerNode.HelloRequest> __Marshaller_greet_HelloRequest = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::QueuerNode.HelloRequest.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::QueuerNode.HelloReply> __Marshaller_greet_HelloReply = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::QueuerNode.HelloReply.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Method<global::QueuerNode.HelloRequest, global::QueuerNode.HelloReply> __Method_SayHello = new grpc::Method<global::QueuerNode.HelloRequest, global::QueuerNode.HelloReply>(
grpc::MethodType.Unary,
__ServiceName,
"SayHello",
__Marshaller_greet_HelloRequest,
__Marshaller_greet_HelloReply);
/// <summary>Service descriptor</summary>
public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor
{
get { return global::QueuerNode.GreetReflection.Descriptor.Services[0]; }
}
/// <summary>Base class for server-side implementations of Greeter</summary>
[grpc::BindServiceMethod(typeof(Greeter), "BindService")]
public abstract partial class GreeterBase
{
/// <summary>
/// Sends a greeting
/// </summary>
/// <param name="request">The request received from the client.</param>
/// <param name="context">The context of the server-side call handler being invoked.</param>
/// <returns>The response to send back to the client (wrapped by a task).</returns>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::System.Threading.Tasks.Task<global::QueuerNode.HelloReply> SayHello(global::QueuerNode.HelloRequest request, grpc::ServerCallContext context)
{
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
}
}
/// <summary>Creates service definition that can be registered with a server</summary>
/// <param name="serviceImpl">An object implementing the server-side handling logic.</param>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public static grpc::ServerServiceDefinition BindService(GreeterBase serviceImpl)
{
return grpc::ServerServiceDefinition.CreateBuilder()
.AddMethod(__Method_SayHello, serviceImpl.SayHello).Build();
}
/// <summary>Register service method with a service binder with or without implementation. Useful when customizing the service binding logic.
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.</summary>
/// <param name="serviceBinder">Service methods will be bound by calling <c>AddMethod</c> on this object.</param>
/// <param name="serviceImpl">An object implementing the server-side handling logic.</param>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public static void BindService(grpc::ServiceBinderBase serviceBinder, GreeterBase serviceImpl)
{
serviceBinder.AddMethod(__Method_SayHello, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::QueuerNode.HelloRequest, global::QueuerNode.HelloReply>(serviceImpl.SayHello));
}
}
}
#endregion
// <auto-generated>
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: Protos/schema-1.proto
// </auto-generated>
#pragma warning disable 0414, 1591, 8981, 0612
#region Designer generated code
using grpc = global::Grpc.Core;
namespace QueuerNode {
public static partial class Queue
{
static readonly string __ServiceName = "Tranmitter.Queue";
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static void __Helper_SerializeMessage(global::Google.Protobuf.IMessage message, grpc::SerializationContext context)
{
#if !GRPC_DISABLE_PROTOBUF_BUFFER_SERIALIZATION
if (message is global::Google.Protobuf.IBufferMessage)
{
context.SetPayloadLength(message.CalculateSize());
global::Google.Protobuf.MessageExtensions.WriteTo(message, context.GetBufferWriter());
context.Complete();
return;
}
#endif
context.Complete(global::Google.Protobuf.MessageExtensions.ToByteArray(message));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static class __Helper_MessageCache<T>
{
public static readonly bool IsBufferMessage = global::System.Reflection.IntrospectionExtensions.GetTypeInfo(typeof(global::Google.Protobuf.IBufferMessage)).IsAssignableFrom(typeof(T));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static T __Helper_DeserializeMessage<T>(grpc::DeserializationContext context, global::Google.Protobuf.MessageParser<T> parser) where T : global::Google.Protobuf.IMessage<T>
{
#if !GRPC_DISABLE_PROTOBUF_BUFFER_SERIALIZATION
if (__Helper_MessageCache<T>.IsBufferMessage)
{
return parser.ParseFrom(context.PayloadAsReadOnlySequence());
}
#endif
return parser.ParseFrom(context.PayloadAsNewBuffer());
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::QueuerNode.Message> __Marshaller_Tranmitter_Message = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::QueuerNode.Message.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::QueuerNode.Acknowledgement> __Marshaller_Tranmitter_Acknowledgement = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::QueuerNode.Acknowledgement.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Method<global::QueuerNode.Message, global::QueuerNode.Acknowledgement> __Method_QueueMessage = new grpc::Method<global::QueuerNode.Message, global::QueuerNode.Acknowledgement>(
grpc::MethodType.Unary,
__ServiceName,
"QueueMessage",
__Marshaller_Tranmitter_Message,
__Marshaller_Tranmitter_Acknowledgement);
/// <summary>Service descriptor</summary>
public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor
{
get { return global::QueuerNode.Schema1Reflection.Descriptor.Services[0]; }
}
/// <summary>Base class for server-side implementations of Queue</summary>
[grpc::BindServiceMethod(typeof(Queue), "BindService")]
public abstract partial class QueueBase
{
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::System.Threading.Tasks.Task<global::QueuerNode.Acknowledgement> QueueMessage(global::QueuerNode.Message request, grpc::ServerCallContext context)
{
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
}
}
/// <summary>Creates service definition that can be registered with a server</summary>
/// <param name="serviceImpl">An object implementing the server-side handling logic.</param>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public static grpc::ServerServiceDefinition BindService(QueueBase serviceImpl)
{
return grpc::ServerServiceDefinition.CreateBuilder()
.AddMethod(__Method_QueueMessage, serviceImpl.QueueMessage).Build();
}
/// <summary>Register service method with a service binder with or without implementation. Useful when customizing the service binding logic.
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.</summary>
/// <param name="serviceBinder">Service methods will be bound by calling <c>AddMethod</c> on this object.</param>
/// <param name="serviceImpl">An object implementing the server-side handling logic.</param>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public static void BindService(grpc::ServiceBinderBase serviceBinder, QueueBase serviceImpl)
{
serviceBinder.AddMethod(__Method_QueueMessage, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::QueuerNode.Message, global::QueuerNode.Acknowledgement>(serviceImpl.QueueMessage));
}
}
}
#endregion
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
//------------------------------------------------------------------------------
// <auto-generated>
// This code was generated by a tool.
// Runtime Version:4.0.30319.42000
//
// Changes to this file may cause incorrect behavior and will be lost if
// the code is regenerated.
// </auto-generated>
//------------------------------------------------------------------------------
using System;
using System.Reflection;
[assembly: System.Reflection.AssemblyCompanyAttribute("QueuerNode")]
[assembly: System.Reflection.AssemblyConfigurationAttribute("Debug")]
[assembly: System.Reflection.AssemblyFileVersionAttribute("1.0.0.0")]
[assembly: System.Reflection.AssemblyInformationalVersionAttribute("1.0.0")]
[assembly: System.Reflection.AssemblyProductAttribute("QueuerNode")]
[assembly: System.Reflection.AssemblyTitleAttribute("QueuerNode")]
[assembly: System.Reflection.AssemblyVersionAttribute("1.0.0.0")]
// Generated by the MSBuild WriteCodeFragment class.
is_global = true
build_property.TargetFramework = net6.0
build_property.TargetPlatformMinVersion =
build_property.UsingMicrosoftNETSdkWeb = true
build_property.ProjectTypeGuids =
build_property.InvariantGlobalization =
build_property.PlatformNeutralAssembly =
build_property._SupportedPlatformList = Linux,macOS,Windows
build_property.RootNamespace = QueuerNode
build_property.RootNamespace = QueuerNode
build_property.ProjectDir = D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\QueuerNode\
build_property.RazorLangVersion = 6.0
build_property.SupportLocalizedComponentNames =
build_property.GenerateRazorMetadataSourceChecksumAttributes =
build_property.MSBuildProjectDirectory = D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\QueuerNode
build_property._RazorSourceGeneratorDebug =
// <auto-generated/>
global using global::Microsoft.AspNetCore.Builder;
global using global::Microsoft.AspNetCore.Hosting;
global using global::Microsoft.AspNetCore.Http;
global using global::Microsoft.AspNetCore.Routing;
global using global::Microsoft.Extensions.Configuration;
global using global::Microsoft.Extensions.DependencyInjection;
global using global::Microsoft.Extensions.Hosting;
global using global::Microsoft.Extensions.Logging;
global using global::System;
global using global::System.Collections.Generic;
global using global::System.IO;
global using global::System.Linq;
global using global::System.Net.Http;
global using global::System.Net.Http.Json;
global using global::System.Threading;
global using global::System.Threading.Tasks;
//------------------------------------------------------------------------------
// <auto-generated>
// This code was generated by a tool.
// Runtime Version:4.0.30319.42000
//
// Changes to this file may cause incorrect behavior and will be lost if
// the code is regenerated.
// </auto-generated>
//------------------------------------------------------------------------------
using System;
using System.Reflection;
[assembly: Microsoft.AspNetCore.Mvc.ApplicationParts.ApplicationPartAttribute("Swashbuckle.AspNetCore.SwaggerGen")]
// Generated by the MSBuild WriteCodeFragment class.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment