import pickle
import pandas as pd
import json
import os
import re
import glob
from pathlib import Path
import mne
import numpy as np
[docs]
def make_config(path=None):
"""
Create a configuration dictionary for a neuroimaging preprocessing pipeline.
This function generates configuration settings for preprocessing, feature extraction,
spectral analysis, and other relevant parameters used in processing EEG/MEG data.
Optionally, it saves the generated configuration to a JSON file in the specified path.
Parameters
----------
path : str, optional
The directory path where the configuration file should be saved. If not provided,
the configuration is not saved to a file.
Returns
-------
config : dict
The configuration dictionary containing settings for preprocessing, feature extraction,
and analysis.
Notes
-----
- The generated configuration includes settings for ICA preprocessing, spectral
estimation, and feature extraction for EEG/MEG data.
- Default values are provided for the majority of settings.
- If `path` is provided, a `.json` file containing the configuration will be saved.
"""
# preprocess configurations =================================================
# downsample data
config = dict()
# You could also set layout to None to have high
# choices: all, lobe, None
config["which_layout"] = "all"
# which sensor type should be used
# choices: meg, mag, grad, eeg, opm
config["which_sensor"] = "meg"
# config['fs'] = 1000
# ICA configuration
config["ica_n_component"] = 30
config["ica_max_iter"] = 800
config["ica_method"] = "fastica"
# lower and upper cutoff frequencies in a bandpass filter
config["cutoffFreqLow"] = 1
config["cutoffFreqHigh"] = 45
config["resampling_rate"] = 1000
config["digital_filter"] = True
config["notch_filter"] = False
config["apply_ica"] = True
config["auto_ica_corr_thr"] = 0.9
# options are "average", "REST", and None
config["rereference_method"] = "average"
# variance threshold across time
config["mag_var_threshold"] = 4e-12
config["grad_var_threshold"] = 4000e-13
config["eeg_var_threshold"] = 40e-6
# flatness threshold across time
config["mag_flat_threshold"] = 10e-15
config["grad_flat_threshold"] = 10e-15
config["eeg_flat_threshold"] = 40e-6
# variance thershold across channels
config["zscore_std_thresh"] = 15 # change this
# segmentation ==============================================
# start time of the raw data to use in seconds, this is to avoid possible eye blinks in close-eyed resting state.
config["segments_tmin"] = 20
# end time of the raw data to use in seconds, this is to avoid possible eye blinks in close-eyed resting state.
config["segments_tmax"] = -20
# length of MEG segments in seconds
config["segments_length"] = 10
# amount of overlap between MEG sigals in seconds
config["segments_overlap"] = 2
# PSD ==============================================
# Spectral estimation method
config["psd_method"] = "welch"
# amount of overlap between windows in Welch's method
config["psd_n_overlap"] = 1
config["psd_n_fft"] = 2
# number of samples in psd
config["psd_n_per_seg"] = 2
# fooof analysis configurations ==============================================
# Desired frequency range to run FOOOF
config["fooof_freq_range_low"] = 3
config["fooof_freq_range_high"] = 40
config["fooof_freq_range_low"] = 3
config["fooof_freq_range_high"] = 40
# which mode should be used for fitting; choices (knee, fixed)
config["aperiodic_mode"] = "knee"
# minimum acceptable peak width in fooof analysis
config["fooof_peak_width_limits"] = [1.0, 12.0]
# Absolute threshold for detecting peaks
config["fooof_min_peak_height"] = 0
# Relative threshold for detecting peaks
config["fooof_peak_threshold"] = 2
# feature extraction ==========================================================
# Define frequency bands
config["freq_bands"] = {
"Theta": (3, 8),
"Alpha": (8, 13),
"Beta": (13, 30),
"Gamma": (30, 40),
# 'Broadband': (3, 40)
}
# Define individualized frequency range over main peaks in each freq band
config["individualized_band_ranges"] = {
"Theta": (-2, 3),
"Alpha": (-2, 3), # change to (-4,2)
"Beta": (-8, 9),
"Gamma": (-5, 5),
}
# least acceptable R squred of fitted models
config["min_r_squared"] = 0.9
config["feature_categories"] = {
"Offset": False,
"Exponent": False,
"Peak_Center": False,
"Peak_Power": False,
"Peak_Width": False,
"Adjusted_Canonical_Relative_Power": True,
"Adjusted_Canonical_Absolute_Power": False,
"Adjusted_Individualized_Relative_Power": False,
"Adjusted_Individualized_Absolute_Power": False,
"OriginalPSD_Canonical_Relative_Power": False,
"OriginalPSD_Canonical_Absolute_Power": False,
"OriginalPSD_Individualized_Relative_Power": False,
"OriginalPSD_Individualized_Absolute_Power": False,
}
config["fooof_res_save_path"] = None
config["random_state"] = 42
if path is not None:
out_file = open(os.path.join(path, "configuration.json"), "w")
json.dump(config, out_file, indent=6)
out_file.close()
return config
[docs]
def storeFooofModels(path, subjId, fooofModels, psds, freqs) -> None:
"""
Stores the periodic and aperiodic results from FOOOF analysis in a pickle file.
This function saves the FOOOF models, the power spectral densities (PSDs),
and the associated frequency data for a given subject into a `.pickle` file.
The data is appended to the file for each subject.
Parameters
----------
path : str
Directory path where the results will be saved.
subjId : str
The subject ID for which the results are saved.
fooofModels : object
The FOOOF model object containing the periodic and aperiodic components.
psds : ndarray
Power Spectral Densities (PSDs) calculated for the subject.
freqs : ndarray
Frequency values corresponding to the PSDs.
Returns
-------
None
This function does not return any value; it writes the results to a file.
"""
with open(os.path.join(path, subjId + ".pickle"), "wb") as file:
pickle.dump([fooofModels, psds, freqs], file)
[docs]
def separate_eyes_open_close_eeglab(
input_base_path,
output_base_path,
annotation_description_open,
annotation_description_close,
trim_before=5,
trim_after=5,
):
# Ensure output directory exists
if not os.path.exists(output_base_path):
os.makedirs(output_base_path)
search_pattern = os.path.join(input_base_path, "*/eeg/*_task-rest_eeg.set")
raw_set_paths = glob.glob(
search_pattern, recursive=True
) # Use glob to find all .set files in the input directory
# Loop through all found .set files
for set_path in raw_set_paths:
subject_id = Path(set_path).parts[
-3
] # Extract subject number from the file path
subject_output_path = os.path.join(
output_base_path, subject_id, "eeg"
) # Create the subject-specific output path
# Ensure output directory for the subject exists
if not os.path.exists(subject_output_path):
os.makedirs(subject_output_path)
# Load the raw .set file (EEGLAB format)
raw = mne.io.read_raw(set_path, preload=True)
# Extract annotations
annotations = raw.annotations
# Separate eyes open and eyes closed events
eyes_open_events = annotations[
annotations.description == annotation_description_open
]
eyes_closed_events = annotations[
annotations.description == annotation_description_close
]
# Extract and concatenate eyes open segments
eyes_open_data = []
for onset, duration in zip(eyes_open_events.onset, eyes_open_events.duration):
if duration <= trim_before + trim_after:
print(
f"Skipping event with onset {onset} and duration {duration} (invalid after trimming)"
)
continue
# Trim the first 5s and last 5s from each event
trimmed_onset = onset + trim_before
trimmed_duration = duration - trim_before - trim_after
start_sample = int(trimmed_onset * raw.info["sfreq"])
stop_sample = int((trimmed_onset + trimmed_duration) * raw.info["sfreq"])
eyes_open_data.append(raw[:, start_sample:stop_sample][0])
if eyes_open_data:
eyes_open_data_concat = np.concatenate(eyes_open_data, axis=1)
raw_eyes_open = mne.io.RawArray(eyes_open_data_concat, raw.info)
# Save eyes open data as a new .set file
eyes_open_file_path = os.path.join(
subject_output_path, f"{subject_id}_task-eyesopen_eeg.set"
)
mne.export.export_raw(
eyes_open_file_path, raw_eyes_open, fmt="eeglab", overwrite=True
)
# Extract and concatenate eyes closed segments
eyes_closed_data = []
for onset, duration in zip(
eyes_closed_events.onset, eyes_closed_events.duration
):
if duration <= trim_before + trim_after:
print(
f"Skipping event with onset {onset} and duration {duration} (invalid after trimming)"
)
continue
trimmed_onset = onset + trim_before
trimmed_duration = duration - trim_before - trim_after
start_sample = int(trimmed_onset * raw.info["sfreq"])
stop_sample = int((trimmed_onset + trimmed_duration) * raw.info["sfreq"])
eyes_closed_data.append(raw[:, start_sample:stop_sample][0])
if eyes_closed_data:
eyes_closed_data_concat = np.concatenate(eyes_closed_data, axis=1)
raw_eyes_closed = mne.io.RawArray(eyes_closed_data_concat, raw.info)
# Save eyes closed data as a new .set file
eyes_closed_file_path = os.path.join(
subject_output_path, f"{subject_id}_task-eyesclosed_eeg.set"
)
mne.export.export_raw(
eyes_closed_file_path, raw_eyes_closed, fmt="eeglab", overwrite=True
)
[docs]
def merge_fidp_demo(
datasets_paths: list,
features_dir: str,
dataset_names: list,
drop_columns: list = ["eyes"],
):
"""
Merge demographic metadata and extracted features into a single DataFrame.
This function loads demographic data and feature data,
assigns a site label to each participant if missing, removes unnecessary columns,
and merges demographic information with corresponding extracted features.
Parameters
----------
datasets_paths : list
List of paths to the dataset directories containing demographic files
('participants_bids.tsv').
features_dir : str
Path to the directory containing the extracted features ('all_features.csv').
dataset_names : list of str
List of dataset names corresponding to each dataset path. Used to populate
missing 'site' information if necessary.
drop_columns : list of str, optional
Columns to drop from the demographic data before merging. Default is ["eyes"].
Returns
-------
data : pandas.DataFrame
Merged DataFrame containing both demographic information and feature data,
with participants indexed as strings.
Raises
------
FileNotFoundError
If the 'participants_bids.tsv' file is missing in any of the dataset paths or
the 'all_features.csv' file is missing in the provided features directory.
"""
# Initialize empty DataFrame
demographic_df = pd.DataFrame()
# Loop through dataset paths
for counter, dataset_path in enumerate(datasets_paths):
demo_path = os.path.join(dataset_path, "participants_bids.tsv")
if not os.path.exists(demo_path):
raise FileNotFoundError(
f"The file 'participants_bids.tsv' is missing from the directory: {dataset_path}. "
"This file must be created using the 'make_demo_file_bids' function and placed in "
"the corresponding dataset directory."
)
demo = pd.read_csv(demo_path, sep="\t", index_col=0)
demo.index = demo.index.astype(str)
if "site" not in demo.columns:
demo["site"] = dataset_names[counter]
demographic_df = pd.concat([demographic_df, demo], axis=0)
# Drop unnecessary columns
demographic_df.drop(columns=drop_columns, errors="ignore", inplace=True)
# Load features
feature_path = os.path.join(features_dir, "all_features.csv")
if not os.path.exists(feature_path):
raise FileNotFoundError(
f"The file 'all_features.csv' is missing in the directory: {features_dir}."
)
features_df = pd.read_csv(feature_path, index_col=0)
features_df.index = features_df.index.astype(str)
# Merge demographic and features
data = demographic_df.join(features_df, how="inner")
data.index.name = None
return data
[docs]
def factorize_columns(df: pd.DataFrame, columns: list):
"""
Factorizes specified columns in the DataFrame.
For the 'diagnosis' column, it assigns 0 to 'control' and factorizes the rest.
Parameters
----------
df : pandas.DataFrame
The DataFrame containing the columns to be factorized.
columns : list
List of column names to be factorized.
Returns
-------
df : pandas.DataFrame
DataFrame with factorized columns.
"""
for col in columns:
if col in df.columns:
if col == "diagnosis":
# Drop rows where diagnosis is NaN
df = df.dropna(subset=["diagnosis"])
# Assign 0 to 'control' and factorize the rest
df["diagnosis"] = np.where(
df["diagnosis"] == "control",
0,
pd.factorize(df["diagnosis"])[0] + 1,
)
else:
# Factorize other columns
df[col] = pd.factorize(df[col])[0]
return df
[docs]
def normalize_column(df, column="age", normalizer=100):
"""
Normalizes a specified column in the DataFrame by dividing its values by the given normalizer.
Parameters
----------
df : pandas.DataFrame
The DataFrame containing the column to be normalized.
column : str, optional
The column to be normalized (default is "age").
normalizer : float or None, optional
The value by which the column will be divided. If None, the column will not be normalized.
Returns
-------
df : pandas.DataFrame
DataFrame with the normalized column.
Raises
------
KeyError
If the specified column does not exist in the DataFrame.
ValueError
If the normalizer is not a positive numeric value.
"""
# Check if column exists in DataFrame
if column not in df.columns:
raise KeyError(f"Column '{column}' not found in DataFrame.")
# Check if the normalizer is a valid positive number
if normalizer is not None and (
not isinstance(normalizer, (int, float)) or normalizer <= 0
):
raise ValueError(
f"Normalizer should be a positive numeric value, got {normalizer}."
)
# Normalize the column if a valid normalizer is provided
if normalizer:
df[column] = df[column] / normalizer
return df
[docs]
def separate_patient_data(df, diagnosis: list):
"""
Separates patients' data from control data based on the diagnosis column.
Parameters
----------
df : pandas.DataFrame
The DataFrame containing the patient data.
diagnosis : list of str
A list of diagnosis values used to separate patients' data.
Returns
-------
df : pandas.DataFrame
The DataFrame containing only control data (after dropping the 'diagnosis' column).
df_patient : pandas.DataFrame
The DataFrame containing the patient data.
Raises
------
KeyError
If the 'diagnosis' column is not found in the DataFrame.
"""
# Ensure the 'diagnosis' column exists in the DataFrame
if "diagnosis" not in df.columns:
raise KeyError("The 'diagnosis' column is missing in the DataFrame.")
# Separate the patient data based on the 'diagnosis' column
df_patient = df[df["diagnosis"].isin(diagnosis)]
# Filter the control data and drop the 'diagnosis' column
df = df[df["diagnosis"] == "control"].drop(columns="diagnosis", errors="ignore")
return df, df_patient
[docs]
def merge_datasets_with_glob(datasets):
"""
Merges file paths across multiple datasets using glob pattern matching.
This function walks through the provided datasets' base directories to find
subject folders and file paths matching a specified task and file ending. It
creates a dictionary mapping each subject to a glob pattern that can be used
to aggregate files across multiple runs or sessions.
Parameters
----------
datasets : dict
Dictionary where each key is a dataset name, and each value is a dictionary
with the following keys:
- "base_dir" (str): Base directory containing subject subdirectories.
- "task" (str): Task keyword to search for in filenames.
- "ending" (str): File ending (e.g., '.nii.gz') to filter relevant files.
Returns
-------
dict
A dictionary mapping subject IDs to a glob-style path string that aggregates
all matching files for that subject. Only subjects with at least one matched
file are included.
Notes
-----
This function is designed to assist in scenarios where each subject may have
multiple files (e.g., different runs or sessions), and the goal is to create
a single pattern that can be used to load all related files for a subject.
"""
subjects = {}
for dataset_name, dataset_info in datasets.items():
base_dir = dataset_info["base_dir"]
dirs = [
d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))
]
subjects.update({subj: [] for subj in dirs})
paths = glob.glob(
f"{datasets[dataset_name]["base_dir"]}/**/*{datasets[dataset_name]["task"]}*{datasets[dataset_name]["ending"]}",
recursive=True,
)
# Walk through the base directory to find subject directories
for subject_dir in dirs:
pattern = os.path.join(datasets[dataset_name]["base_dir"], subject_dir)
subjects[subject_dir].extend(
list(filter(lambda path: path.startswith(pattern), paths))
)
def join_with_star(lst):
if len(lst) == 1:
return lst[0] + "*"
return "*".join(lst)
# add this part to main parallel when you want to concatenate
# different run
subjects = dict(filter(lambda item: item[1], subjects.items()))
subjects = {key: join_with_star(value) for key, value in subjects.items()}
return subjects
[docs]
def make_demo_file_bids(
file_dir: str, save_dir: str, id_col: int, age_col: int, *columns
) -> None:
"""
Convert formats of demographic data into a single format so it can be used
in later stages.
Parameters
----------
file_dir : str
Path to the input demographic file (supports CSV, TSV, or XLSX).
save_dir : str
Path where the BIDS-formatted demographic file will be saved (as TSV).
id_col : int
Column index containing the participant ID.
age_col : int
Column index containing participant age.
*extra_columns : dict
Additional column definitions. While age and participants id were defined
using positional arguments, extra coulmn modification (e.g., sex and eyes
condition) can be revised and converted to a single format across dataset
using this function. Each dict can contain:
- 'col_name': str, required name for the output column. This does not
necessarly match the column name before being passed to this function.
- 'col_id': int, index of the column that the revision should be applied to.
- 'single_value': value to assign to all rows if no col_id and mapping are given.
This can be helpful when all subjects in a dataset have the same properties
e.g., eyes open condition.
- 'mapping': dict, if single value is not defined, value mapping can be passed
to map the initial values to the target values.
Returns
-------
None
"""
for col in columns:
if col.get("single_value") and col.get("mapping"):
raise ValueError(
"'single_value' and 'mapping' can not be both defined. One of them must be None; see the documentation!"
)
# Load input file based on extension
if file_dir.endswith(".xlsx"):
df = pd.read_excel(file_dir)
elif file_dir.endswith(".csv"):
df = pd.read_csv(file_dir)
elif file_dir.endswith(".tsv"):
df = pd.read_csv(file_dir, sep="\t")
else:
raise ValueError(f"Unsupported file type for: {file_dir}")
# Initialize new dataframe with required fields
new_df = pd.DataFrame(
{"participant_id": df.iloc[:, id_col], "age": df.iloc[:, age_col]}
)
for col in columns:
col_name = col.get("col_name")
col_id = col.get("col_id")
mapping = col.get("mapping")
single_value = col.get("single_value")
if col_name is None:
raise ValueError("Each column dictionary must contain a 'col_name'.")
if col_id is not None:
new_df[col_name] = df.iloc[:, col_id]
if mapping:
new_df[col_name] = new_df[col_name].map(mapping)
elif single_value is not None:
new_df[col_name] = single_value
else:
raise ValueError(
f"Column '{col_name}' must have either 'col_id' or 'single_value'."
)
# Special case handling
if col_name == "diagnosis":
new_df[col_name] = new_df[col_name].fillna("nan")
# Remove duplicate participants
new_df = new_df.drop_duplicates(subset="participant_id", keep="first")
# Save as BIDS-compatible TSV
new_df.to_csv(save_dir, sep="\t", index=False)