import numpy as np
from mne.stats import permutation_cluster_test, permutation_cluster_1samp_test
from scipy.sparse import coo_matrix
import scipy
from mne.stats import ttest_ind_no_p, ttest_1samp_no_p
from mne.viz.topomap import _get_pos_outlines
from mne.utils.check import _check_sphere
import matplotlib.pyplot as plt
import seaborn as sns
import mne
from mne.viz import plot_topomap
from functools import partial
import pandas as pd
from scipy.stats import ttest_ind, ttest_rel
from numpy.random import permutation
from scipy.stats import permutation_test
import os
import matplotlib
from fooof import FOOOF
from fooof.analysis import get_band_peak_fm
from .misc import _fmt
from .misc import df_to_array
# TODO:
# Statistics over sessions in session_wise studies
def _dft(x, plot_sine=False):
n_bins = len(x)
phases = np.linspace(0, 2*np.pi, n_bins, endpoint=False)
c = (x*np.exp(-1j*phases)).sum()*2/n_bins
amp, phase = np.abs(c), np.angle(c)
if plot_sine:
n_bins = len(x)
xs = np.linspace(-0.5, n_bins-0.5, 50)
phases_xs = xs*2*np.pi/n_bins
dy = np.nanmean(x)
phases = np.linspace(phases_xs[0], phases_xs[-1],50)
ys = amp*np.cos(phases + phase)+dy
plt.plot(xs, ys, c='k', linewidth=3, zorder=2)
return amp, _wrap(-phase)
# Each arg in args should be of shape (n_samples, n_features)
# This function averages over n_samples and computes DFT amplitude for each feature
from joblib import Parallel, delayed
def _vectorized_dft_amp(*args):
if args[0].ndim == 1:
args = [arg[:, None] for arg in args]
# Compute mean across samples for each feature
means = np.array([np.nanmean(arg, axis=0) for arg in args])
# Use parallel processing to compute DFT amplitudes
amps = Parallel(n_jobs=-1)(delayed(_dft)(means[:, ix_feature]) for ix_feature in range(means.shape[1]))
# Extract amplitude from the result
amps = np.array([amp[0] for amp in amps])
return amps
def _vectorized_dft_amp_group_same(arr):
"""
Compute DFT amplitude over the phase axis for each participant and connection in parallel.
Parameters
----------
arr : array-like
Array with shape (n_phases, n_participants, n_conns).
Returns
-------
amps : ndarray
Amplitudes with shape (n_participants, n_conns).
"""
arr = np.asarray(arr)
if arr.ndim != 3:
raise ValueError("arr must have shape (n_phases, n_participants, n_conns)")
n_phases, n_participants, n_conns = arr.shape
# helper that computes _dft amplitude for a single (participant, conn) pair
def _proc(pair):
p, c = pair
amp, _ = _dft(arr[:, p, c])
return amp
pairs = [(p, c) for p in range(n_participants) for c in range(n_conns)]
amps_flat = Parallel(n_jobs=-1)(delayed(_proc)(pc) for pc in pairs)
amps = np.array(amps_flat).reshape(n_participants, n_conns)
return amps
def _dft_amp_stat(*args):
return _vectorized_dft_amp(*args)
# For use with cluster/network-based permutation tests with "group_same" option (same optimal phases for all participants)
# This performs permutations within each subject, hacky solution
### orig_args is an array of shape (n_phases, n_participants, n_conns)
def _dft_amp_stat_group_same(*args, orig_args=None):
global first_pass_done
if first_pass_done:
permuted_args = permutation(orig_args)
dft_amps = _vectorized_dft_amp_group_same(permuted_args)
else:
first_pass_done = True
dft_amps = _vectorized_dft_amp_group_same(orig_args)
return np.mean(dft_amps, axis=0)
# For use with cluster/network-based permutation tests with "group_different" option (different optimal phases for each participant)
# This is a hacky solution to make permutation_cluster_test compatible with the SINE FIT BINNED procedure outlined in [1].
# Unfortunately, permutation_cluster_test can't handle trial-level data for multiple participants.
# Therefore, args are ignored and orig_args are used here.
# This function permutes trials across phase bins within each participant.
# Trials are averaged within each phase bin and the DFT amplitude is computed for each participant.
# The group-level test statistic is then the DFT amplitude averaged over participants.
# [1] Zoefel, Benedikt, et al. "How to test for phasic modulation of neural and behavioural responses." Neuroimage 202 (2019): 116175.
### orig_args is a list of arrays, one per participant i, each of shape (n_phases, n_epochs_i, n_conns)
def _dft_amp_stat_group_different(*args, orig_args=None):
global first_pass_done
# Shuffle if original test statistic was computed (first pass done)
if first_pass_done:
for ix_arg in range(len(orig_args)):
n_phases = orig_args[ix_arg].shape[0]
n_epochs = orig_args[ix_arg].shape[1]
ix0, ix1 = np.unravel_index(np.random.permutation(n_phases * n_epochs),
(n_phases, n_epochs))
orig_args[ix_arg] = orig_args[ix_arg][ix0, ix1, :]
orig_args[ix_arg] = orig_args[ix_arg].reshape(n_phases, n_epochs, -1)
else:
first_pass_done = True
all_dft_amps = []
for orig_arg in orig_args:
dft_amps = _vectorized_dft_amp(*orig_arg)
all_dft_amps.append(dft_amps)
return np.nanmean(all_dft_amps, axis=0)
def _wrap(phases):
return np.angle(np.exp(1j*phases))
[docs]
def test_sensor_network_modulation(
df_data,
info,
test_level='participant',
measure='phase_lag_index',
threshold_percentile=95):
"""Test phase-dependent modulation of functional connectivity at the sensor level.
This function performs a network-based permutation test to identify networks whose
connectivity is modulated by CLAM-NIBS, and plots the results.
Parameters:
-----------
df_data : pandas.DataFrame
The DataFrame containing connectivity matrices per participant, or per epoch
for each participant.
info : mne.Info
The Info object for topographic plotting.
test_level : str, optional (default='participant')
The level at which the test should be performed ('participant', 'group_same', or 'group_different').
If test_level='participant', a separate test for phase-dependent modulation
is performed for each participant, such that a sinusoidal function is fit to the data
averaged over trials within each phase bin.
If test_level='group_same', a group-level test is performed, where the optimal target phase is
assumed to be the same for all participants, such that a sinusoidal function is fit to the data
averaged over trials and participants within each phase bin.
If test_level='group_different', a group-level test is performed, where the optimal target phase
is assumed to be different for each participant, such that the fitting is performed at the
participant level, but the test statistic (modulation amplitude) is averaged over participants.
measure : str, optional (default='phase_lag_index')
The connectivity measure employed.
plot : str or bool, optional (default=False)
If string, the plot(s) will be saved at that path.
If True, a figure will be created but not shown or saved.
If False, no figure will be created.
Raises:
-------
Exception:
If the test level is not \'participant\' or \'group\'
"""
df_data = df_data[df_data['measure'] == measure]
if test_level == 'participant':
df_results = _test_sensor_network_modulation_participant(df_data,
info,
measure,
threshold_percentile)
return df_results
elif test_level == 'group_same':
df_data = df_data.groupby([col for col in df_data.columns if col != 'value']).agg({'value' : np.mean}).reset_index()
df_results = _test_sensor_network_modulation_group_same(df_data,
info,
measure,
threshold_percentile)
return df_results
elif test_level == 'group_different':
df_results = _test_sensor_network_modulation_group_different(df_data,
info,
measure,
threshold_percentile)
return df_results
else:
raise Exception(
'Test level should be either \'participant\', \'group_same\', or \'group_different\'')
def _test_sensor_network_modulation_participant(df_data, info, measure, threshold_percentile):
gb_participant = df_data.sort_values('participant').groupby('participant')
df_results = pd.DataFrame()
for participant, df_participant in gb_participant:
gb_target_phases = df_participant.sort_values(
'target_phase').groupby('target_phase')
target_phases = []
data = []
for target_phase, df_target_phase in gb_target_phases:
target_phases.append(target_phase)
data.append(np.stack(df_target_phase['value']))
n_obs = data[0].shape[0]
n_chs = data[0].shape[1]
n_conns = n_chs**2
data = [d.reshape(-1, n_conns) for d in data]
adjacency = np.zeros((n_conns, n_conns))
for ix_conn_1 in range(n_conns):
for ix_conn_2 in range(ix_conn_1 + 1, n_conns):
ix_ch_1 = ix_conn_1 // n_chs
ix_ch_2 = ix_conn_1 % n_chs
ix_ch_3 = ix_conn_2 // n_chs
ix_ch_4 = ix_conn_2 % n_chs
if not {ix_ch_1, ix_ch_2}.isdisjoint({ix_ch_3, ix_ch_4}):
adjacency[ix_conn_1, ix_conn_2] = 1
adjacency += adjacency.T
adjacency = coo_matrix(adjacency)
if len(data) == 2:
# use t-statistic
threshold = scipy.stats.t.ppf(1 - 0.05 / 2, df=n_obs * 2 - 2)
tvals, clusters, pvals, _ = permutation_cluster_test(data,
threshold=threshold,
adjacency=adjacency,
out_type='indices',
step_down_p=0,
t_power=1,
stat_fun=ttest_ind_no_p,
tail=0,
n_jobs=1,
verbose=True)
tvals_unit = 'ttest_ind'
else:
stat_fun = _dft_amp_stat
threshold = np.nanpercentile(
[stat_fun(*[d[:, ix] for d in data]) for ix in range(n_conns)], threshold_percentile)
tvals, clusters, pvals, _ = permutation_cluster_test(data,
threshold=threshold,
adjacency=adjacency,
out_type='indices',
step_down_p=0,
t_power=1,
stat_fun=stat_fun,
tail=0,
n_jobs=1,
verbose=True)
tvals_unit = 'dft_amp'
print('Found {:d} clusters'.format(len(clusters)))
print('p-values: ', pvals)
tvals_sig = []
pvals_sig = []
conns_sig = []
for ix_cluster, (cluster, pval) in enumerate(zip(clusters, pvals)):
if pval < 0.05:
tvals_sig.append(tvals[cluster])
pvals_sig.append(pval)
conns_cluster = []
for ix_conn in cluster[0]:
ix_ch_1 = ix_conn // n_chs
ix_ch_2 = ix_conn % n_chs
conns_cluster.append([ix_ch_1, ix_ch_2])
conns_sig.append(conns_cluster)
df_append = pd.DataFrame(
{
'participant': [participant] *
len(tvals_sig),
't_unit': [tvals_unit] *
len(tvals_sig),
't_values': tvals_sig,
'p_value': pvals_sig,
'connections': conns_sig})
df_results = pd.concat([df_results, df_append])
return df_results
def _test_sensor_network_modulation_group_same(df_data, info, measure, threshold_percentile):
gb_target_phases = df_data.sort_values(
'target_phase').groupby('target_phase')
target_phases = []
data = []
for target_phase, df_target_phase in gb_target_phases:
target_phases.append(target_phase)
data.append(np.stack(df_target_phase['value']))
n_obs = data[0].shape[0]
n_chs = data[0].shape[1]
n_conns = n_chs**2
data = [d.reshape(-1, n_conns) for d in data]
adjacency = np.zeros((n_conns, n_conns))
for ix_conn_1 in range(n_conns):
for ix_conn_2 in range(ix_conn_1 + 1, n_conns):
ix_ch_1 = ix_conn_1 // n_chs
ix_ch_2 = ix_conn_1 % n_chs
ix_ch_3 = ix_conn_2 // n_chs
ix_ch_4 = ix_conn_2 % n_chs
if not {ix_ch_1, ix_ch_2}.isdisjoint({ix_ch_3, ix_ch_4}):
adjacency[ix_conn_1, ix_conn_2] = 1
adjacency += adjacency.T
adjacency = coo_matrix(adjacency)
if len(data) == 2:
data_diff = data[0] - data[1]
# use t-statistic
stat_fun = ttest_1samp_no_p
threshold = np.nanpercentile(
[stat_fun(data_diff[:, ix]) for ix in range(n_conns)], threshold_percentile)
tvals, clusters, pvals, _ = permutation_cluster_1samp_test(data_diff,
threshold=threshold,
adjacency=adjacency,
out_type='indices',
step_down_p=0,
t_power=1,
tail=0,
n_jobs=1,
verbose=True)
tvals_unit = 'ttest_dep'
else:
data = np.array(data)
global first_pass_done
first_pass_done = False
stat_fun = partial(
_dft_amp_stat_group_same,
orig_args=data)
dummy_data = np.empty_like(data)
threshold = np.nanpercentile(stat_fun(dummy_data), threshold_percentile) # this takes the threshold for the real data, not dummy data, because that's how stat_fun is constructed
print(f'Threshold at {threshold_percentile}th percentile: {threshold}')
# Hacky, have to set this to false again, because it is used in permutation_cluster_test
first_pass_done = False
tvals, clusters, pvals, _ = permutation_cluster_test(list(dummy_data),
threshold=threshold,
n_permutations=100,
adjacency=adjacency,
out_type='indices',
step_down_p=0,
t_power=1,
stat_fun=stat_fun,
tail=0,
n_jobs=1,
verbose=True,
buffer_size=n_conns)
tvals_unit = 'dft_amp'
print('Found {:d} clusters'.format(len(clusters)))
print('p-values: ', pvals)
tvals_sig = []
pvals_sig = []
conns_sig = []
for ix_cluster, (cluster, pval) in enumerate(zip(clusters, pvals)):
if pval < 0.05:
tvals_sig.append(tvals[cluster].mean())
pvals_sig.append(pval)
conns_cluster = []
for ix_conn in cluster[0]:
ix_ch_1 = ix_conn // n_chs
ix_ch_2 = ix_conn % n_chs
conns_cluster.append([ix_ch_1, ix_ch_2])
conns_sig.append(conns_cluster)
df_results = pd.DataFrame({'participant': ['all'] * len(tvals_sig),
't_unit': [tvals_unit] * len(tvals_sig),
't_value': tvals_sig,
'p_value': pvals_sig,
'connections': conns_sig})
return df_results
def _test_sensor_network_modulation_group_different(df_data, info, measure, threshold_percentile):
all_data = []
gb_participants = df_data.sort_values('participant').groupby('participant')
for participant, df_participant in gb_participants:
gb_target_phases = df_participant.sort_values(
'target_phase').groupby('target_phase')
target_phases = []
data = []
for target_phase, df_target_phase in gb_target_phases:
target_phases.append(target_phase)
data.append(np.stack(df_target_phase['value']))
n_epochs = np.min([d.shape[0] for d in data])
data = np.array([d[:n_epochs] for d in data])
# data is now (n_phases, n_epochs, n_chs, n_chs)
all_data.append(data)
n_participants = len(all_data)
n_phases = data[0].shape[0]
n_chs = data[0].shape[-1]
n_conns = n_chs**2
all_data = [d.reshape(n_phases, -1, n_conns) for d in all_data]
# each data in all_data is now (n_phases, n_epochs, n_conns)
adjacency = np.zeros((n_conns, n_conns))
for ix_conn_1 in range(n_conns):
for ix_conn_2 in range(ix_conn_1 + 1, n_conns):
ix_ch_1 = ix_conn_1 // n_chs
ix_ch_2 = ix_conn_1 % n_chs
ix_ch_3 = ix_conn_2 // n_chs
ix_ch_4 = ix_conn_2 % n_chs
if not {ix_ch_1, ix_ch_2}.isdisjoint({ix_ch_3, ix_ch_4}):
adjacency[ix_conn_1, ix_conn_2] = 1
adjacency += adjacency.T
adjacency = coo_matrix(adjacency)
# Unfortunately, permutation_cluster_test can't handle single-trial data for multiple participants.
# Therefore, we will pass dummy data of the expected shape here and implement the permutation and
# test statistic computation in _dft_amp_stat_group_different. See _dft_amp_stat_group_different for more information.
global first_pass_done
first_pass_done = False
stat_fun = partial(
_dft_amp_stat_group_different,
orig_args=all_data)
dummy_data = [np.empty((n_participants, n_conns)) for ix in range(n_phases)]
threshold = np.nanpercentile(stat_fun(dummy_data), threshold_percentile) # this takes the threshold for the real data, not dummy data, because that's how stat_fun is constructed
# Hacky, have to set this to false again, because it is used in permutation_cluster_test
first_pass_done = False
tvals, clusters, pvals, _ = permutation_cluster_test(dummy_data,
threshold=threshold,
n_permutations=100,
adjacency=adjacency,
out_type='indices',
step_down_p=0,
t_power=1,
stat_fun=stat_fun,
tail=0,
n_jobs=1,
verbose=True,
buffer_size=n_conns)
print('Found {:d} clusters'.format(len(clusters)))
print('p-values: ', pvals)
tvals_unit = 'dft_amp'
tvals_sig = []
pvals_sig = []
conns_sig = []
for ix_cluster, (cluster, pval) in enumerate(zip(clusters, pvals)):
if pval < 0.05:
tvals_sig.append(tvals[cluster])
pvals_sig.append(pval)
conns_cluster = []
for ix_conn in cluster[0]:
ix_ch_1 = ix_conn // n_chs
ix_ch_2 = ix_conn % n_chs
conns_cluster.append([ix_ch_1, ix_ch_2])
conns_sig.append(conns_cluster)
df_results = pd.DataFrame({'participant': ['group'] * len(tvals_sig),
't_unit': [tvals_unit] * len(tvals_sig),
't_values': tvals_sig,
'p_value': pvals_sig,
'connections': conns_sig})
return df_results
[docs]
def test_sensor_cluster_modulation(
df_data,
info,
test_level='participant',
measure='power',
threshold_percentile=95,
plot=False):
"""Test phase-dependent modulation of arbitrary outcome measure (e.g. power) at the sensor level.
This function performs a cluster-based permutation test to identify clusters of sensors whose
value of the outcome measure is modulated by CLAM-NIBS, and plots the results.
Parameters:
-----------
df_data : pandas.DataFrame
The DataFrame containing outcome measure (for all sensors) per participant, or per epoch
for each participant.
info : mne.Info
The Info object for topographic plotting.
test_level : str, optional (default='participant')
The level at which the test should be performed ('participant', 'group_same', or 'group_different').
If test_level='participant', a separate test for phase-dependent modulation
is performed for each participant, such that a sinusoidal function is fit to the data
averaged over trials within each phase bin.
If test_level='group_same', a group-level test is performed, where the optimal target phase is
assumed to be the same for all participants, such that a sinusoidal function is fit to the data
averaged over trials and participants within each phase bin.
If test_level='group_different', a group-level test is performed, where the optimal target phase
is assumed to be different for each participant, such that the fitting is performed at the
participant level, but the test statistic (modulation amplitude) is averaged over participants.
measure : str, optional (default='power')
The outcome measure employed.
plot : str or bool, optional (default=False)
If string, the plot(s) will be saved at that path.
If True, a figure will be created but not shown or saved.
If False, no figure will be created.
Raises:
-------
Exception:
If the test level is not \'participant\' or \'group\'
"""
df_data = df_data[df_data['measure'] == measure]
if test_level == 'participant':
df_results = _test_sensor_cluster_modulation_participant(df_data,
info,
measure,
threshold_percentile,
plot)
return df_results
elif test_level == 'group_same':
raise Exception('test_sensor_cluster_modulation does not yet support test_level=\'group_same\'')
elif test_level == 'group_different':
df_results = _test_sensor_cluster_modulation_group_different(df_data,
info,
measure,
threshold_percentile,
plot)
return df_results
else:
raise Exception(
'Test level should be either \'participant\', \'group_same\', or \'group_different\'')
def _test_sensor_cluster_modulation_participant(df_data, info, measure, threshold_percentile, plot):
gb_participant = df_data.sort_values('participant').groupby('participant')
df_results = pd.DataFrame()
for participant, df_participant in gb_participant:
gb_target_phases = df_participant.sort_values(
'target_phase').groupby('target_phase')
target_phases = []
data = []
for target_phase, df_target_phase in gb_target_phases:
target_phases.append(target_phase)
data.append(np.stack(df_target_phase['value']))
n_obs = data[0].shape[0]
n_chs = data[0].shape[1]
adjacency = mne.channels.find_ch_adjacency(info, None)[0]
if len(data) == 2:
# use t-statistic
threshold = scipy.stats.t.ppf(1 - 0.05 / 2, df=n_obs * 2 - 2)
tvals, clusters, pvals, _ = permutation_cluster_test(data,
threshold=threshold,
adjacency=adjacency,
out_type='indices',
step_down_p=0,
t_power=1,
stat_fun=ttest_ind_no_p,
tail=0,
n_jobs=1,
verbose=True)
tvals_unit = 'ttest_ind'
else:
stat_fun = _dft_amp_stat
threshold = np.nanpercentile(
[stat_fun(*[d[:, ix] for d in data]) for ix in range(n_chs)], threshold_percentile)
tvals, clusters, pvals, _ = permutation_cluster_test(data,
threshold=threshold,
adjacency=adjacency,
out_type='indices',
step_down_p=0,
t_power=1,
stat_fun=stat_fun,
tail=0,
n_jobs=1,
verbose=True)
tvals_unit = 'dft_amp'
print('Found {:d} clusters'.format(len(clusters)))
print('p-values: ', pvals)
tvals_sig = []
pvals_sig = []
channels_sig = []
mask_sig = np.zeros(n_chs).astype(bool)
for ix_cluster, (cluster, pval) in enumerate(zip(clusters, pvals)):
if pval < 0.05:
tvals_sig.append(tvals[cluster])
pvals_sig.append(pval)
channels_sig.append(info.ch_names[cluster])
mask_sig[cluster] = True
if plot:
im = plot_topomap(
tvals,
info,
sensors=False,
mask=mask_sig,
contours=0)[0]
cbar = plt.colorbar(im)
cbar.set_label(
'Modulation of {} \n ({})'.format(
_fmt(measure), _fmt(tvals_unit)))
plt.title(
'{}, Modulation of {}'.format(
participant,
_fmt(measure)))
plt.tight_layout()
df_append = pd.DataFrame(
{
'participant': [participant] *
len(tvals_sig),
't_unit': [tvals_unit] *
len(tvals_sig),
't_values': tvals_sig,
'p_value': pvals_sig,
'channels': channels_sig})
df_results = pd.concat([df_results, df_append])
return df_results
def _test_sensor_cluster_modulation_group_different(df_data, info, measure, threshold_percentile, plot):
all_data = []
gb_participants = df_data.sort_values('participant').groupby('participant')
for participant, df_participant in gb_participants:
gb_target_phases = df_participant.sort_values(
'target_phase').groupby('target_phase')
target_phases = []
data = []
for target_phase, df_target_phase in gb_target_phases:
target_phases.append(target_phase)
data.append(np.stack(df_target_phase['value']))
n_epochs = np.min([d.shape[0] for d in data])
data = np.array([d[:n_epochs] for d in data])
# data is now (n_phases, n_epochs, n_chs)
all_data.append(data)
adjacency = mne.channels.find_ch_adjacency(info, None)[0]
n_participants = len(all_data)
n_phases = all_data[0].shape[0]
n_chs = all_data[0].shape[2]
# Unfortunately, permutation_cluster_test can't handle single-trial data for multiple participants.
# Therefore, we will pass dummy data of the expected shape here and implement the permutation and
# test statistic computation in _dft_amp_stat_group_different. See _dft_amp_stat_group_different for more information.
global first_pass_done
first_pass_done = False
stat_fun = partial(
_dft_amp_stat_group_different,
orig_args=all_data)
dummy_data = [np.empty((n_participants, n_chs)) for ix in range(n_phases)]
threshold = np.nanpercentile(stat_fun(dummy_data), threshold_percentile)
# Hacky, have to set this to false again, because it is used in permutation_cluster_test
first_pass_done = False
tvals, clusters, pvals, _ = permutation_cluster_test(dummy_data,
threshold=threshold,
n_permutations=100,
adjacency=adjacency,
out_type='indices',
step_down_p=0,
t_power=1,
stat_fun=stat_fun,
tail=0,
n_jobs=1,
verbose=True,
buffer_size=n_chs)
print('Found {:d} clusters'.format(len(clusters)))
print('p-values: ', pvals)
tvals_unit = 'dft_amp'
tvals_sig = []
pvals_sig = []
channels_sig = []
mask_sig = np.zeros(n_chs).astype(bool)
for ix_cluster, (cluster, pval) in enumerate(zip(clusters, pvals)):
if pval < 0.05:
tvals_sig.append(tvals[cluster])
pvals_sig.append(pval)
channels_sig.append(info.ch_names[cluster])
mask_sig[cluster] = True
if plot:
im = plot_topomap(
tvals,
info,
sensors=False,
mask=mask_sig,
contours=0)[0]
cbar = plt.colorbar(im)
cbar.set_label(
'Modulation of {} \n ({})'.format(
_fmt(measure),
_fmt(tvals_unit)))
plt.title('Group, Modulation of {}'.format(_fmt(measure)))
plt.tight_layout()
df_results = pd.DataFrame({'participant': ['group'] * len(tvals_sig),
't_unit': [tvals_unit] * len(tvals_sig),
't_values': tvals_sig,
'p_value': pvals_sig,
'channels': channels_sig})
return df_results
[docs]
def test_modulation(
df_data,
test_level='participant',
measure='power',
agg_func=np.nanmean,
plot=False,
plot_mode='box_strip'):
"""Test phase-dependent modulation of arbitrary outcome measure (e.g. power).
This function performs a permutation test to identify phase-dependent modulation
of the outcome measure by CLAM-NIBS, and plots the results.
Parameters:
-----------
df_data : pandas.DataFrame
The DataFrame containing outcome measure per participant, or per epoch
for each participant.
test_level : str, optional (default='participant')
The level at which the test should be performed ('participant', 'group_same', or 'group_different').
If test_level='participant', a separate test for phase-dependent modulation
is performed for each participant, such that a sinusoidal function is fit to the data
averaged over trials within each phase bin.
If test_level='group_same', a group-level test is performed, where the optimal target phase is
assumed to be the same for all participants, such that a sinusoidal function is fit to the data
averaged over trials and participants within each phase bin.
If test_level='group_different', a group-level test is performed, where the optimal target phase
is assumed to be different for each participant, such that the fitting is performed at the
participant level, but the test statistic (modulation amplitude) is averaged over participants.
measure : str, optional (default='power')
The outcome measure employed.
agg_func : callable, optimal (default=np.nanmean)
The function used to aggregate across trials within each phase bin (e.g. np.nanmean or np.nanstd).
plot : str or bool, optional (default=False)
If string, the plot(s) will be saved at that path.
If True, a figure will be created but not shown or saved.
If False, no figure will be created.
plot_mode : str, optional (default='box_strip')
The plotting mode. If all data points should be shown, use 'box_strip' for a boxplot and stripplot.
If data points should only be visualized as aggregated measures (e.g. accuracy across all trials or
variability across all ECG RR intervals), use 'bar' for a barplot.
Raises:
-------
Exception:
If the test level is not \'participant\' or \'group\'
"""
if np.any(np.isin(['open-loop', 'no-stim'], df_data['target_phase'])):
raise Exception('test_modulation tests for phase-dependent modulation, it does not '
'support open-loop or no-stim conditions')
df_data = df_data[df_data['measure'] == measure]
if test_level == 'participant':
df_results = _test_modulation_participant(df_data=df_data, measure=measure, agg_func=agg_func, plot=plot, plot_mode=plot_mode)
return df_results
elif test_level == 'group_same':
df_data = average_over_trials(df_data, agg_func=agg_func)
df_results = _test_modulation_group_same(df_data=df_data, measure=measure, agg_func=np.nanmean, plot=plot, plot_mode=plot_mode)
return df_results
elif test_level == 'group_different':
if np.any(np.isin(['optimal', 'suboptimal'], df_data['target_phase'])):
raise Exception("test_level='group_different' is not supported for 'optimal'/'suboptimal' conditions, "
"because these conditions are comparable across participants by design. "
"Use test_level='group_same' instead.")
df_results = _test_modulation_group_different(df_data=df_data, measure=measure, agg_func=agg_func, plot=plot, plot_mode=plot_mode)
return df_results
else:
raise Exception(
'Test level should be either \'participant\', \'group_same\', or \'group_different\'')
def _test_modulation_group_different(df_data, measure, agg_func, plot, plot_mode):
# Collect per-participant data: list of (list of per-phase trial arrays)
gb_participants = df_data.sort_values('participant').groupby('participant')
participants = []
all_participant_data = [] # outer: participants, inner: phases, each: 1-D array of trial values
target_phases_global = None
for participant, df_participant in gb_participants:
participants.append(participant)
gb_target_phase = df_participant.sort_values('target_phase').groupby('target_phase')
target_phases = []
participant_data = []
for target_phase, df_target_phase in gb_target_phase:
target_phases.append(target_phase)
participant_data.append(df_target_phase['value'].to_numpy())
all_participant_data.append(participant_data)
if target_phases_global is None:
target_phases_global = target_phases
def _participant_dft_amp(participant_data):
avgs = np.array([agg_func(phase_data) for phase_data in participant_data])
amp, _ = _dft(avgs)
return amp
def _group_stat(data_list):
return np.nanmean([_participant_dft_amp(pd) for pd in data_list])
observed_stat = _group_stat(all_participant_data)
# Permutation: shuffle trials across phase bins within each participant independently
n_resamples = 1000
null_distribution = np.empty(n_resamples)
for i in range(n_resamples):
permuted_data_list = []
for participant_data in all_participant_data:
all_trials = np.concatenate(participant_data)
np.random.shuffle(all_trials)
sizes = [len(phase_data) for phase_data in participant_data]
permuted_participant_data = []
idx = 0
for size in sizes:
permuted_participant_data.append(all_trials[idx:idx + size])
idx += size
permuted_data_list.append(permuted_participant_data)
null_distribution[i] = _group_stat(permuted_data_list)
pval = np.mean(null_distribution >= observed_stat)
tval = observed_stat
tval_unit = 'dft_amp'
if plot:
per_participant_amps = [_participant_dft_amp(pd) for pd in all_participant_data]
plt.figure()
df_plot = pd.DataFrame({'Participant': participants, measure: per_participant_amps})
if plot_mode == 'bar':
sns.barplot(df_plot, x='Participant', y=measure, color='k', alpha=0.5, errorbar=None)
else:
sns.barplot(df_plot, x='Participant', y=measure, color='k', alpha=0.5, errorbar=None)
sns.stripplot(df_plot, x='Participant', y=measure, color='r', alpha=0.8)
plt.ylabel('DFT Amplitude ({})'.format(measure))
plt.title('dft_amp = {:.3e}, p = {:.3e}'.format(tval, pval))
sns.despine()
if isinstance(plot, str):
matplotlib.rcParams['pdf.fonttype'] = 42
matplotlib.rcParams['ps.fonttype'] = 42
sns.set_context('paper')
if not os.path.exists(plot):
os.makedirs(plot, exist_ok=True)
plt.savefig('{}_modulation_group_different.pdf'.format(measure))
plt.close()
df_results = pd.DataFrame({'participant': ['group'],
't_unit': [tval_unit],
't_value': [tval],
'p_value': [pval]})
return df_results
def _test_modulation_participant(df_data, measure, agg_func, plot, plot_mode):
df_results = pd.DataFrame()
gb_participants = df_data.sort_values('participant').groupby('participant')
for participant, df_participant in gb_participants:
gb_target_phase = df_participant.sort_values('target_phase').groupby('target_phase')
target_phases = []
target_measures = []
for target_phase, df_target_phase in gb_target_phase:
target_phases.append(target_phase)
target_measures.append(df_target_phase['value'].to_numpy())
if len(target_phases) == 2:
tval, pval = ttest_ind(target_measures[0], target_measures[1])
tval_unit = 'ttest_ind'
else:
res = permutation_test(target_measures,
lambda *x: _dft(np.array([agg_func(x_) for x_ in x]))[0],
permutation_type='independent',
alternative='greater',
n_resamples=1000)
tval = res.statistic
pval = res.pvalue
tval_unit = 'dft_amp'
if plot:
plt.figure()
has_string_phases = any(isinstance(ph, str) for ph in target_phases)
x_label = 'Condition' if has_string_phases else 'Target Phase (°)'
x = np.concatenate([[target_phases[ix]] * len(target_measures[ix])
for ix in range(len(target_phases))])
x = [ph if isinstance(ph, str) else round(np.rad2deg(ph)) for ph in x]
y = np.concatenate(target_measures)
df_plot = pd.DataFrame(
{x_label: x, '{}'.format(measure): y})
df_plot_agg = df_plot.sort_values(x_label).groupby(x_label) \
.agg({'{}'.format(measure) : agg_func}).reset_index()
if plot_mode == 'box_strip':
sns.boxplot(
df_plot,
x=x_label,
y='{}'.format(measure),
color='k',
boxprops=dict(
alpha=0.5),
showmeans=True,
zorder=0,
showfliers=False)
sns.stripplot(
df_plot,
x=x_label,
y='{}'.format(measure),
color='r',
alpha=0.8,
zorder=1)
elif plot_mode == 'bar':
sns.barplot(
df_plot_agg,
x=x_label,
y='{}'.format(measure),
color='k',
alpha=0.5,
errorbar=None,
zorder=0)
if len(target_phases) == 2:
plt.title('t = {:.3e}, p = {:.3e}'.format(tval, pval))
else:
avgs = df_plot_agg['{}'.format(measure)].to_numpy()
_dft(avgs, plot_sine=True)
plt.title('dft_amp = {:.3e}, p = {:.3e}'.format(tval, pval))
sns.despine()
if isinstance(plot, str):
matplotlib.rcParams['pdf.fonttype'] = 42
matplotlib.rcParams['ps.fonttype'] = 42
sns.set_context('paper')
if not os.path.exists(plot):
os.makedirs(plot,exist_ok=True)
plt.savefig('{}_modulation_{}.pdf'.format(measure, participant))
plt.close()
df_append = pd.DataFrame({'participant': [participant],
't_unit': [tval_unit],
't_value': [tval],
'p_value': [pval]})
df_results = pd.concat([df_results, df_append])
return df_results
def _test_modulation_group_same(df_data, measure, agg_func, plot, plot_mode):
gb_target_phase = df_data.sort_values('target_phase').groupby('target_phase')
target_phases = []
target_measures = []
for target_phase, df_target_phase in gb_target_phase:
target_phases.append(target_phase)
target_measures.append(df_target_phase.sort_values('participant')['value'].to_numpy())
if len(target_phases) == 2:
tval, pval = ttest_rel(target_measures[0], target_measures[1])
tval_unit = 'ttest_rel'
else:
res = permutation_test(target_measures,
lambda *x: _dft(np.array([agg_func(x_) for x_ in x]))[0],
permutation_type='samples',
alternative='greater',
n_resamples=1000)
tval = res.statistic
pval = res.pvalue
tval_unit = 'dft_amp'
if plot:
has_string_phases = any(isinstance(ph, str) for ph in target_phases)
x_label = 'Condition' if has_string_phases else 'Target Phase (°)'
x = np.concatenate([[target_phases[ix]] * len(target_measures[ix])
for ix in range(len(target_phases))])
x = [ph if isinstance(ph, str) else round(np.rad2deg(ph)) for ph in x]
y = np.concatenate(target_measures)
df_plot = pd.DataFrame(
{x_label: x, '{}'.format(measure): y})
df_plot_agg = df_plot.sort_values(x_label).groupby(x_label) \
.agg({'{}'.format(measure) : agg_func}).reset_index()
if plot_mode == 'box_strip':
sns.boxplot(
df_plot,
x=x_label,
y='{}'.format(measure),
color='k',
boxprops=dict(
alpha=.5),
showmeans=True,
zorder=0,
showfliers=False)
sns.stripplot(
df_plot,
x=x_label,
y='{}'.format(measure),
color='r',
alpha=0.8,
zorder=1)
elif plot_mode == 'bar':
sns.barplot(
df_plot_agg,
x=x_label,
y='{}'.format(measure),
color='k',
alpha=0.5,
errorbar=None,
zorder=0)
if len(target_phases) == 2:
plt.title('t = {:.3e}, p = {:.3e}'.format(tval, pval))
else:
avgs = df_plot_agg['{}'.format(measure)].to_numpy()
_dft(avgs, plot_sine=True)
plt.title('dft_amp = {:.3e}, p = {:.3e}'.format(tval, pval))
sns.despine()
if isinstance(plot, str):
matplotlib.rcParams['pdf.fonttype'] = 42
matplotlib.rcParams['ps.fonttype'] = 42
sns.set_context('paper')
if not os.path.exists(plot):
os.makedirs(plot,exist_ok=True)
plt.savefig('{}_modulation_group.pdf'.format(measure))
plt.close()
df_results = pd.DataFrame({'participant': ['group'],
't_unit': [tval_unit],
't_value': [tval],
'p_value': [pval]})
return df_results
def _group_mean_psds(group):
stacked = np.vstack(group['value'])
mean = np.nanmean(stacked, axis=0)
return pd.Series([mean])
[docs]
def average_over_trials(df, agg_func=np.nanmean):
"""Average over trials for each participant and target phase.
Parameters:
-----------
df : pandas.DataFrame
The DataFrame containing the data to be averaged.
agg_func : callable
The function used to average the data (e.g. np.nanmean or np.nanmedian).
Returns:
--------
pandas.DataFrame
The DataFrame with averaged values.
"""
df_avg = df.groupby([col for col in df.columns if col != 'value']).agg({'value' : agg_func}).reset_index()
return df_avg
[docs]
def test_modulation_psd(
df_data,
test_level='participant',
measure='power',
freq_lim_tol=0,
plot=False,
plot_mode='box_strip'):
"""Test phase-dependent modulation of power, frequency, or aperiodic exponent from power spectral density.
This function performs a permutation test to identify phase-dependent modulation
of the outcome measure by CLAM-NIBS, and plots the results.
Parameters:
-----------
df_data : pandas.DataFrame
The DataFrame containing outcome measure per participant, or per epoch
for each participant.
test_level : str, optional (default='participant')
The level at which the test should be performed ('participant', 'group_same', or 'group_different').
If test_level='participant', a separate test for phase-dependent modulation
is performed for each participant, such that a sinusoidal function is fit to the data
averaged over trials within each phase bin.
If test_level='group_same', a group-level test is performed, where the optimal target phase is
assumed to be the same for all participants, such that a sinusoidal function is fit to the data
averaged over trials and participants within each phase bin.
If test_level='group_different', a group-level test is performed, where the optimal target phase
is assumed to be different for each participant, such that the fitting is performed at the
participant level, but the test statistic (modulation amplitude) is averaged over participants.
measure : str, optional (default='power')
The outcome measure employed ('power', 'frequency', or 'aperiodic').
freq_lim_tol : float, optional (default=0)
The tolerance by which assessed peaks in the power spectrum can deviate in
frequency from the target frequency range set in the data.
plot : str or bool, optional (default=False)
If string, the plot(s) will be saved at that path.
If True, a figure will be created but not shown or saved.
If False, no figure will be created.
plot_mode : str, optional (default='box_strip')
The plotting mode. If all data points should be shown, use 'box_strip' for a boxplot and stripplot.
If data points should only be visualized as aggregated measures (e.g. accuracy across all trials or
variability across all ECG RR intervals), use 'bar' for a barplot.
Raises:
-------
Exception:
If the test level is not \'participant\' or \'group\'
"""
if np.any(np.isin(['open-loop', 'no-stim'], df_data['target_phase'])):
raise Exception('test_modulation_psd tests for phase-dependent modulation, it does not '
'support open-loop or no-stim conditions')
if measure not in ['power', 'frequency', 'aperiodic']:
raise Exception('Only power, frequency, and aperiodic exponent are supported')
df_data = df_data[df_data['measure'] == 'psd']
if test_level == 'participant':
df_results = _test_modulation_psd_participant(df_data=df_data, measure=measure, freq_lim_tol=freq_lim_tol, plot=plot)
return df_results
elif test_level == 'group_same':
agg_func = partial(_fooof_agg,
measure=measure,
freqs=df_data.attrs['freqs'],
l_freq_target=df_data.attrs['l_freq_target'] - freq_lim_tol,
h_freq_target=df_data.attrs['h_freq_target'] + freq_lim_tol)
df_data = average_over_trials(df_data, agg_func=agg_func)
df_results = _test_modulation_group_same(df_data=df_data, measure=measure, agg_func=np.nanmean, plot=plot, plot_mode=plot_mode)
return df_results
elif test_level == 'group_different':
if np.any(np.isin(['optimal', 'suboptimal'], df_data['target_phase'])):
raise Exception("test_level='group_different' is not supported for 'optimal'/'suboptimal' conditions, "
"because these conditions are comparable across participants by design. "
"Use test_level='group_same' instead.")
df_results = _test_modulation_group_different(df_data=df_data, measure=measure, agg_func=agg_func, plot=plot, plot_mode=plot_mode)
return df_results
else:
raise Exception(
'Test level should be either \'participant\', \'group_same\', or \'group_different\'')
def _fooof_agg(x, measure, freqs, l_freq_target, h_freq_target):
# If we are aggregating over a single column of PSDs in a DataFrame
if isinstance(x, pd.Series):
return_scalar = True
x = [np.mean(x, axis=0)]
# If we are aggregating separately over multiple lists of PSDs (one per target phase)
else:
return_scalar = False
x = [x_.mean(axis=0) for x_ in x]
agg_measures = []
for x_ in x:
agg_measures.append(extract_psd_measure(freqs, x_, l_freq_target, h_freq_target, measure=measure))
# fm.plot()
# plt.axvline(freq, color='r', linestyle='--')
# plt.title('{:.2f} Hz, {:.2f} a.u.'.format(freq, pow))
# plt.show()
if return_scalar:
return agg_measures[0]
else:
return agg_measures
def _fooof_stat(x, measure, freqs, l_freq_target, h_freq_target, stat):
agg_measures = _fooof_agg(x, measure, freqs, l_freq_target, h_freq_target)
if stat == 'ttest':
return agg_measures[0] - agg_measures[1]
elif stat == 'dft':
return _dft(agg_measures)[0]
def _test_modulation_psd_participant(df_data, measure, freq_lim_tol, plot):
df_results = pd.DataFrame()
gb_participants = df_data.sort_values('participant').groupby('participant')
for participant, df_participant in gb_participants:
gb_target_phase = df_participant.sort_values('target_phase').groupby('target_phase')
target_phases = []
target_psds = []
for target_phase, df_target_phase in gb_target_phase:
target_phases.append(target_phase)
target_psds.append(df_target_phase['value'].to_numpy())
if len(target_phases) == 2:
stat = 'ttest'
alternative = 'two-sided'
else:
stat = 'dft'
alternative = 'greater'
stat_fun = partial(_fooof_stat,
measure=measure,
freqs=df_data.attrs['freqs'],
l_freq_target=df_data.attrs['l_freq_target'] - freq_lim_tol,
h_freq_target=df_data.attrs['h_freq_target'] + freq_lim_tol,
stat=stat)
res = permutation_test(target_psds,
lambda *x: stat_fun(x),
permutation_type='independent',
alternative=alternative,
n_resamples=1000)
tval = res.statistic
pval = res.pvalue
tval_unit = stat
if plot:
plt.figure()
has_string_phases = any(isinstance(ph, str) for ph in target_phases)
x_label = 'Condition' if has_string_phases else 'Target Phase (°)'
x = target_phases
x = [ph if isinstance(ph, str) else round(np.rad2deg(ph)) for ph in x]
y = _fooof_agg(target_psds,
measure=measure,
freqs=df_data.attrs['freqs'],
l_freq_target=df_data.attrs['l_freq_target'] - freq_lim_tol,
h_freq_target=df_data.attrs['h_freq_target'] + freq_lim_tol)
df_plot = pd.DataFrame(
{x_label: x, '{}'.format(measure): y})
sns.barplot(
df_plot,
x=x_label,
y='{}'.format(measure),
color='k',
alpha=0.5,
errorbar=None,
zorder=0)
if len(target_phases) == 2:
plt.title('t = {:.3e}, p = {:.3e}'.format(tval, pval))
else:
avgs = df_plot['{}'.format(measure)].to_numpy()
_dft(avgs, plot_sine=True)
plt.title('dft_amp = {:.3e}, p = {:.3e}'.format(tval, pval))
sns.despine()
if isinstance(plot, str):
matplotlib.rcParams['pdf.fonttype'] = 42
matplotlib.rcParams['ps.fonttype'] = 42
sns.set_context('paper')
if not os.path.exists(plot):
os.makedirs(plot,exist_ok=True)
plt.savefig('{}_modulation_{}.pdf'.format(measure, participant))
plt.close()
df_append = pd.DataFrame({'participant': [participant],
't_unit': [tval_unit],
't_value': [tval],
'p_value': [pval]})
df_results = pd.concat([df_results, df_append])
return df_results
# This needs to be fixed
# def _test_modulation_psd_group_different(df_data, measure, freq_lim_tol, plot):
# gb_target_phase = df_data.sort_values('target_phase').groupby('target_phase')
# target_phases = []
# target_psds = []
# for target_phase, df_target_phase in gb_target_phase:
# target_phases.append(target_phase)
# target_psds.append(df_target_phase.sort_values('participant')['value'].to_numpy())
# if len(target_phases) == 2:
# stat = 'ttest'
# alternative = 'two-sided'
# else:
# stat = 'dft'
# alternative = 'greater'
# stat_fun = partial(_fooof_stat,
# measure=measure,
# freqs=df_data.attrs['freqs'],
# l_freq_target=df_data.attrs['l_freq_target'] - freq_lim_tol,
# h_freq_target=df_data.attrs['h_freq_target'] + freq_lim_tol,
# stat=stat)
# res = permutation_test(target_psds,
# lambda *x: stat_fun(x),
# permutation_type='independent',
# alternative=alternative,
# n_resamples=1000)
# tval = res.statistic
# pval = res.pvalue
# tval_unit = stat
# if plot:
# plt.figure()
# x = target_phases
# x = [round(np.rad2deg(ph)) for ph in x]
# y = _fooof_agg(target_psds,
# measure=measure,
# freqs=df_data.attrs['freqs'],
# l_freq_target=df_data.attrs['l_freq_target'] - freq_lim_tol,
# h_freq_target=df_data.attrs['h_freq_target'] + freq_lim_tol)
# df_plot = pd.DataFrame(
# {'Target Phase (°)': x, '{}'.format(measure): y})
# sns.barplot(
# df_plot,
# x='Target Phase (°)',
# y='{}'.format(measure),
# color='k',
# alpha=0.5,
# errorbar=None,
# zorder=0)
# if len(target_phases) == 2:
# plt.title('t = {:.3e}, p = {:.3e}'.format(tval, pval))
# else:
# avgs = df_plot['{}'.format(measure)].to_numpy()
# _dft(avgs, plot_sine=True)
# plt.title('dft_amp = {:.3e}, p = {:.3e}'.format(tval, pval))
# sns.despine()
# if isinstance(plot, str):
# matplotlib.rcParams['pdf.fonttype'] = 42
# matplotlib.rcParams['ps.fonttype'] = 42
# sns.set_context('paper')
# if not os.path.exists(plot):
# os.makedirs(plot,exist_ok=True)
# plt.savefig('{}_modulation_group.pdf'.format(measure))
# plt.close()
# df_results = pd.DataFrame({'participant': ['group'],
# 't_unit': [tval_unit],
# 't_value': [tval],
# 'p_value': [pval]})
# return df_results
[docs]
def get_network_modulation_amp_phase(df_network_results, df_network_data):
"""Get amplitude (depth) and optimal phase of modulation of connectivity
in a network.
Parameters:
-----------
df_network_results: pandas.DataFrame
The dataframe containing the connections which were modulated at the
participant- or group-level.
df_network_data: pandas.DataFrame
The dataframe containing the connectivity matrix for each trial
and participant.
"""
participants = np.unique(df_network_data['participant'])
measure = df_network_data['measure'].iloc[0]
df_results = pd.DataFrame()
for participant in participants:
df_network_data_participant = df_network_data[df_network_data['participant'] == participant]
df_network_results_participant = df_network_results[df_network_results['participant'] == participant]
network_data_participant, target_phases = df_to_array(df_network_data_participant)
# this is now (n_phases, n_epochs, n_chs, n_chs)
for ix_cluster, cluster in df_network_results_participant.iterrows():
t_value = np.nanmean(cluster['t_values']) # average over t-values per connection in cluster
p_value = cluster['p_value'] # this is one p-value for the cluster
ixs_row, ixs_col = np.array(cluster['connections']).T # this contains the indices marking connections between sensors in cluster
cluster_data = network_data_participant[:, :, ixs_row, ixs_col].mean(-1)
avgs = cluster_data.mean(-1)
amp, phase = _dft(avgs)
df_append = pd.DataFrame({'participant':[participant],
'measure':[measure],
'cluster':[ix_cluster],
'amplitude_raw':[amp],
'amplitude_perc':[2*100*amp/np.nanmean(avgs)],
'phase_rad':[phase],
'phase_deg':[round(np.rad2deg(phase))]})
df_results = pd.concat([df_results, df_append])
return df_results