"""
Helper functions for loading data and managing arrays across ranks with MPI.
"""
import h5py
import numpy as np
try:
from mpi4py import MPI
_np2mpi = {np.dtype(np.float32): MPI.FLOAT,
np.dtype(np.float64): MPI.DOUBLE,
np.dtype(np.int): MPI.LONG,
np.dtype(np.intc): MPI.INT}
except ImportError:
pass
[docs]def check_valid_ndarray(X):
"""Checks whether X is a ndarray and returns a contiguous version.
"""
if X is None:
return X
if not isinstance(X, np.ndarray):
raise ValueError('Must be a numpy ndarray.')
return np.ascontiguousarray(X)
[docs]def load_data_MPI(h5_name, X_key='X', y_key='y', comm=None, root=0):
"""Load data from an HDF5 file and broadcast it across MPI ranks.
This is a helper function. It is also possible to load the data
without this function.
Parameters
----------
h5_name : str
Path to h5 file.
X_key : str
Key for the features dataset. (default: 'X')
y_key : str
Key for the targets dataset. (default: 'y')
comm : MPI.COMM_WORLD
MPI communicator.
root : int
This rank will load the data from file.
Returns
-------
X : ndarray
Features on all MPI ranks.
y : ndarray
Targets on all MPI ranks.
"""
if comm is None:
comm = MPI.COMM_WORLD
rank = comm.rank
Xshape = None
Xdtype = None
yshape = None
ydtype = None
if rank == root:
with h5py.File(h5_name, 'r') as f:
X = f[X_key][()]
Xshape = X.shape
Xdtype = X.dtype
y = f[y_key][()]
yshape = y.shape
ydtype = y.dtype
Xshape = comm.bcast(Xshape, root=root)
Xdtype = comm.bcast(Xdtype, root=root)
yshape = comm.bcast(yshape, root=root)
ydtype = comm.bcast(ydtype, root=root)
if rank != root:
X = np.empty(Xshape, dtype=Xdtype)
y = np.empty(yshape, dtype=ydtype)
comm.Bcast([X, _np2mpi[np.dtype(X.dtype)]], root=root)
comm.Bcast([y, _np2mpi[np.dtype(y.dtype)]], root=root)
return X, y
[docs]def Bcast_from_root(send, comm=None, root=0):
"""Broadcast an array from root to all MPI ranks.
Parameters
----------
send : ndarray or None
Array to send from root to all ranks. send in other ranks
has no effect.
comm : MPI.COMM_WORLD
MPI communicator.
root : int
This rank contains the array to send.
Returns
-------
send : ndarray
Each rank will have a copy of the array from root.
"""
send = check_valid_ndarray(send)
if comm is None:
comm = MPI.COMM_WORLD
rank = comm.rank
if rank == 0:
dtype = send.dtype
shape = send.shape
else:
dtype = None
shape = None
shape = comm.bcast(shape, root=root)
dtype = comm.bcast(dtype, root=root)
if rank != 0:
send = np.empty(shape, dtype=dtype)
comm.Bcast([send, _np2mpi[np.dtype(dtype)]], root=root)
return send
[docs]def Gatherv_rows(send, comm=None, root=0):
"""Concatenate arrays along the first axis using Gatherv on root.
Parameters
----------
send : ndarray
The arrays to concatenate. All dimensions must be equal except for the
first.
comm : MPI.COMM_WORLD
MPI communicator.
root : int
This rank will contain the Gatherv'ed array.
Returns
-------
rec : ndarray or None
Gatherv'ed array on root or None on other ranks.
"""
send = check_valid_ndarray(send)
if comm is None:
comm = MPI.COMM_WORLD
rank = comm.rank
dtype = send.dtype
shape = send.shape
tot = np.zeros(1, dtype=int)
# Gather the sizes of the first dimension on root
rank_sizes = comm.gather(shape[0], root=root)
comm.Reduce(np.array(shape[0], dtype=int),
[tot, _np2mpi[tot.dtype]], op=MPI.SUM, root=root)
if rank == root:
rec_shape = (tot[0],) + shape[1:]
rec = np.empty(rec_shape, dtype=dtype)
sizes = [size * np.prod(rec_shape[1:]) for size in rank_sizes]
disps = np.insert(np.cumsum(sizes), 0, 0)[:-1]
else:
rec = None
sizes = None
disps = None
comm.Gatherv(send, [rec, sizes, disps, _np2mpi[dtype]], root=0)
return rec