Commit b0d07fef authored by Mohamad Bashar Desoki's avatar Mohamad Bashar Desoki

Prediction Service - Application

parent 24a00f24
# prediction-python
Install Requirements
```
$ pip install -r requirements.txt
```
Generate Stubs (if there's a need to update the service)
```
$ python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. prediction.proto
```
\ No newline at end of file
from scipy.spatial.distance import euclidean
from fastdtw import fastdtw
import numpy as np
def manhattan_distance(point1, point2):
return np.sum(np.abs(point1 - point2))
def cosine_similarity(vec1, vec2):
dot_product = np.dot(vec1, vec2)
norm_vec1 = np.linalg.norm(vec1)
norm_vec2 = np.linalg.norm(vec2)
if norm_vec1 == 0 or norm_vec2 == 0:
# Handle the case where one of the vectors has zero magnitude
similarity = 0.0 # or another appropriate value
else:
similarity = dot_product / (norm_vec1 * norm_vec2)
return similarity
def euclidean_distance(vec1, vec2):
return euclidean(vec1, vec2)
#DTW
# https://ealizadeh.com/blog/introduction-to-dynamic-time-warping/
# https://www.databricks.com/blog/2019/04/30/understanding-dynamic-time-warping.html
def fast_dtw(vec1, vec2):
return fastdtw(vec1, vec2)[0]
syntax = "proto3";
package prediction;
message PredictionRequest {
string micorservice_name = 1;
repeated double measurements = 2;
string history = 3;
string stepDuration = 4;
int32 predictVerticalWindow = 5;
int32 predictHorizontalWindow = 6;
}
message PredictionResponse {
int32 result = 1;
}
service PredictionService {
rpc ProcessData(PredictionRequest) returns (PredictionResponse);
}
\ No newline at end of file
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: prediction.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10prediction.proto\x12\nprediction\"\xab\x01\n\x11PredictionRequest\x12\x19\n\x11micorservice_name\x18\x01 \x01(\t\x12\x14\n\x0cmeasurements\x18\x02 \x03(\x01\x12\x0f\n\x07history\x18\x03 \x01(\t\x12\x14\n\x0cstepDuration\x18\x04 \x01(\t\x12\x1d\n\x15predictVerticalWindow\x18\x05 \x01(\x05\x12\x1f\n\x17predictHorizontalWindow\x18\x06 \x01(\x05\"$\n\x12PredictionResponse\x12\x0e\n\x06result\x18\x01 \x01(\x05\x32\x61\n\x11PredictionService\x12L\n\x0bProcessData\x12\x1d.prediction.PredictionRequest\x1a\x1e.prediction.PredictionResponseb\x06proto3')
_PREDICTIONREQUEST = DESCRIPTOR.message_types_by_name['PredictionRequest']
_PREDICTIONRESPONSE = DESCRIPTOR.message_types_by_name['PredictionResponse']
PredictionRequest = _reflection.GeneratedProtocolMessageType('PredictionRequest', (_message.Message,), {
'DESCRIPTOR' : _PREDICTIONREQUEST,
'__module__' : 'prediction_pb2'
# @@protoc_insertion_point(class_scope:prediction.PredictionRequest)
})
_sym_db.RegisterMessage(PredictionRequest)
PredictionResponse = _reflection.GeneratedProtocolMessageType('PredictionResponse', (_message.Message,), {
'DESCRIPTOR' : _PREDICTIONRESPONSE,
'__module__' : 'prediction_pb2'
# @@protoc_insertion_point(class_scope:prediction.PredictionResponse)
})
_sym_db.RegisterMessage(PredictionResponse)
_PREDICTIONSERVICE = DESCRIPTOR.services_by_name['PredictionService']
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_PREDICTIONREQUEST._serialized_start=33
_PREDICTIONREQUEST._serialized_end=204
_PREDICTIONRESPONSE._serialized_start=206
_PREDICTIONRESPONSE._serialized_end=242
_PREDICTIONSERVICE._serialized_start=244
_PREDICTIONSERVICE._serialized_end=341
# @@protoc_insertion_point(module_scope)
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import lib.grpc.prediction_pb2 as prediction__pb2
class PredictionServiceStub(object):
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.ProcessData = channel.unary_unary(
'/prediction.PredictionService/ProcessData',
request_serializer=prediction__pb2.PredictionRequest.SerializeToString,
response_deserializer=prediction__pb2.PredictionResponse.FromString,
)
class PredictionServiceServicer(object):
"""Missing associated documentation comment in .proto file."""
def ProcessData(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_PredictionServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'ProcessData': grpc.unary_unary_rpc_method_handler(
servicer.ProcessData,
request_deserializer=prediction__pb2.PredictionRequest.FromString,
response_serializer=prediction__pb2.PredictionResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'prediction.PredictionService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class PredictionService(object):
"""Missing associated documentation comment in .proto file."""
@staticmethod
def ProcessData(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/prediction.PredictionService/ProcessData',
prediction__pb2.PredictionRequest.SerializeToString,
prediction__pb2.PredictionResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
import re
orig_r=4
orig_c=7
#two hour in three day before
wind_r=2
wind_c=3
microservices_ts= {}
microservices_cfg= {}
def update_microservices_ts(key, value):
# if key in microservices_ts:
# # diff=list(set(microservices_ts[key])-set(value))
# microservices_ts[key].extend(diff) # If the key exists, append to the existing array
# else:
microservices_ts[key] = value # If the key doesn't exist, create a new array with the value
def update_microservices_cfg(key,history=orig_r,stepDuration=15,predictVerticalWindow=wind_r,predictHorizontalWindow=wind_c):
microservices_cfg[key] = {
"orig_r":int(re.search(r'\d+', history).group()), #orig_r
"orig_c":int(60*24/int(re.search(r'\d+', stepDuration).group())), #to continue
"wind_r":int(predictVerticalWindow), #wind_r
"wind_c":int(predictHorizontalWindow) #wind_c
}
import numpy as np
def update_matrix(ground_ts, new_element):
ground_ts = np.append(ground_ts[1:], new_element)
return ground_ts
def extract_submatrices(matrix, k, m):
n, d = matrix.shape
submatrices = []
for i in range(n-k+1):
for j in range(d):
submatrix = np.zeros((k, m))
if(i==n-k and j>d-m):
break
for r in range(k):
for c in range(m):
if(j + c < d):
row_idx = (i + r)
col_idx = (j + c)
elif(i+r<n-1):
row_idx = (i + r+1)
col_idx = (j + c) % d
submatrix[r, c] = matrix[row_idx, col_idx]
submatrices.append(submatrix.flatten())
return submatrices
from lib.matrixtrans.matrixtrans import *
from lib.distance.distance import *
import numpy as np
import sys
def prediction(time_series, orig_r, orig_c, wind_r, wind_c,distance_func):
if(len(time_series)==1):
return time_series[0]
# Prepare and shape time series
if(len(time_series)>orig_r*orig_c):
time_series=time_series[-(orig_r*orig_c):]
else:
max_integer = sys.maxsize
sparse =np.full((orig_r*orig_c)-len(time_series),max_integer)
time_series= np.concatenate((sparse, time_series))
time_series=update_matrix(time_series,0).reshape(orig_r, orig_c)
# print(time_series)
# Extract Submatrices
submatrices = extract_submatrices(time_series, wind_r, wind_c)
# print(submatrices)
similarity_distances = {
manhattan_distance: [],
euclidean_distance: [],
cosine_similarity: [],
fast_dtw: []
}
# Calculate the similarity between the main series (e.g the last submatrix) and other submatrix
main_series=submatrices[-1]
for submatrix in submatrices[:-1]:
for key in similarity_distances:
distance = key(main_series[:-1], submatrix[:-1])
similarity_distances[key].append(distance)
mst_similar = {
"manhattan_distance": None,
"euclidean_distance": None,
"cosine_similarity": None,
"fast_dtw": None
}
for key in similarity_distances:
if(key!=cosine_similarity):
opt_dist =min(similarity_distances[key])
else:
opt_dist =max(similarity_distances[key])
idx_of_mst_similar=similarity_distances[key].index(opt_dist)
mst_similar[key.__name__]=submatrices[idx_of_mst_similar][-1]
#Debug
# print(similarity_distances)
# print("the index is" ,idx_of_mst_similar)
# print("the main array", main_series)
# print("the mst simmiler array", mst_similar)
# print("the forecast is ", mst_similar[-1])
return mst_similar[distance_func]
# import pandas as pd
import numpy as np
from lib.prediction import prediction
from lib.helper import *
# grpc
import grpc
from concurrent.futures import ThreadPoolExecutor
from lib.grpc.prediction_pb2 import PredictionRequest, PredictionResponse
from lib.grpc.prediction_pb2_grpc import PredictionServiceServicer, add_PredictionServiceServicer_to_server
class PredictionServiceImpl(PredictionServiceServicer):
def ProcessData(self, request, context):
# Process the received arrays and strings
microservice = request.micorservice_name
update_microservices_ts(microservice,request.measurements)
update_microservices_cfg(microservice,request.history,request.stepDuration,request.predictVerticalWindow,request.predictHorizontalWindow)
print(microservices_ts)
# print(microservices_cfg)
response=prediction(microservices_ts[microservice],microservices_cfg[microservice]["orig_r"],microservices_cfg[microservice]["orig_c"],microservices_cfg[microservice]["wind_r"],microservices_cfg[microservice]["wind_c"],"fast_dtw")
print(int(response))
return PredictionResponse(result=int(response))
def serve():
server = grpc.server(ThreadPoolExecutor())
add_PredictionServiceServicer_to_server(PredictionServiceImpl(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
serve()
fastdtw==0.3.4
grpcio==1.48.2
grpcio-tools==1.48.2
numpy==1.19.5
pkg-resources==0.0.0
protobuf==3.19.6
scipy==1.5.4
six==1.16.0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment