Commit ddb0001a authored by saad.aswad's avatar saad.aswad

docs: add headers, Makefile, and improve README

parent 13616b3b
.PHONY: all p1 p2 clean
# Default number of processes
NP ?= 4
all: p1 p2
p1:
@echo "Running Problem 1 (Prefix Sum) with NP=$(NP)..."
mpiexec -n $(NP) python3 src/problem1/prefix_sum.py
p2:
@echo "Running Problem 2 (Benchmark) with NP=$(NP)..."
mpiexec -n $(NP) python3 src/problem2/benchmark.py
clean:
find . -name "*.pyc" -delete
find . -name "__pycache__" -delete
# MPI Distributed Programming Assignment
# MPI Distributed Programming Project
Implementation of Prefix Sum and Tree-based Reduce using mpi4py.
This repository contains solutions for the Distributed Programming assignment using `mpi4py`. It implements a parallel prefix sum algorithm and a custom tree-based reduction, along with performance benchmarking.
## Requirements
- Python 3
- mpi4py
- OpenMPI (System installed)
## 📂 Project Structure
## Usage
- **`src/problem1/`**: Contains `prefix_sum.py` (Parallel Prefix Sum implementation).
- **`src/problem2/`**: Contains `manual_reduce.py` (Tree-based Reduce) and `benchmark.py` (Performance comparison).
- **`src/common/`**: Shared utility functions for data generation and verification.
- **`simgrid/`**: XML configuration files for SimGrid simulations.
- **`Makefile`**: Shortcuts for running tests and benchmarks.
### 1. Parallel Prefix Sum (Problem 1)
## 🚀 How to Run
### Prerequisites
- Python 3.x
- MPI implementation (OpenMPI, MPICH)
- `mpi4py` library (`pip install mpi4py`)
- `numpy` library (`pip install numpy`)
### Using Make (Recommended)
Run Problem 1 (Prefix Sum):
```bash
make p1
```
Run Problem 2 (Benchmark):
```bash
make p2
```
Run Both:
```bash
mpiexec -n <NP> python3 src/problem1/prefix_sum.py
make all
```
### 2. Manual Tree Reduce & Benchmark (Problem 2)
Change number of processes (e.g., to 8):
```bash
mpiexec -n <NP> python3 src/problem2/benchmark.py
make p1 NP=8
```
### 3. SimGrid Simulation
SimGrid configurations are provided in `simgrid/`.
To run with SimGrid (if SMPI/Python bindings are configured):
### Manual Execution
**Problem 1:**
```bash
smpirun -platform simgrid/platform.xml -hostfile ... python3 src/problem2/benchmark.py
mpiexec -n 4 python3 src/problem1/prefix_sum.py
```
## Running Tests
Execute the helper script:
**Problem 2:**
```bash
bash run_tests.sh
mpiexec -n 4 python3 src/problem2/benchmark.py
```
## 🧪 Testing on University Server (CentOS 7.7)
1. **Connect to the Cluster**:
Use MobaXterm or your terminal to SSH into the head node.
```bash
ssh username@cluster-address
```
2. **Clone the Repository**:
```bash
git clone git@github.com:LeadstarlingX/MPI_Distributed_Programming.git
cd MPI_Distributed_Programming
```
3. **Load Environment (If required)**:
Some university clusters require loading modules. Check if you need to run:
```bash
module load mpi/openmpi-x.x
module load python/3.x
```
4. **Install/Check Dependencies**:
Ensure `mpi4py` is installed in your user environment:
```bash
pip3 install --user mpi4py numpy
```
5. **Run**:
```bash
make all
```
"""
Common Utilities Package.
This file is empty to explicitly marks this directory as a Python package,
allowing `src.common` imports from other directories.
"""
# Init
"""
Common Utilities
Helper functions for data generation and verification shared across problems.
"""
def generate_data(size):
import numpy as np
return np.random.randint(0, 11, size)
......
"""
Problem 1: Parallel Prefix Sum
This file checks correctness of a distributed prefix sum algorithm on a generated dataset.
"""
# problem1/prefix_sum.py
from mpi4py import MPI
import sys
import os
# Add common directory to path to import utils
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from common import utils
def prefix_mpi(local_data, comm):
"""
Computes distributed prefix sum.
Computes distributed prefix sum using a 3-step algorithm.
"""
import numpy as np
# Step 1: Local Prefix Sum
# local_data is a numpy array.
local_prefix = np.cumsum(local_data)
# Step 2: Exchange Block Sums
# We need the total sum of this block to send to others
local_sum = local_prefix[-1]
# Gather all local_sums to Rank 0 (or Allgather to everyone)
# The requirement says: "In the second step... sequential... prefix sum computed over array of block sums"
# We can use Allgather so every process knows the offsets, or Gather to 0, calc, then Scatter.
# Given "sequential implementation provided... loop... full dependency chain", let's Gather to 0,
# compute offsets, then scatter/bcast offsets.
size = comm.Get_size()
rank = comm.Get_rank()
# Gather local sums (each process sends 1 float/int)
# block_sums will be significant on Rank 0
block_sums = comm.gather(local_sum, root=0)
block_offset = 0
if rank == 0:
# Sequential prefix sum on block_sums (exclude last element for offsets? No, we need offset for block `i`)
# offset for block 0 is 0.
# offset for block i is sum(block_sums[0]...block_sums[i-1])
# Calculate offsets
offsets = np.zeros(size, dtype=int)
current_acc = 0
for i in range(size):
......@@ -49,58 +34,47 @@ def prefix_mpi(local_data, comm):
block_offset = offsets
# Scatter the offsets back to processes
# We use scatter. rank 0 sends `offsets[i]` to rank `i`.
local_offset = comm.scatter(block_offset, root=0)
# Step 3: Add offsets
# Add local_offset to all elements in local_prefix
final_prefix = local_prefix + local_offset
return final_prefix
def main():
"""
Main execution function for Parallel Prefix Sum.
"""
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
N = 12 # Small test size, divisible by 3 and 4 -> 12/3 = 4 elements per process (if size=3)
# Ideally N should be large and passed as arg, but for skeleton we fix or parse arg.
N = 12
local_data = None
if rank == 0:
# Generate Data
full_data = utils.generate_data(N)
print(f"Rank 0: Generated data: {full_data}")
# Scatter needs equal chunks if using simple Scatter.
# mpi4py Scatter handles numpy arrays if contiguous.
# We assume N is divisible by size for this assignment simplicity or handle separately.
chunks = np.array_split(full_data, size)
else:
chunks = None
# Scatter
local_data = comm.scatter(chunks, root=0)
print(f"Rank {rank}: Received chunk {local_data}")
# Run Prefix Sum
result_local = prefix_mpi(local_data, comm)
print(f"Rank {rank}: Local prefix result (adjusted) {result_local}")
# Gather back
final_result = comm.gather(result_local, root=0)
if rank == 0:
# Flatten
final_flat = np.concatenate(final_result)
print(f"Final Parallel Result: {final_flat}")
# Verify
is_correct = utils.verify_prefix_sum(full_data, final_flat)
print(f"Verification: {'SUCCESS' if is_correct else 'FAILURE'}")
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()
"""
Problem 2: Benchmark
This file measures and compares execution time between Sequential Sum and the Manual Tree Reduce.
"""
# problem2/benchmark.py
from mpi4py import MPI
import numpy as np
......@@ -5,17 +9,18 @@ import time
import sys
import os
# Add common directory
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from common import utils
from problem2.manual_reduce import manual_reduce
def run_benchmark():
"""
Executes the benchmarking logic comparing Sequential Sum vs Manual Tree Reduce.
"""
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# Test cases: Array Size N
sizes = [1000, 10000, 100000, 1000000]
if rank == 0:
......@@ -23,7 +28,6 @@ def run_benchmark():
print("-" * 80)
for N in sizes:
# Generate data
local_vec = np.random.randint(0, 10, N // size if N >= size else N).astype(np.int32)
full_data = None
if rank == 0:
......@@ -31,7 +35,6 @@ def run_benchmark():
comm.Barrier()
# 1. Sequential Time (Only Rank 0 measures this for baseline)
seq_time = 0.0
if rank == 0:
start = MPI.Wtime()
......@@ -39,44 +42,11 @@ def run_benchmark():
end = MPI.Wtime()
seq_time = end - start
# Broadcast seq_time to all if needed, or just keep on Rank 0
# 2. Parallel Manual Reduce Time
# We need to distribute data or just use random local data
# For fair comparison, we should really scatter, but assuming local_vec is already the chunk.
comm.Barrier()
start_par = MPI.Wtime()
recvbuf = np.zeros(1, dtype=np.int32) if rank == 0 else None
# Sum local first? Yes, reduce assumes we are reducing a value or vector.
# Requirement: "receive arrays... adds... sends resulting array"
# If we reduce a single scalar per process vs vector:
# The prompt says "receives arrays... adds... resulting array". Implies vector reduction.
# But if we want to sum N numbers, typical MPI way is each sums local chunk -> scalar, then Reduce scalars.
# However, "array dimension" in table implies we might be reducing arrays of size N? NO.
# "Prefix sum of sequence"... "Parallel version of prefix sum".
# For Reduce: "root of reduction... data type is MPI_INT".
# "receive arrays... adds...". This implies we are doing an element-wise sum of arrays?
# OR it means we are reducing a large array where each process has a chunk?
# Usually "Reduce" reduces inputs from all process to one.
# If input is an array of size M, output is array of size M (element-wise sum).
# Let's assume we are reducing arrays of size M = 10 (or whatever) to show "array dimension".
# Wait, "Execution time... for different array sizes".
# This usually means the workload size.
# If we reduce a SCALAR (sum of N numbers), the message size is 1 INT.
# If we reduce a VECTOR (element-wise sum of N-arrays), message size is N INTs.
# Given "receives arrays... adds... sends resulting array", it likely means Vector Reduction.
# Let's treat N as the Vector Length.
# Adjust local_vec to be size N (all processes have vector of size N)
# Or N is total elements and we sum them?
# "Prefix sum" was Problem 1.
# Problem 2 is "Reduce operation... data type MPI_INT... receive arrays... adds...".
# This strongly suggests Vector Reduction of size N.
# So each process creates array of size N.
local_arr = np.random.randint(0, 10, N).astype(np.int32)
recv_arr = np.zeros(N, dtype=np.int32) if rank == 0 else None
......
"""
Problem 2: Manual Tree Reduce
This file contains the implementation of a custom tree-based reduction function mimicking MPI_Reduce.
"""
# problem2/manual_reduce.py
from mpi4py import MPI
import numpy as np
......@@ -12,63 +16,44 @@ def manual_reduce(sendbuf, recvbuf, op=MPI.SUM, root=0, comm=MPI.COMM_WORLD):
rank = comm.Get_rank()
size = comm.Get_size()
# Initialize accumulator with local data
# We assume sendbuf is a numpy array.
# recvbuf is where result goes on root.
# We need a temporary buffer equal to sendbuf size/type.
# For simplicity, assume sendbuf is a numpy array or scalar.
# If scalar, wrap/unwrap. But assume numpy for assignment.
import numpy as np
# Local accumulator
acc = np.copy(sendbuf)
# Children
left = 2 * rank + 1
right = 2 * rank + 2
# Receive from Left
if left < size:
# We need to know the size of data to recv?
# In MPI_Reduce, count/datatype are known. Here rely on numpy auto-detect or logic.
# sendbuf.shape provides the expected shape.
temp_recv = np.empty_like(sendbuf)
comm.Recv(temp_recv, source=left, tag=111)
acc += temp_recv
# Receive from Right
if right < size:
temp_recv = np.empty_like(sendbuf)
comm.Recv(temp_recv, source=right, tag=111)
acc += temp_recv
# Send to Parent or Store Result
if rank == root:
# We are root, result is in acc.
# Copy to recvbuf.
if recvbuf is not null: # In mpi4py recvbuf can be None on non-root, but here we are root.
# Ensure recvbuf is suitable (mutable).
if recvbuf is not None:
recvbuf[:] = acc[:]
else:
parent = (rank - 1) // 2
comm.Send(acc, dest=parent, tag=111)
def main():
"""
Main execution function/Test for Manual Reduce.
"""
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# Generate random vector
N = 10
local_vec = np.random.randint(0, 10, N).astype(np.int32)
# print(f"Rank {rank}: {local_vec}")
# Verify with standard Reduce to check correctness
std_result = np.zeros(N, dtype=np.int32) if rank == 0 else None
comm.Reduce(local_vec, std_result, op=MPI.SUM, root=0)
# Manual Reduce
manual_result = np.zeros(N, dtype=np.int32) if rank == 0 else None
manual_reduce(local_vec, manual_result, op=MPI.SUM, root=0, comm=comm)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment