Commit 94d078f8 authored by mohammad.salama's avatar mohammad.salama

Removed Statics for performance, and Tested

parent 46087515
This diff is collapsed.
......@@ -11,8 +11,6 @@ IHost host = Host.CreateDefaultBuilder(args)
IConfiguration config = host.Services.GetRequiredService<IConfiguration>();
Initializer.init(ref config);
string redis_read = ReadRedisInfoParser.connection;
try
{
TotalWorker.setAll();
......
FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443
EXPOSE 5783
EXPOSE 9091
......@@ -20,4 +18,8 @@ RUN dotnet publish "GrpcMessageNode.csproj" -c Release -o /app/publish
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENV ASPNETCORE_URLS=http://localhost:9091;http://localhost:5273
# ENV ASPNETCORE_ENVIRONMENT=Development
EXPOSE 9091
EXPOSE 5273
ENTRYPOINT ["dotnet", "GrpcMessageNode.dll"]
\ No newline at end of file
using GrpcMessageNode.Initializer;
using GrpcMessageNode.Services;
using Microsoft.AspNetCore.Server.Kestrel.Https;
using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
......@@ -13,30 +14,17 @@ public class Program
public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
// Additional configuration is required to successfully run gRPC on macOS.
// For instructions on how to configure Kestrel and gRPC clients on macOS, visit https://go.microsoft.com/fwlink/?linkid=2099682
// Add services to the container.
builder.Services.AddGrpc();
builder.Services.AddDiscoveryClient(builder.Configuration);
builder.Services.AddDiscoveryClient();
IConfiguration config = builder.Configuration;
Initializer.init(ref config);
string serviceName = ServiceNameParser.serviceName;
/*builder.Logging.AddOpenTelemetry(options =>
{
options
.SetResourceBuilder(
ResourceBuilder.CreateDefault()
.AddService(serviceName))
.AddConsoleExporter();
});*/
//;https://localhost:9091
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource.AddService(serviceName))
.WithTracing(tracing => tracing
......@@ -50,9 +38,10 @@ public class Program
var app = builder.Build();
// Configure the HTTP request pipeline.
// app.UseHttpsRedirection();
app.UseRouting();
app.UseHttpsRedirection();
app.MapGrpcService<SendMessageService>();
......
......@@ -5,14 +5,8 @@
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "http://localhost:5273;https://localhost:9091",
"applicationUrl": "https://localhost:5273",
"dotnetRunMessages": true
},
"Docker": {
"commandName": "Docker",
"launchUrl": "{Scheme}://{ServiceHost}:{ServicePort}",
"publishAllPorts": true,
"useSSL": true
}
}
}
\ No newline at end of file
......@@ -21,7 +21,7 @@
"AllowedHosts": "*",
"Kestrel": {
"EndpointDefaults": {
"Protocols": "Http2"
"Protocols": "Http2,HTTP"
}
}
},
......
......@@ -21,7 +21,7 @@
"AllowedHosts": "*",
"Kestrel": {
"EndpointDefaults": {
"Protocols": "Http2"
"Protocols": "Http2,HTTP"
}
}
},
......
docker build -t grpc-msg-node .
docker build -t grpc-msgs-node .
docker run --name <container-name> -d -v (absolute path to new appsettings.json):/app/appsettings.json <image-name>
\ No newline at end of file
......@@ -114,3 +114,4 @@ D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\GrpcMessage
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\GrpcMessage\obj\Debug\net6.0\ref\GrpcMessageNode.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\GrpcMessage\obj\Debug\net6.0\GrpcMessageNode.pdb
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\GrpcMessage\obj\Debug\net6.0\GrpcMessageNode.genruntimeconfig.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\GrpcMessage\obj\Debug\net6.0\GrpcMessageNode.csproj.AssemblyReference.cache
......@@ -65,3 +65,16 @@
2.0
2.0
2.0
2.0
2.0
2.0
2.0
2.0
2.0
2.0
2.0
2.0
2.0
2.0
2.0
2.0
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
......@@ -7,6 +7,11 @@
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<None Remove="docker-instructions.txt" />
<None Remove="Dockerfile.txt" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Grpc.AspNetCore" Version="2.40.0" />
</ItemGroup>
......@@ -21,7 +26,6 @@
<PropertyGroup>
<SteeltoeVersion>3.2.6</SteeltoeVersion>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>
<ItemGroup>
......
// See https://aka.ms/new-console-template for more information

using HTTPMessageGenerator;
using System.Net;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
......@@ -13,17 +12,23 @@ message.clientID = "m-salameh";
message.localPriority = 1;
message.msgId = "msg-id=1";
message.phoneNumber = "043 33 00 83";
message.tag = "SYYR";
message.tag = "SYR";
//message.year = message.month = message.day = message.minute = 0;
message.year = 2024;
message.month = 8;
message.day = 16;
message.hour = 23;
message.minute = 59;
CancellationTokenSource cancel = new CancellationTokenSource();
/*
var client = new HttpClient();
StringContent payload = new (JsonSerializer.Serialize(message) , Encoding.UTF8 , "application/json");
StringContent payload = new(JsonSerializer.Serialize(message), Encoding.UTF8, "application/json");
try
{
HttpResponseMessage reply = await client.PostAsync("https://localhost:7095/queue-msg", payload);
HttpResponseMessage reply = await client.PostAsync("http://localhost:7095/queue-msg", payload);
Console.WriteLine(reply.Content.ToString());
}
......@@ -31,4 +36,55 @@ catch (Exception ex)
{
Console.Error.WriteLine("Error Processing - Connection Problem");
}
//Console.WriteLine(reply.Result);
*/
/*
var task1 = Task.Run(async () =>
{
while (!cancel.IsCancellationRequested)
{
var client = new HttpClient();
StringContent payload = new(JsonSerializer.Serialize(message), Encoding.UTF8, "application/json");
try
{
HttpResponseMessage reply = await client.PostAsync("http://localhost:7095/queue-msg", payload);
Console.WriteLine(reply.Content.ToString());
}
catch (Exception ex)
{
Console.Error.WriteLine("Error Processing - Connection Problem");
}
await Task.Delay(1000);
}
});*/
Task[] tsks = new Task[6];
for (int i=0; i < tsks.Length; i++)
{
tsks[i] = Task.Run(async () =>
{
while (!cancel.IsCancellationRequested)
{
var client = new HttpClient();
StringContent payload = new(JsonSerializer.Serialize(message), Encoding.UTF8, "application/json");
try
{
HttpResponseMessage reply = await client.PostAsync("http://localhost:7095/queue-msg", payload);
Console.WriteLine(reply.Content.ToString());
}
catch (Exception ex)
{
Console.Error.WriteLine("Error Processing - Connection Problem");
}
await Task.Delay(1000);
}
});
}
Task.WaitAll(tsks);
\ No newline at end of file
......@@ -2,9 +2,6 @@
"profiles": {
"HTTPMessageGenerator": {
"commandName": "Project"
},
"Docker": {
"commandName": "Docker"
}
}
}
\ No newline at end of file
......@@ -26,7 +26,6 @@ D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\HTTPMessage
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\HTTPMessageGenerator\bin\Debug\net6.0\Swashbuckle.AspNetCore.SwaggerGen.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\HTTPMessageGenerator\bin\Debug\net6.0\Swashbuckle.AspNetCore.SwaggerUI.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\HTTPMessageGenerator\bin\Debug\net6.0\System.Reflection.MetadataLoadContext.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\HTTPMessageGenerator\obj\Debug\net6.0\HTTPMessageGenerator.csproj.AssemblyReference.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\HTTPMessageGenerator\obj\Debug\net6.0\HTTPMessageGenerator.GeneratedMSBuildEditorConfig.editorconfig
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\HTTPMessageGenerator\obj\Debug\net6.0\HTTPMessageGenerator.AssemblyInfoInputs.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\HTTPMessageGenerator\obj\Debug\net6.0\HTTPMessageGenerator.AssemblyInfo.cs
......@@ -36,3 +35,4 @@ D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\HTTPMessage
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\HTTPMessageGenerator\obj\Debug\net6.0\ref\HTTPMessageGenerator.dll
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\HTTPMessageGenerator\obj\Debug\net6.0\HTTPMessageGenerator.pdb
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\HTTPMessageGenerator\obj\Debug\net6.0\HTTPMessageGenerator.genruntimeconfig.cache
D:\HIAST\FIY\Project-MSGPriorityQ\GrpcMessage\message-priority-queue\HTTPMessageGenerator\obj\Debug\net6.0\HTTPMessageGenerator.csproj.AssemblyReference.cache
......@@ -17,4 +17,8 @@ RUN dotnet publish "HTTPMessageNode.csproj" -c Release -o /app/publish
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENV ASPNETCORE_URLS=http://localhost:7095;http://localhost:5095
# ENV ASPNETCORE_ENVIRONMENT=Development
EXPOSE 7095
EXPOSE 5095
ENTRYPOINT ["dotnet", "HTTPMessageNode.dll"]
\ No newline at end of file
......@@ -53,9 +53,10 @@ if (app.Environment.IsDevelopment())
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
/*app.UseHttpsRedirection();
app.UseAuthorization();
app.UseAuthorization();*/
app.UseRouting();
app.MapControllers();
......
......@@ -16,7 +16,7 @@
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "https://localhost:7095;http://localhost:5095",
"applicationUrl": "http://localhost:7095;http://localhost:5095",
"dotnetRunMessages": true
},
"IIS Express": {
......
......@@ -4,7 +4,6 @@
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<UserSecretsId>dotnet-MessageGeneratorGRPC-685F4B88-9109-41BE-B200-D1916D3E7EAB</UserSecretsId>
</PropertyGroup>
<ItemGroup>
......
......@@ -6,9 +6,6 @@
"DOTNET_ENVIRONMENT": "Development"
},
"dotnetRunMessages": true
},
"Docker": {
"commandName": "Docker"
}
}
}
\ No newline at end of file
......@@ -17,10 +17,17 @@ namespace GGRPCMessageGenerator
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
string address = getAddress();
Console.WriteLine(address);
if(address == ServersNotAvail)
{
return;
}
// address = "http://localhost:9091";
/**using var channel = GrpcChannel.ForAddress(address , new GrpcChannelOptions
{
Credentials = Grpc.Core.ChannelCredentials.Insecure
});*/
using var channel = GrpcChannel.ForAddress(address);
try
{
......@@ -47,6 +54,7 @@ namespace GGRPCMessageGenerator
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
return;
//return Task.something;
}
......
......@@ -11,7 +11,6 @@
using System;
using System.Reflection;
[assembly: Microsoft.Extensions.Configuration.UserSecrets.UserSecretsIdAttribute("dotnet-MessageGeneratorGRPC-685F4B88-9109-41BE-B200-D1916D3E7EAB")]
[assembly: System.Reflection.AssemblyCompanyAttribute("GRPCMessageGenerator")]
[assembly: System.Reflection.AssemblyConfigurationAttribute("Debug")]
[assembly: System.Reflection.AssemblyFileVersionAttribute("1.0.0.0")]
......
5d03022be1a5f013e0383ee19885499bd2115593
4982793b406bef5b7c63b630795a153c7e9ff212
......@@ -17,4 +17,6 @@ RUN dotnet publish "Scheduler.csproj" -c Release -o /app/publish
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENV ASPNETCORE_URLS=http://localhost:5004;http://localhost:9090
# ENV ASPNETCORE_ENVIRONMENT=Development
ENTRYPOINT ["dotnet", "Scheduler.dll"]
\ No newline at end of file
......@@ -9,20 +9,20 @@ namespace Scheduler.MongoMessages
{
public class MongoMessagesShceduler
{
private static string MongoURL = MessagesMongoDBParser.connection;
private static string DataBaseName = MessagesMongoDBParser.database;
private static string myCollection = MessagesMongoDBParser.collection;
private string MongoURL = MessagesMongoDBParser.connection;
private string DataBaseName = MessagesMongoDBParser.database;
private string myCollection = MessagesMongoDBParser.collection;
private static IMongoCollection<BsonDocument> collection;
public static string ConnectionError = "Error Connecting to MongoDB on : " + MongoURL;
private static string NotTaken = "Not-Taken";
//private IMongoCollection<BsonDocument> collection;
public string ConnectionError = "Error Connecting to MongoDB on : " + MessagesMongoDBParser.connection;
private string NotTaken = "Not-Taken";
/// <summary>
/// Connects to MongoDB to Store Messges and Set Things Up
/// </summary>
/// <param name="MyId"></param>
/// <returns>string : ok if all goes well , otherwise something else</returns>
public static string init()
/*public string init()
{
try
{
......@@ -39,19 +39,21 @@ namespace Scheduler.MongoMessages
return ex.Message;
}
}
}*/
/// <summary>
/// insert a scheduled message in the mongodb
/// </summary>
/// <param name="message"></param>
/// <returns>string: ok or else</returns>
public static string insertMessage(ref Message message)
public string insertMessage(ref Message message)
{
try
{
IMongoCollection<BsonDocument> collection = MongoSettingsInitializer.collection;
DateTime date = new DateTime(message.Year, message.Month, message.Day, message.Hour, message.Minute, 2);
Console.WriteLine(date);
var document = new BsonDocument
{
{ "sender", message.ClientID},
......
using MongoDB.Bson;
using MongoDB.Driver;
using Scheduler.Initializer;
namespace Scheduler.MongoMessages
{
public class MongoSettingsInitializer
{
private static string MongoURL = MessagesMongoDBParser.connection;
private static string DataBaseName = MessagesMongoDBParser.database;
private static string myCollection = MessagesMongoDBParser.collection;
public static IMongoCollection<BsonDocument> collection;
public static string ConnectionError = "Error Connecting to MongoDB on : " + MongoURL;
private static string NotTaken = "Not-Taken";
/// <summary>
/// Connects to MongoDB to Store Messges and Set Things Up
/// </summary>
/// <param name="MyId"></param>
/// <returns>string : ok if all goes well , otherwise something else</returns>
public static string init()
{
try
{
var client = new MongoClient(MongoURL);
var database = client.GetDatabase(DataBaseName);
collection = database.GetCollection<BsonDocument>(myCollection);
return "ok";
}
catch (Exception ex)
{
return ex.Message;
}
}
}
}
using Scheduler.MongoMessages;
using SchedulerNode.RedisQueuer;
using SchedulerNode.Services;
using Steeltoe.Discovery.Client;
using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using Scheduler.Initializer;
using Scheduler.RedisQueuer;
var builder = WebApplication.CreateBuilder(args);
......@@ -19,9 +18,11 @@ builder.Services.AddDiscoveryClient();
IConfiguration config = builder.Configuration;
Initializer.init(ref config);
MessageQueues.init();
//MessageQueues.init();
RedisSettingsInitializer.init();
MongoMessagesShceduler.init();
//MongoMessagesShceduler.init();
MongoSettingsInitializer.init();
string serviceName = ServiceNameParser.serviceName;
/*
......
using Newtonsoft.Json;
using Scheduler.Initializer;
using Scheduler.RedisQueuer;
using StackExchange.Redis;
using Steeltoe.Discovery;
using System;
......@@ -12,21 +13,21 @@ namespace SchedulerNode.RedisQueuer
{
public class MessageQueues
{
private readonly static string RedisURL = RedisInfoParser.connection;
private readonly string RedisURL = RedisInfoParser.connection;
private readonly static string Syriatel = RedisInfoParser.Syriatel;
private readonly static string MTN = RedisInfoParser.MTN;
private readonly string Syriatel = RedisInfoParser.Syriatel;
private readonly string MTN = RedisInfoParser.MTN;
private readonly static int LEVELS = 6;
private static int StreamMaxLength = 100000000;
private static IDatabase db = null;
public readonly static string RedisConnectionError = "Error Writing to Redis";
private readonly int LEVELS = 6;
private int StreamMaxLength = 100000000;
///private IDatabase db = null;
public readonly string RedisConnectionError = "Error Writing to Redis";
/// <summary>
/// Connects to Redis Stream With Streams for Each Priority and create consuming groups
/// if not created
/// </summary>
public static void init()
/* public void init()
{
var redis = ConnectionMultiplexer.Connect(RedisURL);
......@@ -57,7 +58,7 @@ namespace SchedulerNode.RedisQueuer
}
}
}
}*/
/// <summary>
......@@ -65,7 +66,7 @@ namespace SchedulerNode.RedisQueuer
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public static string addMessage(Message message)
public string addMessage(Message message)
{
string id = "Error";
string temp = string.Empty;
......@@ -83,10 +84,11 @@ namespace SchedulerNode.RedisQueuer
}
private static async Task<string> addMessageRedisAsync(Message message, string URL)
private async Task<string> addMessageRedisAsync(Message message, string URL)
{
try
{
IDatabase db = RedisSettingsInitializer.db;
string tag = getTag(ref message);
string streamName = tag + "_" + message.LocalPriority.ToString();
......@@ -121,7 +123,7 @@ namespace SchedulerNode.RedisQueuer
}
}
private static string getTag (ref Message message)
private string getTag (ref Message message)
{
if(message.Tag.Contains(Syriatel, StringComparison.OrdinalIgnoreCase))
......
using Scheduler.Initializer;
using StackExchange.Redis;
namespace Scheduler.RedisQueuer
{
public class RedisSettingsInitializer
{
private static readonly string RedisURL = RedisInfoParser.connection;
private static readonly string Syriatel = RedisInfoParser.Syriatel;
private static readonly string MTN = RedisInfoParser.MTN;
private static readonly int LEVELS = 6;
private static int StreamMaxLength = 100000000;
public static IDatabase db = null;
public readonly string RedisConnectionError = "Error Writing to Redis";
/// <summary>
/// Connects to Redis Stream With Streams for Each Priority and create consuming groups
/// if not created
/// </summary>
public static void init()
{
var redis = ConnectionMultiplexer.Connect(RedisURL);
db = redis.GetDatabase();
for (int i = 1; i < LEVELS; i++)
{
try
{
bool k1 = db.StreamCreateConsumerGroup(Syriatel + "_" + i.ToString(),
"SYS_MSGS",
0,
true);
bool k2 = db.StreamCreateConsumerGroup(MTN + "_" + i.ToString(),
"SYS_MSGS",
0,
true);
if (k1 && k2)
{
Console.WriteLine("OK");
}
}
catch (Exception ex)
{
continue;
}
}
}
}
}
......@@ -52,11 +52,12 @@ namespace SchedulerNode.Services
/// <returns>Acknowledgment: status of request and id if all goes well</returns>
private Acknowledgement SendAsap(ref Message message)
{
string reqId = MessageQueues.addMessage(message);
MessageQueues Mq = new MessageQueues();
string reqId = Mq.addMessage(message);
Console.WriteLine("req id ASAP que = " + reqId);
if (reqId.Equals(MessageQueues.RedisConnectionError))
if (reqId.Equals(Mq.RedisConnectionError))
{
return (new Acknowledgement
{
......@@ -80,9 +81,10 @@ namespace SchedulerNode.Services
/// <returns>Acknowledgment: status of request and id if all goes well</returns>
private Acknowledgement Schedule(ref Message message)
{
string res = MongoMessagesShceduler.insertMessage(ref message);
MongoMessagesShceduler mongoMessagesShceduler = new MongoMessagesShceduler();
string res = mongoMessagesShceduler.insertMessage(ref message);
Console.WriteLine("req id SCHEDULED que = " + res);
if (res.Equals (MongoMessagesShceduler.ConnectionError))
if (res.Equals (mongoMessagesShceduler.ConnectionError))
{
return new Acknowledgement
{
......
44aa8ac0f519284d33bbd830c129987feb0de392
dddf50d50e3bc6fd0864359277eba69d849ac441
......@@ -61,3 +61,7 @@
2.0
2.0
2.0
2.0
2.0
2.0
2.0
using MongoDB.Bson;
using MongoDB.Driver;
using ScheduledMessagesHandler.Initializer;
namespace ScheduledMessagesHandler.MongoMessages
{
public class MongoSettingsInitializer
{
private static string MongoURL = MessagesMongoDBParser.connection;
private static string DataBaseName = MessagesMongoDBParser.database;
private static string myCollection = MessagesMongoDBParser.collection;
public static IMongoCollection<BsonDocument> collection;
public static string ConnectionError = "Error Connecting to MongoDB on : " + MongoURL;
private static string NotTaken = "Not-Taken";
/// <summary>
/// Connects to MongoDB to Store Messges and Set Things Up
/// </summary>
/// <param name="MyId"></param>
/// <returns>string : ok if all goes well , otherwise something else</returns>
public static string init()
{
try
{
var client = new MongoClient(MongoURL);
var database = client.GetDatabase(DataBaseName);
collection = database.GetCollection<BsonDocument>(myCollection);
return "ok";
}
catch (Exception ex)
{
return ex.Message;
}
}
}
}
using MongoDB.Bson;
using MongoDB.Driver;
using ScheduledMessagesHandler;
using ScheduledMessagesHandler.Initializer;
using ScheduledMessagesHandler.RedisQueuer;
namespace ScheduledMessagesHandler.MongoMessages
{
public class MongoMessagesShceduler
public class ScheduledMessagesHandler
{
private static string MongoURL = MessagesMongoDBParser.connection;
private static string DataBaseName = MessagesMongoDBParser.database;
private static string myCollection = MessagesMongoDBParser.collection;
public static string ConnectionError = "Error Connecting to MongoDB on : " + MongoURL;
private static IMongoCollection<BsonDocument> collection;
private static string Pending = "Pending";
private static string Acked = "Acked";
//this NotTaken here must be the same of NotTaken in scheduler
private static string NotTaken = "Not-Taken";
private static int limit = 100000;
private string MongoURL = MessagesMongoDBParser.connection;
private string DataBaseName = MessagesMongoDBParser.database;
private string myCollection = MessagesMongoDBParser.collection;
//private IMongoCollection<BsonDocument> collection;
public string ConnectionError = "Error Connecting to MongoDB on : " + MessagesMongoDBParser.connection;
private string NotTaken = "Not-Taken";
private string Pending = "Pending";
private string Acked = "Acked";
private int limit = 100000;
/// <summary>
/// Connects to MongoDB Where Scheduled Messages Are Stored by The Scheduler Service
/// Connects to MongoDB to Store Messges and Set Things Up
/// </summary>
/// <returns>string : ok if every thing goes fine</returns>
public static string init()
/// <param name="MyId"></param>
/// <returns>string : ok if all goes well , otherwise something else</returns>
/*public string init()
{
try
{
var client = new MongoClient(MongoURL);
var database = client.GetDatabase(DataBaseName);
collection = database.GetCollection<BsonDocument>(myCollection);
var keys = Builders<BsonDocument>.IndexKeys.Ascending("timestamp").Ascending("status");
var indexOptions = new CreateIndexOptions { Background = true };
var indexModel = new CreateIndexModel<BsonDocument>(keys, indexOptions);
collection.Indexes.CreateOne(indexModel);
collection = database.GetCollection<BsonDocument>(myCollection);
return "ok";
}
......@@ -49,28 +40,28 @@ namespace ScheduledMessagesHandler.MongoMessages
return ex.Message;
}
}
}*/
/// <summary>
/// extract dued scheduled messages cautiously and write them to Redis Stream by Priority
/// </summary>
public static void getDuedMessagesAndQueue()
public void getDuedMessagesAndQueue()
{
bool thereArePending = true;
while (true)
{
if (thereArePending)
{
thereArePending = getBulkMessages(Pending);
thereArePending = getBulkMessages(Pending);
}
if (! thereArePending)
if (!thereArePending)
{
thereArePending = getBulkMessages(NotTaken);
}
}
}
/// <summary>
......@@ -78,8 +69,10 @@ namespace ScheduledMessagesHandler.MongoMessages
/// </summary>
/// <param name="status"></param>
/// <returns></returns>
private static bool getBulkMessages(string status)
private bool getBulkMessages(string status)
{
IMongoCollection<BsonDocument> collection = MongoSettingsInitializer.collection;
MessageQueues messageQueues = new MessageQueues();
var specificTime = DateTime.UtcNow;
var filter = Builders<BsonDocument>.Filter.And(
......@@ -93,7 +86,7 @@ namespace ScheduledMessagesHandler.MongoMessages
.Limit(limit)
.Sort(sort)
.ToList<BsonDocument>();
if (docs.Count == 0)
{
return false;
......@@ -110,20 +103,20 @@ namespace ScheduledMessagesHandler.MongoMessages
{
collection.UpdateOne(_filter, updatePending);
}
string result = MessageQueues.addMessage(message);
if (!result.Equals (MessageQueues.RedisConnectionError))
string result = messageQueues.addMessage(message);
if (!result.Equals(messageQueues.RedisConnectionError))
{
collection.UpdateOne(_filter, updateAck);
//Console.WriteLine(message);
}
}
return true;
}
private static MessageDTO getMessage(BsonDocument doc)
private MessageDTO getMessage(BsonDocument doc)
{
MessageDTO message = new MessageDTO();
message.clientID = (string)doc["sender"];
......@@ -133,7 +126,14 @@ namespace ScheduledMessagesHandler.MongoMessages
message.phoneNumber = (string)doc["phone-number"];
message.msgId = (string)doc["msg-id"];
message.apiKey = (string)doc["api-key"];
message.year = message.month = message.day = message.hour = message.minute = 0;
DateTime dd = (DateTime)doc["timestamp"];
Console.WriteLine(dd);
message.year = dd.Year;
message.month = dd.Month;
message.hour = dd.Hour;
message.day = dd.Day;
message.minute = dd.Minute;
//message.year = message.month = message.day = message.hour = message.minute = 0;
return message;
}
}
......
......@@ -19,8 +19,9 @@ config = host.Services.GetRequiredService<IConfiguration>();
Initializer.init(ref config);
MessageQueues.init();
MongoMessagesShceduler.init();
RedisSettingsInitializer.init();
MongoSettingsInitializer.init();
await host.RunAsync();
using Newtonsoft.Json;
using ScheduledMessagesHandler;
using ScheduledMessagesHandler.Initializer;
using StackExchange.Redis;
using Steeltoe.Discovery;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ScheduledMessagesHandler.RedisQueuer
{
public class MessageQueues
{
private readonly static string RedisURL = RedisInfoParser.connection;
private readonly string RedisURL = RedisInfoParser.connection;
private readonly static string Syriatel = RedisInfoParser.Syriatel;
private readonly static string MTN = RedisInfoParser.MTN;
public readonly static string RedisConnectionError = "Error Writing to Redis";
private readonly string Syriatel = RedisInfoParser.Syriatel;
private readonly string MTN = RedisInfoParser.MTN;
private readonly int LEVELS = 6;
private int StreamMaxLength = 100000000;
///private IDatabase db = null;
public readonly string RedisConnectionError = "Error Writing to Redis";
private static int LEVELS = 6;
private static int StreamMaxLength = 100000000;
private static IDatabase db = null;
/// <summary>
/// Connects to Redis Stream With Streams for Each Priority and create consuming groups
/// if not created
/// </summary>
public static void init()
/* public void init()
{
var redis = ConnectionMultiplexer.Connect(RedisURL);
......@@ -53,44 +57,46 @@ namespace ScheduledMessagesHandler.RedisQueuer
}
}
}
}*/
/// <summary>
/// Add a Message by Casting the MessageDTO to According Redis Stream
/// Add a Message to the According Redis Stream
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public static string addMessage(MessageDTO message)
public string addMessage(MessageDTO message)
{
string id = "Error";
string temp = string.Empty;
var resid = addMessageRedisAsync(message, RedisURL);
temp = resid.Result;
if (temp.Equals(RedisConnectionError))
{
return RedisConnectionError;
}
id = message.tag + ":" + message.localPriority + ":" + temp;
//id = Guid.NewGuid().ToString();
return id;
}
private static async Task<string> addMessageRedisAsync(MessageDTO message, string URL)
private async Task<string> addMessageRedisAsync(MessageDTO message, string URL)
{
try
{
IDatabase db = RedisSettingsInitializer.db;
string tag = getTag(ref message);
string streamName = tag + "_" + message.localPriority.ToString();
//Console.WriteLine("stream name = " + streamName);
Console.WriteLine("stream name = " + streamName);
var serializedMessage = JsonConvert.SerializeObject(message);
//Console.WriteLine("Sending to stream : " + streamName);
Console.WriteLine("Sending to stream : " + streamName);
var messageId = await db.StreamAddAsync
(streamName,
......@@ -98,13 +104,14 @@ namespace ScheduledMessagesHandler.RedisQueuer
{
new NameValueEntry("message", serializedMessage)
},
null,
StreamMaxLength,
true);
null,
StreamMaxLength,
true
);
//Console.WriteLine("Done Sending to stream : " + streamName);
//Console.WriteLine("Stream msg id = " + messageId);
Console.WriteLine("Done Sending to stream : " + streamName);
Console.WriteLine("Stream msg id = " + messageId);
//var messageId = "YES";
return messageId.ToString();
}
......@@ -115,7 +122,7 @@ namespace ScheduledMessagesHandler.RedisQueuer
}
}
private static string getTag (ref MessageDTO message)
private string getTag (ref MessageDTO message)
{
if(message.tag.Contains(Syriatel, StringComparison.OrdinalIgnoreCase))
......
using ScheduledMessagesHandler.Initializer;
using StackExchange.Redis;
namespace ScheduledMessagesHandler.RedisQueuer
{
public class RedisSettingsInitializer
{
private static readonly string RedisURL = RedisInfoParser.connection;
private static readonly string Syriatel = RedisInfoParser.Syriatel;
private static readonly string MTN = RedisInfoParser.MTN;
private static readonly int LEVELS = 6;
private static int StreamMaxLength = 100000000;
public static IDatabase db = null;
public readonly string RedisConnectionError = "Error Writing to Redis";
/// <summary>
/// Connects to Redis Stream With Streams for Each Priority and create consuming groups
/// if not created
/// </summary>
public static void init()
{
var redis = ConnectionMultiplexer.Connect(RedisURL);
db = redis.GetDatabase();
for (int i = 1; i < LEVELS; i++)
{
try
{
bool k1 = db.StreamCreateConsumerGroup(Syriatel + "_" + i.ToString(),
"SYS_MSGS",
0,
true);
bool k2 = db.StreamCreateConsumerGroup(MTN + "_" + i.ToString(),
"SYS_MSGS",
0,
true);
if (k1 && k2)
{
Console.WriteLine("OK");
}
}
catch (Exception ex)
{
continue;
}
}
}
}
}
......@@ -20,7 +20,8 @@ namespace ScheduledMessagesHandler
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
MongoMessagesShceduler.getDuedMessagesAndQueue();
MongoMessages.ScheduledMessagesHandler mongoMessagesHandler = new MongoMessages.ScheduledMessagesHandler();
mongoMessagesHandler.getDuedMessagesAndQueue();
}
}
}
\ No newline at end of file
0f1a2ce0938f1ef7cfc97270642d94a615e90ff5
a8dd004c9a106628a47121fe875b77b6ccc9f6db
FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443
FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build
WORKDIR /src
......@@ -17,4 +15,6 @@ RUN dotnet publish "Validator.csproj" -c Release -o /app/publish
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENV ASPNETCORE_URLS=http://localhost:5205;http://localhost:9095
# ENV ASPNETCORE_ENVIRONMENT=Development
ENTRYPOINT ["dotnet", "Validator.dll"]
\ No newline at end of file
......@@ -4,11 +4,11 @@ namespace Validator.MongoDBAccess
{
public class InformationHolder
{
private static readonly string URL = AccountsDBParser.connection;
private static readonly string DBName = AccountsDBParser.DBName;
private static readonly string collection = AccountsDBParser.collection;
private readonly string URL = AccountsDBParser.connection;
private readonly string DBName = AccountsDBParser.DBName;
private readonly string collection = AccountsDBParser.collection;
public static void checkMessage(MessageMetaData metaData)
public void checkMessage(MessageMetaData metaData)
{
//check for auther + authen + quota
}
......
......@@ -24,6 +24,7 @@
"Protocols": "Http2"
}
},
"OpenTelemetry": {
"ServiceName": "Validator-1",
"Tracing": {
......
......@@ -24,6 +24,7 @@
"Protocols": "Http2"
}
},
"OpenTelemetry": {
"ServiceName": "Validator-1",
"Tracing": {
......
......@@ -29,3 +29,9 @@
2.0
2.0
2.0
2.0
2.0
2.0
2.0
2.0
2.0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment