diff --git a/prefixSum.py b/prefixSum.py index 1eb683adc304d76d6e173d64a202195e95cd3491..9f548aac4a74ce28f99af3b2c241a24b7f4e4012 100644 --- a/prefixSum.py +++ b/prefixSum.py @@ -4,39 +4,57 @@ import numpy as np # Perform parallel prefix sum on the given local array. def prefix_mpi(local_array, rank, size): - comm = MPI.COMM_WORLD - # [Step 1]: Compute local prefix sums + # Start timers for computation and communication + computation_time = 0.0 + communication_time = 0.0 + + # [Step 1]: Compute local prefix sums (computation) + comp_start = MPI.Wtime() for i in range(1, len(local_array)): local_array[i] += local_array[i - 1] + comp_end = MPI.Wtime() + computation_time += comp_end - comp_start - # Valuable Information printed print(f"[ worker #{rank} ] local prefix sum: {local_array} ") - - # [Step 2]: Gather the last element of each local array to compute offsets + + # [Step 2]: Gather the last element of each local array to compute offsets (communication) local_sum = np.array([local_array[-1]], dtype=int) all_local_sums = None - if rank == 0: all_local_sums = np.zeros(size, dtype=int) - # Gather Blocks Results + comm_start = MPI.Wtime() comm.Gather(local_sum, all_local_sums, root=0) + comm_end = MPI.Wtime() + communication_time += comm_end - comm_start if rank == 0: + # Compute offsets (computation) + comp_start = MPI.Wtime() offsets = np.zeros(size, dtype=int) for i in range(1, size): offsets[i] = offsets[i - 1] + all_local_sums[i - 1] + comp_end = MPI.Wtime() + computation_time += comp_end - comp_start else: offsets = None - # Broadcast offsets to all processes + # Broadcast offsets to all processes (communication) local_offset = np.zeros(1, dtype=int) + comm_start = MPI.Wtime() comm.Scatter(offsets, local_offset, root=0) + comm_end = MPI.Wtime() + communication_time += comm_end - comm_start - # [Step 3]: Add offsets to local prefix sums + # [Step 3]: Add offsets to local prefix sums (computation) + comp_start = MPI.Wtime() local_array += local_offset[0] + comp_end = MPI.Wtime() + computation_time += comp_end - comp_start + + return computation_time, communication_time def main(): @@ -45,21 +63,15 @@ def main(): size = comm.Get_size() # Configuration - total_size = 8 # Total number of elements + total_size = 8 # Total number of elements block_size = (total_size + size - 1) // size # Block size for each process - # Root Process - # -Its generate the array - # ther scatter it - # then Gather Blocks Prefix Sums - # and compute result finaly if rank == 0: - # Generate an array of random integers global_array = np.random.randint(0, 100, total_size, dtype=int) print("[ master ++ ] Input array:", global_array) - # Pad the array if its not divisible by the number of processes + # Pad the array if it's not divisible by the number of processes padded_size = block_size * size if total_size != padded_size: global_array = np.pad(global_array, (0, padded_size - total_size), constant_values=0) @@ -71,12 +83,12 @@ def main(): # Distribute chunks of the array to all processes comm.Scatter(global_array, local_array, root=0) - - # Start measuring time + + # Start measuring total execution time start_time = MPI.Wtime() - - # Perform parallel prefix sum - prefix_mpi(local_array, rank, size) + + # Perform parallel prefix sum and measure computation and communication times + computation_time, communication_time = prefix_mpi(local_array, rank, size) # Gather results back to the root process if rank == 0: @@ -85,26 +97,26 @@ def main(): result = None comm.Gather(local_array, result, root=0) - - # Stop measuring time + + # Stop measuring total execution time end_time = MPI.Wtime() - - # Validate the result on the root process + if rank == 0: # Remove padding before validation result = result[:total_size] - print(f"[ master ++ ] prefix sum: {result} ") + print(f"[ master ++ ] prefix sum: {result}") - # Sequential prefix sum + # Sequential prefix sum sequential_result = np.zeros_like(result, dtype=int) - sequential_result[0]=global_array[0] + sequential_result[0] = global_array[0] for i in range(1, total_size): sequential_result[i] = sequential_result[i - 1] + global_array[i] - print(f"[ ++++++ ++ ] Sequential result: {sequential_result} ") - # Output the execution time - print(f"[ ++++++ ++ ] Execution time: {end_time - start_time :.6f} seconds") + print(f"[ ++++++ ++ ] Sequential result: {sequential_result}") + print(f"[ ++++++ ++ ] Total execution time: {end_time - start_time:.6f} seconds") + print(f"[ ++++++ ++ ] Total computation time: {computation_time:.6f} seconds") + print(f"[ ++++++ ++ ] Total communication time: {communication_time:.6f} seconds") # Validate the results if np.array_equal(result, sequential_result):