Commit 0c540ce5 authored by tammam.alsoleman's avatar tammam.alsoleman

create client gateway in python

parent 51e2f508
import grpc
import generated.calculator_pb2 as calculator_pb2
import generated.calculator_pb2_grpc as calculator_pb2_grpc
import threading
class ClientGateway:
def __init__(self):
self.addition_channel = grpc.insecure_channel('localhost:50052')
self.multiplication_channel = grpc.insecure_channel('localhost:50053')
self.addition_stub = calculator_pb2_grpc.CalculatorStub(self.addition_channel)
self.multiplication_stub = calculator_pb2_grpc.CalculatorStub(self.multiplication_channel)
def perform_operations(self, a, b):
"""Send numbers to both addition and multiplication services"""
try:
addition_thread = threading.Thread(target=self._call_addition, args=(a, b))
multiplication_thread = threading.Thread(target=self._call_multiplication, args=(a, b))
addition_thread.start()
multiplication_thread.start()
addition_thread.join()
multiplication_thread.join()
except grpc.RpcError as e:
print(f"Error: {e.code()} - {e.details()}")
def _call_addition(self, a, b):
"""Call addition service"""
try:
numbers = calculator_pb2.Numbers(a=a, b=b)
response = self.addition_stub.Add(numbers)
print(f" Addition: {a} + {b} = {response.value}")
except grpc.RpcError as e:
print(f"Addition Service Error: {e.details()}")
def _call_multiplication(self, a, b):
"""Call multiplication service"""
try:
numbers = calculator_pb2.Numbers(a=a, b=b)
response = self.multiplication_stub.Multiply(numbers)
print(f" Multiplication: {a} × {b} = {response.value}")
except grpc.RpcError as e:
print(f"Multiplication Service Error: {e.details()}")
def get_history(self):
"""Request operation history from both services"""
print("\n Fetching operation history...")
self._get_addition_history()
self._get_multiplication_history()
def _get_addition_history(self):
"""Get addition operation history"""
try:
response_stream = self.addition_stub.GetOperationHistory(calculator_pb2.Empty())
print("\nAddition Operation History:")
for history in response_stream:
for op in history.operations:
print(f" {op.timestamp}: {op.a} + {op.b} = {op.result}")
except grpc.RpcError as e:
print(f"Error getting addition history: {e.details()}")
def _get_multiplication_history(self):
"""Get multiplication operation history"""
try:
response_stream = self.multiplication_stub.GetOperationHistory(calculator_pb2.Empty())
print("\nMultiplication Operation History:")
for history in response_stream:
for op in history.operations:
print(f" {op.timestamp}: {op.a} × {op.b} = {op.result}")
except grpc.RpcError as e:
print(f"Error getting multiplication history: {e.details()}")
def main():
client = ClientGateway()
while True:
print("\n" + "="*50)
print(" Distributed Calculator System")
print("1. Perform calculations")
print("2. Show operation history")
print("3. Exit")
choice = input("Choose option: ")
if choice == "1":
try:
a = int(input("Enter first number: "))
b = int(input("Enter second number: "))
client.perform_operations(a, b)
except ValueError:
print(" Please enter valid integers only!")
elif choice == "2":
client.get_history()
elif choice == "3":
print("👋 Goodbye!")
break
else:
print(" Invalid choice!")
if __name__ == "__main__":
main()
\ No newline at end of file
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# NO CHECKED-IN PROTOBUF GENCODE
# source: calculator.proto
# Protobuf Python Version: 6.31.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import runtime_version as _runtime_version
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
_runtime_version.ValidateProtobufRuntimeVersion(
_runtime_version.Domain.PUBLIC,
6,
31,
1,
'',
'calculator.proto'
)
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x63\x61lculator.proto\x12\ncalculator\"\x1f\n\x07Numbers\x12\t\n\x01\x61\x18\x01 \x01(\x05\x12\t\n\x01\x62\x18\x02 \x01(\x05\"*\n\x06Result\x12\r\n\x05value\x18\x01 \x01(\x05\x12\x11\n\toperation\x18\x02 \x01(\t\"S\n\x10OperationHistory\x12\x14\n\x0cservice_name\x18\x01 \x01(\t\x12)\n\noperations\x18\x02 \x03(\x0b\x32\x15.calculator.Operation\"R\n\tOperation\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\t\n\x01\x61\x18\x02 \x01(\x05\x12\t\n\x01\x62\x18\x03 \x01(\x05\x12\x0e\n\x06result\x18\x04 \x01(\x05\x12\x11\n\ttimestamp\x18\x05 \x01(\t\"\x07\n\x05\x45mpty2\xbb\x01\n\nCalculator\x12.\n\x03\x41\x64\x64\x12\x13.calculator.Numbers\x1a\x12.calculator.Result\x12\x33\n\x08Multiply\x12\x13.calculator.Numbers\x1a\x12.calculator.Result\x12H\n\x13GetOperationHistory\x12\x11.calculator.Empty\x1a\x1c.calculator.OperationHistory0\x01\x42+\n\'com.calculator.multiplication.generatedP\x01\x62\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'calculator_pb2', _globals)
if not _descriptor._USE_C_DESCRIPTORS:
_globals['DESCRIPTOR']._loaded_options = None
_globals['DESCRIPTOR']._serialized_options = b'\n\'com.calculator.multiplication.generatedP\001'
_globals['_NUMBERS']._serialized_start=32
_globals['_NUMBERS']._serialized_end=63
_globals['_RESULT']._serialized_start=65
_globals['_RESULT']._serialized_end=107
_globals['_OPERATIONHISTORY']._serialized_start=109
_globals['_OPERATIONHISTORY']._serialized_end=192
_globals['_OPERATION']._serialized_start=194
_globals['_OPERATION']._serialized_end=276
_globals['_EMPTY']._serialized_start=278
_globals['_EMPTY']._serialized_end=285
_globals['_CALCULATOR']._serialized_start=288
_globals['_CALCULATOR']._serialized_end=475
# @@protoc_insertion_point(module_scope)
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings
import calculator_pb2 as calculator__pb2
GRPC_GENERATED_VERSION = '1.76.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False
try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True
if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ ' but the generated code in calculator_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)
class CalculatorStub(object):
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.Add = channel.unary_unary(
'/calculator.Calculator/Add',
request_serializer=calculator__pb2.Numbers.SerializeToString,
response_deserializer=calculator__pb2.Result.FromString,
_registered_method=True)
self.Multiply = channel.unary_unary(
'/calculator.Calculator/Multiply',
request_serializer=calculator__pb2.Numbers.SerializeToString,
response_deserializer=calculator__pb2.Result.FromString,
_registered_method=True)
self.GetOperationHistory = channel.unary_stream(
'/calculator.Calculator/GetOperationHistory',
request_serializer=calculator__pb2.Empty.SerializeToString,
response_deserializer=calculator__pb2.OperationHistory.FromString,
_registered_method=True)
class CalculatorServicer(object):
"""Missing associated documentation comment in .proto file."""
def Add(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Multiply(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetOperationHistory(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_CalculatorServicer_to_server(servicer, server):
rpc_method_handlers = {
'Add': grpc.unary_unary_rpc_method_handler(
servicer.Add,
request_deserializer=calculator__pb2.Numbers.FromString,
response_serializer=calculator__pb2.Result.SerializeToString,
),
'Multiply': grpc.unary_unary_rpc_method_handler(
servicer.Multiply,
request_deserializer=calculator__pb2.Numbers.FromString,
response_serializer=calculator__pb2.Result.SerializeToString,
),
'GetOperationHistory': grpc.unary_stream_rpc_method_handler(
servicer.GetOperationHistory,
request_deserializer=calculator__pb2.Empty.FromString,
response_serializer=calculator__pb2.OperationHistory.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'calculator.Calculator', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers('calculator.Calculator', rpc_method_handlers)
# This class is part of an EXPERIMENTAL API.
class Calculator(object):
"""Missing associated documentation comment in .proto file."""
@staticmethod
def Add(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/calculator.Calculator/Add',
calculator__pb2.Numbers.SerializeToString,
calculator__pb2.Result.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def Multiply(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/calculator.Calculator/Multiply',
calculator__pb2.Numbers.SerializeToString,
calculator__pb2.Result.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def GetOperationHistory(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(
request,
target,
'/calculator.Calculator/GetOperationHistory',
calculator__pb2.Empty.SerializeToString,
calculator__pb2.OperationHistory.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
grpcio==1.60.0
grpcio-tools==1.60.0
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment