import numpy as np
import importlib
from scipy.stats import norm
import pathlib
import sys
from tqdm import tqdm
from joblib import Parallel, delayed
[docs]
class OsipiBase:
"""
Comprehensive base class for OSIPI IVIM fitting algorithms.
The ``OsipiBase`` class defines a standardized framework for fitting
intravoxel incoherent motion (IVIM) models to diffusion-weighted MRI data.
It manages common attributes such as b-values, parameter bounds,
thresholds, and initial guesses; provides requirement-checking utilities;
and offers convenience methods for voxel-wise or full-volume fitting.
Subclasses can implement algorithm-specific behavior while inheriting
these shared tools.
Parameters
----------
bvalues : array-like, optional
Diffusion b-values (s/mm²) matching the last dimension of the input data.
thresholds : array-like, optional
Thresholds used by specific algorithms (e.g., signal cutoffs).
bounds : array-like, optional
Parameter bounds for constrained optimization.
initial_guess : array-like, optional
Initial parameter estimates for the IVIM fit.
algorithm : str, optional
Name of an algorithm module in ``src/standardized`` to load dynamically.
If supplied, the instance is immediately converted to that algorithm’s
subclass via :meth:`osipi_initiate_algorithm`.
**kwargs
Additional keyword arguments forwarded to the selected algorithm’s
initializer if ``algorithm`` is provided.
Attributes
----------
bvalues, thresholds, bounds, initial_guess : np.ndarray or None
Core fitting inputs stored as arrays when provided.
use_bounds, use_initial_guess : bool
Flags controlling whether bounds and initial guesses are applied.
deep_learning, supervised, stochastic : bool
Indicators for the algorithm type; subclasses may set these.
result_keys : list of str, optional
Names of the output parameters (e.g., ["f", "Dp", "D"]).
required_bvalues, required_thresholds, required_bounds,
required_bounds_optional, required_initial_guess,
required_initial_guess_optional : various, optional
Optional attributes used by requirement-checking helpers.
Key Methods
-----------
osipi_initiate_algorithm(algorithm, **kwargs)
Dynamically replace the current instance with the specified
algorithm subclass.
osipi_fit(data, bvalues=None, njobs=1, **kwargs)
Voxel-wise IVIM fitting with optional parallel processing and
automatic signal normalization.
osipi_fit_full_volume(data, bvalues=None, **kwargs)
Full-volume fitting for algorithms that support it.
osipi_print_requirements()
Display algorithm requirements such as needed b-values or bounds.
osipi_accepted_dimensions(), osipi_accepts_dimension(dim)
Query acceptable input dimensionalities.
osipi_check_required_*()
Validate that provided inputs meet algorithm requirements.
osipi_simple_bias_and_RMSE_test(SNR, bvalues, f, Dstar, D, noise_realizations)
Monte-Carlo bias/RMSE evaluation of the current fitting method.
D_and_Ds_swap(results)
Ensure consistency of D and D* estimates by swapping if necessary.
Notes
-----
* This class is typically used as a base or as a dynamic loader for
a specific algorithm implementation.
* Parallel voxel-wise fitting uses :mod:`joblib`.
* Subclasses must implement algorithm-specific methods such as
:meth:`ivim_fit` or :meth:`ivim_fit_full_volume`.
Examples
--------
# Dynamic algorithm loading
model = OsipiBase(bvalues=[0, 50, 200, 800],
algorithm="MyAlgorithm")
# Voxel-wise fitting
results = base.osipi_fit(dwi_data, njobs=4)
f_map = results["f"]
"""
def __init__(self, bvalues=None, thresholds=None, bounds=None, initial_guess=None, algorithm=None, **kwargs):
# Define the attributes as numpy arrays only if they are not None
self.bvalues = np.asarray(bvalues) if bvalues is not None else None
self.thresholds = np.asarray(thresholds) if thresholds is not None else None
self.bounds = np.asarray(bounds) if bounds is not None else None
self.initial_guess = np.asarray(initial_guess) if initial_guess is not None else None
self.use_bounds = True
self.use_initial_guess = True
self.deep_learning = False
self.supervised = False
self.stochastic = False
# If the user inputs an algorithm to OsipiBase, it is intereprete as initiating
# an algorithm object with that name.
if algorithm:
self.osipi_initiate_algorithm(algorithm, bvalues=bvalues, thresholds=thresholds, bounds=bounds, initial_guess=initial_guess, **kwargs)
[docs]
def osipi_initiate_algorithm(self, algorithm, **kwargs):
"""Turns the class into a specified one by the input.
This method can be used instead of specifically importing that class and initiating it.
WIP: Add args and kwargs!
Args:
algorithm (string): The name of the algorithm, should be the same as the file in the src/standardized folder without the .py extension.
"""
# Import the algorithm
root_path = pathlib.Path(__file__).resolve().parents[2]
if str(root_path) not in sys.path:
print("Root folder not in PYTHONPATH")
return False
import_base_path = "src.standardized"
import_path = import_base_path + "." + algorithm
#Algorithm = getattr(importlib.import_module(import_path), algorithm)
# Change the class from OsipiBase to the specified algorithm
self.__class__ = getattr(importlib.import_module(import_path), algorithm)
self.__init__(**kwargs)
[docs]
def initialize(**kwargs):
"""Placeholder for subclass initialization"""
pass
#def osipi_fit(self, data=None, bvalues=None, thresholds=None, bounds=None, initial_guess=None, **kwargs):
[docs]
def osipi_fit(self, data, bvalues=None, njobs=1, **kwargs):
"""
Fit multi-b-value diffusion MRI data using the IVIM model.
This function normalizes the input signal to the minimum b-value and fits each voxel's signal to the IVIM model to estimate parameters such as
perfusion fraction (f), pseudo-diffusion coefficient (D*), and tissue diffusion coefficient (D).
Fits can be computed in parallel across voxels using multiple jobs.
Parameters
----------
data : np.ndarray
Multi-dimensional array containing the signal intensities. The last dimension must correspond
to the b-values (diffusion weightings).
bvalues : array-like, optional
Array of b-values corresponding to the last dimension of `data`. If not provided, the method
uses `self.bvalues` set during object initialization.
njobs : int, optional, default=1
Number of parallel jobs to use for voxel-wise fitting. If `njobs` > 1, the fitting will be
distributed across multiple processes. -1 will use all available cpus
**kwargs : dict, optional
Additional keyword arguments to be passed to the underlying `ivim_fit` function.
Returns
-------
results : dict of np.ndarray
Dictionary containing voxel-wise parameter maps. Keys are parameter names ("f", "D", "Dp"),
and values are arrays with the same shape as `data` excluding the last dimension.
Notes
-----
- The signal is normalized to the minimum b-value before fitting.
- Handles NaN values by returning zeros for all parameters in those voxels.
- Parallelization is handled using joblib's `Parallel` and `delayed`.
- If `self.result_keys` is defined, it determines the output parameter names; otherwise, the default
keys are ["f", "Dp", "D"].
- The method swaps D and D* values after fitting using `self.D_and_Ds_swap` to maintain consistency.
Example
-------
>>> results = instance.osipi_fit(data, bvalues=[0, 50, 200, 800], njobs=4)
>>> f_map = results['f']
>>> D_map = results['D']
"""
# We should first check whether the attributes in the __init__ are not None
# Then check if they are input here, if they are, these should overwrite the attributes
use_bvalues = bvalues if bvalues is not None else self.bvalues
#use_thresholds = thresholds if self.thresholds is None else self.thresholds
#use_bounds = bounds if self.bounds is None else self.bounds
#use_initial_guess = initial_guess if self.initial_guess is None else self.initial_guess
# Make sure we don't make arrays of None's
if use_bvalues is not None: use_bvalues = np.asarray(use_bvalues)
#if use_thresholds is not None: use_thresholds = np.asarray(use_thresholds)
#if use_bounds is not None: use_bounds = np.asarray(use_bounds)
#if use_initial_guess is not None: use_initial_guess = np.asarray(use_initial_guess)
#kwargs["bvalues"] = use_bvalues
#args = [data, use_bvalues, use_thresholds]
#if self.required_bounds or self.required_bounds_optional:
#args.append(use_bounds)
#if self.required_initial_guess or self.required_initial_guess_optional:
#args.append(use_initial_guess)
# Run a check_requirements method that makes sure that these inputs fulfil the requirements
# Then we check which inputs are required for the algorithm using the requirements attributes in the template
# Then we pass everything into the ivim_fit method which performs the fit according to the algorithm
#args = [data, use_bvalues, use_initial_guess, use_bounds, use_thresholds]
#args = [arg for arg in args if arg is not None]
# Check if there is an attribute that defines the result dictionary keys
if hasattr(self, "result_keys"):
# result_keys is a list of strings of parameter names, e.g. "S0", "f1", "f2", etc.
result_keys = self.result_keys
else:
# Default is ["f", "Dp", "D"]
self.result_keys = ["f", "Dp", "D"]
results = {}
for key in self.result_keys:
results[key] = np.empty(list(data.shape[:-1]))
# Assuming the last dimension of the data is the signal values of each b-value
# results = np.empty(list(data.shape[:-1])+[3]) # Create an array with the voxel dimensions + the ones required for the fit
#for ijk in tqdm(np.ndindex(data.shape[:-1]), total=np.prod(data.shape[:-1])):
#args = [data[ijk], use_bvalues]
#fit = list(self.ivim_fit(*args, **kwargs))
#results[ijk] = fit
minimum_bvalue = np.min(use_bvalues) # We normalize the signal to the minimum bvalue. Should be 0 or very close to 0.
b0_indices = np.where(use_bvalues == minimum_bvalue)[0]
normalization_factor = np.mean(data[..., b0_indices],axis=-1)
data = data / np.repeat(normalization_factor[...,np.newaxis],np.shape(data)[-1],-1)
if np.shape(data.shape)[0] == 1:
njobs=1
if data.shape[0] < njobs:
njobs = 1
if njobs > 1:
# Flatten the indices first
all_indices = list(np.ndindex(data.shape[:-1]))
#np.save('data.npy', data)
#data = np.load('data.npy', mmap_mode='r')
# Define parallel function
def parfun(ijk):
single_voxel_data = np.array(data[ijk], copy=True)
if not np.isnan(single_voxel_data[0]):
fit = self.ivim_fit(single_voxel_data, use_bvalues, **kwargs)
else:
fit={'D':0,'f':0,'Dp':0}
return ijk, fit
# Run in parallel
results_list = Parallel(n_jobs=njobs)(
delayed(parfun)(ijk) for ijk in tqdm(all_indices, total=len(all_indices), mininterval=60) # updates every minute
)
# Initialize result arrays if not already done
# Example: results = {key: np.zeros(data.shape[:-1]) for key in expected_keys}
# Populate results after parallel loop
for ijk, fit in results_list:
for key in fit:
results[key][ijk] = fit[key]
else:
for ijk in tqdm(np.ndindex(data.shape[:-1]), total=np.prod(data.shape[:-1]), mininterval=60):
# Normalize array
single_voxel_data = data[ijk]
if not np.isnan(single_voxel_data[0]):
args = [single_voxel_data, use_bvalues]
fit = self.ivim_fit(*args, **kwargs)
else:
fit={'D':0,'f':0,'Dp':0}
for key in list(fit.keys()):
results[key][ijk] = fit[key]
#self.parameter_estimates = self.ivim_fit(data, bvalues)
return results
[docs]
def osipi_fit_full_volume(self, data, bvalues=None, **kwargs):
"""
Fit an entire volume of multi-b-value diffusion MRI data in a single call using the IVIM model.
Unlike `osipi_fit`, which processes one voxel at a time, this method sends the full volume
to the fitting algorithm. This can be more efficient for algorithms that support full-volume fitting.
Parameters
----------
data : np.ndarray
2D (data x b-values), 3D (single slice), or 4D (multi-slice) diffusion-weighted imaging (DWI) data.
The last dimension must correspond to the b-values.
bvalues : array-like, optional
Array of b-values corresponding to the last dimension of `data`. If not provided, the method
uses `self.bvalues` set during object initialization.
**kwargs : dict, optional
Additional keyword arguments to be passed to `ivim_fit_full_volume`.
Returns
-------
results : dict of np.ndarray or bool
Dictionary containing parametric maps for each IVIM parameter (keys from `self.result_keys`,
or defaults ["f", "Dp", "D"]). Each value is an array matching the spatial dimensions of `data`.
Returns `False` if full-volume fitting is not supported by the algorithm.
Notes
-----
- This method does not normalize the input signal to the minimum b-value, unlike `osipi_fit`.
Example
-------
# Standard usage:
results = instance.osipi_fit_full_volume(data, bvalues=[0, 50, 200, 800])
f_map = results['f']
D_map = results['D']
Dp_map = results['Dp']
# If the algorithm does not support full-volume fitting:
results = instance.osipi_fit_full_volume(data)
if results is False:
print("Full-volume fitting not supported.")
"""
try:
use_bvalues = bvalues if bvalues is not None else self.bvalues
if use_bvalues is not None: use_bvalues = np.asarray(use_bvalues)
# Check if there is an attribute that defines the result dictionary keys
if hasattr(self, "result_keys"):
# result_keys is a list of strings of parameter names, e.g. "S0", "f1", "f2", etc.
result_keys = self.result_keys
else:
# Default is ["f", "Dp", "D"]
self.result_keys = ["f", "Dp", "D"]
# Create the results dictionary
results = {}
for key in self.result_keys:
results[key] = np.empty(list(data.shape[:-1]))
# no normalisation as volume algorithms may not want normalized signals...
args = [data, use_bvalues]
fit = self.ivim_fit_full_volume(*args, **kwargs) # Assume this is a dict with an array per key representing the parametric maps
for key in list(fit.keys()):
results[key] = fit[key]
return results
except:
# Check if the problem is that full volume fitting is simply not supported in the standardized implementation
if not hasattr(self, "ivim_fit_full_volume"): #and callable(getattr(self, "ivim_fit_full_volume")):
print("Full volume fitting not supported for this algorithm")
return False
[docs]
def osipi_print_requirements(self):
"""
Prints the requirements of the algorithm.
Attributes that are currently checked for are:
required_bvalues (int):
The lowest number of b-values required.
required_thresholds (array-like)
1D array-like of two ints [least number of b-value thresholds, most number of b-value_thresholds].
required_bounds (bool):
Whether bounds are required or not.
required_bounds_optional (bool):
Whether bounds are optional or not
required_initial_guess (bool):
Whether an initial guess is required or not.
required_initial_guess_optional (bool):
Whether an initial guess is optional or not.
"""
print("\n### Algorithm requirements ###")
# Check if the attributes exist and print them
if hasattr(self, "required_bvalues"):
print(f"Number of b-values: {self.required_bvalues}")
if hasattr(self, "required_thresholds"):
print(f"Numer of b-value thresholds [at least, at most]: {self.required_thresholds}")
if hasattr(self, "required_bounds") and hasattr(self, "required_bounds_optional"):
if self.required_bounds:
print(f"Bounds required: {self.required_bounds}")
else:
# Bounds may not be required but can be optional
if self.required_bounds_optional:
print(f"Bounds required: {self.required_bounds} but is optional")
else:
print(f"Bounds required: {self.required_bounds} and is not optional")
if hasattr(self, "required_initial_guess") and hasattr(self, "required_initial_guess_optional"):
if self.required_initial_guess:
print(f"Initial guess required: {self.required_initial_guess}")
else:
# Bounds may not be required but can be optional
if self.required_initial_guess_optional:
print(f"Initial guess required: {self.required_initial_guess} but is optional")
else:
print(f"Initial guess required: {self.required_initial_guess} and is not optional")
[docs]
def osipi_accepted_dimensions(self):
"""The array of accepted dimensions
e.g.
(1D, 2D, 3D, 4D, 5D, 6D)
(True, True, False, False, False, False)
"""
#return (False,) * 6
return True
[docs]
def osipi_accepts_dimension(self, dim):
"""Query if the selection dimension is fittable"""
#accepted = self.accepted_dimensions()
#if dim < 0 or dim > len(accepted):
#return False
#return accepted[dim]
return True
def osipi_check_required_bvalues(self):
"""Checks if the input bvalues fulfil the algorithm requirements"""
#if self.bvalues.size < self.required_bvalues:
#print("Conformance error: Number of b-values.")
#return False
#else:
#return True
return True
[docs]
def osipi_check_required_thresholds(self):
"""Checks if the number of input thresholds fulfil the algorithm requirements"""
#if (len(self.thresholds) < self.required_thresholds[0]) or (len(self.thresholds) > self.required_thresholds[1]):
#print("Conformance error: Number of thresholds.")
#return False
#else:
#return True
return True
[docs]
def osipi_check_required_bounds(self):
"""Checks if input bounds fulfil the algorithm requirements"""
#if self.required_bounds is True and self.bounds is None:
#print("Conformance error: Bounds.")
#return False
#else:
#return True
return True
[docs]
def osipi_check_required_initial_guess(self):
"""Checks if input initial guess fulfil the algorithm requirements"""
#if self.required_initial_guess is True and self.initial_guess is None:
#print("Conformance error: Initial guess")
#return False
#else:
#return True
return True
[docs]
def osipi_check_required_bvalues(self):
"""Minimum number of b-values required"""
pass
[docs]
def osipi_author(self):
"""Author identification"""
return ''
[docs]
def osipi_simple_bias_and_RMSE_test(self, SNR, bvalues, f, Dstar, D, noise_realizations=100):
# Generate signal
bvalues = np.asarray(bvalues)
signals = f*np.exp(-bvalues*Dstar) + (1-f)*np.exp(-bvalues*D)
f_estimates = np.zeros(noise_realizations)
Dstar_estimates = np.zeros(noise_realizations)
D_estimates = np.zeros(noise_realizations)
for i in range(noise_realizations):
# Add some noise
sigma = signals[0]/SNR
noised_signal = np.array([norm.rvs(signal, sigma) for signal in signals])
# Perform fit with the noised signal
# f_estimates[i], Dstar_estimates[i], D_estimates[i] = self.D_and_Ds_swap(self.ivim_fit(noised_signal, bvalues))
result = self.ivim_fit(noised_signal, bvalues)
f_estimates[i], Dstar_estimates[i], D_estimates[i] = result['f'], result['Dp'], result['D']
# Calculate bias
f_bias = np.mean(f_estimates) - f
Dstar_bias = np.mean(Dstar_estimates) - Dstar
D_bias = np.mean(D_estimates) - D
# Calculate RMSE
f_RMSE = np.sqrt(np.var(f_estimates) + f_bias**2)
Dstar_RMSE = np.sqrt(np.var(Dstar_estimates) + Dstar_bias**2)
D_RMSE = np.sqrt(np.var(D_estimates) + D_bias**2)
print(f"f bias: {f_bias:.4g} \nf RMSE: {f_RMSE:.4g}")
print(f"Dstar bias: {Dstar_bias:.4g}\nDstar RMSE: {Dstar_RMSE:.4g}")
print(f"D bias: {D_bias:.4g} \nD RMSE: {D_RMSE:.4g}")
[docs]
def D_and_Ds_swap(self,results):
if results['D']>results['Dp']:
D=results['Dp']
results['Dp']=results['D']
results['D']=D
results['f']=1-results['f']
return results