Source code for src.wrappers.OsipiBase

import numpy as np
import importlib
from scipy.stats import norm
import pathlib
import sys
import warnings
from tqdm import tqdm
from joblib import Parallel, delayed


[docs] class OsipiBase: """ Comprehensive base class for OSIPI IVIM fitting algorithms. The ``OsipiBase`` class defines a standardized framework for fitting intravoxel incoherent motion (IVIM) models to diffusion-weighted MRI data. It manages common attributes such as b-values, parameter bounds, thresholds, and initial guesses; provides requirement-checking utilities; and offers convenience methods for voxel-wise or full-volume fitting. Subclasses can implement algorithm-specific behavior while inheriting these shared tools. Parameters ---------- bvalues : array-like, optional Diffusion b-values (s/mm²) matching the last dimension of the input data. thresholds : array-like, optional Thresholds used by specific algorithms (e.g., signal cutoffs). bounds : dict, optional Parameter bounds for constrained optimization. Should be a dict with keys like "S0", "f", "Dp", "D" and values as [lower, upper] lists or arrays. E.g. {"S0" : [0.7, 1.3], "f" : [0, 1], "Dp" : [0.005, 0.2], "D" : [0, 0.005]}. initial_guess : dict or str, optional Initial parameter estimates for the IVIM fit. Can be: - A dict with keys like "S0", "f", "Dp", "D" and float values. E.g. {"S0" : 1, "f" : 0.1, "Dp" : 0.01, "D" : 0.001}. - A string naming a body part (e.g., "brain", "liver", "kidney"). The string is looked up in the body-part defaults table and replaced with the corresponding dict. If bounds are not provided, body-part-specific bounds are also applied. algorithm : str, optional Name of an algorithm module in ``src/standardized`` to load dynamically. If supplied, the instance is immediately converted to that algorithm’s subclass via :meth:`osipi_initiate_algorithm`. force_default_settings : bool, optional If bounds and initial guesses are not provided, the wrapper will set them to reasonable physical ones, i.e. {"S0":[0.7, 1.3], "f":[0, 1], "Dp":[0.005, 0.2], "D":[0, 0.005]}. To prevent this, set this bool to False. Default initial guess {"S0" : 1, "f": 0.1, "Dp": 0.01, "D": 0.001}. body_part : str, optional Name of the anatomical region being scanned (e.g., "brain", "liver", "kidney", "prostate", "pancreas", "head_and_neck", "breast", "placenta"). When provided, body-part-specific initial guesses, bounds, and thresholds are used as defaults instead of the generic ones. User-provided bounds/initial_guess always take priority. See :mod:`src.wrappers.ivim_body_part_defaults` for available body parts and their literature-sourced parameter values. **kwargs Additional keyword arguments forwarded to the selected algorithm’s initializer if ``algorithm`` is provided. Attributes ---------- bvalues, thresholds, bounds, initial_guess : np.ndarray or None Core fitting inputs stored as arrays when provided. use_bounds, use_initial_guess : bool Flags controlling whether bounds and initial guesses are applied. deep_learning, supervised, stochastic : bool Indicators for the algorithm type; subclasses may set these. result_keys : list of str, optional Names of the output parameters (e.g., ["f", "Dp", "D"]). required_bvalues, required_thresholds, required_bounds, required_bounds_optional, required_initial_guess, required_initial_guess_optional : various, optional Optional attributes used by requirement-checking helpers. Key Methods ----------- osipi_initiate_algorithm(algorithm, **kwargs) Dynamically replace the current instance with the specified algorithm subclass. osipi_fit(data, njobs=1, **kwargs) Voxel-wise IVIM fitting with optional parallel processing and automatic signal normalization. osipi_fit_full_volume(data, **kwargs) Full-volume fitting for algorithms that support it. osipi_print_requirements() Display algorithm requirements such as needed b-values or bounds. osipi_accepted_dimensions(), osipi_accepts_dimension(dim) Query acceptable input dimensionalities. osipi_check_required_*() Validate that provided inputs meet algorithm requirements. osipi_simple_bias_and_RMSE_test(SNR, f, Dstar, D, noise_realizations) Monte-Carlo bias/RMSE evaluation of the current fitting method. D_and_Ds_swap(results) Ensure consistency of D and D* estimates by swapping if necessary. Notes ----- * This class is typically used as a base or as a dynamic loader for a specific algorithm implementation. * Parallel voxel-wise fitting uses :mod:`joblib`. * Subclasses must implement algorithm-specific methods such as :meth:`ivim_fit` or :meth:`ivim_fit_full_volume`. Examples -------- # Dynamic algorithm loading model = OsipiBase(bvalues=[0, 50, 200, 800], algorithm="MyAlgorithm") # Voxel-wise fitting results = base.osipi_fit(dwi_data, njobs=4) f_map = results["f"] """ def __init__(self, bvalues=None, thresholds=None, bounds=None, initial_guess=None, algorithm=None, force_default_settings=True, body_part=None, **kwargs): from src.wrappers.ivim_body_part_defaults import get_body_part_defaults # If initial_guess is a string, treat it as a body part name if isinstance(initial_guess, str): body_part = initial_guess initial_guess = None # Define the attributes as numpy arrays only if they are not None self.bvalues = np.asarray(bvalues) if bvalues is not None else None self.thresholds = np.asarray(thresholds) if thresholds is not None else None self.bounds = bounds if bounds is not None else None self.initial_guess = initial_guess if initial_guess is not None else None self.use_bounds = {"f" : False, "D" : False, "Dp" : False, "S0" : False} # Default to False. These are more specifically set by each algorithm subclass self.use_initial_guess = {"f" : False, "D" : False, "Dp" : False, "S0" : False} # Default to False. These are more specifically set by each algorithm subclass self.deep_learning = False self.supervised = False self.stochastic = False self.body_part = body_part # Store for reference if force_default_settings: if body_part is not None: # Use body-part-specific defaults from the literature-sourced lookup table bp_defaults = get_body_part_defaults(body_part) if self.bounds is None: self.bounds = bp_defaults["bounds"] self.forced_default_bounds = True if self.initial_guess is None: self.initial_guess = bp_defaults["initial_guess"] self.forced_default_initial_guess = True if self.thresholds is None: self.thresholds = np.array(bp_defaults["thresholds"]) else: # Generic defaults (original behavior) if self.bounds is None: warnings.warn('No bounds were defined, so default bounds are used of [0, 0, 0.005, 0.7],[0.005, 1.0, 0.2, 1.3]', UserWarning, stacklevel=2) self.bounds = {"S0" : [0.7, 1.3], "f" : [0, 1.0], "Dp" : [0.005, 0.2], "D" : [0, 0.005]} # These are defined as [lower, upper] self.forced_default_bounds = True if self.initial_guess is None: warnings.warn('No initial guesses were defined, so default initial guesses are used of [0.001, 0.001, 0.01, 1]', UserWarning, stacklevel=2) self.initial_guess = {"S0" : 1, "f" : 0.1, "Dp" : 0.01, "D" : 0.001} self.forced_default_initial_guess = True if self.thresholds is None: self.thresholds = np.array([200]) self.osipi_bounds = self.bounds # Variable that stores the original bounds before they are passed to the algorithm self.osipi_initial_guess = self.initial_guess # Variable that stores the original initial guesses before they are passed to the algorithm self.id_ref = None # If the user inputs an algorithm to OsipiBase, it is intereprete as initiating # an algorithm object with that name. if algorithm: self.osipi_initiate_algorithm(algorithm, bvalues=self.bvalues, thresholds=self.thresholds, bounds=self.bounds, initial_guess=self.initial_guess, **kwargs)
[docs] def osipi_initiate_algorithm(self, algorithm, **kwargs): """Turns the class into a specified one by the input. This method can be used instead of specifically importing that class and initiating it. WIP: Add args and kwargs! Args: algorithm (string): The name of the algorithm, should be the same as the file in the src/standardized folder without the .py extension. Raises: ValueError: If the algorithm name does not correspond to any .py file in ``src/standardized/``. """ # Import the algorithm root_path = pathlib.Path(__file__).resolve().parents[2] if str(root_path) not in sys.path: raise RuntimeError( "Root folder not found in PYTHONPATH. " "Please ensure the project root is in sys.path." ) # ------------------------------------------------------------------ # Validate algorithm name against available modules in src/standardized/ # ------------------------------------------------------------------ standardized_dir = root_path / "src" / "standardized" available_algorithms = sorted( p.stem for p in standardized_dir.glob("*.py") if p.stem != "__init__" ) if algorithm not in available_algorithms: raise ValueError( f"Algorithm '{algorithm}' not found. " f"Available algorithms are:\n " + "\n ".join(available_algorithms) ) import_base_path = "src.standardized" import_path = import_base_path + "." + algorithm # Secondary safety net: catch import / attribute errors that could # occur if the file exists but is broken or missing the class. try: module = importlib.import_module(import_path) except ImportError as exc: raise ImportError( f"Failed to import module for algorithm '{algorithm}': {exc}" ) from exc try: algorithm_class = getattr(module, algorithm) except AttributeError as exc: raise AttributeError( f"Module '{import_path}' was imported but does not contain " f"a class named '{algorithm}'. " f"Available names: {[n for n in dir(module) if not n.startswith('_')]}" ) from exc # Change the class from OsipiBase to the specified algorithm self.__class__ = algorithm_class self.__init__(**kwargs)
[docs] def initialize(**kwargs): """Placeholder for subclass initialization""" pass
[docs] def osipi_fit(self, data, njobs=1, **kwargs): """ Fit multi-b-value diffusion MRI data using the IVIM model. This function normalizes the input signal to the minimum b-value and fits each voxel's signal to the IVIM model to estimate parameters such as perfusion fraction (f), pseudo-diffusion coefficient (D*), and tissue diffusion coefficient (D). Fits can be computed in parallel across voxels using multiple jobs. Parameters ---------- data : np.ndarray Multi-dimensional array containing the signal intensities. The last dimension must correspond to the b-values (diffusion weightings). njobs : int, optional, default=1 Number of parallel jobs to use for voxel-wise fitting. If `njobs` > 1, the fitting will be distributed across multiple processes. -1 will use all available cpus **kwargs : dict, optional Additional keyword arguments to be passed to the underlying `ivim_fit` function. Returns ------- results : dict of np.ndarray Dictionary containing voxel-wise parameter maps. Keys are parameter names ("f", "D", "Dp"), and values are arrays with the same shape as `data` excluding the last dimension. Notes ----- - The signal is normalized to the minimum b-value before fitting. - Handles NaN values by returning zeros for all parameters in those voxels. - Parallelization is handled using joblib's `Parallel` and `delayed`. - If `self.result_keys` is defined, it determines the output parameter names; otherwise, the default keys are ["f", "Dp", "D"]. - The method swaps D and D* values after fitting using `self.D_and_Ds_swap` to maintain consistency. Example ------- >>> results = instance.osipi_fit(data, njobs=4) >>> f_map = results['f'] >>> D_map = results['D'] """ # b-values must be provided at initialization, not at fit-time. if self.bvalues is None: raise ValueError( "b-values must be provided at initialization (__init__), not at fit-time. " "Pass bvalues to the constructor: OsipiBase(bvalues=..., algorithm=...)" ) # Check if there is an attribute that defines the result dictionary keys if hasattr(self, "result_keys"): # result_keys is a list of strings of parameter names, e.g. "S0", "f1", "f2", etc. result_keys = self.result_keys else: # Default is ["f", "Dp", "D"] self.result_keys = ["f", "Dp", "D"] results = {} for key in self.result_keys: results[key] = np.empty(list(data.shape[:-1])) # Assuming the last dimension of the data is the signal values of each b-value # results = np.empty(list(data.shape[:-1])+[3]) # Create an array with the voxel dimensions + the ones required for the fit #for ijk in tqdm(np.ndindex(data.shape[:-1]), total=np.prod(data.shape[:-1])): #args = [data[ijk], self.bvalues] #fit = list(self.ivim_fit(*args, **kwargs)) #results[ijk] = fit minimum_bvalue = np.min(self.bvalues) # We normalize the signal to the minimum bvalue. Should be 0 or very close to 0. b0_indices = np.where(self.bvalues == minimum_bvalue)[0] normalization_factor = np.mean(data[..., b0_indices],axis=-1) data = data / np.repeat(normalization_factor[...,np.newaxis],np.shape(data)[-1],-1) if np.shape(data.shape)[0] == 1: njobs=1 if data.shape[0] < njobs: njobs = 1 if njobs > 1: # Flatten the indices first all_indices = list(np.ndindex(data.shape[:-1])) #np.save('data.npy', data) #data = np.load('data.npy', mmap_mode='r') # Define parallel function def parfun(ijk): single_voxel_data = np.array(data[ijk], copy=True) if not np.isnan(single_voxel_data[0]): fit = self.ivim_fit(single_voxel_data, **kwargs) else: fit={'D':0,'f':0,'Dp':0} return ijk, fit # Run in parallel results_list = Parallel(n_jobs=njobs)( delayed(parfun)(ijk) for ijk in tqdm(all_indices, total=len(all_indices), mininterval=60) # updates every minute ) # Initialize result arrays if not already done # Example: results = {key: np.zeros(data.shape[:-1]) for key in expected_keys} # Populate results after parallel loop for ijk, fit in results_list: for key in fit: results[key][ijk] = fit[key] else: for ijk in tqdm(np.ndindex(data.shape[:-1]), total=np.prod(data.shape[:-1]), mininterval=60): # Normalize array single_voxel_data = data[ijk] if not np.isnan(single_voxel_data[0]): fit = self.ivim_fit(single_voxel_data, **kwargs) else: fit={'D':0,'f':0,'Dp':0} for key in list(fit.keys()): results[key][ijk] = fit[key] #self.parameter_estimates = self.ivim_fit(data, bvalues) return results
[docs] def osipi_fit_full_volume(self, data, **kwargs): """ Fit an entire volume of multi-b-value diffusion MRI data in a single call using the IVIM model. Unlike `osipi_fit`, which processes one voxel at a time, this method sends the full volume to the fitting algorithm. This can be more efficient for algorithms that support full-volume fitting. Parameters ---------- data : np.ndarray 2D (data x b-values), 3D (single slice), or 4D (multi-slice) diffusion-weighted imaging (DWI) data. The last dimension must correspond to the b-values. **kwargs : dict, optional Additional keyword arguments to be passed to `ivim_fit_full_volume`. Returns ------- results : dict of np.ndarray or bool Dictionary containing parametric maps for each IVIM parameter (keys from `self.result_keys`, or defaults ["f", "Dp", "D"]). Each value is an array matching the spatial dimensions of `data`. Returns `False` if full-volume fitting is not supported by the algorithm. Notes ----- - This method does not normalize the input signal to the minimum b-value, unlike `osipi_fit`. Example ------- # Standard usage: results = instance.osipi_fit_full_volume(data) f_map = results['f'] D_map = results['D'] Dp_map = results['Dp'] # If the algorithm does not support full-volume fitting: results = instance.osipi_fit_full_volume(data) if results is False: print("Full-volume fitting not supported.") """ try: # b-values must be provided at initialization, not at fit-time. if self.bvalues is None: raise ValueError( "b-values must be provided at initialization (__init__), not at fit-time. " "Pass bvalues to the constructor: OsipiBase(bvalues=..., algorithm=...)" ) # Check if there is an attribute that defines the result dictionary keys if hasattr(self, "result_keys"): # result_keys is a list of strings of parameter names, e.g. "S0", "f1", "f2", etc. result_keys = self.result_keys else: # Default is ["f", "Dp", "D"] self.result_keys = ["f", "Dp", "D"] # Create the results dictionary results = {} for key in self.result_keys: results[key] = np.empty(list(data.shape[:-1])) # no normalisation as volume algorithms may not want normalized signals... fit = self.ivim_fit_full_volume(data, **kwargs) # Assume this is a dict with an array per key representing the parametric maps for key in list(fit.keys()): results[key] = fit[key] return results except Exception as e: # Check if the problem is that full volume fitting is simply not supported in the standardized implementation if not hasattr(self, "ivim_fit_full_volume"): #and callable(getattr(self, "ivim_fit_full_volume")): print("Full volume fitting not supported for this algorithm") else: print(f"Full volume fitting failed: {type(e).__name__}: {e}") return False
[docs] def osipi_print_requirements(self): """ Prints the requirements of the algorithm. Attributes that are currently checked for are: required_bvalues (int): The lowest number of b-values required. required_thresholds (array-like) 1D array-like of two ints [least number of b-value thresholds, most number of b-value_thresholds]. required_bounds (bool): Whether bounds are required or not. required_bounds_optional (bool): Whether bounds are optional or not required_initial_guess (bool): Whether an initial guess is required or not. required_initial_guess_optional (bool): Whether an initial guess is optional or not. """ print("\n### Algorithm requirements ###") # Check if the attributes exist and print them if hasattr(self, "required_bvalues"): print(f"Number of b-values: {self.required_bvalues}") if hasattr(self, "required_thresholds"): print(f"Numer of b-value thresholds [at least, at most]: {self.required_thresholds}") if hasattr(self, "required_bounds") and hasattr(self, "required_bounds_optional"): if self.required_bounds: print(f"Bounds required: {self.required_bounds}") else: # Bounds may not be required but can be optional if self.required_bounds_optional: print(f"Bounds required: {self.required_bounds} but is optional") else: print(f"Bounds required: {self.required_bounds} and is not optional") if hasattr(self, "required_initial_guess") and hasattr(self, "required_initial_guess_optional"): if self.required_initial_guess: print(f"Initial guess required: {self.required_initial_guess}") else: # Bounds may not be required but can be optional if self.required_initial_guess_optional: print(f"Initial guess required: {self.required_initial_guess} but is optional") else: print(f"Initial guess required: {self.required_initial_guess} and is not optional")
[docs] def osipi_accepted_dimensions(self): """The array of accepted dimensions e.g. (1D, 2D, 3D, 4D, 5D, 6D) (True, True, False, False, False, False) """ #return (False,) * 6 return True
[docs] def osipi_accepts_dimension(self, dim): """Query if the selection dimension is fittable""" #accepted = self.accepted_dimensions() #if dim < 0 or dim > len(accepted): #return False #return accepted[dim] return True
def osipi_check_required_bvalues(self): """Checks if the input bvalues fulfil the algorithm requirements""" #if self.bvalues.size < self.required_bvalues: #print("Conformance error: Number of b-values.") #return False #else: #return True return True
[docs] def osipi_check_required_thresholds(self): """Checks if the number of input thresholds fulfil the algorithm requirements""" #if (len(self.thresholds) < self.required_thresholds[0]) or (len(self.thresholds) > self.required_thresholds[1]): #print("Conformance error: Number of thresholds.") #return False #else: #return True return True
[docs] def osipi_check_required_bounds(self): """Checks if input bounds fulfil the algorithm requirements""" #if self.required_bounds is True and self.bounds is None: #print("Conformance error: Bounds.") #return False #else: #return True return True
[docs] def osipi_check_required_initial_guess(self): """Checks if input initial guess fulfil the algorithm requirements""" #if self.required_initial_guess is True and self.initial_guess is None: #print("Conformance error: Initial guess") #return False #else: #return True return True
[docs] def osipi_check_required_bvalues(self): """Minimum number of b-values required""" pass
[docs] def osipi_author(self): """Author identification""" return ''
[docs] def osipi_simple_bias_and_RMSE_test(self, SNR, f, Dstar, D, noise_realizations=100): # Generate signal using b-values from initialization signals = f*np.exp(-self.bvalues*Dstar) + (1-f)*np.exp(-self.bvalues*D) f_estimates = np.zeros(noise_realizations) Dstar_estimates = np.zeros(noise_realizations) D_estimates = np.zeros(noise_realizations) for i in range(noise_realizations): # Add some noise sigma = signals[0]/SNR noised_signal = np.array([norm.rvs(signal, sigma) for signal in signals]) # Perform fit with the noised signal # f_estimates[i], Dstar_estimates[i], D_estimates[i] = self.D_and_Ds_swap(self.ivim_fit(noised_signal, bvalues)) result = self.ivim_fit(noised_signal) f_estimates[i], Dstar_estimates[i], D_estimates[i] = result['f'], result['Dp'], result['D'] # Calculate bias f_bias = np.mean(f_estimates) - f Dstar_bias = np.mean(Dstar_estimates) - Dstar D_bias = np.mean(D_estimates) - D # Calculate RMSE f_RMSE = np.sqrt(np.var(f_estimates) + f_bias**2) Dstar_RMSE = np.sqrt(np.var(Dstar_estimates) + Dstar_bias**2) D_RMSE = np.sqrt(np.var(D_estimates) + D_bias**2) print(f"f bias: {f_bias:.4g} \nf RMSE: {f_RMSE:.4g}") print(f"Dstar bias: {Dstar_bias:.4g}\nDstar RMSE: {Dstar_RMSE:.4g}") print(f"D bias: {D_bias:.4g} \nD RMSE: {D_RMSE:.4g}")
[docs] def D_and_Ds_swap(self,results): if results['D']>results['Dp'] and results['Dp'] < 0.05: D=results['Dp'] results['Dp']=results['D'] results['D']=D results['f']=1-results['f'] return results
[docs] def cite(self): print("thank you for using our repository. If it was useful for your paper, please cite our upcoming paper in MRM.") if self.id_ref is not None: print("The current fit method you have loaded should be cited with: " + self.id_ref) else: print("The used fit code has not yet provided a code-speicific reference.")