Source code for clamnibs.misc

from mne.preprocessing import find_ecg_events
import mne
from scipy.stats import ttest_ind
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from math import degrees
from statistics import mode
from .base import RawCLAM, EpochsCLAM, EpochsCLAMVariable
from scipy.io import savemat
import warnings
from scipy.stats import zscore

[docs] def concat_dfs(dfs): attrs = dfs[-1].attrs for df in dfs: df.attrs = {} df = pd.concat(dfs) df.attrs = attrs return df
def _get_main_target_phase(marker_definition, events): target_codes = marker_definition.keys() events = events[np.isin(events[:, 2], target_codes)] trial_codes = events[:-1, 2] trial_durations = np.diff(events[:, 0]) total_durations = [] for target_code in target_codes: total_durations.append(trial_durations[trial_codes == target_code].sum()) main_code = target_codes[np.argmax(total_durations)] return marker_definition[main_code] def _get_ixs_goods(obj): types = obj.get_channel_types() if not np.all(np.array(types[:obj.n_chs]) == 'eeg'): raise Exception('Data should contain all EEG channels, including bads') return [ix for ix, ch in enumerate( obj.ch_names[:obj.n_chs]) if ch not in obj.info['bads']] def _wrap(phase): return np.angle(np.exp(1j * phase)) def _circmean(phase): return np.angle(np.mean(np.exp(1j * phase))) def _pli(phase1, phase2): return np.abs(np.mean(np.sign(_wrap(phase1 - phase2)))) def _fmt(string): return string.replace( '_', ' ').title().replace( ' Of ', ' of ').replace( ' And ', ' and ') def _get_trial_target_codes_cwm_error(raw): events = mne.events_from_annotations(raw)[0] n_events = len(events) target_codes = raw.marker_definition.keys() trial_target_codes = [] trial_errors = [] for ix_event in range(n_events): if ix_event < n_events - 3: event = events[ix_event] if event[2] in target_codes: # assert np.all(events[ix_event+1:ix_event+13,2]>=50) true_color = np.sum(events[ix_event + 1:ix_event + 7, 2]) - 300 # assert -1 <= true_color <= 366 est_color = np.sum(events[ix_event + 7:ix_event + 13, 2]) - 300 # assert -1 <= est_color <= 366 error = true_color - est_color if error < -180: error += 360 if error > 180: error -= 360 assert -180 <= error <= 180 trial_target_codes.append(event[2]) trial_errors.append(error) trial_target_codes, trial_errors = np.array( trial_target_codes), np.array(trial_errors) return trial_target_codes, trial_errors def _get_trial_target_codes_binary_accuracy(raw, correct_codes, incorrect_codes): events = mne.events_from_annotations(raw)[0] n_events = len(events) target_codes = raw.marker_definition.keys() trial_target_codes = [] trial_accuracies = [] for ix_event in range(n_events): event = events[ix_event] if event[2] in target_codes: for ix_event_post in range(ix_event+1, n_events): event_post = events[ix_event_post] if event_post[2] in correct_codes: trial_target_codes.append(event[2]) trial_accuracies.append(1) break if event_post[2] in incorrect_codes: trial_target_codes.append(event[2]) trial_accuracies.append(0) break if event_post[2] in target_codes: warnings.warn("A target event code marking a trial was found \ without a following correct/incorrect marker", UserWarning) break trial_target_codes, trial_accuracies = np.array( trial_target_codes), np.array(trial_accuracies) return trial_target_codes, trial_accuracies
[docs] def compute_single_trial_behavior(raw, measure='binary_accuracy', correct_codes=[10], incorrect_codes=[11, 12]): """ Compute single-trial task performance and assign it to CLAM-NIBS target phase. Applicable to any task with trial-by-trial responses. Parameters: ----------- raw : RawCLAM The RawCLAM object containing the raw data to analyze. measure : str, optional The method used to compute task performance. Either 'binary' (default) or 'cwm'. Returns: -------- pandas.DataFrame A DataFrame containing the computed performance values and CLAM-NIBS target phase for each trial. Raises: ------- Exception If the input raw data is not of type RawCLAM. If the modulation of task performance is attempted on data without CLAM-tACS stimulation. Notes: ------ - This function only works if the data contains a target phase marker at the beginning of each trial. """ if measure not in ['binary_accuracy', 'cwm_error']: raise Exception( 'Method to compute working memory error must be \'binary_accuracy\' or \'cwm_error\'') if not isinstance(raw, RawCLAM): raise Exception('compute_single_trial_behavior can only be applied to RawCLAM objects') marker_definition = raw.marker_definition participant = raw.participant session = raw.session design = raw.design if not raw.is_stim: raise Exception( 'Single-trial performance with target phases can only be computed on data with CLAM-NIBS') match measure: case 'binary_accuracy': trial_target_codes, trial_values = _get_trial_target_codes_binary_accuracy(raw, correct_codes, incorrect_codes) case 'cwm_error': trial_target_codes, trial_values = _get_trial_target_codes_cwm_error(raw) trial_values = np.abs(trial_values) trial_target_phases = [marker_definition.get(x) for x in trial_target_codes] if design == 'session_wise': main_trial_code = mode(trial_target_codes) mask = trial_target_codes == main_trial_code trial_target_codes = trial_target_codes[mask] trial_values = trial_values[mask] df_result = pd.DataFrame({'participant': [participant] * len(trial_values), 'session': [session] * len(trial_values), 'design': [design] * len(trial_values), 'target_phase': trial_target_phases, 'measure': [measure] * len(trial_values), 'value': trial_values}) return df_result
[docs] def compute_single_trial_rr(raw, end_codes=None): """ Compute single-trial RR-intervals and assign them to CLAM-NIBS target phase. Parameters: ----------- raw : RawCLAM The RawCLAM object containing the raw data to analyze. end_codes : list of int or None, optional If provided, use variable-length epochs defined by start markers (from marker_definition) to end markers (end_codes). If None, use fixed-length epochs (default). Returns: -------- pandas.DataFrame A DataFrame containing the computed RR intervals and CLAM-NIBS target phases. Raises: ------- Exception If the input raw data is not of type RawCLAM. If the method is attempted on data without CLAM-tACS stimulation. """ if not isinstance(raw, RawCLAM): raise Exception('compute_single_trial_rr can only be applied to RawCLAM objects') marker_definition = raw.marker_definition target_codes = list(marker_definition.keys()) sfreq = raw.info['sfreq'] participant = raw.participant session = raw.session design = raw.design events = mne.events_from_annotations(raw)[0] if not raw.is_stim: raise Exception( 'Single-trial RR intervals can only be computed on data with CLAM-tACS') if design == 'trial_wise': n_targets = len(target_codes) events_ecg = find_ecg_events(raw, ch_name='ecg')[0] if end_codes is not None: epochs = EpochsCLAMVariable(raw, end_codes=end_codes, apply_hil=False) trial_target_codes = [] trial_rrs = [] for ix_ep in range(len(epochs)): ev = epochs.events[ix_ep] ep_start = ev[0] ep_duration_samples = epochs._data[ix_ep].shape[1] ep_end = ep_start + ep_duration_samples this_trial_rpeaks = [] for ev_ecg in events_ecg: if ev_ecg[0] > ep_start and ev_ecg[0] < ep_end: this_trial_rpeaks.append(ev_ecg[0]) if len(this_trial_rpeaks) >= 2: this_trial_rrs = np.diff(this_trial_rpeaks) / sfreq trial_target_codes.extend([ev[2]] * len(this_trial_rrs)) trial_rrs.extend(this_trial_rrs) else: tmin = raw.tmin tmax = raw.tmax events_trials = mne.events_from_annotations(raw)[0] events_trials = events_trials[np.isin(events_trials[:, 2], target_codes)] trial_target_codes = [] trial_rrs = [] for ev_trial in events_trials: this_trial_rpeaks = [] for ev_ecg in events_ecg: if ev_ecg[0] > ev_trial[0] + tmin * \ sfreq and ev_ecg[0] < ev_trial[0] + tmax * sfreq: this_trial_rpeaks.append(ev_ecg[0]) if len(this_trial_rpeaks) >= 2: this_trial_rrs = np.diff(this_trial_rpeaks) / sfreq trial_target_codes.extend([ev_trial[2]] * len(this_trial_rrs)) trial_rrs.extend(this_trial_rrs) trial_target_phases = [marker_definition.get(x) for x in trial_target_codes] else: events_ecg = find_ecg_events(raw, ch_name='ecg')[0] trial_rrs = np.diff(events_ecg[:,0]) / sfreq main_target_phase = _get_main_target_phase(marker_definition, events) trial_target_phases = [main_target_phase] * len(trial_rrs) trial_rrs = np.array(trial_rrs) trial_target_phases = np.array(trial_target_phases, dtype=object) zscores = zscore(trial_rrs) mask = (zscores > -1.6) & (zscores < 1.6) trial_rrs = trial_rrs[mask] trial_target_phases = trial_target_phases[mask] df_result = pd.DataFrame({'participant': [participant] * len(trial_rrs), 'session': [session] * len(trial_rrs), 'design': [design] * len(trial_rrs), 'target_phase': trial_target_phases, 'measure': ['rr_interval'] * len(trial_rrs), 'value': trial_rrs}) return df_result
[docs] def compute_single_trial_scr(raw, end_codes=None): """ Compute single-trial area under the curve (AUC) of the skin conductance response (SCR) [1], and assign it to CLAM-NIBS target phase. Parameters: ----------- raw : RawCLAM The RawCLAM object containing the raw data to analyze. end_codes : list of int or None, optional If provided, use variable-length epochs defined by start markers (from marker_definition) to end markers (end_codes). If None, use fixed-length epochs (default). Returns: -------- pandas.DataFrame A DataFrame containing the computed SCR AUC and CLAM-NIBS target phases. Raises: ------- Exception If the input raw data is not of type RawCLAM. If the method is attempted on data without CLAM-tACS stimulation. [1] Bach, Dominik R., Karl J. Friston, and Raymond J. Dolan. "Analytic measures for quantification of arousal from spontaneous skin conductance fluctuations." International journal of psychophysiology 76.1 (2010): 52-55. """ if not isinstance(raw, RawCLAM): raise Exception('compute_single_trial_scr can only be applied to RawCLAM objects') marker_definition = raw.marker_definition target_codes = list(marker_definition.keys()) sfreq = raw.info['sfreq'] participant = raw.participant session = raw.session design = raw.design events = mne.events_from_annotations(raw)[0] if not raw.is_stim: raise Exception( 'Single-trial SCR AUC can only be computed on data with CLAM-tACS') raw = raw.copy() raw.pick_channels(['eda']) raw.filter(0.1, 5, picks='all') raw._data -= raw._data.min() if design == 'trial_wise': if end_codes is not None: epochs = EpochsCLAMVariable(raw, end_codes=end_codes, apply_hil=False) epoch_aucs = [ep.squeeze().sum(-1) for ep in epochs.get_data()] epoch_aucs = np.array(epoch_aucs) else: epochs = EpochsCLAM(raw, apply_hil=False) epoch_aucs = epochs.get_data().squeeze().sum(-1) else: epoch_aucs = raw.get_data().squeeze().sum(-1) if design == 'trial_wise': epoch_target_phases = [marker_definition.get(x) for x in epochs.events[:, 2]] else: epoch_target_phases = [_get_main_target_phase(marker_definition, events)] df_result = pd.DataFrame({'participant': [participant] * len(epoch_aucs), 'session': [session] * len(epoch_aucs), 'design': [design] * len(epoch_aucs), 'target_phase': epoch_target_phases, 'measure': ['scr_auc'] * len(epoch_aucs), 'value': epoch_aucs}) return df_result
[docs] def save_calibration_data(obj, folder_path, phase_delay=None): exclude_idx_file_path = '{}\\exclude_idx.mat'.format(folder_path) p_target_file_path = '{}\\P_TARGET_{:d}.mat'.format(folder_path, int(obj.n_chs)) flip_file_path = '{}\\flip.mat'.format(folder_path) exclude_idx = np.sort([obj.ch_names.index(ch) + 1 for ch in obj.info["bads"]]) data_dict = {"exclude_idx": exclude_idx} savemat(exclude_idx_file_path, data_dict) data_dict = {'P_TARGET_%i'%obj.n_chs:obj.forward_full} savemat(p_target_file_path, data_dict) data_dict = {'flip':obj.flip} savemat(flip_file_path, data_dict) if phase_delay is not None: phase_delay_file_path = '{}\\phase_delay.mat'.format(folder_path) data_dict = {'phase_delay':phase_delay} savemat(phase_delay_file_path, data_dict)
[docs] def df_to_array(df_data): """ Convert a DataFrame containing one data point (trial or participant) per row to an ndarray. Parameters: ----------- df_data : pandas.DataFrame The DataFrame object containing the data to convert. Returns: -------- numpy.ndarray An ndarray of shape (n_phases, n_datapoints, ...). """ gb_target_phases = df_data.sort_values( 'target_phase').groupby('target_phase') target_phases = [] data = [] for target_phase, df_target_phase in gb_target_phases: target_phases.append(target_phase) data.append(np.stack(df_target_phase['value'])) n_epochs = np.min([d.shape[0] for d in data]) data = np.array([d[:n_epochs] for d in data]) # data is now (n_phases, n_epochs, ...) return data, target_phases