Source code for sovabids.datasets

"""Module with dataset utilities.
"""
import os
from pandas import read_csv
import shutil
from sovabids.files import download,_get_files
from sovabids.misc import get_num_digits
from sovabids.parsers import parse_from_regex
import mne
import numpy as np
from mne_bids.write import _write_raw_brainvision
import fileinput


[docs]def lemon_prepare(): """Download and prepare a few files of the LEMON dataset. Notes ----- See the `LEMON dataset <http://fcon_1000.projects.nitrc.org/indi/retro/MPI_LEMON.html>`_ . """ # Path Configuration this_dir = os.path.dirname(__file__) data_dir = os.path.join(this_dir,'..','_data') root_path = os.path.abspath(os.path.join(data_dir,'lemon')) os.makedirs(data_dir,exist_ok=True) # Download lemon Database urls = ['https://fcp-indi.s3.amazonaws.com/data/Projects/INDI/MPI-LEMON/Compressed_tar/EEG_MPILMBB_LEMON/EEG_Raw_BIDS_ID/sub-032301.tar.gz', 'https://fcp-indi.s3.amazonaws.com/data/Projects/INDI/MPI-LEMON/Compressed_tar/EEG_MPILMBB_LEMON/EEG_Raw_BIDS_ID/sub-032302.tar.gz', 'https://fcp-indi.s3.amazonaws.com/data/Projects/INDI/MPI-LEMON/Compressed_tar/EEG_MPILMBB_LEMON/EEG_Raw_BIDS_ID/sub-032303.tar.gz', 'https://fcp-indi.s3.amazonaws.com/data/Projects/INDI/MPI-LEMON/name_match.csv'] for url in urls: download(url,os.path.join(data_dir,'lemon')) # Generate all filepaths filepaths = _get_files(root_path) # Label Correction name_match = read_csv(os.path.join(root_path,'name_match.csv')) # Unpack files # TAR FILES tars = [x for x in filepaths if 'tar.gz' in x ] # SUBJECTS # ignore field so that it doesnt get rid of - or _ old_ids = [parse_from_regex(x,'(sub-.*?).tar.gz',['ignore']) for x in tars] old_ids = [x['ignore'] for x in old_ids] new_ids = [name_match.loc[(name_match.INDI_ID==x),'Initial_ID']._values[0] for x in old_ids] # EEG FILES not_tars = [x for x in filepaths if '.vhdr' in x ] not_tars_ids = [parse_from_regex(x,'RSEEG\\/(sub-.*?).vhdr',['id']) for x in not_tars] not_tars_ids = [x['id'] for x in not_tars_ids] assert len(tars) == len(old_ids) == len(new_ids) if set(new_ids) == set(not_tars_ids): # all done return else: for file,old,new in zip(tars,old_ids,new_ids): if not new in not_tars_ids: # skip already prepared files shutil.unpack_archive(file,root_path) olddir = os.path.join(root_path,old) subject_files = _get_files(olddir) for subfile in subject_files: # fix sub-id new_path = subfile.replace(old,new) dir,_ = os.path.split(new_path) os.makedirs(dir,exist_ok=True) shutil.move(subfile,new_path) shutil.rmtree(olddir) print('LEMON PREPARE DONE!')
[docs]def lemon_bidscoin_prepare(src_path): """Download and prepare a few files of the LEMON dataset to be used with BIDSCOIN. Parameters ---------- src_path : str The path where the BIDSCOIN-ready LEMON files will be See Also -------- datasets.lemon_prepare """ lemon_prepare() this_dir = os.path.dirname(__file__) data_dir = os.path.join(this_dir,'..','_data') root_path = os.path.abspath(os.path.join(data_dir,'lemon')) bidscoin_input_path = src_path os.makedirs(bidscoin_input_path,exist_ok=True) files = _get_files(root_path) files = [x for x in files if x.split('.')[-1] in ['eeg','vmrk','vhdr'] ] files_out = [] for f in files: session = 'ses-001' task = 'resting' head,tail=os.path.split(f) sub = tail.split('.')[0] new_path = os.path.join(bidscoin_input_path,sub,session,task,tail) files_out.append(new_path) for old,new in zip(files,files_out): print(old,' to ',new) os.makedirs(os.path.split(new)[0], exist_ok=True) if not os.path.isfile(new): shutil.copy2(old,new) else: print('already done, skipping...') print('finish')
[docs]def make_dummy_dataset(EXAMPLE, PATTERN='T%task%/S%session%/sub%subject%_%acquisition%_%run%', DATASET = 'DUMMY', NSUBS = 2, NSESSIONS = 2, NTASKS = 2, NACQS = 2, NRUNS = 2, PREFIXES = {'subject':'SU','session':'SE','task':'TA','acquisition':'AC','run':'RU'}, ROOT=None, ): """Create a dummy dataset given some parameters. Parameters ---------- EXAMPLE : str,PathLike|list , required Path of the file to replicate as each file in the dummy dataset. If a list, it is assumed each item is a file. All of these items are replicated. PATTERN : str, optional The pattern in placeholder notation using the following fields: %dataset%, %task%, %session%, %subject%, %run%, %acquisition% DATASET : str, optional Name of the dataset. NSUBS : int, optional Number of subjects. NSESSIONS : int, optional Number of sessions. NTASKS : int, optional Number of tasks. NACQS : int, optional Number of acquisitions. NRUNS : int, optional Number of runs. PREFIXES : dict, optional Dictionary with the following keys:'subject', 'session', 'task' and 'acquisition'. The values are the corresponding prefix. RUN is not present because it has to be a number. ROOT : str, optional Path where the files will be generated. If None, the _data subdir will be used. """ if ROOT is None: this_dir = os.path.dirname(__file__) data_dir = os.path.abspath(os.path.join(this_dir,'..','_data')) else: data_dir = ROOT os.makedirs(data_dir,exist_ok=True) sub_zeros = get_num_digits(NSUBS) subs = [ PREFIXES['subject']+ str(x).zfill(sub_zeros) for x in range(NSUBS)] task_zeros = get_num_digits(NTASKS) tasks = [ PREFIXES['task']+str(x).zfill(task_zeros) for x in range(NTASKS)] run_zeros = get_num_digits(NRUNS) runs = [str(x).zfill(run_zeros) for x in range(NRUNS)] ses_zeros = get_num_digits(NSESSIONS) sessions = [ PREFIXES['session']+str(x).zfill(ses_zeros) for x in range(NSESSIONS)] acq_zeros = get_num_digits(NACQS) acquisitions = [ PREFIXES['acquisition']+str(x).zfill(acq_zeros) for x in range(NACQS)] for task in tasks: for session in sessions: for run in runs: for sub in subs: for acq in acquisitions: dummy = PATTERN.replace('%dataset%',DATASET) dummy = dummy.replace('%task%',task) dummy = dummy.replace('%session%',session) dummy = dummy.replace('%subject%',sub) dummy = dummy.replace('%run%',run) dummy = dummy.replace('%acquisition%',acq) path = [data_dir] +dummy.split('/') fpath = os.path.join(*path) dirpath = os.path.join(*path[:-1]) os.makedirs(dirpath,exist_ok=True) if isinstance(EXAMPLE,list): for ff in EXAMPLE: fname, ext = os.path.splitext(ff) shutil.copyfile(ff, fpath+ext) if 'vmrk' in ext or 'vhdr' in ext: replace_brainvision_filename(fpath+ext,path[-1]) else: fname, ext = os.path.splitext(EXAMPLE) shutil.copyfile(EXAMPLE, fpath+ext)
[docs]def get_dummy_raw(NCHANNELS = 5, SFREQ = 200, STOP = 10, NUMEVENTS = 10, ): """ Create a dummy MNE Raw file given some parameters. Parameters ---------- NCHANNELS : int, optional Number of channels. SFREQ : float, optional Sampling frequency of the data. STOP : float, optional Time duration of the data in seconds. NUMEVENTS : int, optional Number of events along the duration. """ # Create some dummy metadata n_channels = NCHANNELS sampling_freq = SFREQ # in Hertz info = mne.create_info(n_channels, sfreq=sampling_freq) times = np.linspace(0, STOP, STOP*sampling_freq, endpoint=False) data = np.zeros((NCHANNELS,times.shape[0])) raw = mne.io.RawArray(data, info) raw.set_channel_types({x:'eeg' for x in raw.ch_names}) new_events = mne.make_fixed_length_events(raw, duration=STOP//NUMEVENTS) return raw,new_events
[docs]def save_dummy_vhdr(fpath,dummy_args={} ): """ Save a dummy vhdr file. Parameters ---------- fpath : str, required Path where to save the file. kwargs : dict, optional Dictionary with the arguments of the get_dummy_raw function. Returns ------- List with the Paths of the desired vhdr file, if those were succesfully created, None otherwise. """ raw,new_events = get_dummy_raw(**dummy_args) _write_raw_brainvision(raw,fpath,new_events,overwrite=True) eegpath =fpath.replace('.vhdr','.eeg') vmrkpath = fpath.replace('.vhdr','.vmrk') if all(os.path.isfile(x) for x in [fpath,eegpath,vmrkpath]): return [fpath,eegpath,vmrkpath] else: return None
[docs]def save_dummy_cnt(fpath, ): """ Save a dummy cnt file. Parameters ---------- fpath : str, required Path where to save the file. Returns ------- Path of the desired file if the file was succesfully created, None otherwise. """ fname = 'scan41_short.cnt' cnt_dict={'dataset_name': 'cnt_sample', 'archive_name': 'scan41_short.cnt', 'hash': 'md5:7ab589254e83e001e52bee31eae859db', 'url': 'https://github.com/mne-tools/mne-testing-data/blob/master/CNT/scan41_short.cnt?raw=true', 'folder_name': 'cnt_sample', } data_path = mne.datasets.fetch_dataset(cnt_dict) shutil.copyfile(os.path.join(data_path,'scan41_short.cnt'), fpath) #copyfile overwrites by default if os.path.isfile(fpath): return fpath else: return None
[docs]def replace_brainvision_filename(fpath,newname): if '.eeg' in newname: newname = newname.replace('.eeg','') if '.vmrk' in newname: newname = newname.replace('.vmrk','') for line in fileinput.input(fpath, inplace=True): if 'DataFile' in line: print(f'DataFile={newname}.eeg'.format(fileinput.filelineno(), line)) elif 'MarkerFile' in line: print(f'MarkerFile={newname}.vmrk'.format(fileinput.filelineno(), line)) else: print('{}'.format(line), end='')