You need to sign in or sign up before continuing.
Commit 07d77621 authored by hasan  khaddour's avatar hasan khaddour

ADD first Question So,l.

parent 45078d68
from mpi4py import MPI
import numpy as np
# Perform parallel prefix sum on the given local array.
def prefix_mpi(local_array, rank, size):
comm = MPI.COMM_WORLD
# [Step 1]: Compute local prefix sums
for i in range(1, len(local_array)):
local_array[i] += local_array[i - 1]
# Valuable Information printed
print(f"[ worker #{rank} ] local prefix sum: {local_array} ")
# [Step 2]: Gather the last element of each local array to compute offsets
local_sum = np.array([local_array[-1]], dtype=int)
all_local_sums = None
if rank == 0:
all_local_sums = np.zeros(size, dtype=int)
# Gather Blocks Results
comm.Gather(local_sum, all_local_sums, root=0)
if rank == 0:
offsets = np.zeros(size, dtype=int)
for i in range(1, size):
offsets[i] = offsets[i - 1] + all_local_sums[i - 1]
else:
offsets = None
# Broadcast offsets to all processes
local_offset = np.zeros(1, dtype=int)
comm.Scatter(offsets, local_offset, root=0)
# [Step 3]: Add offsets to local prefix sums
local_array += local_offset[0]
def main():
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# Configuration
total_size = 8 # Total number of elements
block_size = (total_size + size - 1) // size # Block size for each process
# Root Process
# -Its generate the array
# ther scatter it
# then Gather Blocks Prefix Sums
# and compute result finaly
if rank == 0:
# Generate an array of random integers
global_array = np.random.randint(0, 100, total_size, dtype=int)
print("[ master ++ ] Input array:", global_array)
# Pad the array if its not divisible by the number of processes
padded_size = block_size * size
if total_size != padded_size:
global_array = np.pad(global_array, (0, padded_size - total_size), constant_values=0)
else:
global_array = None
# Allocate space for local array
local_array = np.zeros(block_size, dtype=int)
# Distribute chunks of the array to all processes
comm.Scatter(global_array, local_array, root=0)
# Start measuring time
start_time = MPI.Wtime()
# Perform parallel prefix sum
prefix_mpi(local_array, rank, size)
# Gather results back to the root process
if rank == 0:
result = np.zeros_like(global_array, dtype=int)
else:
result = None
comm.Gather(local_array, result, root=0)
# Stop measuring time
end_time = MPI.Wtime()
# Validate the result on the root process
if rank == 0:
# Remove padding before validation
result = result[:total_size]
print(f"[ master ++ ] prefix sum: {result} ")
# Sequential prefix sum
sequential_result = np.zeros_like(result, dtype=int)
sequential_result[0]=global_array[0]
for i in range(1, total_size):
sequential_result[i] = sequential_result[i - 1] + global_array[i]
print(f"[ ++++++ ++ ] Sequential result: {sequential_result} ")
# Output the execution time
print(f"[ ++++++ ++ ] Execution time: {end_time - start_time :.6f} seconds")
# Validate the results
if np.array_equal(result, sequential_result):
print("[ ++++++ ++ ] Validation successful: Results match!")
else:
print("[ ++++++ ++ ] Validation failed: Results do not match!")
if __name__ == "__main__":
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment