Source code for meganorm.src.preprocess

import os
import mne
from mne_icalabel import label_components
import json
import numpy as np
import glob
from typing import Any, Dict
import pandas as pd

import warnings

warnings.filterwarnings("ignore")


[docs] def find_ica_component(ica, data, physiological_signal, auto_ica_corr_thr): """ Identifies independent components that their correlation with physiological signals (ECG or EOG) is higher than a threshold. Parameters ---------- ica : object The fitted ICA object using MNE. data : mne.io.Raw The raw MEG/EEG data used to extract independent components. physiological_signal : np.ndarray The physiological signal (ECG or EOG) to compare with independent componentss. auto_ica_corr_thr : float Pearson correlation threshold (between 0 and 1) for accepting a component as noise. Returns ------- list Index of the component with the highest correlation if it exceeds the threshold. Returns an empty list if no component meets the criterion. """ components = ica.get_sources(data.copy()).get_data() if components.shape[1] != len(physiological_signal): raise ValueError( "Length of physiological signal must match the number of time points in the data." ) corr = np.corrcoef(components, physiological_signal)[-1, :-1] if np.max(corr) >= auto_ica_corr_thr: componentIndx = [int(np.argmax(corr))] else: componentIndx = [] return componentIndx
[docs] def auto_ica( data, physiological_sensor, n_components=30, ica_max_iter=1000, IcaMethod="fastica", which_sensor={"meg": True, "eeg": True}, auto_ica_corr_thr=0.9, ): """ Performs automated ICA for artifact removal by identifying components that correlate highly with physiological signals (ECG or EOG) which is determined by 'auto_ica_corr_thr'. Parameters ---------- data : mne.io.Raw Raw MEG/EEG data. physiological_sensor : str Name of the physiological sensor ('ECG' or 'EOG'). n_components : int or float Number of ICA components to retain. ica_max_iter : int Maximum number of iterations for the ICA algorithm. IcaMethod : str ICA algorithm to use (e.g., 'fastica', 'picard', 'infomax'). which_sensor : dict Dictionary indicating sensor types to include (e.g., {'meg': True, 'eeg': True}). auto_ica_corr_thr : float Threshold for accepting independent component as noisy based on correlation with the corresponding physiological recording (ECG or EOG). Returns ------- data : mne.io.Raw Raw data with bad ICA components removed (in-place modification). ICA_flag : bool True if no bad components were found, False otherwise. """ # Get physiological signal physiological_signal = data.copy().pick(picks=physiological_sensor).get_data() # Pick MEG/EEG for ICA data = data.pick_types( meg=which_sensor.get("meg", False), eeg=which_sensor.get("eeg", False), ref_meg=False, eog=True, ecg=True, ) # ICA initialization ica = mne.preprocessing.ICA( n_components=n_components, max_iter=ica_max_iter, method=IcaMethod, random_state=42, verbose=False, ) ica.fit(data, verbose=False, picks=["eeg", "meg"]) # Detect components correlated with physiological signal bad_components = [] for sensor in physiological_signal: bad_components.extend( find_ica_component( ica=ica, data=data, physiological_signal=sensor, auto_ica_corr_thr=auto_ica_corr_thr, ) ) print("Bad Components identified by auto ICA:", bad_components) if bad_components: ica.exclude = bad_components.copy() ica.apply(data, verbose=False) ICA_flag = False else: ICA_flag = True return data, ICA_flag
[docs] def auto_ica_with_mean( data, n_components=30, ica_max_iter=1000, IcaMethod="fastica", which_sensor={"meg": True, "eeg": True}, auto_ica_corr_thr=0.9, ): """ Performs ICA-based artifact rejection using MNE’s built-in ECG correlation method. This function creates a synthetic ECG signal (by avergaing across magnetometers or Gradiometers) and use it to find and remove the noisy independent component. Parameters ---------- data : mne.io.Raw Raw MEG/EEG data. n_components : int, optional Number of ICA components to retain, by default 30. ica_max_iter : int, optional Maximum number of iterations for the ICA algorithm, by default 1000. IcaMethod : str, optional ICA algorithm to use (e.g., 'fastica', 'picard', 'infomax'), by default "fastica". which_sensor : dict, optional Dictionary specifying sensor types to include (e.g., {"meg": True, "eeg": True}), by default {"meg": True, "eeg": True}. auto_ica_corr_thr : float, optional Correlation threshold for detecting ECG-related components, by default 0.9. Returns ------- mne.io.Raw Raw data with ECG-related ICA components removed. """ data = data.pick_types( meg=which_sensor["meg"] | which_sensor["mag"] | which_sensor["grad"], eeg=which_sensor["eeg"], ref_meg=False, eog=True, ecg=True, ) ica = mne.preprocessing.ICA( n_components=n_components, max_iter=ica_max_iter, method=IcaMethod, random_state=42, verbose=False, ) ica.fit(data, verbose=False, picks=["eeg", "meg"]) ecg_indices, _ = ica.find_bads_ecg( data, method="correlation", threshold=auto_ica_corr_thr, measure="correlation" ) ica.exclude = ecg_indices ica.apply(data, verbose=False) return data
[docs] def AutoIca_with_IcaLabel( data, physiological_noise_type, n_components=30, ica_max_iter=1000, IcaMethod="infomax", iclabel_thr=0.8, ): if physiological_noise_type == "ecg": physiological_noise_type = "heart beat" if physiological_noise_type == "eog": physiological_noise_type = "eye blink" # fit ICA ica = mne.preprocessing.ICA( n_components=n_components, max_iter=ica_max_iter, method=IcaMethod, random_state=42, fit_params=dict(extended=True), verbose=False, ) # fit_params=dict(extended=True) bc icalabel is trained with this ica.fit(data, verbose=False, picks=["eeg"]) # apply ICLabel labels = label_components(data, ica, method="iclabel") # Identify and exclude artifact components based on probability threshold of being an artifact bad_components = [] for idx, label in enumerate(labels["labels"]): if ( label == physiological_noise_type and labels["y_pred_proba"][idx] > iclabel_thr ): bad_components.append(idx) print("Bad Components identified by ICALabel:", bad_components) ica.exclude = bad_components.copy() ica.apply(data, verbose=False) return data
[docs] def prepare_eeg_data(data, path): """ Prepare EEG data by setting channel types and electrode montage when they are not in the data yet Parameters ---------- data : mne.io.Raw The raw EEG data. path : str Path to the EEG recording file. Returns ------- mne.io.Raw The EEG data with updated channel types and montage (if available). """ task = path.split("/")[-1].split("_")[-2] base_dir = os.path.dirname(path) # Set channel types search_pattern = os.path.join(base_dir, f"**_{task}_channels.tsv") channel_files = glob.glob(search_pattern, recursive=True) if channel_files: channels_df = pd.read_csv(channel_files[0], sep="\t") channels_types = channels_df.set_index("name")["type"].str.lower().to_dict() data.set_channel_types(channels_types) # Set montage if not already set montage = data.get_montage() if montage is None: try: search_pattern_montage = os.path.join(base_dir, "*_montage.csv") montage_files = glob.glob(search_pattern_montage, recursive=True) if not montage_files: raise FileNotFoundError("No montage CSV file found!") montage_df = pd.read_csv(montage_files[0]) ch_positions = { row["Channel"]: [row["X"], row["Y"], row["Z"]] for _, row in montage_df.iterrows() } eeg_montage = mne.channels.make_dig_montage( ch_pos=ch_positions, coord_frame="head" ) data.set_montage(eeg_montage) except Exception as e: print(f"Error setting montage: {e}") print( "Continuing without a montage. This may raise issues for ICA labeling." ) return data
[docs] def segment_epoch( data: mne.io.Raw, tmin: float, tmax: float, sampling_rate: float, segmentsLength: float, overlap: float, ): """ Segments continuous raw data into epochs of fixed length. Parameters ---------- data : mne.io.Raw MEG/EEG recording. tmin : float Start time (in seconds) for cropping the raw data. tmax : float End time (in seconds) for cropping the raw data. 'tmax' must be a negative number, indicating the time difference between the crop end point and the total recording duration. sampling_rate : float Sampling rate of the data (Hz). segmentsLength : float Length of each epoch in seconds. overlap : float Overlap between successive epochs in seconds. Returns ------- mne.Epochs Segmented data with fixed-length segments. """ if tmax > 0: raise ValueError("The 'tmax' must be a negative number") # Calculate absolute tmax based on data duration and trim beginning/end tmax = int(np.shape(data.get_data())[1] / sampling_rate + tmax) # Crop 20 seconds from both ends to avoid eye-open/close artifacts data.crop(tmin=tmin, tmax=tmax) # Create fixed-length overlapping epochs segments = mne.make_fixed_length_epochs( data, duration=segmentsLength, overlap=overlap, reject_by_annotation=True, verbose=False, ) return segments
[docs] def preprocess( data, which_sensor: dict, resampling_rate: int = 1000, digital_filter=True, rereference_method="average", n_component: int = 30, ica_max_iter: int = 800, IcaMethod: str = "fastica", cutoffFreqLow: int = 1, cutoffFreqHigh: int = 45, apply_ica=True, power_line_freq: int = 60, auto_ica_corr_thr: float = 0.9, ): """ Applies a preprocessing pipeline on MEG/EEG data, including filtering, re-referencing (for EEG), ICA for artifact removal, and optional downsampling. Parameters ---------- data : mne.io.Raw Raw MEG/EEG data. which_sensor : dict Dictionary specifying which sensor types to include (e.g., {'meg': True, 'eeg': True}). resampling_rate : int, optional Target sampling rate for resampling. If None, resampling is skipped; by default 1000. digital_filter : bool, optional Whether to apply a bandpass FIR filter to the data; by default True. rereference_method : str, optional EEG re-referencing method. Supported: "average", "REST"; by default "average". n_component : int, optional Number of independent component to retain in ICA; by default 30. ica_max_iter : int, optional Maximum number of iterations for ICA; by default 800. IcaMethod : str, optional ICA algorithm to use. Supported: 'fastica', 'picard', 'infomax'; by default "fastica". cutoffFreqLow : int, optional Low cutoff frequency for bandpass filtering; by default 1. cutoffFreqHigh : int, optional High cutoff frequency for bandpass filtering; by default 45. apply_ica : bool, optional Whether to apply ICA to remove artifacts; by default True. power_line_freq : int, optional Power line frequency (for notch filtering); by default 60. auto_ica_corr_thr : float, optional Correlation threshold for automatic ICA artifact rejection; by default 0.9. That is, the correlation between identified independent components and physiological signals (ECG and EOG) must be higher than 'auto_ica_corr_thr' Returns ------- mne.io.Raw Preprocessed MEG/EEG data. Raises ------ ValueError auto_ica_corr_thr must be between 0 and 1. ValueError ICA method must be one of: 'fastica', 'picard', 'infomax'. """ if not 0 < auto_ica_corr_thr <= 1: raise ValueError("auto_ica_corr_thr must be between 0 and 1.") if IcaMethod not in ["fastica", "picard", "infomax"]: raise ValueError("ICA method must be one of: 'fastica', 'picard', 'infomax'.") # since pick_channels can not seperate mag and grad signals if not (which_sensor["meg"] or which_sensor["eeg"]): if not which_sensor["mag"]: mag_channels = [ ch for ch, ch_type in zip(data.ch_names, data.get_channel_types()) if ch_type == "mag" ] elif not which_sensor["grad"]: mag_channels = [ ch for ch, ch_type in zip(data.ch_names, data.get_channel_types()) if ch_type == "grad" ] data.drop_channels(mag_channels) channel_types = set(data.get_channel_types()) sampling_rate = data.info["sfreq"] # resample & band pass filter if resampling_rate and resampling_rate != sampling_rate: data.resample(int(resampling_rate), verbose=False, n_jobs=-1) sampling_rate = data.info["sfreq"] data.notch_filter( freqs=np.arange( int(power_line_freq), 4 * int(power_line_freq) + 1, int(power_line_freq) ), n_jobs=-1, ) if digital_filter: data.filter( l_freq=int(cutoffFreqLow), h_freq=int(cutoffFreqHigh), n_jobs=-1, verbose=False, ) # rereference if which_sensor["eeg"] and rereference_method: data = data.set_eeg_reference(rereference_method) ICA_flag = True # initialize flag physiological_electrods = { channel: channel in channel_types for channel in ["ecg", "eog"] } for phys_activity_type, if_elec_exist in physiological_electrods.items(): if which_sensor[ "meg" ]: # ====================================================================== # 1 if if_elec_exist and apply_ica: data, _ = auto_ica( data=data, n_components=n_component, ica_max_iter=ica_max_iter, IcaMethod=IcaMethod, which_sensor=which_sensor, physiological_sensor=phys_activity_type, auto_ica_corr_thr=auto_ica_corr_thr, ) # 2 elif not if_elec_exist and apply_ica and phys_activity_type == "ecg": data = auto_ica_with_mean( data=data, n_components=n_component, ica_max_iter=ica_max_iter, IcaMethod=IcaMethod, which_sensor=which_sensor, auto_ica_corr_thr=auto_ica_corr_thr, ) if which_sensor[ "eeg" ]: # ====================================================================== # 1 if if_elec_exist and apply_ica: data, ICA_flag = auto_ica( data=data, n_components=n_component, ica_max_iter=ica_max_iter, IcaMethod=IcaMethod, which_sensor=which_sensor, physiological_sensor=phys_activity_type, auto_ica_corr_thr=auto_ica_corr_thr, ) # 2 elif not if_elec_exist and apply_ica and ICA_flag: data = AutoIca_with_IcaLabel( data=data, n_components=n_component, ica_max_iter=ica_max_iter, IcaMethod=IcaMethod, iclabel_thr=auto_ica_corr_thr, physiological_noise_type=phys_activity_type, ) data = data.pick_types( meg=which_sensor["meg"] | which_sensor["mag"] | which_sensor["grad"], eeg=which_sensor["eeg"], ref_meg=False, eog=False, ecg=False, ) return data, data.info["ch_names"], int(sampling_rate)
[docs] def drop_noisy_meg_channels( data: Any, subID: str, args: Any, configs: Dict[str, str] ) -> Any: """ Identifies and removes noisy or flat MEG/EEG channels using Maxwell filtering, and logs the number of dropped channels for each subject. Parameters ---------- data : instance of `mne.io.Raw` The MEG/EEG recording to process. subID : str Identifier for the subject, used in naming the log file. args : argparse.Namespace or similar Object containing runtime arguments, including 'saveDir'. configs : dict Configuration dictionary containing: - 'which_sensor': one of {"meg", "mag", "grad", "eeg", "opm"} Returns ------- data_cleaned : instance of `mne.io.Raw` The cleaned data with noisy/flat channels removed. Notes ----- If Maxwell filtering has already been applied (e.g., SSS step), the function will skip bad channel detection and proceed to drop previously marked bad channels. The number of dropped channels is saved to a JSON log file in a directory derived from `args.saveDir`, replacing 'temp' with 'log_droped_channels'. """ which_sensor = dict.fromkeys(["meg", "mag", "grad", "eeg", "opm"], False) which_sensor[configs.get("which_sensor")] = True try: auto_noisy_chs, auto_flat_chs = mne.preprocessing.find_bad_channels_maxwell( data, return_scores=False, verbose=True, coord_frame="meg" ) data.info["bads"] += auto_noisy_chs + auto_flat_chs except RuntimeError as e: if "Maxwell filtering SSS step has already been applied" in str(e): print("Skipping: SSS already applied.") else: raise # Always proceed to log and drop marked bads droped_ch_len = len(data.info["bads"]) log_path = args.saveDir.replace("temp", "log_droped_channels") os.makedirs(log_path, exist_ok=True) with open(os.path.join(log_path, f"{subID}.json"), "w") as file: json.dump(droped_ch_len, file) return data.copy().drop_channels(data.info["bads"])