Commit ca46bd62 authored by hasan.bahjat's avatar hasan.bahjat 💬

Update prefixSum.py

parent 0b9ad6d4
...@@ -4,39 +4,57 @@ import numpy as np ...@@ -4,39 +4,57 @@ import numpy as np
# Perform parallel prefix sum on the given local array. # Perform parallel prefix sum on the given local array.
def prefix_mpi(local_array, rank, size): def prefix_mpi(local_array, rank, size):
comm = MPI.COMM_WORLD comm = MPI.COMM_WORLD
# [Step 1]: Compute local prefix sums # Start timers for computation and communication
computation_time = 0.0
communication_time = 0.0
# [Step 1]: Compute local prefix sums (computation)
comp_start = MPI.Wtime()
for i in range(1, len(local_array)): for i in range(1, len(local_array)):
local_array[i] += local_array[i - 1] local_array[i] += local_array[i - 1]
comp_end = MPI.Wtime()
computation_time += comp_end - comp_start
# Valuable Information printed
print(f"[ worker #{rank} ] local prefix sum: {local_array} ") print(f"[ worker #{rank} ] local prefix sum: {local_array} ")
# [Step 2]: Gather the last element of each local array to compute offsets # [Step 2]: Gather the last element of each local array to compute offsets (communication)
local_sum = np.array([local_array[-1]], dtype=int) local_sum = np.array([local_array[-1]], dtype=int)
all_local_sums = None all_local_sums = None
if rank == 0: if rank == 0:
all_local_sums = np.zeros(size, dtype=int) all_local_sums = np.zeros(size, dtype=int)
# Gather Blocks Results comm_start = MPI.Wtime()
comm.Gather(local_sum, all_local_sums, root=0) comm.Gather(local_sum, all_local_sums, root=0)
comm_end = MPI.Wtime()
communication_time += comm_end - comm_start
if rank == 0: if rank == 0:
# Compute offsets (computation)
comp_start = MPI.Wtime()
offsets = np.zeros(size, dtype=int) offsets = np.zeros(size, dtype=int)
for i in range(1, size): for i in range(1, size):
offsets[i] = offsets[i - 1] + all_local_sums[i - 1] offsets[i] = offsets[i - 1] + all_local_sums[i - 1]
comp_end = MPI.Wtime()
computation_time += comp_end - comp_start
else: else:
offsets = None offsets = None
# Broadcast offsets to all processes # Broadcast offsets to all processes (communication)
local_offset = np.zeros(1, dtype=int) local_offset = np.zeros(1, dtype=int)
comm_start = MPI.Wtime()
comm.Scatter(offsets, local_offset, root=0) comm.Scatter(offsets, local_offset, root=0)
comm_end = MPI.Wtime()
communication_time += comm_end - comm_start
# [Step 3]: Add offsets to local prefix sums # [Step 3]: Add offsets to local prefix sums (computation)
comp_start = MPI.Wtime()
local_array += local_offset[0] local_array += local_offset[0]
comp_end = MPI.Wtime()
computation_time += comp_end - comp_start
return computation_time, communication_time
def main(): def main():
...@@ -48,18 +66,12 @@ def main(): ...@@ -48,18 +66,12 @@ def main():
total_size = 8 # Total number of elements total_size = 8 # Total number of elements
block_size = (total_size + size - 1) // size # Block size for each process block_size = (total_size + size - 1) // size # Block size for each process
# Root Process
# -Its generate the array
# ther scatter it
# then Gather Blocks Prefix Sums
# and compute result finaly
if rank == 0: if rank == 0:
# Generate an array of random integers # Generate an array of random integers
global_array = np.random.randint(0, 100, total_size, dtype=int) global_array = np.random.randint(0, 100, total_size, dtype=int)
print("[ master ++ ] Input array:", global_array) print("[ master ++ ] Input array:", global_array)
# Pad the array if its not divisible by the number of processes # Pad the array if it's not divisible by the number of processes
padded_size = block_size * size padded_size = block_size * size
if total_size != padded_size: if total_size != padded_size:
global_array = np.pad(global_array, (0, padded_size - total_size), constant_values=0) global_array = np.pad(global_array, (0, padded_size - total_size), constant_values=0)
...@@ -72,11 +84,11 @@ def main(): ...@@ -72,11 +84,11 @@ def main():
# Distribute chunks of the array to all processes # Distribute chunks of the array to all processes
comm.Scatter(global_array, local_array, root=0) comm.Scatter(global_array, local_array, root=0)
# Start measuring time # Start measuring total execution time
start_time = MPI.Wtime() start_time = MPI.Wtime()
# Perform parallel prefix sum # Perform parallel prefix sum and measure computation and communication times
prefix_mpi(local_array, rank, size) computation_time, communication_time = prefix_mpi(local_array, rank, size)
# Gather results back to the root process # Gather results back to the root process
if rank == 0: if rank == 0:
...@@ -86,25 +98,25 @@ def main(): ...@@ -86,25 +98,25 @@ def main():
comm.Gather(local_array, result, root=0) comm.Gather(local_array, result, root=0)
# Stop measuring time # Stop measuring total execution time
end_time = MPI.Wtime() end_time = MPI.Wtime()
# Validate the result on the root process
if rank == 0: if rank == 0:
# Remove padding before validation # Remove padding before validation
result = result[:total_size] result = result[:total_size]
print(f"[ master ++ ] prefix sum: {result} ") print(f"[ master ++ ] prefix sum: {result}")
# Sequential prefix sum # Sequential prefix sum
sequential_result = np.zeros_like(result, dtype=int) sequential_result = np.zeros_like(result, dtype=int)
sequential_result[0]=global_array[0] sequential_result[0] = global_array[0]
for i in range(1, total_size): for i in range(1, total_size):
sequential_result[i] = sequential_result[i - 1] + global_array[i] sequential_result[i] = sequential_result[i - 1] + global_array[i]
print(f"[ ++++++ ++ ] Sequential result: {sequential_result} ") print(f"[ ++++++ ++ ] Sequential result: {sequential_result}")
# Output the execution time print(f"[ ++++++ ++ ] Total execution time: {end_time - start_time:.6f} seconds")
print(f"[ ++++++ ++ ] Execution time: {end_time - start_time :.6f} seconds") print(f"[ ++++++ ++ ] Total computation time: {computation_time:.6f} seconds")
print(f"[ ++++++ ++ ] Total communication time: {communication_time:.6f} seconds")
# Validate the results # Validate the results
if np.array_equal(result, sequential_result): if np.array_equal(result, sequential_result):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment