8. Two-Level GLM (from Nilearn)#
In this tutorial, we demonstrate how to write pydra tasks for the first level (subject-level) GLM and the second level (group-level) GLM in Nilearn. We use the data from Balloon Analog Risk-taking Task. Basic information about this dataset:
16 subjects
3 runs
functional scan TR: 2.3
num of functional scan: 300
import nest_asyncio
nest_asyncio.apply()
8.1. Preparation#
Import packages that will be used globally and set up output directory
import warnings
import sys
if not sys.warnoptions:
warnings.simplefilter("ignore")
import os, glob
import datetime
import random
import pydra
from pydra import Workflow
from pydra.engine.specs import File, MultiInputFile, MultiOutputFile
import typing as ty
from pathlib import Path
import datalad.api as dl
import numpy as np
import pandas as pd
import nibabel as nib
from scipy.stats import norm
from nilearn.interfaces.fmriprep import load_confounds_strategy
from nilearn.image import load_img, get_data, math_img, threshold_img
from nilearn.glm.first_level import make_first_level_design_matrix, FirstLevelModel
from nilearn.glm.second_level import SecondLevelModel, non_parametric_inference
from nilearn.glm.contrasts import compute_fixed_effects
from nilearn.plotting import plot_stat_map, plot_glass_brain
---------------------------------------------------------------------------
ImportError Traceback (most recent call last)
Cell In[2], line 14
12 import typing as ty
13 from pathlib import Path
---> 14 import datalad.api as dl
16 import numpy as np
17 import pandas as pd
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/datalad/__init__.py:23
17 random.seed(_seed)
19 # Other imports are interspersed with lgr.debug to ease troubleshooting startup
20 # delays etc.
21
22 # If there is a bundled git, make sure GitPython uses it too:
---> 23 from datalad.cmd import GitRunner
24 GitRunner._check_git_path()
25 if GitRunner._GIT_PATH:
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/datalad/cmd.py:35
32 from .support.exceptions import CommandError
33 from .support.protocol import NullProtocol, DryRunProtocol, \
34 ExecutionTimeProtocol, ExecutionTimeExternalsProtocol
---> 35 from .utils import on_windows, get_tempfile_kwargs, assure_unicode
36 from .dochelpers import borrowdoc
38 lgr = logging.getLogger('datalad.cmd')
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/datalad/utils.py:29
27 from functools import wraps
28 from time import sleep
---> 29 from inspect import getargspec
31 from os.path import sep as dirsep
32 from os.path import commonprefix
ImportError: cannot import name 'getargspec' from 'inspect' (/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/inspect.py)
# get current directory
pydra_tutorial_dir = os.path.dirname(os.getcwd())
# set up output directory
workflow_dir = Path(pydra_tutorial_dir) / 'outputs'
workflow_out_dir = workflow_dir / '9_glm' /'results'
# create folders if not exit
os.makedirs(workflow_out_dir, exist_ok=True)
8.1.1. Download the data#
DataLad is often used in those cases to download data. Here we use its Python API.
We need the following data:
event information (raw data)
preprocessed image data (fmriprep)
masks (fmriprep)
confounds (fmriprep)
fmriprep_path = workflow_dir / '7_glm'/ 'data'
rawdata_path = workflow_dir / '7_glm' / 'raw_data'
os.makedirs(fmriprep_path, exist_ok=True)
os.makedirs(rawdata_path, exist_ok=True)
# Install datasets to specific datapaths
fmriprep_url = 'https://github.com/OpenNeuroDerivatives/ds000001-fmriprep.git'
rawdata_url = 'https://github.com/OpenNeuroDatasets/ds000001.git'
dl.install(source=rawdata_url, path=rawdata_path)
dl.install(source=fmriprep_url, path=fmriprep_path)
8.1.2. Get data for each subject#
By datalad.api.install
, datalad downloads all symlinks without storing the actual data locally. We can then use datalad.api.get
to get the data we need for our analysis.
We need to get four types of data from two folders:
event_info:
*events.tsv
fromrawdata_path
bold:
*space-MNI152NLin2009cAsym_res-2_desc-preproc_bold.nii.gz
fromfmriprep_path
mask:
*space-MNI152NLin2009cAsym_res-2_desc-brain_mask.nii.gz
fromfmriprep_path
confounds:
*desc-confounds_timeseries.tsv
fromfmriprep_path
(this is implicitly needed byload_confounds_strategy
)
@pydra.mark.task
@pydra.mark.annotate(
{
'subj_id': int,
'return': {'subj_id': int, 'subj_events': list, 'subj_imgs':list, 'subj_masks':list},
}
)
def get_subjdata(subj_id):
print(f"\nDownload data for subject-{subj_id}")
# get events.tsv
subj_events = glob.glob(os.path.join(rawdata_path, 'sub-%02d' % subj_id, 'func', '*events.tsv'))
subj_events.sort()
for i in subj_events:
dl.get(i, dataset=rawdata_path)
# get bold
subj_imgs = glob.glob(os.path.join(fmriprep_path, 'sub-%02d' % subj_id, 'func', '*space-MNI152NLin2009cAsym_res-2_desc-preproc_bold.nii.gz'))
subj_imgs.sort()
for i in subj_imgs:
dl.get(i, dataset=fmriprep_path)
# get mask
subj_masks = glob.glob(os.path.join(fmriprep_path, 'sub-%02d' % subj_id, 'func', '*space-MNI152NLin2009cAsym_res-2_desc-brain_mask.nii.gz'))
subj_masks.sort()
for i in subj_masks:
dl.get(i, dataset=fmriprep_path)
# get confounds list
subj_confounds = glob.glob(os.path.join(fmriprep_path, 'sub-%02d' % subj_id, 'func', '*desc-confounds_timeseries.tsv'))
subj_confounds.sort()
for i in subj_confounds:
dl.get(i, dataset=fmriprep_path)
return subj_id, subj_events, subj_imgs, subj_masks
8.2. First-Level GLM#
The first level GLM has two parts:
conduct GLM for each run on every subject
average across runs for each subject with a fixed-effect model
8.2.1. Get the first-level design matrix#
The design matrix is a M(row) x N(columns) matrix. M corresponds to the number of tr, while N corresponds to event conditions + confounds.
@pydra.mark.task
@pydra.mark.annotate(
{
'tr': float,
'n_scans': int,
'hrf_model': str,
'subj_id': int,
'run_id': int,
'subj_imgs': list,
'subj_events':list,
'return': {'dm_path': str, 'run_id': int},
}
)
def get_firstlevel_dm(tr, n_scans, hrf_model, subj_id, run_id, subj_imgs, subj_events):
print(f"\nGet subject-{subj_id}, run-{run_id} firstlevel GLM design matrix...\n")
# read event file
run_img = subj_imgs[run_id-1]
run_event = subj_events[run_id-1]
event = pd.read_csv(run_event, sep='\t').fillna(0)
event = event[['onset', 'duration', 'trial_type']]
# get list of confounds directly from fmriprepped bold
confounds = load_confounds_strategy(run_img, denoise_strategy='simple')[0]
frame_times = np.arange(n_scans) * tr
design_matrix = make_first_level_design_matrix(frame_times, event,
hrf_model=hrf_model,
add_regs=confounds)
# make sure all design matrices have the same length of column
# if you have a block design, this is not needed.
# 39 = 4(events) + 34(confounds) + 13(drift) + 1(constant)
assert design_matrix.shape[1] == 52, "This design matrix has the wrong column number"
# sort the column order alphabetical for contrasts
design_matrix = design_matrix.reindex(sorted(design_matrix.columns), axis=1)
dm_path = os.path.join(workflow_out_dir, 'sub-%s_run-%s_designmatrix.csv' % (subj_id, run_id))
design_matrix.to_csv(dm_path, index=None)
return dm_path, run_id
8.2.2. Set up the first level contrasts#
@pydra.mark.task
@pydra.mark.annotate(
{
'subj_id': int,
'run_id': int,
'dm_path': str,
'return': {'contrasts': dict},
}
)
def set_contrast(subj_id, run_id, dm_path):
print(f"\nSet firstlevel contrast for subject-{subj_id}, run-{run_id} ...\n")
design_matrix = pd.read_csv(dm_path)
contrast_matrix = np.eye(design_matrix.shape[1])
basic_contrasts = dict([(column, contrast_matrix[i])
for i, column in enumerate(design_matrix.columns)])
contrasts = {
'pumps-control': basic_contrasts['pumps_demean'] - basic_contrasts['control_pumps_demean'],
'control-pumps': -basic_contrasts['control_pumps_demean'] + basic_contrasts['pumps_demean'],
'pumps-baseline': basic_contrasts['pumps_demean'],
'cash-baseline': basic_contrasts['cash_demean'],
'explode-baseline': basic_contrasts['explode_demean']
}
return contrasts
8.2.3. Fit the first level GLM#
@pydra.mark.task
@pydra.mark.annotate(
{
'subj_id': int,
'run_id': int,
'subj_imgs': list,
'subj_masks': list,
'smoothing_fwhm': float,
'dm_path': str,
'contrasts':dict,
'return': {'effect_size_path_dict': dict, 'effect_variance_path_dict': dict},
}
)
def firstlevel_estimation(subj_id, run_id, subj_imgs, subj_masks, smoothing_fwhm, dm_path, contrasts):
print(f"\nStart firstlevel estimation for subject-{subj_id}, run-{run_id} ...\n")
# subsample img to reduce memory
run_img = subj_imgs[run_id-1]
img = load_img(run_img)
img_data = get_data(run_img)[::2,::2,::2]
new_img = nib.Nifti1Image(img_data, img.affine)
run_mask = subj_masks[run_id-1]
print('Fit the firstlevel model...')
first_level_model = FirstLevelModel(mask_img=run_mask, smoothing_fwhm=smoothing_fwhm)
dm= pd.read_csv(dm_path)
first_level_model = first_level_model.fit(new_img, design_matrices=dm)
print('Computing contrasts...')
effect_size_path_dict = dict.fromkeys(contrasts.keys())
effect_variance_path_dict = dict.fromkeys(contrasts.keys())
for index, (contrast_id, contrast_val) in enumerate(contrasts.items()):
print(' Contrast % 2i out of %i: %s' % (
index + 1, len(contrasts), contrast_id))
# Estimate the contasts. Note that the model implicitly computes a fixed
# effect across the two sessions
res = first_level_model.compute_contrast(contrast_val, output_type='all')
# write the resulting stat images to file
effect_size_path = os.path.join(workflow_out_dir, 'sub-%s_run-%s_contrast-%s_effect_size.nii.gz' % (subj_id, run_id, contrast_id))
effect_variance_path = os.path.join(workflow_out_dir, 'sub-%s_run-%s_contrast-%s_effect_varaince.nii.gz' % (subj_id, run_id, contrast_id))
effect_size_path_dict[contrast_id] = effect_size_path
effect_variance_path_dict[contrast_id] = effect_variance_path
res['effect_size'].to_filename(effect_size_path)
res['effect_variance'].to_filename(effect_variance_path)
return effect_size_path_dict, effect_variance_path_dict
8.2.4. Create the first level GLM workflow#
This workflow include GLM for each run.
# initiate the first-level GLM workflow
wf_firstlevel = Workflow(
name='wf_firstlevel',
input_spec=[
'subj_id',
'run_id',
'subj_imgs',
'subj_events',
'subj_masks',
'tr',
'n_scans',
'hrf_model',
'smoothing_fwhm'
],
)
wf_firstlevel.split('run_id', run_id = wf_firstlevel.lzin.run_id)
# add task - get_firstlevel_dm
wf_firstlevel.add(
get_firstlevel_dm(
name = "get_firstlevel_dm",
tr = wf_firstlevel.lzin.tr,
n_scans = wf_firstlevel.lzin.n_scans,
hrf_model = wf_firstlevel.lzin.hrf_model,
subj_id = wf_firstlevel.lzin.subj_id,
run_id = wf_firstlevel.lzin.run_id,
subj_imgs = wf_firstlevel.lzin.subj_imgs,
subj_events = wf_firstlevel.lzin.subj_events,
)
)
# add task - set_contrast
wf_firstlevel.add(
set_contrast(
name = "set_contrast",
subj_id = wf_firstlevel.lzin.subj_id,
run_id = wf_firstlevel.get_firstlevel_dm.lzout.run_id,
dm_path = wf_firstlevel.get_firstlevel_dm.lzout.dm_path
)
)
# add task - firstlevel_estimation
wf_firstlevel.add(
firstlevel_estimation(
name = "firstlevel_estimation",
subj_id = wf_firstlevel.lzin.subj_id,
run_id = wf_firstlevel.get_firstlevel_dm.lzout.run_id,
subj_imgs = wf_firstlevel.lzin.subj_imgs,
subj_masks = wf_firstlevel.lzin.subj_masks,
smoothing_fwhm = wf_firstlevel.lzin.smoothing_fwhm,
dm_path = wf_firstlevel.get_firstlevel_dm.lzout.dm_path,
contrasts = wf_firstlevel.set_contrast.lzout.contrasts
)
)
wf_firstlevel.combine('run_id')
# specify output
wf_firstlevel.set_output(
[
('first_level_contrast', wf_firstlevel.set_contrast.lzout.contrasts),
('first_level_effect_size_list', wf_firstlevel.firstlevel_estimation.lzout.effect_size_path_dict),
('first_level_effect_variance_list', wf_firstlevel.firstlevel_estimation.lzout.effect_variance_path_dict),
]
)
8.2.5. Compute fixed effects#
Before we move to the second(group) level, we need to average results from all three runs from a fixed effect model
@pydra.mark.task
@pydra.mark.annotate(
{'subj_id': int,
'subj_masks': list,
'contrasts': list,
'effect_size_path_dict_list': list,
'effect_variance_path_dict_list': list,
'return': {'fixed_fx_contrast_path_dict': dict, 'fixed_fx_variance_path_dict': dict, 'fixed_fx_ttest_path_dict': dict},
}
)
def get_fixed_effcts(subj_id, subj_masks, contrasts, effect_size_path_dict_list, effect_variance_path_dict_list):
print(f"contrast:{contrast}")
print(f'Compute fixed effects for subject-{subj_id}...')
# average mask across three runs
mean_mask = math_img('np.mean(img, axis=-1)', img=subj_masks)
# binarize the mean mask
mask = math_img('img > 0', img=mean_mask)
fixed_fx_contrast_path_dict =dict.fromkeys(contrasts[0].keys())
fixed_fx_variance_path_dict = dict.fromkeys(contrasts[0].keys())
fixed_fx_ttest_path_dict = dict.fromkeys(contrasts[0].keys())
for index, (contrast_id, contrast_val) in enumerate(contrasts[0].items()):
print(' Contrast % 2i out of %i: %s' % (index + 1, len(contrasts[0]), contrast_id))
contrast_imgs = [nib.load(img_dict[contrast_id]) for img_dict in effect_size_path_dict_list]
variance_imgs = [nib.load(img_dict[contrast_id]) for img_dict in effect_variance_path_dict_list]
fixed_fx_contrast, fixed_fx_variance, fixed_fx_ttest = compute_fixed_effects(contrast_imgs, variance_imgs, mask)
effect_size_path = os.path.join(workflow_out_dir, 'sub-%s_contrast-%s_fx_effect_size.nii.gz' % (subj_id, contrast_id))
variance_path = os.path.join(workflow_out_dir, 'sub-%s_contrast-%s_fx_effect_varaince.nii.gz' % (subj_id, contrast_id))
ttest_path = os.path.join(workflow_out_dir, 'sub-%s_contrast-%s_ttest_map.nii.gz' % (subj_id, contrast_id))
fixed_fx_contrast_path_dict[contrast_id] = effect_size_path
fixed_fx_variance_path_dict[contrast_id] = variance_path
fixed_fx_ttest_path_dict[contrast_id] = ttest_path
fixed_fx_contrast.to_filename(effect_size_path)
fixed_fx_variance.to_filename(variance_path)
fixed_fx_ttest.to_filename(ttest_path)
return fixed_fx_contrast_path_dict, fixed_fx_variance_path_dict, fixed_fx_ttest_path_dict
8.2.6. Create the fixed effect workflow#
# initiate the fixed effect GLM workflow
wf_fixed_effect = Workflow(
name='wf_fixed_effect',
input_spec=[
'subj_id',
'run_id',
'tr',
'n_scans',
'hrf_model',
'smoothing_fwhm'
],
)
wf_fixed_effect.split('subj_id', subj_id = wf_fixed_effect.lzin.subj_id)
# add task - get_subj_file
wf_fixed_effect.add(
get_subjdata(
name = "get_subjdata",
subj_id = wf_fixed_effect.lzin.subj_id,
)
)
wf_firstlevel.inputs.subj_id = wf_fixed_effect.get_subjdata.lzout.subj_id
wf_firstlevel.inputs.run_id = wf_fixed_effect.lzin.run_id
wf_firstlevel.inputs.tr = wf_fixed_effect.lzin.tr
wf_firstlevel.inputs.n_scans = wf_fixed_effect.lzin.n_scans
wf_firstlevel.inputs.hrf_model = wf_fixed_effect.lzin.hrf_model
wf_firstlevel.inputs.smoothing_fwhm = wf_fixed_effect.lzin.smoothing_fwhm
wf_firstlevel.inputs.subj_imgs = wf_fixed_effect.get_subjdata.lzout.subj_imgs
wf_firstlevel.inputs.subj_events = wf_fixed_effect.get_subjdata.lzout.subj_events
wf_firstlevel.inputs.subj_masks = wf_fixed_effect.get_subjdata.lzout.subj_masks
wf_fixed_effect.add(wf_firstlevel)
wf_fixed_effect.add(
get_fixed_effcts(
name = "get_fixed_effcts",
subj_id = wf_fixed_effect.get_subjdata.lzout.subj_id,
subj_masks = wf_fixed_effect.get_subjdata.lzout.subj_masks,
contrasts = wf_fixed_effect.wf_firstlevel.lzout.first_level_contrast,
effect_size_path_dict_list = wf_fixed_effect.wf_firstlevel.lzout.first_level_effect_size_list,
effect_variance_path_dict_list = wf_fixed_effect.wf_firstlevel.lzout.first_level_effect_variance_list
)
)
wf_fixed_effect.combine('subj_id')
# specify output
wf_fixed_effect.set_output(
[
('first_level_contrast', wf_fixed_effect.wf_firstlevel.lzout.first_level_contrast),
('fx_effect_size_list', wf_fixed_effect.get_fixed_effcts.lzout.fixed_fx_contrast_path_dict),
('fx_effect_variance_list', wf_fixed_effect.get_fixed_effcts.lzout.fixed_fx_variance_path_dict),
('fx_t_test_list', wf_fixed_effect.get_fixed_effcts.lzout.fixed_fx_ttest_path_dict),
]
)
print(wf_fixed_effect.lzout.first_level_contrast)
LazyOutField(name='wf_fixed_effect', field='first_level_contrast', type=typing.List[typing.List[dict]], splits=frozenset(), cast_from=None)
8.3. Second-Level GLM#
The second level GLM, as known as the group level, averages results across subjects, containing the following steps:
construct design matrix
fit the second-level GLM
statistical testing
8.3.1. Get the second level design matrix#
This is a one-group design. So we need a design matrix for a one-sample test.
The design matrix is a single column of ones, corresponding to the model intercept.
@pydra.mark.task
@pydra.mark.annotate(
{'n_subj': int, 'return': {'design_matrix': ty.Any}}
)
def get_secondlevel_dm(n_subj):
t1 = datetime.datetime.now()
print(f"\nGet secondlevel design matrix ...\n")
design_matrix = pd.DataFrame([1] * n_subj,columns=['intercept'])
return design_matrix
8.3.2. Fit the second level GLM#
Here, we use the list of FirstLevel z-maps as the input for the SecondLevelModel.
@pydra.mark.task
@pydra.mark.annotate(
{'firstlevel_stats_list': list, 'design_matrix': ty.Any, 'firstlevel_contrast':list,
'return': {'secondlevel_mask': ty.Any, 'stat_maps_dict': dict}}
)
def secondlevel_estimation(firstlevel_stats_list, design_matrix, firstlevel_contrast):
print(f"\nStart secondlevel estimation ...\n")
stat_maps_dict = dict.fromkeys(firstlevel_contrast[0][0].keys())
for index, (contrast_id, contrast_val) in enumerate(firstlevel_contrast[0][0].items()):
print(' Contrast % 2i out of %i: %s' % (
index + 1, len(firstlevel_contrast[0][0]), contrast_id))
second_level_input = [nib.load(stats_dict[contrast_id]) for stats_dict in firstlevel_stats_list]
second_level_model = SecondLevelModel()
second_level_model = second_level_model.fit(second_level_input, design_matrix=design_matrix)
secondlevel_mask = second_level_model.masker_.mask_img_
stats = second_level_model.compute_contrast(output_type='all')
# write the resulting stat images to file
z_image_path = os.path.join(workflow_out_dir, 'secondlevel_contrast-%s_z_map.nii.gz' % contrast_id)
stat_maps_dict[contrast_id] = stats
stats['z_score'].to_filename(z_image_path)
plot_path = os.path.join(workflow_out_dir, 'secondlevel_unthresholded_contrast-%s_zmap.jpg' % contrast_id)
plot_glass_brain(stats['z_score'],
colorbar=True,
threshold=norm.isf(0.001),
title='Unthresholded z map',
output_file=plot_path)
return secondlevel_mask, stat_maps_dict
8.3.3. Create the second level GLM workflow#
# initiate the first-level GLM workflow
wf_secondlevel = Workflow(
name='wf_secondlevel',
input_spec=[
'n_subj',
'firstlevel_stats_list',
'firstlevel_contrast',
'n_perm',
],
)
# add task - get_secondlevel_dm
wf_secondlevel.add(
get_secondlevel_dm(
name = "get_secondlevel_dm",
n_subj = wf_secondlevel.lzin.n_subj,
)
)
# add task - secondlevel_estimation
wf_secondlevel.add(
secondlevel_estimation(
name = "secondlevel_estimation",
firstlevel_stats_list = wf_secondlevel.lzin.firstlevel_stats_list,
design_matrix = wf_secondlevel.get_secondlevel_dm.lzout.design_matrix,
firstlevel_contrast = wf_secondlevel.lzin.firstlevel_contrast
)
)
# specify output
wf_secondlevel.set_output(
[
('second_level_designmatrix', wf_secondlevel.get_secondlevel_dm.lzout.design_matrix),
('second_level_mask', wf_secondlevel.secondlevel_estimation.lzout.secondlevel_mask),
('second_level_stats_map', wf_secondlevel.secondlevel_estimation.lzout.stat_maps_dict)
]
)
8.4. Statistical Testing#
In this section, we present different ways of doing statistical testing
Cluster-thresholding without multiple comparison
Multiple comparison using FDR
Paramatric testing
Nonparamatric testing
8.4.1. Cluster-thresholding and Plot without multiple comparison#
Threshold the resulting map without multiple comparisons correction, abs(z) > 3.29 (equivalent to p < 0.001), cluster size > 10 voxels.
@pydra.mark.task
@pydra.mark.annotate(
{'stat_maps_dict': dict, 'threshold': float, 'cluster_threshold': int,
'return': {'thresholded_map_dict': dict, 'plot_contrast_dict': dict}}
)
def cluster_thresholding(stat_maps_dict, threshold, cluster_threshold):
t1 = datetime.datetime.now()
print("\nStart cluster thresholding ...\n")
thresholded_map_dict = dict.fromkeys(stat_maps_dict.keys())
plot_contrast_dict = dict.fromkeys(stat_maps_dict.keys())
for index, (stats_id, stats_val) in enumerate(stat_maps_dict.items()):
print('Contrast % 2i out of %i: %s' % (
index + 1, len(stat_maps_dict), stats_id))
thresholded_map = threshold_img(
img = stats_val['z_score'],
threshold=threshold,
cluster_threshold=cluster_threshold,
two_sided=True,
)
thresholded_map_path = os.path.join(workflow_out_dir, 'secondlevel_cluster_thresholded_contrast-%s_z_map.nii.gz' % stats_id)
thresholded_map_dict[stats_id] = thresholded_map_path
thresholded_map.to_filename(thresholded_map_path)
plot_path = os.path.join(workflow_out_dir,
'secondlevel_cluster_thresholded_contrast-%s_zmap.jpg' % stats_id)
plot_contrast_dict[stats_id] = plot_path
plot_stat_map(thresholded_map,
title='Cluster Thresholded z map',
output_file=plot_path)
print("\nCluster thresholding is done")
return thresholded_map_dict, plot_contrast_dict
8.4.2. Multiple comparison and Plot#
We have the following choices:
fdr
: False Discovery Rate (FDR <.05) and no cluster-level thresholdfpr
: False Positive Ratebonferroni
More details see here
@pydra.mark.task
@pydra.mark.annotate(
{'stat_maps_dict': dict, 'alpha': float, 'height_control': str,
'return': {'thresholded_map_dict': dict, 'plot_contrast_dict': dict}}
)
def multiple_comparison(stat_maps_dict, alpha, height_control):
print("\nStart multiple comparison ...\n")
from nilearn.glm import threshold_stats_img
from nilearn.plotting import plot_stat_map
thresholded_map_dict = dict.fromkeys(stat_maps_dict.keys())
plot_contrast_dict = dict.fromkeys(stat_maps_dict.keys())
for index, (stats_id, stats_val) in enumerate(stat_maps_dict.items()):
print('Contrast % 2i out of %i: %s' % (
index + 1, len(stat_maps_dict), stats_id))
thresholded_map, threshold = threshold_stats_img(
stat_img=stats_val['z_score'],
alpha=alpha,
height_control=height_control)
thresholded_map_path = os.path.join(workflow_out_dir,
'secondlevel_multiple_comp_corrected_contrast-%s_z_map.nii.gz' % stats_id)
thresholded_map_dict[stats_id] = thresholded_map_path
thresholded_map.to_filename(thresholded_map_path)
plot_path = os.path.join(workflow_out_dir,
'secondlevel_multiple_comp_corrected_contrast-%s_zmap.jpg' % stats_id)
plot_contrast_dict[stats_id] = plot_path
plot_stat_map(thresholded_map,
title='Thresholded z map, expected fdr = .05',
threshold=threshold,
output_file=plot_path)
print("\nMultiple comparison is done")
return thresholded_map_dict, plot_contrast_dict
8.4.3. Paramatric test & Plot#
We threshold the second level contrast at uncorrected p < 0.001.
A nilearn example see here
@pydra.mark.task
@pydra.mark.annotate(
{'stat_maps_dict': dict,
'secondlevel_mask': ty.Any,
'return': {'thresholded_map_dict': dict, 'plot_contrast_dict': dict}}
)
def parametric_test(stat_maps_dict, secondlevel_mask):
print("\nStart parametric test ...\n")
thresholded_map_dict = dict.fromkeys(stat_maps_dict.keys())
plot_contrast_dict = dict.fromkeys(stat_maps_dict.keys())
for index, (stats_id, stats_val) in enumerate(stat_maps_dict.items()):
print('Contrast % 2i out of %i: %s' % (
index + 1, len(stat_maps_dict), stats_id))
p_val = stats_val['p_value']
n_voxels = np.sum(get_data(img=secondlevel_mask))
# Correcting the p-values for multiple testing and taking negative logarithm
neg_log_pval = math_img("-np.log10(np.minimum(1, img * {}))"
.format(str(n_voxels)),
img=p_val)
thresholded_map_path = os.path.join(workflow_out_dir, 'secondlevel_paramatric_thresholded_contrast-%s_z_map.nii.gz' % stats_id)
thresholded_map_dict[stats_id] = thresholded_map_path
neg_log_pval.to_filename(thresholded_map_path)
# Since we are plotting negative log p-values and using a threshold equal to 1,
# it corresponds to corrected p-values lower than 10%, meaning that there is
# less than 10% probability to make a single false discovery (90% chance that
# we make no false discovery at all). This threshold is much more conservative
# than the previous one.
title = ('parametric test (FWER < 10%)')
plot_path = os.path.join(workflow_out_dir,
'secondlevel_paramatric_thresholded_contrast-%s_zmap.jpg' % stats_id)
plot_contrast_dict[stats_id] = plot_path
plot_stat_map(
neg_log_pval, colorbar=True,
title=title, output_file=plot_path)
print("\nParametric test is done")
return thresholded_map_dict, plot_contrast_dict
8.4.4. Non-paramatric test & Plot#
Here we compute the (corrected) negative log p-values with permutation test.
@pydra.mark.task
@pydra.mark.annotate(
{'firstlevel_stats_list': list, 'smoothing_fwhm':float,'design_matrix': ty.Any, 'firstlevel_contrast': list, 'n_perm': int,
'return': {'thresholded_map_dict': dict, 'plot_contrast_dict': dict}}
)
def nonparametric_test(firstlevel_stats_list, smoothing_fwhm, design_matrix, firstlevel_contrast, n_perm):
print(f"\nStart nonparametric test ...\n")
thresholded_map_dict = dict.fromkeys(firstlevel_contrast[0][0].keys())
plot_contrast_dict = dict.fromkeys(firstlevel_contrast[0][0].keys())
for index, (contrast_id, contrast_val) in enumerate(firstlevel_contrast[0][0].items()):
print(' Contrast % 2i out of %i: %s' % (
index + 1, len(firstlevel_contrast[0][0]), contrast_id))
# here we set threshold as none to do voxel-level FWER-correction.
second_level_input = [nib.load(stats_dict[contrast_id]) for stats_dict in firstlevel_stats_list]
neg_log_pvals_permuted_ols_unmasked = \
non_parametric_inference(second_level_input=second_level_input, design_matrix=design_matrix,
model_intercept=True, n_perm=n_perm,
two_sided_test=False, smoothing_fwhm=smoothing_fwhm, n_jobs=1)
thresholded_map_path = os.path.join(workflow_out_dir, 'secondlevel_permutation_contrast-%s_z_map.nii.gz' % contrast_id)
thresholded_map_dict[contrast_id] = thresholded_map_path
neg_log_pvals_permuted_ols_unmasked.to_filename(thresholded_map_path)
# here I actually have more than one contrast
title = ('permutation test (FWER < 10%)')
plot_path = os.path.join(workflow_out_dir, 'secondlevel_permutation_contrast-%s_zmap.jpg' % contrast_id)
plot_contrast_dict[contrast_id] = plot_path
plot_stat_map(
neg_log_pvals_permuted_ols_unmasked, colorbar=True,
title=title, output_file=plot_path)
print("\nPermutation is done")
return thresholded_map_dict, plot_contrast_dict
8.5. The Ultimate Workflow#
Now, let’s connect all tasks and workflows together.
Here we randomly choose 5 subjects to perform the analysis.
For computational time, we set n_perm=100
.
wf = Workflow(
name='twolevel_glm',
input_spec=['n_subj'],
)
wf.inputs.n_subj = 2
# randomly choose subjects
wf_fixed_effect.inputs.subj_id = random.sample(range(1,17), wf.inputs.n_subj)
wf_fixed_effect.inputs.run_id =[1,2]
wf_fixed_effect.inputs.tr = 2.3
wf_fixed_effect.inputs.n_scans = 300
wf_fixed_effect.inputs.hrf_model = 'glover'
wf_fixed_effect.inputs.smoothing_fwhm = 5.0
wf.add(wf_fixed_effect)
wf_secondlevel.inputs.n_subj = wf.inputs.n_subj
wf_secondlevel.inputs.firstlevel_stats_list = wf.wf_fixed_effect.lzout.fx_t_test_list
wf_secondlevel.inputs.firstlevel_contrast = wf.wf_fixed_effect.lzout.first_level_contrast
wf.add(wf_secondlevel)
# add task - cluster_thresholding
wf.add(
cluster_thresholding(
name = "cluster_thresholding",
stat_maps_dict = wf.wf_secondlevel.lzout.second_level_stats_map,
threshold = 3.29,
cluster_threshold = 10
)
)
# add task - multiple_comparison
wf.add(
multiple_comparison(
name = "multiple_comparison",
stat_maps_dict = wf.wf_secondlevel.lzout.second_level_stats_map,
alpha = 0.05,
height_control = 'fdr'
)
)
# add task - parametric_test
wf.add(
parametric_test(
name = "parametric_test",
stat_maps_dict = wf.wf_secondlevel.lzout.second_level_stats_map,
secondlevel_mask = wf.wf_secondlevel.lzout.second_level_mask
)
)
# add task - nonparametric_test
wf.add(
nonparametric_test(
name = "nonparametric_test",
firstlevel_stats_list = wf.wf_fixed_effect.lzout.fx_t_test_list,
smoothing_fwhm = 5.0,
design_matrix = wf.wf_secondlevel.lzout.second_level_designmatrix,
firstlevel_contrast = wf.wf_fixed_effect.lzout.first_level_contrast,
n_perm = 100,
)
)
wf.set_output(
[
('second_level_stats_map', wf.wf_secondlevel.lzout.second_level_stats_map)
]
)
8.5.1. Run Workflow Run#
from pydra import Submitter
with Submitter(plugin='cf', n_procs=1) as submitter:
submitter(wf)
results = wf.result()
print(results)
Show code cell output
Download data for subject-1
Download data for subject-4
Get subject-1, run-1 firstlevel GLM design matrix...
Get subject-1, run-2 firstlevel GLM design matrix...
Task exception was never retrieved
future: <Task finished name='Task-8' coro=<ConcurrentFuturesWorker.exec_as_coro() done, defined at /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/workers.py:173> exception=IndexError('list index out of range')>
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/concurrent/futures/process.py", line 254, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 529, in _run
self._run_task()
~~~~~~~~~~~~~~^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/task.py", line 202, in _run_task
output = cp.loads(self.inputs._func)(**inputs)
File "/tmp/ipykernel_2815/2321826732.py", line 17, in get_firstlevel_dm
run_img = subj_imgs[run_id-1]
~~~~~~~~~^^^^^^^^^^
IndexError: list index out of range
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/tasks.py", line 306, in __step_run_and_handle_result
result = coro.throw(exc)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/workers.py", line 176, in exec_as_coro
res = await self.loop.run_in_executor(self.pool, runnable._run, rerun)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/futures.py", line 286, in __await__
yield self # This tells Task to wait for completion.
^^^^^^^^^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/tasks.py", line 375, in __wakeup
future.result()
~~~~~~~~~~~~~^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/futures.py", line 199, in result
raise self._exception.with_traceback(self._exception_tb)
IndexError: list index out of range
Get subject-4, run-1 firstlevel GLM design matrix...
Task exception was never retrieved
future: <Task finished name='Task-9' coro=<ConcurrentFuturesWorker.exec_as_coro() done, defined at /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/workers.py:173> exception=IndexError('list index out of range')>
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/concurrent/futures/process.py", line 254, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 529, in _run
self._run_task()
~~~~~~~~~~~~~~^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/task.py", line 202, in _run_task
output = cp.loads(self.inputs._func)(**inputs)
File "/tmp/ipykernel_2815/2321826732.py", line 17, in get_firstlevel_dm
run_img = subj_imgs[run_id-1]
~~~~~~~~~^^^^^^^^^^
IndexError: list index out of range
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/tasks.py", line 306, in __step_run_and_handle_result
result = coro.throw(exc)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/workers.py", line 176, in exec_as_coro
res = await self.loop.run_in_executor(self.pool, runnable._run, rerun)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/futures.py", line 286, in __await__
yield self # This tells Task to wait for completion.
^^^^^^^^^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/tasks.py", line 375, in __wakeup
future.result()
~~~~~~~~~~~~~^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/futures.py", line 199, in result
raise self._exception.with_traceback(self._exception_tb)
IndexError: list index out of range
Get subject-4, run-2 firstlevel GLM design matrix...
Task exception was never retrieved
future: <Task finished name='Task-7' coro=<load_and_run_async() done, defined at /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/helpers.py:579> exception=ValueError("Tasks ['get_firstlevel_dm'] raised an error")>
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1365, in _collect_outputs
val_out = val.get_value(self)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py", line 1030, in get_value
value = get_nested_results(result, depth=split_depth)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py", line 1020, in get_nested_results
raise ValueError(
...<2 lines>...
)
ValueError: Cannot retrieve value for contrasts from set_contrast as the node errored
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/tasks.py", line 304, in __step_run_and_handle_result
result = coro.send(None)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/helpers.py", line 585, in load_and_run_async
await task._run(submitter=submitter, rerun=rerun, **kwargs)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1237, in _run
result.output = self._collect_outputs()
~~~~~~~~~~~~~~~~~~~~~^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1371, in _collect_outputs
raise ValueError(
f"Tasks {getattr(self, val.name)._errored} raised an error"
)
ValueError: Tasks ['get_firstlevel_dm'] raised an error
Task exception was never retrieved
future: <Task finished name='Task-6' coro=<load_and_run_async() done, defined at /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/helpers.py:579> exception=ValueError("Tasks ['get_firstlevel_dm'] raised an error")>
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1365, in _collect_outputs
val_out = val.get_value(self)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py", line 1030, in get_value
value = get_nested_results(result, depth=split_depth)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py", line 1020, in get_nested_results
raise ValueError(
...<2 lines>...
)
ValueError: Cannot retrieve value for contrasts from set_contrast as the node errored
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/tasks.py", line 304, in __step_run_and_handle_result
result = coro.send(None)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/helpers.py", line 585, in load_and_run_async
await task._run(submitter=submitter, rerun=rerun, **kwargs)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1237, in _run
result.output = self._collect_outputs()
~~~~~~~~~~~~~~~~~~~~~^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1371, in _collect_outputs
raise ValueError(
f"Tasks {getattr(self, val.name)._errored} raised an error"
)
ValueError: Tasks ['get_firstlevel_dm'] raised an error
Task exception was never retrieved
future: <Task finished name='Task-12' coro=<ConcurrentFuturesWorker.exec_as_coro() done, defined at /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/workers.py:173> exception=IndexError('list index out of range')>
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/concurrent/futures/process.py", line 254, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 529, in _run
self._run_task()
~~~~~~~~~~~~~~^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/task.py", line 202, in _run_task
output = cp.loads(self.inputs._func)(**inputs)
File "/tmp/ipykernel_2815/2321826732.py", line 17, in get_firstlevel_dm
run_img = subj_imgs[run_id-1]
~~~~~~~~~^^^^^^^^^^
IndexError: list index out of range
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/tasks.py", line 306, in __step_run_and_handle_result
result = coro.throw(exc)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/workers.py", line 176, in exec_as_coro
res = await self.loop.run_in_executor(self.pool, runnable._run, rerun)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/futures.py", line 286, in __await__
yield self # This tells Task to wait for completion.
^^^^^^^^^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/tasks.py", line 375, in __wakeup
future.result()
~~~~~~~~~~~~~^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/futures.py", line 199, in result
raise self._exception.with_traceback(self._exception_tb)
IndexError: list index out of range
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<load_and_run_async() done, defined at /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/helpers.py:579> exception=ValueError("Task wf_firstlevel raised an error, full crash report is here: [PosixPath('/tmp/tmp7a1mu_1l/Workflow_0eb6e0463a0ceca0072d995523b692ee/_error.pklz'), PosixPath('/tmp/tmp7a1mu_1l/Workflow_83f3b6ebcd3806f5c736a327e7dfa59f/_error.pklz')]")>
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1365, in _collect_outputs
val_out = val.get_value(self)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py", line 1030, in get_value
value = get_nested_results(result, depth=split_depth)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py", line 1015, in get_nested_results
val = StateArray[self.type](
get_nested_results(res=r, depth=depth - 1) for r in res
)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py", line 1016, in <genexpr>
get_nested_results(res=r, depth=depth - 1) for r in res
~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py", line 1020, in get_nested_results
raise ValueError(
...<2 lines>...
)
ValueError: Cannot retrieve value for first_level_contrast from wf_firstlevel as the node errored
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/tasks.py", line 304, in __step_run_and_handle_result
result = coro.send(None)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/helpers.py", line 585, in load_and_run_async
await task._run(submitter=submitter, rerun=rerun, **kwargs)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1237, in _run
result.output = self._collect_outputs()
~~~~~~~~~~~~~~~~~~~~~^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1386, in _collect_outputs
raise ValueError(
...<2 lines>...
)
ValueError: Task wf_firstlevel raised an error, full crash report is here: [PosixPath('/tmp/tmp7a1mu_1l/Workflow_0eb6e0463a0ceca0072d995523b692ee/_error.pklz'), PosixPath('/tmp/tmp7a1mu_1l/Workflow_83f3b6ebcd3806f5c736a327e7dfa59f/_error.pklz')]
Task exception was never retrieved
future: <Task finished name='Task-13' coro=<ConcurrentFuturesWorker.exec_as_coro() done, defined at /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/workers.py:173> exception=IndexError('list index out of range')>
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/concurrent/futures/process.py", line 254, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 529, in _run
self._run_task()
~~~~~~~~~~~~~~^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/task.py", line 202, in _run_task
output = cp.loads(self.inputs._func)(**inputs)
File "/tmp/ipykernel_2815/2321826732.py", line 17, in get_firstlevel_dm
run_img = subj_imgs[run_id-1]
~~~~~~~~~^^^^^^^^^^
IndexError: list index out of range
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/tasks.py", line 306, in __step_run_and_handle_result
result = coro.throw(exc)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/workers.py", line 176, in exec_as_coro
res = await self.loop.run_in_executor(self.pool, runnable._run, rerun)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/futures.py", line 286, in __await__
yield self # This tells Task to wait for completion.
^^^^^^^^^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/tasks.py", line 375, in __wakeup
future.result()
~~~~~~~~~~~~~^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/futures.py", line 199, in result
raise self._exception.with_traceback(self._exception_tb)
IndexError: list index out of range
Task exception was never retrieved
future: <Task finished name='Task-11' coro=<load_and_run_async() done, defined at /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/helpers.py:579> exception=ValueError("Tasks ['get_firstlevel_dm'] raised an error")>
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1365, in _collect_outputs
val_out = val.get_value(self)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py", line 1030, in get_value
value = get_nested_results(result, depth=split_depth)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py", line 1020, in get_nested_results
raise ValueError(
...<2 lines>...
)
ValueError: Cannot retrieve value for contrasts from set_contrast as the node errored
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/tasks.py", line 304, in __step_run_and_handle_result
result = coro.send(None)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/helpers.py", line 585, in load_and_run_async
await task._run(submitter=submitter, rerun=rerun, **kwargs)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1237, in _run
result.output = self._collect_outputs()
~~~~~~~~~~~~~~~~~~~~~^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1371, in _collect_outputs
raise ValueError(
f"Tasks {getattr(self, val.name)._errored} raised an error"
)
ValueError: Tasks ['get_firstlevel_dm'] raised an error
Task exception was never retrieved
future: <Task finished name='Task-10' coro=<load_and_run_async() done, defined at /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/helpers.py:579> exception=ValueError("Tasks ['get_firstlevel_dm'] raised an error")>
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1365, in _collect_outputs
val_out = val.get_value(self)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py", line 1030, in get_value
value = get_nested_results(result, depth=split_depth)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py", line 1020, in get_nested_results
raise ValueError(
...<2 lines>...
)
ValueError: Cannot retrieve value for contrasts from set_contrast as the node errored
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/tasks.py", line 304, in __step_run_and_handle_result
result = coro.send(None)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/helpers.py", line 585, in load_and_run_async
await task._run(submitter=submitter, rerun=rerun, **kwargs)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1237, in _run
result.output = self._collect_outputs()
~~~~~~~~~~~~~~~~~~~~~^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1371, in _collect_outputs
raise ValueError(
f"Tasks {getattr(self, val.name)._errored} raised an error"
)
ValueError: Tasks ['get_firstlevel_dm'] raised an error
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<load_and_run_async() done, defined at /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/helpers.py:579> exception=ValueError("Task wf_firstlevel raised an error, full crash report is here: [PosixPath('/tmp/tmp7a1mu_1l/Workflow_520f5ffd532f544f2f8a063b589ffe05/_error.pklz'), PosixPath('/tmp/tmp7a1mu_1l/Workflow_1d94891ef83024fd873d1a94e35071cc/_error.pklz')]")>
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1365, in _collect_outputs
val_out = val.get_value(self)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py", line 1030, in get_value
value = get_nested_results(result, depth=split_depth)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py", line 1015, in get_nested_results
val = StateArray[self.type](
get_nested_results(res=r, depth=depth - 1) for r in res
)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py", line 1016, in <genexpr>
get_nested_results(res=r, depth=depth - 1) for r in res
~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py", line 1020, in get_nested_results
raise ValueError(
...<2 lines>...
)
ValueError: Cannot retrieve value for first_level_contrast from wf_firstlevel as the node errored
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/tasks.py", line 304, in __step_run_and_handle_result
result = coro.send(None)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/helpers.py", line 585, in load_and_run_async
await task._run(submitter=submitter, rerun=rerun, **kwargs)
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1237, in _run
result.output = self._collect_outputs()
~~~~~~~~~~~~~~~~~~~~~^^
File "/usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py", line 1386, in _collect_outputs
raise ValueError(
...<2 lines>...
)
ValueError: Task wf_firstlevel raised an error, full crash report is here: [PosixPath('/tmp/tmp7a1mu_1l/Workflow_520f5ffd532f544f2f8a063b589ffe05/_error.pklz'), PosixPath('/tmp/tmp7a1mu_1l/Workflow_1d94891ef83024fd873d1a94e35071cc/_error.pklz')]
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py:1365, in Workflow._collect_outputs(self)
1364 try:
-> 1365 val_out = val.get_value(self)
1366 output_wf[name] = val_out
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py:1030, in LazyOutField.get_value(self, wf, state_index)
1028 return val
-> 1030 value = get_nested_results(result, depth=split_depth)
1031 value = self._apply_cast(value)
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/specs.py:1020, in LazyOutField.get_value.<locals>.get_nested_results(res, depth)
1019 if res.errored:
-> 1020 raise ValueError(
1021 f"Cannot retrieve value for {self.field} from {self.name} as "
1022 "the node errored"
1023 )
1024 val = res.get_output_field(self.field)
ValueError: Cannot retrieve value for second_level_stats_map from wf_secondlevel as the node errored
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
Cell In[20], line 4
1 from pydra import Submitter
3 with Submitter(plugin='cf', n_procs=1) as submitter:
----> 4 submitter(wf)
6 results = wf.result()
8 print(results)
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/submitter.py:42, in Submitter.__call__(self, runnable, cache_locations, rerun)
40 if cache_locations is not None:
41 runnable.cache_locations = cache_locations
---> 42 self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
43 return runnable.result()
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/nest_asyncio.py:98, in _patch_loop.<locals>.run_until_complete(self, future)
95 if not f.done():
96 raise RuntimeError(
97 'Event loop stopped before Future completed.')
---> 98 return f.result()
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/futures.py:199, in Future.result(self)
197 self.__log_traceback = False
198 if self._exception is not None:
--> 199 raise self._exception.with_traceback(self._exception_tb)
200 return self._result
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/asyncio/tasks.py:304, in Task.__step_run_and_handle_result(***failed resolving arguments***)
300 try:
301 if exc is None:
302 # We use the `send` method directly, because coroutines
303 # don't have `__iter__` and `__next__` methods.
--> 304 result = coro.send(None)
305 else:
306 result = coro.throw(exc)
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/submitter.py:68, in Submitter.submit_from_call(self, runnable, rerun)
66 # 1
67 if runnable.state is None:
---> 68 await runnable._run(self, rerun=rerun)
69 # 3
70 else:
71 await self.expand_runnable(runnable, wait=True, rerun=rerun)
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py:1237, in Workflow._run(self, submitter, rerun, **kwargs)
1235 self.audit.monitor()
1236 await self._run_task(submitter, rerun=rerun)
-> 1237 result.output = self._collect_outputs()
1238 except Exception:
1239 etype, eval, etr = sys.exc_info()
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py:1371, in Workflow._collect_outputs(self)
1369 # checking if the tasks has predecessors that raises error
1370 if isinstance(getattr(self, val.name)._errored, list):
-> 1371 raise ValueError(
1372 f"Tasks {getattr(self, val.name)._errored} raised an error"
1373 )
1374 else:
1375 if isinstance(getattr(self, val.name).output_dir, list):
ValueError: Tasks ['wf_fixed_effect'] raised an error
8.6. Let’s Plot!#
We only use 5 subjects, so it’s reasonable the following plots have nothing survived from testing.
8.6.1. Unthresholded#
Let’s plot the unthresholded image first.
Show code cell source
from IPython.display import Image
ut_list = glob.glob(os.path.join(workflow_out_dir, "secondlevel_unthresholded*.jpg"))
Image(filename=ut_list[0])
---------------------------------------------------------------------------
IndexError Traceback (most recent call last)
Cell In[21], line 3
1 from IPython.display import Image
2 ut_list = glob.glob(os.path.join(workflow_out_dir, "secondlevel_unthresholded*.jpg"))
----> 3 Image(filename=ut_list[0])
IndexError: list index out of range
8.6.2. Cluster Thresholding#
Show code cell source
from IPython.display import Image
ct_list = glob.glob(os.path.join(workflow_out_dir, "secondlevel_cluster_thresholded*.jpg"))
Image(filename=ct_list[0])
---------------------------------------------------------------------------
IndexError Traceback (most recent call last)
Cell In[22], line 3
1 from IPython.display import Image
2 ct_list = glob.glob(os.path.join(workflow_out_dir, "secondlevel_cluster_thresholded*.jpg"))
----> 3 Image(filename=ct_list[0])
IndexError: list index out of range
8.6.3. Multiple Comparison#
Show code cell source
mc_list = glob.glob(os.path.join(workflow_out_dir, "secondlevel_multiple_comp*.jpg"))
Image(filename=mc_list[0])
---------------------------------------------------------------------------
IndexError Traceback (most recent call last)
Cell In[23], line 2
1 mc_list = glob.glob(os.path.join(workflow_out_dir, "secondlevel_multiple_comp*.jpg"))
----> 2 Image(filename=mc_list[0])
IndexError: list index out of range
8.6.4. Paramatric Test#
Show code cell source
pt_list = glob.glob(os.path.join(workflow_out_dir, "secondlevel_paramatric*.jpg"))
Image(filename=pt_list[0])
---------------------------------------------------------------------------
IndexError Traceback (most recent call last)
Cell In[24], line 2
1 pt_list = glob.glob(os.path.join(workflow_out_dir, "secondlevel_paramatric*.jpg"))
----> 2 Image(filename=pt_list[0])
IndexError: list index out of range
8.6.5. Nonparamatric Test#
Show code cell source
npt_list = glob.glob(os.path.join(workflow_out_dir, "secondlevel_permutation*.jpg"))
Image(filename=npt_list[0])
---------------------------------------------------------------------------
IndexError Traceback (most recent call last)
Cell In[25], line 2
1 npt_list = glob.glob(os.path.join(workflow_out_dir, "secondlevel_permutation*.jpg"))
----> 2 Image(filename=npt_list[0])
IndexError: list index out of range
8.7. Exercise #1#
In this example, we conducted GLM on each run per subject separately and then used a fixed-effect model to average across runs.
Where did we put .splitter
and .combiner
. Why did we put it there?
8.8. Exercise #2#
Moreover, We choose this approach due to limited memory on GitHub. FirstLevelModel in Nilearn also allows to compute multiple runs with a fixed-effect model simultanously. Here is an example.
Would you like to give it a try on your own?