Source code for src.original.fitting.OGC_AmsterdamUMC.LSQ_fitting

"""
September 2020 by Oliver Gurney-Champion
oliver.gurney.champion@gmail.com / o.j.gurney-champion@amsterdamumc.nl
https://www.github.com/ochampion

requirements:
numpy
tqdm
matplotlib
scipy
joblib
"""

# load relevant libraries
from scipy.optimize import curve_fit, minimize
import numpy as np
from scipy import stats
from joblib import Parallel, delayed
import sys
if sys.stderr.isatty():
    from tqdm import tqdm
else:
[docs] def tqdm(iterable, **kwargs): return iterable
import warnings
[docs] def ivimN(bvalues, Dt, Fp, Dp, S0): # IVIM function in which we try to have equal variance in the different IVIM parameters; equal variance helps with certain fitting algorithms return S0 * ivimN_noS0(bvalues, Dt, Fp, Dp)
[docs] def ivimN_noS0(bvalues, Dt, Fp, Dp): # IVIM function in which we try to have equal variance in the different IVIM parameters and S0=1 return (Fp / 10 * np.exp(-bvalues * Dp / 10) + (1 - Fp / 10) * np.exp(-bvalues * Dt / 1000))
[docs] def ivim(bvalues, Dt, Fp, Dp, S0): # regular IVIM function return (S0 * (Fp * np.exp(-bvalues * Dp) + (1 - Fp) * np.exp(-bvalues * Dt)))
[docs] def tri_expN(bvalues, Fp0, Dt, Fp1, Dp1, Fp2, Dp2): # tri-exp function in which we try to have equal variance in the different IVIM parameters; equal variance helps with certain fitting algorithms return (Fp1 / 10 * np.exp(-bvalues * Dp1 / 100) + Fp2 / 10 * np.exp(-bvalues * Dp2 / 10) + (Fp0 / 10) * np.exp(-bvalues * Dt / 1000))
[docs] def tri_expN_noS0(bvalues, Dt, Fp1, Dp1, Fp2, Dp2): # tri-exp function in which we try to have equal variance in the different IVIM parameters and S0=1 return (Fp1 / 10 * np.exp(-bvalues * Dp1 / 100) + Fp2 / 10 * np.exp(-bvalues * Dp2 / 10) + (1 - Fp1 / 10 - Fp2 / 10) * np.exp(-bvalues * Dt / 1000))
[docs] def tri_exp(bvalues, Fp0, Dt, Fp1, Dp1, Fp2, Dp2): # tri-exp function in which we try to have equal variance in the different IVIM parameters; equal variance helps with certain fitting algorithms return (Fp1 * np.exp(-bvalues * Dp1) + Fp2 * np.exp(-bvalues * Dp2) + (Fp0) * np.exp(-bvalues * Dt))
[docs] def order(Dt, Fp, Dp, S0=None): # function to reorder D* and D in case they were swapped during unconstraint fitting. Forces D* > D (Dp>Dt) if Dp < Dt: Dp, Dt = Dt, Dp Fp = 1 - Fp if S0 is None: return Dt, Fp, Dp else: return Dt, Fp, Dp, S0
[docs] def fit_segmented_array(bvalues, dw_data, njobs=4, bounds=([0, 0, 0.005],[0.005, 0.7, 0.2]), cutoff=75,p0=[0.001, 0.1, 0.01,1]): """ This is an implementation of the segmented fit, in which we first estimate D using a curve fit to b-values>cutoff; then estimate f from the fitted S0 and the measured S0 and finally estimate D* while fixing D and f. This fit is done on an array. :param bvalues: 1D Array with the b-values :param dw_data: 2D Array with diffusion-weighted signal in different voxels at different b-values :param njobs: Integer determining the number of parallel processes; default = 4 :param bounds: 2D Array with fit bounds ([Dtmin, Fpmin, Dpmin, S0min],[Dtmax, Fpmax, Dpmax, S0max]). Default: ([0.005, 0, 0, 0.8], [0.2, 0.7, 0.005, 1.2]) :param cutoff: cutoff value for determining which data is taken along in fitting D :return Dt: 1D Array with D in each voxel :return Fp: 1D Array with f in each voxel :return Dp: 1D Array with Dp in each voxel :return S0: 1D Array with S0 in each voxel """ # first we normalise the signal to S0 S0 = np.mean(dw_data[:, bvalues == 0], axis=1) dw_data = dw_data / S0[:, None] # here we try parallel computing, but if fails, go back to computing one single core. single = False if njobs > 2: try: # define the parallel function def parfun(i): return fit_segmented(bvalues, dw_data[i, :], bounds=bounds, cutoff=cutoff,p0=p0) output = Parallel(n_jobs=njobs)(delayed(parfun)(i) for i in tqdm(range(len(dw_data)), position=0, leave=True)) Dt, Fp, Dp = np.transpose(output) except Exception: # if fails, retry using single core single = True else: # or, if specified, immediately go to single core single = True if single: # initialize empty arrays Dp = np.zeros(len(dw_data)) Dt = np.zeros(len(dw_data)) Fp = np.zeros(len(dw_data)) for i in tqdm(range(len(dw_data)), position=0, leave=True): # fill arrays with fit results on a per voxel base: Dt[i], Fp[i], Dp[i] = fit_segmented(bvalues, dw_data[i, :], bounds=bounds, cutoff=cutoff,p0=p0) return [Dt, Fp, Dp, S0]
[docs] def fit_segmented(bvalues, dw_data, bounds=([0, 0, 0.005],[0.005, 0.7, 0.2]), cutoff=75,p0=[0.001, 0.1, 0.01,1]): """ This is an implementation of the segmented fit, in which we first estimate D using a curve fit to b-values>cutoff; then estimate f from the fitted S0 and the measured S0 and finally estimate D* while fixing D and f. :param bvalues: Array with the b-values :param dw_data: Array with diffusion-weighted signal at different b-values :param bounds: Array with fit bounds ([Dtmin, Fpmin, Dpmin, S0min],[Dtmax, Fpmax, Dpmax, S0max]). Default: ([0.005, 0, 0, 0.8], [0.2, 0.7, 0.005, 1.2]) :param cutoff: cutoff value for determining which data is taken along in fitting D :return Dt: Fitted D :return Fp: Fitted f :return Dp: Fitted Dp :return S0: Fitted S0 """ try: # determine high b-values and data for D dw_data=dw_data/np.mean(dw_data[bvalues==0]) high_b = bvalues[bvalues >= cutoff] high_dw_data = dw_data[bvalues >= cutoff] # correct the bounds. Note that S0 bounds determine the max and min of f assert bounds[0][1] < p0[1] assert bounds[1][1] > p0[1] bounds1 = ([bounds[0][0], 0], [bounds[1][0], 10000000000]) params, _ = curve_fit(lambda b, Dt, int: int * np.exp(-b * Dt ), high_b, high_dw_data, p0=(p0[0], p0[3]-p0[1]), bounds=bounds1) Dt, Fp = params[0], 1 - params[1] if Fp < bounds[0][1] : Fp = np.float64(bounds[0][1]) if Fp > bounds[1][1] : Fp = np.float64(bounds[1][1]) # remove the diffusion part to only keep the pseudo-diffusion dw_data_remaining = dw_data - (1 - Fp) * np.exp(-bvalues * Dt) bounds2 = (bounds[0][2], bounds[1][2]) # fit for D* params, _ = curve_fit(lambda b, Dp: Fp * np.exp(-b * Dp), bvalues, dw_data_remaining, p0=(p0[2]), bounds=bounds2) Dp = params[0] return Dt, Fp, Dp except (RuntimeError, ValueError, FloatingPointError): # if fit fails, return zeros # print('segmented fit failed') return 0., 0., 0.
[docs] def fit_least_squares_array(bvalues, dw_data, S0_output=True, fitS0=True, njobs=4, bounds=([0, 0, 0.005, 0.7],[0.005, 0.7, 0.2, 1.3]),p0=[0.001, 0.1, 0.01, 1]): """ This is an implementation of the conventional IVIM fit. It is fitted in array form. :param bvalues: 1D Array with the b-values :param dw_data: 2D Array with diffusion-weighted signal in different voxels at different b-values :param S0_output: Boolean determining whether to output (often a dummy) variable S0; default = True :param fix_S0: Boolean determining whether to fix S0 to 1; default = False :param njobs: Integer determining the number of parallel processes; default = 4 :param bounds: Array with fit bounds ([Dtmin, Fpmin, Dpmin, S0min],[Dtmax, Fpmax, Dpmax, S0max]). Default: ([0.005, 0, 0, 0.8], [0.2, 0.7, 0.005, 1.2]) :return Dt: 1D Array with D in each voxel :return Fp: 1D Array with f in each voxel :return Dp: 1D Array with Dp in each voxel :return S0: 1D Array with S0 in each voxel """ # normalise the data to S(value=0) S0 = np.mean(dw_data[:, bvalues == 0], axis=1) dw_data = dw_data / S0[:, None] single = False # split up on whether we want S0 as output if S0_output: # check if parallel is desired if njobs > 1: try: # defining parallel function def parfun(i): return fit_least_squares(bvalues, dw_data[i, :], S0_output=S0_output, fitS0=fitS0, bounds=bounds,p0=p0) output = Parallel(n_jobs=njobs)(delayed(parfun)(i) for i in tqdm(range(len(dw_data)), position=0, leave=True)) Dt, Fp, Dp, S0 = np.transpose(output) except Exception: single = True else: single = True if single: # run on single core, instead. Defining empty arrays Dp = np.zeros(len(dw_data)) Dt = np.zeros(len(dw_data)) Fp = np.zeros(len(dw_data)) S0 = np.zeros(len(dw_data)) # running in a single loop and filling arrays for i in tqdm(range(len(dw_data)), position=0, leave=True): Dt[i], Fp[i], Dp[i], S0[i] = fit_least_squares(bvalues, dw_data[i, :], S0_output=S0_output, fitS0=fitS0, bounds=bounds,p0=p0) return [Dt, Fp, Dp, S0] else: # if S0 is not exported if njobs > 1: try: def parfun(i): return fit_least_squares(bvalues, dw_data[i, :], fitS0=fitS0, bounds=bounds,p0=p0) output = Parallel(n_jobs=njobs)(delayed(parfun)(i) for i in tqdm(range(len(dw_data)), position=0, leave=True)) Dt, Fp, Dp = np.transpose(output) except Exception: single = True else: single = True if single: Dp = np.zeros(len(dw_data)) Dt = np.zeros(len(dw_data)) Fp = np.zeros(len(dw_data)) for i in range(len(dw_data)): Dt[i], Fp[i], Dp[i] = fit_least_squares(bvalues, dw_data[i, :], S0_output=S0_output, fitS0=fitS0, bounds=bounds,p0=p0) return [Dt, Fp, Dp]
[docs] def fit_least_squares(bvalues, dw_data, S0_output=False, fitS0=True, bounds=([0, 0, 0.005, 0.7],[0.005, 0.7, 0.2, 1.3]), p0=[0.001, 0.1, 0.01, 1]): """ This is an implementation of the conventional IVIM fit. It fits a single curve :param bvalues: Array with the b-values :param dw_data: Array with diffusion-weighted signal at different b-values :param S0_output: Boolean determining whether to output (often a dummy) variable S0; default = True :param fix_S0: Boolean determining whether to fix S0 to 1; default = False :param bounds: Array with fit bounds ([Dtmin, Fpmin, Dpmin, S0min],[Dtmax, Fpmax, Dpmax, S0max]). Default: ([0.005, 0, 0, 0.8], [0.2, 0.7, 0.005, 1.2]) :return Dt: Array with D in each voxel :return Fp: Array with f in each voxel :return Dp: Array with Dp in each voxel :return S0: Array with S0 in each voxel """ try: if not fitS0: # bounds are rescaled such that each parameter changes at roughly the same rate to help fitting. bounds2 = ([bounds[0][0] * 1000, bounds[0][1] * 10, bounds[0][2] * 10], [bounds[1][0] * 1000, bounds[1][1] * 10, bounds[1][2] * 10]) p1=[p0[0]*1000,p0[1]*10,p0[2]*10] params, _ = curve_fit(ivimN_noS0, bvalues, dw_data, p0=p1, bounds=bounds2) S0 = 1 else: # bounds are rescaled such that each parameter changes at roughly the same rate to help fitting. bounds2 = ([bounds[0][0] * 1000, bounds[0][1] * 10, bounds[0][2] * 10, bounds[0][3]], [bounds[1][0] * 1000, bounds[1][1] * 10, bounds[1][2] * 10, bounds[1][3]]) p1=[p0[0]*1000,p0[1]*10,p0[2]*10,p0[3]] params, _ = curve_fit(ivimN, bvalues, dw_data, p0=p1, bounds=bounds2) S0 = params[3] # correct for the rescaling of parameters Dt, Fp, Dp = params[0] / 1000, params[1] / 10, params[2] / 10 # reorder output in case Dp<Dt if S0_output: return order(Dt, Fp, Dp, S0) else: return order(Dt, Fp, Dp) except (RuntimeError, ValueError, FloatingPointError): # if fit fails, then do a segmented fit instead # print('lsq fit failed, trying segmented') if S0_output: Dt, Fp, Dp = fit_segmented(bvalues, dw_data, bounds=bounds,p0=p0) return Dt, Fp, Dp, 1 else: return fit_segmented(bvalues, dw_data, bounds=bounds,p0=p0)
[docs] def fit_least_squares_array_tri_exp(bvalues, dw_data, S0_output=True, fitS0=True, njobs=4, bounds=([0, 0, 0, 0.005, 0, 0.06], [2.5, 0.005, 1, 0.06, 1, 0.5])): """ This is an implementation of a tri-exponential fit. It is fitted in array form. :param bvalues: 1D Array with the b-values :param dw_data: 2D Array with diffusion-weighted signal in different voxels at different b-values :param S0_output: Boolean determining whether to output (often a dummy) variable S0; default = True :param fix_S0: Boolean determining whether to fix S0 to 1; default = False :param njobs: Integer determining the number of parallel processes; default = 4 :param bounds: Array with fit bounds ([fp0min, Dtmin, Fp1min, Dp1min, Fp2min, Dp2min],[fp0max, Dtmax, Fp1max, Dp1max, Fp2max, Dp2max]). Default: ([0, 0, 0, 0.005, 0, 0.06], [2.5, 0.005, 1, 0.06, 1, 0.5]) :return S0: optional 1D Array with S0 in each voxel :return Dt: 1D Array with D in each voxel :return Fp1: 1D Array with Fp1 in each voxel :return Dp1: 1D Array with Dp1 in each voxel :return Fp2: 1D Array with Fp2 in each voxel :return Dp2: 1D Array with Dp2 in each voxel """ # normalise the data to S(value=0) S0 = np.mean(dw_data[:, bvalues == 0], axis=1) dw_data = dw_data / S0[:, None] single = False # check if parallel is desired if njobs > 1: try: # defining parallel function def parfun(i): return fit_least_squares_tri_exp(bvalues, dw_data[i, :], S0_output=S0_output, fitS0=fitS0, bounds=bounds) output = Parallel(n_jobs=njobs)(delayed(parfun)(i) for i in tqdm(range(len(dw_data)), position=0, leave=True)) Fp0, Dt, Fp1, Dp1, Fp2, Dp2 = np.transpose(output) except Exception: single = True else: single = True if single: # run on single core, instead. Defining empty arrays Dp1 = np.zeros(len(dw_data)) Dt = np.zeros(len(dw_data)) Fp1 = np.zeros(len(dw_data)) Fp0 = np.zeros(len(dw_data)) Fp2 = np.zeros(len(dw_data)) Dp2 = np.zeros(len(dw_data)) # running in a single loop and filling arrays for i in tqdm(range(len(dw_data)), position=0, leave=True): Fp0[i], Dt[i], Fp1[i], Dp1[i], Fp2[i], Dp2[i] = fit_least_squares_tri_exp(bvalues, dw_data[i, :], S0_output=S0_output, fitS0=fitS0, bounds=bounds) if S0_output: return [Fp0+Fp1+Fp2, Dt, Fp1/(Fp0+Fp1+Fp2), Dp1, Fp2/(Fp0+Fp1+Fp2), Dp2] else: return [Dt, Fp1/(Fp0+Fp1+Fp2), Dp1, Fp2/(Fp0+Fp1+Fp2), Dp2]
[docs] def fit_least_squares_tri_exp(bvalues, dw_data, S0_output=False, fitS0=True, bounds=([0, 0, 0, 0.005, 0, 0.06], [2.5, 0.005, 1, 0.06, 1, 0.5])): """ This is an implementation of the tri-exponential fit. It fits a single curve :param bvalues: 1D Array with the b-values :param dw_data: 2D Array with diffusion-weighted signal in different voxels at different b-values :param S0_output: Boolean determining whether to output (often a dummy) variable S0; default = True :param fix_S0: Boolean determining whether to fix S0 to 1; default = False :param bounds: Array with fit bounds ([fp0min, Dtmin, Fp1min, Dp1min, Fp2min, Dp2min],[fp0max, Dtmax, Fp1max, Dp1max, Fp2max, Dp2max]). Default: ([0, 0, 0, 0.005, 0, 0.06], [2.5, 0.005, 1, 0.06, 1, 0.5]) :return Fp0: optional 1D Array with f0 in each voxel :return Dt: 1D Array with D in each voxel :return Fp1: 1D Array with fp1 in each voxel :return Dp1: 1D Array with Dp1 in each voxel :return Fp2: 1D Array with the fraciton of signal for Dp2 in each voxel :return Dp2: 1D Array with Dp2 in each voxel """ try: if not fitS0: # bounds are rescaled such that each parameter changes at roughly the same rate to help fitting. bounds = ([bounds[0][1] * 1000, bounds[0][2] * 10, bounds[0][3] * 100, bounds[0][4] * 10, bounds[0][5] * 10], [bounds[1][1] * 1000, bounds[1][2] * 10, bounds[1][3] * 100, bounds[1][4] * 10, bounds[1][5] * 10]) params, _ = curve_fit(tri_expN_noS0, bvalues, dw_data, p0=[1.5, 1, 3, 1, 1.5], bounds=bounds) Fp0 = 1 - params[1] / 10 - params[3] / 10 Dt, Fp1, Dp1, Fp2, Dp2 = params[0] / 1000, params[1] / 10, params[2] / 100, params[3] / 10, params[4] / 10 else: # bounds are rescaled such that each parameter changes at roughly the same rate to help fitting. bounds = ([bounds[0][0] * 10, bounds[0][1] * 1000, bounds[0][2] * 10, bounds[0][3] * 100, bounds[0][4] * 10, bounds[0][5] * 10], [bounds[1][0] * 10, bounds[1][1] * 1000, bounds[1][2] * 10, bounds[1][3] * 100, bounds[1][4] * 10, bounds[1][5] * 10]) params, _ = curve_fit(tri_expN, bvalues, dw_data, p0=[8, 1.0, 1, 3, 1, 1.5], bounds=bounds) Fp0 = params[0]/10 Dt, Fp1, Dp1, Fp2, Dp2 = params[1] / 1000, params[2] / 10, params[3] / 100, params[4] / 10, params[5] / 10 # correct for the rescaling of parameters # reorder output in case Dp<Dt if S0_output: #Dt, Fp, Dp, Fp2, Dp2 = order_tri(Dt, Fp, Dp, Fp2, Dp2) return Fp0, Dt, Fp1, Dp1, Fp2, Dp2 else: return Dt, Fp1, Dp1, Fp2, Dp2 except (RuntimeError, ValueError, FloatingPointError): # if fit fails, then do a segmented fit instead # print('lsq fit failed, trying segmented') if S0_output: return 0, 0, 0, 0, 0, 0 else: return 0, 0, 0, 0, 0
[docs] def fit_segmented_array_tri_exp(bvalues, dw_data, njobs=4, bounds=([0, 0, 0, 0.005, 0, 0.06], [2.5, 0.005, 1, 0.06, 1, 0.5]), cutoff=[15, 120]): """ This is an implementation of the segmented fit for a tri-exp model, in which we first estimate D using a curve fit to b-values>cutoff; then estimate f from the fitted S0 and the measured S0 and finally estimate D* while fixing D and f. This fit is done on an array. :param bvalues: 1D Array with the b-values :param dw_data: 2D Array with diffusion-weighted signal in different voxels at different b-values :param njobs: Integer determining the number of parallel processes; default = 4 :param bounds: Array with fit bounds ([fp0min, Dtmin, Fp1min, Dp1min, Fp2min, Dp2min],[fp0max, Dtmax, Fp1max, Dp1max, Fp2max, Dp2max]). Default: ([0, 0, 0, 0.005, 0, 0.06], [2.5, 0.005, 1, 0.06, 1, 0.5]) :param cutoff: 2 cutoff values for determining which data is taken along in fitting D, and subsequently D* and F :return S0: 1D Array with S0 in each voxel :return Dt: 1D Array with D in each voxel :return Fp1: 1D Array with Fp1 in each voxel :return Dp1: 1D Array with Dp1 in each voxel :return Fp2: 1D Array with Fp2 in each voxel :return Dp2: 1D Array with Dp2 in each voxel """ # first we normalise the signal to S0 S0 = np.mean(dw_data[:, bvalues == 0], axis=1) dw_data = dw_data / S0[:, None] # here we try parallel computing, but if fails, go back to computing one single core. single = False if njobs > 2: try: # define the parallel function def parfun(i): return fit_segmented(bvalues, dw_data[i, :], bounds=bounds, cutoff=cutoff) output = Parallel(n_jobs=njobs)(delayed(parfun)(i) for i in tqdm(range(len(dw_data)), position=0, leave=True)) Dt, Fp, Dp, Fp0, Fp2, Dp2 = np.transpose(output) except Exception: # if fails, retry using single core single = True else: # or, if specified, immediately go to single core single = True if single: # initialize empty arrays Dp1 = np.zeros(len(dw_data)) Dt = np.zeros(len(dw_data)) Fp0 = np.zeros(len(dw_data)) Fp1 = np.zeros(len(dw_data)) Dp2 = np.zeros(len(dw_data)) Fp2 = np.zeros(len(dw_data)) for i in tqdm(range(len(dw_data)), position=0, leave=True): # fill arrays with fit results on a per voxel base: Fp0[i], Dt[i], Fp1[i], Dp1[i], Fp2[i], Dp2[i] = fit_segmented_tri_exp(bvalues, dw_data[i, :], bounds=bounds, cutoff=cutoff) return [Fp0+Fp1+Fp2, Dt, Fp1/(Fp0+Fp1+Fp2), Dp1, Fp2/(Fp0+Fp1+Fp2), Dp2]
[docs] def fit_segmented_tri_exp(bvalues, dw_data, bounds=([0, 0, 0, 0.005, 0, 0.06], [2.5, 0.005, 1, 0.06, 1, 0.5]), cutoff=[15, 120]): """ This is an implementation of the segmented fit, in which we first estimate D using a curve fit to b-values>cutoff; then estimate f from the fitted S0 and the measured S0 and finally estimate D* while fixing D and f. :param bvalues: Array with the b-values :param dw_data: Array with diffusion-weighted signal at different b-values :param bounds: Array with fit bounds ([fp0min, Dtmin, Fp1min, Dp1min, Fp2min, Dp2min],[fp0max, Dtmax, Fp1max, Dp1max, Fp2max, Dp2max]). Default: ([0, 0, 0, 0.005, 0, 0.06], [2.5, 0.005, 1, 0.06, 1, 0.5]) :param cutoff: 2 cutoff values for determining which data is taken along in fitting D, and subsequently D* and F :return Fp0: 1D Array with Fp1 in each voxel :return Dt: Fitted D :return Fp1: Fitted f :return Dp1: Fitted Dp :return Fp2: Fitted Fp2 :return Dp2: Fitted Dp2 """ try: # determine high b-values and data for D high_b = bvalues[bvalues >= cutoff[1]] high_dw_data = dw_data[bvalues >= cutoff[1]] bounds1 = ([bounds[0][1] * 1000., 0], [bounds[1][1] * 1000., 1]) # fit for S0' and D params, _ = curve_fit(lambda b, Dt, int: int * np.exp(-b * Dt / 1000), high_b, high_dw_data, p0=(1, 1), bounds=bounds1) Dt, Fp0 = params[0] / 1000, params[1] # remove the diffusion part to only keep the pseudo-diffusion dw_data = dw_data - Fp0 * np.exp(-bvalues * Dt) # for another round: high_b = bvalues[bvalues >= cutoff[0]] high_dw_data = dw_data[bvalues >= cutoff[0]] high_b2 = high_b[high_b <= cutoff[1]*1.5] high_dw_data = high_dw_data[high_b <= cutoff[1]*1.5] bounds1 = ([bounds[0][3] * 10., bounds[0][2]], [bounds[1][3] * 10., bounds[1][2]]) # fit for f0' and Dp1 params, _ = curve_fit(lambda b, Dt, int: int * np.exp(-b * Dt / 10), high_b2, high_dw_data, p0=(0.1, min(0.1)), bounds=bounds1) Dp, Fp = params[0] / 10, params[1] # remove the diffusion part to only keep the pseudo-diffusion dw_data = dw_data - Fp * np.exp(-bvalues * Dp) dw_data = dw_data[bvalues <= cutoff[0]*2] bvalueslow = bvalues[bvalues <= cutoff[0]*2] bounds1 = (bounds[0][5], bounds[1][5]) # fit for D* Fp2 = 1 - Fp0 - Fp params, _ = curve_fit(lambda b, Dp: Fp2 * np.exp(-b * Dp), bvalueslow, dw_data, p0=(0.1), bounds=bounds1) Dp2 = params[0] return Fp0, Dt, Fp, Dp, Fp2, Dp2 except (RuntimeError, ValueError, FloatingPointError, TypeError): # if fit fails, return zeros # print('segnetned fit failed') return 0., 0., 0., 0., 0., 0.
[docs] def neg_log_likelihood(p, bvalues, dw_data): """ This function determines the negative of the log of the likelihood of parameters p, given the data dw_data for the Bayesian fit :param p: 1D Array with the estimates of D, f, D* and (optionally) S0 :param bvalues: 1D array with b-values :param dw_data: 1D Array diffusion-weighted data :returns: the log-likelihood of the parameters given the data """ if len(p) == 4: return 0.5 * (len(bvalues) + 1) * np.log( np.sum((ivim(bvalues, p[0], p[1], p[2], p[3]) - dw_data) ** 2)) # 0.5*sum simplified else: return 0.5 * (len(bvalues) + 1) * np.log( np.sum((ivim(bvalues, p[0], p[1], p[2], 1) - dw_data) ** 2)) # 0.5*sum simplified
[docs] def empirical_neg_log_prior(Dt0, Fp0, Dp0, S00=None): """ This function determines the negative of the log of the empirical prior probability of the IVIM parameters :param Dt0: 1D Array with the initial D estimates :param Dt0: 1D Array with the initial f estimates :param Dt0: 1D Array with the initial D* estimates :param Dt0: 1D Array with the initial S0 estimates (optional) """ # Dp0, Dt0, Fp0 are flattened arrays # only take valid voxels along, in which the initial estimates were sensible and successful Dp_valid = (1e-8 < np.nan_to_num(Dp0)) & (np.nan_to_num(Dp0) < 1 - 1e-8) Dt_valid = (1e-8 < np.nan_to_num(Dt0)) & (np.nan_to_num(Dt0) < 1 - 1e-8) Fp_valid = (1e-8 < np.nan_to_num(Fp0)) & (np.nan_to_num(Fp0) < 1 - 1e-8) # determine whether we fit S0 if S00 is not None: S0_valid = (1e-8 < np.nan_to_num(S00)) & (np.nan_to_num(S00) < 2 - 1e-8) valid = Dp_valid & Dt_valid & Fp_valid & S0_valid Dp0, Dt0, Fp0, S00 = Dp0[valid], Dt0[valid], Fp0[valid], S00[valid] else: valid = Dp_valid & Dt_valid & Fp_valid Dp0, Dt0, Fp0 = Dp0[valid], Dt0[valid], Fp0[valid] # determine prior's shape. Note that D, D* and S0 are shaped as lognorm distributions whereas f is a beta distribution Dp_shape, _, Dp_scale = stats.lognorm.fit(Dp0, floc=0) Dt_shape, _, Dt_scale = stats.lognorm.fit(Dt0, floc=0) Fp_a, Fp_b, _, _ = stats.beta.fit(Fp0, floc=0, fscale=1) if S00 is not None: S0_a, S0_b, _, _ = stats.beta.fit(S00, floc=0, fscale=2) # define the prior def neg_log_prior(p): # depends on whether S0 is fitted or not if len(p) == 4: Dt, Fp, Dp, S0 = p[0], p[1], p[2], p[3] else: Dt, Fp, Dp = p[0], p[1], p[2] # make D*<D very unlikely if (Dp < Dt): return 1e8 else: eps = 1e-8 Dp_prior = stats.lognorm.pdf(Dp, Dp_shape, scale=Dp_scale) Dt_prior = stats.lognorm.pdf(Dt, Dt_shape, scale=Dt_scale) Fp_prior = stats.beta.pdf(Fp, Fp_a, Fp_b) # determine and return the prior for D, f and D* (and S0) if len(p) == 4: S0_prior = stats.beta.pdf(S0 / 2, S0_a, S0_b) return -np.log(Dp_prior + eps) - np.log(Dt_prior + eps) - np.log(Fp_prior + eps) - np.log( S0_prior + eps) else: return -np.log(Dp_prior + eps) - np.log(Dt_prior + eps) - np.log(Fp_prior + eps) return neg_log_prior
[docs] def neg_log_posterior(p, bvalues, dw_data, neg_log_prior): """ This function determines the negative of the log of the likelihood of parameters p, given the prior likelihood and the data :param p: 1D Array with the estimates of D, f, D* and (optionally) S0 :param bvalues: 1D array with b-values :param dw_data: 1D Array diffusion-weighted data :param neg_log_prior: prior likelihood function (created with empirical_neg_log_prior) :returns: the posterior probability given the data and the prior """ return neg_log_likelihood(p, bvalues, dw_data) + neg_log_prior(p)
[docs] def flat_neg_log_prior(Dt_range, Fp_range, Dp_range, S0_range=None): """ This function determines the negative of the log of the empirical prior probability of the IVIM parameters :param Dt0: 1D Array with the initial D estimates :param Dt0: 1D Array with the initial f estimates :param Dt0: 1D Array with the initial D* estimates :param Dt0: 1D Array with the initial S0 estimates (optional) """ def neg_log_prior(p): # depends on whether S0 is fitted or not if len(p) == 4: Dt, Fp, Dp, S0 = p[0], p[1], p[2], p[3] else: Dt, Fp, Dp = p[0], p[1], p[2] # make D*<D very unlikely if (Dp < Dt): return 1e10 else: # determine and return the prior for D, f and D* (and S0) if len(p) == 4: if Dt_range[0] < Dt < Dt_range[1] and Fp_range[0] < Fp < Fp_range[1] and Dp_range[0] < Dp < Dp_range[1] and S0_range[0] < S0 < S0_range[1]: #<< not sure whether this helps. Technically it should be here return 0 else: return 1e10 else: if Dt_range[0] < Dt < Dt_range[1] and Fp_range[0] < Fp < Fp_range[1] and Dp_range[0] < Dp < Dp_range[1]: return 0 else: return 1e10 return neg_log_prior
[docs] def fit_bayesian_array(bvalues, dw_data, paramslsq, arg): """ This is an implementation of the Bayesian IVIM fit for arrays. The fit is taken from Barbieri et al. which was initially introduced in http://arxiv.org/10.1002/mrm.25765 and later further improved in http://arxiv.org/abs/1903.00095. If found useful, please cite those papers. :param bvalues: Array with the b-values :param dw_data: 2D Array with diffusion-weighted signal in different voxels at different b-values :param paramslsq: 2D Array with initial estimates for the parameters. These form the base for the Bayesian prior distribution and are typically obtained by least squares fitting of the data :param arg: an object with fit options, with attributes: arg.fitS0 --> Boolean; False fixes S0 to 1, True fits S0 arg.jobs --> Integer specifying the number of parallel processes used in fitting. If <2, regular fitting is used instead arg.bounds --> 2D Array of fit bounds ([Dtmin, Fpmin, Dpmin, S0min],[Dtmax, Fpmax, Dpmax, S0max]) :return Dt: Array with D in each voxel :return Fp: Array with f in each voxel :return Dp: Array with Dp in each voxel :return S0: Array with S0 in each voxel """ # fill out missing args Dt0, Fp0, Dp0, S00 = paramslsq # determine prior if arg.fitS0: neg_log_prior = empirical_neg_log_prior(Dt0, Fp0, Dp0, S00) else: neg_log_prior = empirical_neg_log_prior(Dt0, Fp0, Dp0) single = False # determine whether we fit parallel or not if arg.jobs > 1: try: # do parallel bayesian fit def parfun(i): # starting point x0 = [Dt0[i], Fp0[i], Dp0[i], S00[i]] return fit_bayesian(bvalues, dw_data[i, :], neg_log_prior, x0, fitS0=arg.fitS0) output = Parallel(n_jobs=arg.jobs)(delayed(parfun)(i) for i in tqdm(range(len(dw_data)), position=0, leave=True)) Dt_pred, Fp_pred, Dp_pred, S0_pred = np.transpose(output) except Exception: single = True else: single = True if single: # do serial; intialising arrays Dp_pred = np.zeros(len(dw_data)) Dt_pred = np.zeros(len(dw_data)) Fp_pred = np.zeros(len(dw_data)) S0_pred = np.zeros(len(dw_data)) # fill in array while looping over voxels for i in tqdm(range(len(dw_data)), position=0, leave=True): # starting point x0 = [Dt0[i], Fp0[i], Dp0[i], S00[i]] Dt, Fp, Dp, S0 = fit_bayesian(bvalues, dw_data[i, :], neg_log_prior, x0, fitS0=arg.fitS0) Dp_pred[i] = Dp Dt_pred[i] = Dt Fp_pred[i] = Fp S0_pred[i] = S0 return Dt_pred, Fp_pred, Dp_pred, S0_pred
[docs] def fit_bayesian(bvalues, dw_data, neg_log_prior, x0=[0.001, 0.2, 0.05, 1], fitS0=True, bounds=([0,0,0,0],[0.005,1.5,2,2.5])): ''' This is an implementation of the Bayesian IVIM fit. It returns the Maximum a posterior probability. The fit is taken from Barbieri et al. which was initially introduced in http://arxiv.org/10.1002/mrm.25765 and later further improved in http://arxiv.org/abs/1903.00095. If found useful, please cite those papers. :param bvalues: Array with the b-values :param dw_data: 1D Array with diffusion-weighted signal at different b-values :param neg_log_prior: the prior :param x0: 1D array with initial parameter guess :param fitS0: boolean, if set to False, S0 is not fitted :return Dt: estimated D :return Fp: estimated f :return Dp: estimated D* :return S0: estimated S0 (optional) ''' try: # define fit bounds bounds = [(bounds[0][0], bounds[1][0]), (bounds[0][1], bounds[1][1]), (bounds[0][2], bounds[1][2]), (bounds[0][3], bounds[1][3])] # Find the Maximum a posterior probability (MAP) by minimising the negative log of the posterior if fitS0: params = minimize(neg_log_posterior, x0=x0, args=(bvalues, dw_data, neg_log_prior), bounds=bounds) else: params = minimize(neg_log_posterior, x0=x0[:3], args=(bvalues, dw_data, neg_log_prior), bounds=bounds[:3]) if not params.success: raise RuntimeError(params.message) if fitS0: Dt, Fp, Dp, S0 = params.x[0], params.x[1], params.x[2], params.x[3] else: Dt, Fp, Dp = params.x[0], params.x[1], params.x[2] S0 = 1 return order(Dt, Fp, Dp, S0) except (RuntimeError, ValueError, TypeError): # if fit fails, return regular lsq-fit result # print('a bayes fit fialed') return fit_least_squares(bvalues, dw_data, S0_output=True)
[docs] def goodness_of_fit(bvalues, Dt, Fp, Dp, S0, dw_data, Fp2=None, Dp2=None): """ Calculates the R-squared as a measure for goodness of fit. input parameters are :param b: 1D Array b-values :param Dt: 1D Array with fitted D :param Fp: 1D Array with fitted f :param Dp: 1D Array with fitted D* :param S0: 1D Array with fitted S0 (or ones) :param dw_data: 2D array containing data, as voxels x b-values :return R2: 1D Array with the R-squared for each voxel """ # simulate the IVIM signal given the D, f, D* and S0 try: if Fp2 is None: datasim = ivim(np.tile(np.expand_dims(bvalues, axis=0), (len(Dt), 1)), np.tile(np.expand_dims(Dt, axis=1), (1, len(bvalues))), np.tile(np.expand_dims(Fp, axis=1), (1, len(bvalues))), np.tile(np.expand_dims(Dp, axis=1), (1, len(bvalues))), np.tile(np.expand_dims(S0, axis=1), (1, len(bvalues)))).astype('f') else: datasim = tri_exp(np.tile(np.expand_dims(bvalues, axis=0), (len(Dt), 1)), np.tile(np.expand_dims(S0 * (1 - Fp - Fp2), axis=1), (1, len(bvalues))), np.tile(np.expand_dims(Dt, axis=1), (1, len(bvalues))), np.tile(np.expand_dims(Fp * S0, axis=1), (1, len(bvalues))), np.tile(np.expand_dims(Dp, axis=1), (1, len(bvalues))), np.tile(np.expand_dims(Fp2 * S0, axis=1), (1, len(bvalues))), np.tile(np.expand_dims(Dp2, axis=1), (1, len(bvalues))), ).astype('f') # calculate R-squared given the estimated IVIM signal and the data norm = np.mean(dw_data, axis=1) ss_tot = np.sum(np.square(dw_data - norm[:, None]), axis=1) ss_res = np.sum(np.square(dw_data - datasim), axis=1) R2 = 1 - (ss_res / ss_tot) # R-squared if Fp2 is None: adjusted_R2 = 1 - ((1 - R2) * (len(bvalues)) / (len(bvalues) - 4 - 1)) else: adjusted_R2 = 1 - ((1 - R2) * (len(bvalues)) / (len(bvalues) - 6 - 1)) R2[R2 < 0] = 0 adjusted_R2[adjusted_R2 < 0] = 0 except (IndexError, ValueError, TypeError): if Fp2 is None: datasim = ivim(bvalues, Dt, Fp, Dp, S0) else: datasim = tri_exp(bvalues, S0 * (1 - Fp - Fp2), Dt, Fp * S0, Dp, Fp2 * S0, Dp2) norm = np.mean(dw_data) ss_tot = np.sum(np.square(dw_data - norm)) ss_res = np.sum(np.square(dw_data - datasim)) R2 = 1 - (ss_res / ss_tot) # R-squared if Fp2 is None: adjusted_R2 = 1 - ((1 - R2) * (len(bvalues)) / (len(bvalues) - 4 - 1)) else: adjusted_R2 = 1 - ((1 - R2) * (len(bvalues)) / (len(bvalues) - 6 - 1)) # from matplotlib import pyplot as plt # plt.figure(1) # vox=58885 # plt.clf() # plt.plot(bvalues, datasim[vox], 'rx', markersize=5) # plt.plot(bvalues, dw_data[vox], 'bx', markersize=5) # plt.ion() # plt.show() # print(R2[vox]) return R2, adjusted_R2
# ed_R2
[docs] def MSE(bvalues, Dt, Fp, Dp, S0, dw_data): """ Calculates the MSE as a measure for goodness of fit. input parameters are :param b: 1D Array b-values :param Dt: 1D Array with fitted D :param Fp: 1D Array with fitted f :param Dp: 1D Array with fitted D* :param S0: 1D Array with fitted S0 (or ones) :param dw_data: 2D array containing data, as voxels x b-values :return MSError: 1D Array with the R-squared for each voxel """ # simulate the IVIM signal given the D, f, D* and S0 datasim = ivim(np.tile(np.expand_dims(bvalues, axis=0), (len(Dt), 1)), np.tile(np.expand_dims(Dt, axis=1), (1, len(bvalues))), np.tile(np.expand_dims(Fp, axis=1), (1, len(bvalues))), np.tile(np.expand_dims(Dp, axis=1), (1, len(bvalues))), np.tile(np.expand_dims(S0, axis=1), (1, len(bvalues)))).astype('f') # calculate R-squared given the estimated IVIM signal and the data MSError = np.mean(np.square(dw_data - datasim), axis=1) # R-squared return MSError