import os
import time
import shutil
import subprocess
from datetime import datetime
import pandas as pd
[docs]
def progress_bar(current, total, bar_length=20):
"""
Displays or updates a console progress bar.
Parameters
----------
current : int
The current progress (must be between 0 and total).
total : int
The total steps for complete progress.
bar_length : int, optional
The character length of the progress bar. Default is 20.
"""
fraction = current / total
arrow = int(fraction * bar_length - 1) * ">" + ">"
padding = (bar_length - len(arrow)) * " "
progress_percentage = round(fraction * 100, 1)
print(f"\rProgress: [{'>' + arrow + padding}] {progress_percentage}%", end="")
if current == total:
print() # Move to the next line when progress is complete.
[docs]
def sbatchfile(
mainParallel_path,
bash_file_path,
log_path=None,
module="mne",
time="1:00:00",
memory="20GB",
partition="normal",
core=1,
node=1,
batch_file_name="batch_job",
with_config=True,
):
"""
Generates a batch script file for submission to a job scheduler (e.g., SLURM) for parallel execution.
Parameters
----------
mainParallel_path : str
Path to the `mainParallel.py` script that will be executed in the batch job.
bash_file_path : str
Path where the generated batch job file will be saved.
log_path : str, optional
Path to the log file where output from the job will be saved. Default is None.
module : str, optional
The module to load in the batch job environment. Default is 'mne'.
time : str, optional
Maximum wall time for the job (format: HH:MM:SS). Default is '1:00:00'.
memory : str, optional
Amount of memory allocated for the job (e.g., '20GB'). Default is '20GB'.
partition : str, optional
The partition or queue to submit the job to. Default is 'normal'.
core : int, optional
Number of CPU cores to allocate for the job. Default is 1.
node : int, optional
Number of nodes to request for the job. Default is 1.
batch_file_name : str, optional
Name for the generated batch job file. Default is 'batch_job'.
with_config : bool, optional
Whether to include the configuration in the batch file. Default is True.
Returns
-------
None
This function generates a batch script file and saves it to the specified path.
"""
sbatch_init = "#!/bin/bash\n"
sbatch_nodes = "#SBATCH -N " + str(node) + "\n"
sbatch_tasks = "#SBATCH -c " + str(core) + "\n"
sbatch_partition = "#SBATCH -p " + partition + "\n"
sbatch_time = "#SBATCH --time=" + time + "\n"
sbatch_memory = "#SBATCH --mem=" + memory + "\n"
sbatch_module = "source activate " + module + "\n"
if log_path is not None:
sbatch_log_out = "#SBATCH -o " + log_path + "/%x_%j.out" + "\n"
sbatch_log_error = "#SBATCH -e " + log_path + "/%x_%j.err" + "\n"
sbatch_input_1 = "source=$1\n"
sbatch_input_2 = "target=$2\n"
sbatch_input_3 = "subject=$3\n"
sbatch_input_4 = "config=$4\n"
if with_config:
command = (
"srun python "
+ mainParallel_path
+ " $source $target $subject --configs $config"
)
else:
command = "srun python " + mainParallel_path + " $source $target $subject"
bash_environment = [
sbatch_init
+ sbatch_nodes
+ sbatch_tasks
+ sbatch_partition
+ sbatch_time
+ sbatch_memory
]
if log_path is not None:
bash_environment[0] += sbatch_log_out
bash_environment[0] += sbatch_log_error
bash_environment[0] += sbatch_module
bash_environment[0] += sbatch_input_1
bash_environment[0] += sbatch_input_2
bash_environment[0] += sbatch_input_3
if with_config:
bash_environment[0] += sbatch_input_4
bash_environment[0] += command
job_path = os.path.join(bash_file_path, batch_file_name + ".sh")
# writes bash file into processing dir
with open(job_path, "w") as bash_file:
bash_file.writelines(bash_environment)
# changes permissoins for bash.sh file
os.chmod(job_path, 0o770)
return job_path
[docs]
def submit_jobs(
mainParallel_path,
bash_file_path,
subjects,
temp_path,
config_file=None,
job_configs=None,
progress=False,
):
"""
Submits jobs for each subject to the SLURM cluster for parallel execution.
Parameters
----------
mainParallel_path : str
Path to the `mainParallel.py` script that will be executed in the batch job.
bash_file_path : str
Path where the generated batch job file will be saved.
subjects : dict
A dictionary of subject names (keys) and their corresponding paths (values).
Each subject will have a job submitted to the cluster.
temp_path : str
Path where temporary files will be stored.
config_file : str, optional
Path to a JSON configuration file. If provided, this will be passed to the batch job.
Default is None.
job_configs : dict, optional
Dictionary containing job-specific configurations (e.g., memory, time, partition).
Defaults to None, in which case default configurations will be used.
progress : bool, optional
Whether to show a progress bar during job submission. Default is False.
Returns
-------
str
The start time for the batch job submission, formatted as 'YYYY-MM-DDTHH:MM:SS'.
"""
if not os.path.isdir(temp_path):
os.makedirs(temp_path)
if job_configs is None:
job_configs = {
"log_path": None,
"module": "mne",
"time": "1:00:00",
"memory": "20GB",
"partition": "normal",
"core": 1,
"node": 1,
"batch_file_name": "batch_job",
}
batch_file = sbatchfile(
mainParallel_path,
bash_file_path,
log_path=job_configs["log_path"],
module=job_configs["module"],
time=job_configs["time"],
memory=job_configs["memory"],
partition=job_configs["partition"],
core=job_configs["core"],
node=job_configs["node"],
batch_file_name=job_configs["batch_file_name"],
with_config=config_file is not None,
)
start_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
for s, subject in enumerate(subjects.keys()):
# fname = os.path.join(subjects[subject], 'meg', subject + '_task-rest_meg.fif')
fname = subjects[subject]
# if os.path.exists(fname[0]):
if config_file is None:
subprocess.check_call(
f"sbatch --job-name={subject} {batch_file} {fname} {temp_path} {subject}",
shell=True,
)
else:
subprocess.check_call(
f"sbatch --job-name={subject} {batch_file} {fname} {temp_path} {subject} {config_file}",
shell=True,
)
# else:
# print('File does not exist!')
if progress:
progress_bar(s, len(subjects))
return start_time
[docs]
def check_jobs_status(username, start_time, delay=20):
"""
Checks the status of submitted jobs to the SLURM cluster.
Parameters
----------
username : str
The SLURM username used to check the status of the jobs.
start_time : str
The start time for the batch job submission, formatted as 'YYYY-MM-DDTHH:MM:SS'.
This is used to identify the specific set of jobs submitted in the `submit_jobs` function.
delay : int, optional
The delay, in seconds, between each check of job status. Default is 20 seconds.
Returns
-------
list
A list of names of jobs that have failed.
"""
n = 1
while n > 0:
job_counts, failed_job_names = check_user_jobs(username, start_time)
if job_counts:
print(f"Status for user {username} from {start_time}: {job_counts}")
if failed_job_names:
print("Failed Jobs:", ", ".join(failed_job_names))
else:
print("No job data available.")
n = job_counts["PENDING"] + job_counts["RUNNING"]
time.sleep(delay)
return failed_job_names
[docs]
def check_user_jobs(username, start_time):
"""
Utility function for counting the status of jobs submitted to the SLURM scheduler.
Parameters
----------
username : str
The SLURM username used to check the status of the jobs.
start_time : str
The start time for the batch job submission, formatted as 'YYYY-MM-DDTHH:MM:SS'.
This is used to filter the jobs that were submitted after the specified start time.
Returns
-------
tuple
A tuple containing:
- status_counts : dict
A dictionary with counts of jobs in various states (PENDING, RUNNING, COMPLETED, FAILED, CANCELLED).
- failed_jobs : list
A list of job names that have failed.
"""
try:
# Format the current datetime to match Slurm's expected format
end_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
# sacct command to get job states and names within the specified time frame
cmd = [
"sacct",
"-n",
"-X",
"--parsable2",
"--noheader",
"-S",
start_time,
"-E",
end_time,
"-u",
username,
"--format=JobName,State",
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print("Failed to query jobs:", result.stderr)
return
# Initialize status counts and a list for failed job names
status_counts = {
"PENDING": 0,
"RUNNING": 0,
"COMPLETED": 0,
"FAILED": 0,
"CANCELLED": 0,
}
failed_jobs = []
# Process each line to count statuses and collect names of failed jobs
lines = result.stdout.strip().split("\n")
for line in lines:
if line:
parts = line.split("|")
if len(parts) >= 2:
job_name, state = parts[0], parts[1]
if state == "PENDING":
status_counts["PENDING"] += 1
elif state == "RUNNING":
status_counts["RUNNING"] += 1
elif state == "COMPLETED":
status_counts["COMPLETED"] += 1
elif state == "FAILED":
status_counts["FAILED"] += 1
failed_jobs.append(job_name)
elif state == "CANCELLED":
status_counts["CANCELLED"] += 1
return status_counts, failed_jobs
except Exception as e:
print("An error occurred while checking the job status:", str(e))
return
[docs]
def collect_results(target_dir, subjects, temp_path, file_name="features", clean=True):
"""
Collects and merges the results of all jobs into a single file.
Parameters
----------
target_dir : str
Path to the target directory where the merged results will be saved.
subjects : dict
A dictionary with subject names as keys and their corresponding file paths as values.
temp_path : str
Path to the temporary directory where individual subject result files are stored.
file_name : str, optional
The name of the file where the merged results will be saved. Default is 'features'.
clean : bool, optional
Whether to remove the temporary files after merging the results. Default is True.
Returns
-------
None
This function does not return anything but writes the merged results to a CSV file in the target directory.
"""
if not os.path.isdir(target_dir):
os.makedirs(target_dir)
all_features = []
for subject in subjects.keys():
try:
all_features.append(
pd.read_csv(os.path.join(temp_path, subject + ".csv"), index_col=0)
)
except:
continue
features = pd.concat(all_features)
features.to_csv(os.path.join(target_dir, file_name + ".csv"))
if clean:
shutil.rmtree(temp_path)