diff --git a/brainiak/utils/fmrisim.py b/brainiak/utils/fmrisim.py index 1cce375a9..a1c93d8fd 100644 --- a/brainiak/utils/fmrisim.py +++ b/brainiak/utils/fmrisim.py @@ -27,12 +27,18 @@ generate_stimfunction Create a function to describe the stimulus onsets/durations +export_stimfunction: +Generate a three column timing file that can be used with software like FSL + double_gamma_hrf Convolve the stimulus function with the HRF to model when events are expected. apply_signal Combine the volume and the HRF +calc_noise +Estimate the noise properties of a given volume + generate_noise Create the noise for this run. This creates temporal, task and white noise. Various parameters can be tuned depending on need @@ -48,17 +54,21 @@ """ import logging -import numpy as np -import scipy.ndimage as ndimage +from itertools import product +import nitime.algorithms.autoregressive as ar import math -from scipy import stats +import numpy as np from pkg_resources import resource_stream +from scipy import stats +import scipy.ndimage as ndimage __all__ = [ "generate_signal", "generate_stimfunction", + "export_stimfunction", "double_gamma_hrf", "apply_signal", + "calc_noise", "generate_noise", "mask_brain", "plot_brain", @@ -67,7 +77,10 @@ logger = logging.getLogger(__name__) -def _generate_feature(feature_type, feature_size, signal_magnitude): +def _generate_feature(feature_type, + feature_size, + signal_magnitude, + thickness=1): """Generate features corresponding to signal Generate signal in specific regions of the brain with for a single @@ -84,7 +97,11 @@ def _generate_feature(feature_type, feature_size, signal_magnitude): How big is the signal? signal_magnitude : float - What is the value of the signal voxels in the feature + Set the signal size, a value of 1 means the signal is one standard + deviation of the noise + + thickness : int + How thick is the surface of the loop/cavity Returns ---------- @@ -94,6 +111,10 @@ def _generate_feature(feature_type, feature_size, signal_magnitude): """ + # If the size is equal to or less than 2 then all features are the same + if feature_size <= 2: + feature_type = 'cube' + # What kind of signal is it? if feature_type == 'cube': @@ -119,14 +140,19 @@ def _generate_feature(feature_type, feature_size, signal_magnitude): xx, yy = np.meshgrid(seq, seq) # Make a disk corresponding to the whole mesh grid - disk = ((xx - ((feature_size - 1) / 2)) ** 2 - + (yy - ((feature_size - 1) / 2)) ** 2) + xxmesh = (xx - ((feature_size - 1) / 2)) ** 2 + yymesh = (yy - ((feature_size - 1) / 2)) ** 2 + disk = xxmesh + yymesh + + # What are the limits of the rings being made + outer_lim = disk[int((feature_size - 1) / 2), 0] + inner_lim = disk[int((feature_size - 1) / 2), thickness] # What is the outer disk - outer = disk < (feature_size - 1) + outer = disk <= outer_lim # What is the inner disk - inner = disk < (feature_size - 1) / 2 + inner = disk <= inner_lim # Subtract the two disks to get a loop loop = outer != inner @@ -152,14 +178,21 @@ def _generate_feature(feature_type, feature_size, signal_magnitude): (yy - ((feature_size - 1) / 2)) ** 2 + (zz - ((feature_size - 1) / 2)) ** 2) + # What are the limits of the rings being made + outer_lim = signal[int((feature_size - 1) / 2), int((feature_size - + 1) / 2), 0] + inner_lim = signal[int((feature_size - 1) / 2), int((feature_size - + 1) / 2), + thickness] + # Is the signal a sphere or a cavity? if feature_type == 'sphere': - signal = signal < (feature_size - 1) + signal = signal <= outer_lim else: # Get the inner and outer sphere - outer = signal < (feature_size - 1) - inner = signal < (feature_size - 1) / 2 + outer = signal <= outer_lim + inner = signal <= inner_lim # Subtract the two disks to get a loop signal = outer != inner @@ -206,15 +239,15 @@ def _insert_idxs(feature_centre, feature_size, dimensions): """ # Set up the indexes within which to insert the signal - x_idx = [int(feature_centre[0] - (feature_size / 2)), + x_idx = [int(feature_centre[0] - (feature_size / 2)) + 1, int(feature_centre[0] - (feature_size / 2) + - feature_size)] - y_idx = [int(feature_centre[1] - (feature_size / 2)), + feature_size) + 1] + y_idx = [int(feature_centre[1] - (feature_size / 2)) + 1, int(feature_centre[1] - (feature_size / 2) + - feature_size)] - z_idx = [int(feature_centre[2] - (feature_size / 2)), + feature_size) + 1] + z_idx = [int(feature_centre[2] - (feature_size / 2)) + 1, int(feature_centre[2] - (feature_size / 2) + - feature_size)] + feature_size) + 1] # Check for out of bounds # Min Boundary @@ -241,7 +274,7 @@ def generate_signal(dimensions, feature_coordinates, feature_size, feature_type, - signal_magnitude, + signal_magnitude=[1], signal_constant=1, ): """Generate volume containing signal @@ -274,7 +307,9 @@ def generate_signal(dimensions, must be supplied or m values signal_magnitude : list, float - What is the (average) magnitude of the signal being generated? + What is the (average) magnitude of the signal being generated? A + value of 1 means that the signal is one standard deviation from the + noise signal_constant : list, bool Is the signal constant or is it a random pattern (with the same @@ -341,12 +376,15 @@ def generate_signal(dimensions, def generate_stimfunction(onsets, event_durations, total_time, - tr_duration, + weights=[1], + timing_file=None, ): """Return the function for the onset of events When do stimuli onset, how long for and to what extent should you - resolve the fMRI time course + resolve the fMRI time course. There are two ways to create this, either + by supplying onset, duration and weight information or by supplying a + timing file (in the three column format) Parameters ---------- @@ -362,58 +400,144 @@ def generate_stimfunction(onsets, total_time : int How long is the experiment in total. - tr_duration : float - What is the TR duration, related to the precision of the boxcar. + weights : list, float + How large is the box car for each of the onsets. If there is + only one value then this will be assigned to all onsets + + timing_file : string + The filename (with path) to a three column timing file (FSL) to + make the events. Still requires tr_duration and total_time Returns ---------- - Iterable[bool] + Iterable[float] The time course of stimulus related activation """ + # If the timing file is supplied then use this to acquire the + if timing_file is not None: + + # Read in text file line by line + with open(timing_file) as f: + text = f.readlines() # Pull out file as a an array + + # Preset + onsets = list() + event_durations = list() + weights = list() + + # Pull out the onsets, weights and durations, set as a float + for line in text: + onset, duration, weight = line.strip().split() + onsets.append(float(onset)) + event_durations.append(float(duration)) + weights.append(float(weight)) + # If only one duration is supplied then duplicate it for the length of # the onset variable if len(event_durations) == 1: event_durations = event_durations * len(onsets) - if (event_durations[0] / tr_duration).is_integer() is False: - logging.warning('Event durations are not a multiple of the TR ' - 'duration, rounding down.') + if len(weights) == 1: + weights = weights * len(onsets) - if (onsets[0] / tr_duration).is_integer() is False: - logging.warning('Onsets are not a multiple of the TR duration, ' - 'rounding down.') + # How many elements per second are you modeling + temporal_resolution = 1000 - # Generate the time course as empty - stimfunction = [0] * round(total_time / tr_duration) + # Generate the time course as empty, each element is a millisecond + stimfunction = [0] * round(total_time * temporal_resolution) # Cycle through the onsets - for onset_counter in list(range(0, len(onsets))): + for onset_counter in list(range(len(onsets))): # Adjust for the resolution - onset_idx = int(np.floor(onsets[onset_counter] / tr_duration)) + onset_idx = int(np.floor(onsets[onset_counter] * temporal_resolution)) # Adjust for the resolution offset_idx = int(np.floor((onsets[onset_counter] + event_durations[ - onset_counter]) / tr_duration)) + onset_counter])) * temporal_resolution) - # For the appropriate indexes and duration, make this value 1 - idx_number = round(event_durations[onset_counter] / tr_duration) - stimfunction[onset_idx:offset_idx] = idx_number * [1] + # For the appropriate number of indexes and duration, make this value 1 + idx_n = round(event_durations[onset_counter] * temporal_resolution) + stimfunction[onset_idx:offset_idx] = [weights[onset_counter]] * idx_n - # Remove any indexes that are too long - if len(stimfunction) > total_time / tr_duration: - stimfunction = stimfunction[0:int(total_time / tr_duration)] + # Shorten the data if it's too long + if len(stimfunction) > total_time * temporal_resolution: + stimfunction = stimfunction[0:int(total_time * temporal_resolution)] return stimfunction +def export_stimfunction(stimfunction, + filename, + ): + """ Output a tab separated timing file + + This produces a three column tab separated text file, with the three + columns representing onset time, event duration and weight, respectively + + Useful if you want to run the simulated data through FEAT analyses + + Parameters + ---------- + stimfunction : list + The stimulus function describing the time course of events + + filename : str + The name of the three column text file to be output + + """ + + # Iterate through the stim function + stim_counter = 0 + event_counter = 0 + while stim_counter < len(stimfunction): + + # Is it an event? + if stimfunction[stim_counter] != 0: + + # When did the event start? + event_onset = str(stim_counter / 1000) + + # The weight of the stimulus + weight = str(stimfunction[stim_counter]) + + # Reset + event_duration = 0 + + # Is the event still ongoing? + while stimfunction[stim_counter] != 0 & stim_counter <= len( + stimfunction): + + # Add one millisecond to each duration + event_duration = event_duration + 1 + + # Increment + stim_counter = stim_counter + 1 + + # How long was the event in seconds + event_duration = str(event_duration / 1000) + + # Append this row to the data file + with open(filename, "a") as file: + file.write(event_onset + '\t' + event_duration + '\t' + + weight + '\n') + + # Increment the number of events + event_counter = event_counter + 1 + + # Increment + stim_counter = stim_counter + 1 + + def double_gamma_hrf(stimfunction, + tr_duration, response_delay=6, undershoot_delay=12, response_dispersion=0.9, undershoot_dispersion=0.9, + response_scale=1, undershoot_scale=0.035): """Return a double gamma HRF @@ -435,8 +559,11 @@ def double_gamma_hrf(stimfunction, undershoot_dispersion : float How wide is the undershoot dispersion + response_scale : float + How big is the response relative to the peak + undershoot_scale :float - How big is the undershoot relative to the peak + How big is the undershoot relative to the trough Returns ---------- @@ -461,29 +588,36 @@ def double_gamma_hrf(stimfunction, resp_exp = math.exp(-(hrf_counter - response_peak) / response_dispersion) - response_model = resp_pow * resp_exp + response_model = response_scale * resp_pow * resp_exp undershoot_pow = math.pow(hrf_counter / undershoot_peak, undershoot_delay) - undershoot_exp = math.exp(-(hrf_counter - undershoot_peak - / undershoot_dispersion)) + undershoot_exp = math.exp(-(hrf_counter - undershoot_peak / + undershoot_dispersion)) undershoot_model = undershoot_scale * undershoot_pow * undershoot_exp # For this time point find the value of the HRF hrf[hrf_counter] = response_model - undershoot_model + # Decimate the stim function so that it only has one element per TR + stimfunction = stimfunction[0::tr_duration * 1000] + # Convolve the hrf that was created with the boxcar input signal_function = np.convolve(stimfunction, hrf) # Cut off the HRF signal_function = signal_function[0:len(stimfunction)] + # Scale the function so that the peak response is 1 + signal_function = signal_function / np.max(signal_function) + return signal_function def apply_signal(signal_function, - volume_static): + volume_static, + ): """Apply the convolution and stimfunction Apply the convolution of the HRF and stimulus time course to the @@ -501,9 +635,7 @@ def apply_signal(signal_function, Returns ---------- multidimensional array, float - Generates the spatial noise volume for these parameters - - """ + Generates the spatial noise volume for these parameters """ # Preset volume signal = np.ndarray([volume_static.shape[0], volume_static.shape[ @@ -519,8 +651,239 @@ def apply_signal(signal_function, return signal -def _generate_noise_system(dimensions, - sigma=1.5, +def _calc_fwhm(volume, + mask, + ): + """ Calculate the FWHM of a volume + Takes in a volume and mask and outputs the FWHM of each TR of this + volume for the non-masked voxels + + Parameters + ---------- + volume : multidimensional array + Functional data to have the FWHM measured. Can be 3d or 4d data + + mask : 4 dimensional array + A mask of the voxels to have the FWHM measured from + + Returns + ------- + + float, list + Returns the FWHM of each TR """ + + # What are the dimensions of the volume + dimensions = volume.shape + + # Identify the number of TRs + if len(dimensions) == 3: + trs = 1 + else: + trs = dimensions[3] + dimensions = dimensions[0:3] + volume_all = volume # Store for later + + # Preset size + fwhm = np.zeros(trs) + + # Iterate through the TRs, creating a FWHM for each TR + + for tr_counter in list(range(0, trs)): + + # Preset + sum_sq_der = [0.0, 0.0, 0.0] + countSqDer = [0, 0, 0] + + # If there is more than one TR, just pull out that volume + if trs > 1: + volume = volume_all[:, :, :, tr_counter] + + # Pull out all the voxel coordinates + coordinates = list(product(range(dimensions[0]), + range(dimensions[1]), + range(dimensions[2]))) + + # Find the sum of squared error for the non-masked voxels in the brain + for i in list(range(len(coordinates))): + + # Pull out this coordinate + x, y, z = coordinates[i] + + # Is this within the mask? + if mask[x, y, z, tr_counter] > 0: + + # For each xyz dimension calculate the squared + # difference of this voxel and the next + + if x < dimensions[0] - 1 and mask[x + 1, y, z, + tr_counter] > 0: + temp = (volume[x, y, z] - volume[x + 1, y, z]) ** 2 + sum_sq_der[0] += temp + countSqDer[0] += + 1 + + if y < dimensions[1] - 1 and mask[x, y + 1, z, + tr_counter] > 0: + temp = (volume[x, y, z] - volume[x, y + 1, z]) ** 2 + sum_sq_der[1] += temp + countSqDer[1] += + 1 + + if z < dimensions[2] - 1 and mask[x, y, z + 1, + tr_counter] > 0: + temp = (volume[x, y, z] - volume[x, y, z + 1]) ** 2 + sum_sq_der[2] += temp + countSqDer[2] += 1 + + # What is the FWHM for each dimension + fwhm3 = np.sqrt(np.divide(4 * np.log(2), np.divide(sum_sq_der, + countSqDer))) + fwhm[tr_counter] = np.prod(fwhm3) ** (1 / 3) # Calculate the average + + return fwhm + + +def _calc_snr(volume, + mask, + ): + """ Calculate the SNR of a volume + This takes the middle of the volume and averages the signal within the + brain and compares to the std in the voxels outside the brain. + + Parameters + ---------- + volume : 4d array, float + Take a volume time series to extract the middle slice from the middle TR + + mask : 4d array, float + A mask the same size as the volume, specifying the mask (values=0 -> 1) + + Returns + ------- + + float + The snr of the volume (how much greater in activity the brain is from + the background) + + """ + + # What are the midpoints to be extracted + mid_x_idx = int(np.ceil(volume.shape[0] / 2)) + mid_tr_idx = int(np.ceil(volume.shape[3] / 2)) + + # Pull out the slices + slice_volume = volume[mid_x_idx, :, :, mid_tr_idx] + slice_mask = mask[mid_x_idx, :, :, mid_tr_idx] + + # Divide by the mask (if the middle slice was low in grey matter mass + # then you would have a lower mean signal by default) + mean_signal = (slice_volume[slice_mask > 0] / slice_mask[slice_mask > + 0]).mean() + + # What is the overall noise + overall_noise = float(slice_volume[slice_mask == 0].std()) + + # Calculate snr + snr = mean_signal / overall_noise + + # Convert from memmap + snr = float(snr) + + return overall_noise, snr + + +def _calc_autoregression(volume, + mask, + ar_order=2, + ): + """ Calculate the autoregressive sigma of the data. + + Parameters + ---------- + volume : 4d masarray, float + Input data to be calculate the autoregression + + mask : 4d array, float + What voxels of the input are within the brain + + ar_order : int + What order of the autoregression do you want to pull out + + Returns + ------- + float + A sigma of the autoregression in the data + + """ + + # Calculate the time course + timecourse = np.mean(volume[mask[:, :, :, 0] > 0], 0) + + # Pull out the AR values (depends on order) + auto_reg_sigma = ar.AR_est_YW(timecourse, ar_order)[0][1] + + # What is the size of the change in the time course + drift_sigma = timecourse.std().tolist() + + return auto_reg_sigma, drift_sigma + + +def calc_noise(volume, + mask=None, + ): + """ Calculates the noise properties of the volume supplied. + This estimates what noise properties the volume has. For instance it + determines the spatial smoothness, the autoregressive noise, system + noise etc. Read the doc string for generate_noise to understand how + these different types of noise interact. + + Parameters + ---------- + volume : 4d numpy array, float + Take in a functional volume (either the file name or the numpy + array) to be used to estimate the noise properties of this + + mask : 4d numpy array, float + The mask to be used, the same size as the volume + + Returns + ------- + + dict + Return a dictionary of the calculated noise parameters of the provided + dataset + + """ + + # Preset + noise_dict = {} + + # Create the mask + if mask is None: + mask = np.ones(volume.shape) + + # Since you are deriving the 'true' values then you want your noise to + # be set to that level + + # Calculate the overall noise and SNR of the volume + noise_dict['overall'], noise_dict['snr'] = _calc_snr(volume, mask) + + # Calculate the fwhm of the data and use the average as your spatial sigma + noise_dict['spatial_sigma'] = _calc_fwhm(volume, mask).mean() + + # Calculate the autoregressive and drift noise + auto_reg_sigma, drift_sigma = _calc_autoregression(volume, mask) + + noise_dict['auto_reg_sigma'] = auto_reg_sigma / noise_dict['overall'] + noise_dict['drift_sigma'] = drift_sigma / noise_dict['overall'] + + # Calculate the white noise + noise_dict['system_sigma'] = float(volume[mask == 0].std() / noise_dict[ + 'overall']) + + # Return the noise dictionary + return noise_dict + + +def _generate_noise_system(dimensions_tr, ): """Generate the scanner noise @@ -529,12 +892,9 @@ def _generate_noise_system(dimensions, Parameters ---------- - dimensions : n length array, int - What are the dimensions of the volume you wish to insert - noise into. This can be a volume of any size - - sigma : float - What is the standard deviation of this noise? + dimensions_tr : n length array, int + What are the dimensions of the volume you wish to insert + noise into. This can be a volume of any size Returns ---------- @@ -545,19 +905,21 @@ def _generate_noise_system(dimensions, """ # Generate the Rician noise - noise_rician = stats.rice.rvs(1, sigma, size=dimensions) + noise_rician = stats.rice.rvs(1, 1, size=dimensions_tr) # Apply the gaussian noise - noise_gaussian = np.random.normal(0, sigma, size=dimensions) + noise_gaussian = np.random.normal(0, 1, size=dimensions_tr) # Combine these two noise types noise_system = noise_rician + noise_gaussian + # Normalize + noise_system = stats.zscore(noise_system) + return noise_system -def _generate_noise_temporal_task(stimfunction, - motion_sigma, +def _generate_noise_temporal_task(stimfunction_tr, motion_noise='gaussian', ): """Generate the signal dependent noise @@ -568,13 +930,9 @@ def _generate_noise_temporal_task(stimfunction, Parameters ---------- - stimfunction : 1 Dimensional array - This is the timecourse of the stimuli in this experiment - - motion_sigma : float - - How much noise is left over after pre-processing has been - done. This is noise specifically on the task events + stimfunction_tr : 1 Dimensional array + This is the timecourse of the stimuli in this experiment, + each element represents a TR motion_noise : str What type of noise will you generate? Can be gaussian or rician @@ -587,25 +945,24 @@ def _generate_noise_temporal_task(stimfunction, """ - noise_task = [] + # Make the noise to be added if motion_noise == 'gaussian': - noise_task = stimfunction + (stimfunction * - np.random.normal(0, - motion_sigma, - size=len(stimfunction))) + noise = stimfunction_tr * np.random.normal(0, 1, size=len( + stimfunction_tr)) elif motion_noise == 'rician': - noise_task = stimfunction + (stimfunction * - stats.rice.rvs(0, - motion_sigma, - size=len(stimfunction))) + noise = stimfunction_tr * stats.rice.rvs(0, 1, size=len( + stimfunction_tr)) + + noise_task = stimfunction_tr + noise + + # Normalize + noise_task = stats.zscore(noise_task) return noise_task def _generate_noise_temporal_drift(trs, tr_duration, - drift_sigma, - timepoints, ): """Generate the drift noise @@ -623,11 +980,6 @@ def _generate_noise_temporal_drift(trs, tr_duration : float How long is each TR - drift_sigma : float - How large are the coefficients controlling drift - - timepoints : 1 Dimensional array - What time points are sampled by a TR Returns ---------- @@ -636,21 +988,26 @@ def _generate_noise_temporal_drift(trs, """ + # What time points are sampled by a TR? + timepoints = list(range(0, trs * tr_duration))[::tr_duration] + # Calculate the coefficients of the drift for a given function degree = round(trs * tr_duration / 150) + 1 - coefficients = np.random.normal(0, drift_sigma, size=degree) + coefficients = np.random.normal(0, 1, size=degree) # What are the values of this drift noise_drift = np.polyval(coefficients, timepoints) + # Normalize + noise_drift = stats.zscore(noise_drift) + # Return noise return noise_drift -def _generate_noise_temporal_autoregression(auto_reg_rho, - auto_reg_order, - auto_seg_sigma, - timepoints, +def _generate_noise_temporal_autoregression(timepoints, + auto_reg_order=1, + auto_reg_rho=[1], ): """Generate the autoregression noise @@ -658,16 +1015,16 @@ def _generate_noise_temporal_autoregression(auto_reg_rho, Parameters ---------- - auto_reg_sigma : float - + timepoints : 1 Dimensional array + What time points are sampled by a TR auto_reg_order : float - + How many timepoints ought to be taken into consideration for the + autoregression function auto_reg_rho : float - - timepoints : 1 Dimensional array - What time points are sampled by a TR + What is the scaling factor on the predictiveness of the previous + time point Returns ---------- @@ -684,7 +1041,7 @@ def _generate_noise_temporal_autoregression(auto_reg_rho, for tr_counter in list(range(0, len(timepoints))): if tr_counter == 0: - noise_autoregression.append(np.random.normal(0, auto_seg_sigma)) + noise_autoregression.append(np.random.normal(0, 1)) else: @@ -693,16 +1050,18 @@ def _generate_noise_temporal_autoregression(auto_reg_rho, if tr_counter - pCounter >= 0: past_trs = noise_autoregression[int(tr_counter - pCounter)] past_reg = auto_reg_rho[pCounter - 1] - random = np.random.normal(0, auto_seg_sigma) + random = np.random.normal(0, 1) temp.append(past_trs * past_reg + random) noise_autoregression.append(np.mean(temp)) + # Normalize + noise_autoregression = stats.zscore(noise_autoregression) + return noise_autoregression def _generate_noise_temporal_phys(timepoints, - physiological_sigma, resp_freq=0.2, heart_freq=1.17, ): @@ -714,10 +1073,6 @@ def _generate_noise_temporal_phys(timepoints, timepoints : 1 Dimensional array What time points are sampled by a TR - physiological_sigma : float - How variable is the signal as a result of physiology, - like heart beat and breathing - resp_freq : float What is the frequency of respiration @@ -738,114 +1093,17 @@ def _generate_noise_temporal_phys(timepoints, # Combine the two types of noise and append noise_phys.append(np.cos(resp_radians) + np.sin(heart_radians) + - np.random.normal(0, physiological_sigma)) - - return noise_phys - - -def _generate_noise_temporal(stimfunction, - tr_duration, - z_score=1, - motion_sigma=15, - drift_sigma=15, - auto_seg_sigma=15, - auto_reg_order=1, - auto_reg_rho=[1], - physiological_sigma=15, - ): - """Generate the signal dependent noise - - This noise depends on things like the signal or the timing of the - experiment. - - Parameters - ---------- - - stimfunction : 1 Dimensional array - This is the timecourse of the stimuli in this experiment - - tr_duration : int - How long is a TR, in seconds - - z_score : bool - Is the data to be normalized - - motion_sigma : float - - How much noise is left over after pre-processing has been - done. This is noise specifically on the task events - - drift_sigma : float - - What is the sigma on the distribution that coefficients are - randomly sampled from - - auto_reg_sigma : float - - - auto_reg_order : float - - - auto_reg_rho : float - - - physiological_sigma : float - - How variable is the signal as a result of physiology, - like heart beat and breathing - - Returns - ---------- - one dimensional array, float - Generates the temporal noise timecourse for these parameters - - - """ - - # Set up common parameters - # How many TRs are there - trs = len(stimfunction) - - # What time points are sampled by a TR? - timepoints = list(range(0, trs * tr_duration))[::tr_duration] + np.random.normal(0, 1)) - # Make each noise type - noise_task = _generate_noise_temporal_task(stimfunction, - motion_sigma, - ) - - noise_drift = _generate_noise_temporal_drift(trs, - tr_duration, - drift_sigma, - timepoints, - ) - - noise_phys = _generate_noise_temporal_phys(timepoints, - physiological_sigma, - ) - - noise_autoregression = _generate_noise_temporal_autoregression( - auto_reg_rho, auto_reg_order, auto_seg_sigma, timepoints, ) - - # Do you want to z score it? - if z_score == 1: - noise_task = stats.zscore(noise_task) - noise_phys = stats.zscore(noise_phys) - noise_drift = stats.zscore(noise_drift) - noise_autoregression = stats.zscore(noise_autoregression) - - # add the noise (it would have been nice to just add all of them in a - # single line but this was causing formatting problems) - noise_temporal = noise_task - noise_temporal = noise_temporal + noise_phys - noise_temporal = noise_temporal + noise_drift - noise_temporal = noise_temporal + noise_autoregression + # Normalize + noise_phys = stats.zscore(noise_phys) - return noise_temporal + return noise_phys def _generate_noise_spatial(dimensions, - sigma=-4.0, + mask=None, + spatial_sigma=-4.0, ): """Generate code for Gaussian Random Fields. @@ -859,17 +1117,27 @@ def _generate_noise_spatial(dimensions, Parameters ---------- - dimensions : tuple + dimensions : 3 length array, int What is the shape of the volume to be generated - sigma : float - What is the size of the standard deviation in the gaussian random - fields to generated + mask : 3d array + The mask describing the boundaries of the brain + + spatial_sigma : float + Approximately, what is the FWHM of the gaussian fields being + created. Use calc_fwhm to check this. + This is approximate for two reasons: firstly, although the + distribution that is being drawn from is set to this value, + this will manifest differently on every draw. Secondly, because of + the masking and dimensions of the generated volume, this does not + behave simply. For instance, values over 10 do not work well because + the size of the volume. Moreover, wrapping effects matter (the + outputs are closer to this value if you have no mask). Returns ---------- - multidimensional array, float + 3d array, float Generates the spatial noise volume for these parameters """ @@ -888,9 +1156,9 @@ def Pk2(idxs): # If all the indexes are zero then set the out put to zero if np.all(idxs == 0): return 0.0 - return np.sqrt((np.sqrt(np.sum(idxs ** 2))) ** sigma) + return np.sqrt(np.sqrt(np.sum(idxs ** 2)) ** (-1 * spatial_sigma)) - noise = np.fft.fft2(np.random.normal(size=dimensions)) + noise = np.fft.fftn(np.random.normal(size=dimensions)) amplitude = np.zeros(dimensions) for x, fft_x in enumerate(fftIndgen(dimensions[0])): @@ -899,94 +1167,195 @@ def Pk2(idxs): amplitude[x, y, z] = Pk2(np.array([fft_x, fft_y, fft_z])) # The output - noise_spatial = np.fft.ifft2(noise * amplitude) + noise_spatial = np.fft.ifftn(noise * amplitude) - return noise_spatial.real + # Mask or not, then z score + if mask is not None: + # Mask the output + noise_spatial = noise_spatial.real * mask -def generate_noise(dimensions, - stimfunction, - tr_duration, - noise_strength=[1]): - """ Generate the noise to be added to the signal + # Z score the specific to the brain + noise_spatial[mask > 0] = stats.zscore(noise_spatial[mask > 0]) + else: + noise_spatial = stats.zscore(noise_spatial.real) + + return noise_spatial + + +def _generate_noise_temporal(stimfunction, + tr_duration, + dimensions, + mask, + spatial_sigma, + motion_sigma, + drift_sigma, + auto_reg_sigma, + physiological_sigma, + ): + """Generate the signal dependent noise + + This noise depends on things like the signal or the timing of the + experiment. Parameters ---------- - dimensions : tuple - What is the shape of the volume to be generated - stimfunction : Iterable, bool - When do the stimuli events occur + stimfunction : 1 Dimensional array + This is the timecourse of the stimuli in this experiment - tr_duration : float - What is the duration, in seconds, of each TR? + tr_duration : int + How long is a TR, in seconds + + motion_sigma : float + + How much noise is left over after pre-processing has been + done. This is noise specifically on the task events - noise_strength : list, float - Either a one element list or a list of length 3 which varies - the size of the noise for temporal noise, spatial noise and - system noise + drift_sigma : float + + What is the sigma on the distribution that coefficients are + randomly sampled from + + auto_reg_sigma : float, list + How large is the sigma on the autocorrelation. Higher means more + variable over time. If there are multiple entries then this is + inferred as higher orders of the autoregression + + physiological_sigma : float + + How variable is the signal as a result of physiology, + like heart beat and breathing Returns ---------- + one dimensional array, float + Generates the temporal noise timecourse for these parameters - multidimensional array, float - Generates the noise volume for these parameters + """ - """ + # Set up common parameters + # How many TRs are there + trs = int(len(stimfunction) / (tr_duration * 1000)) - # Duplicate the noise strength if only one is supplied - if len(noise_strength) == 1: - noise_strength = noise_strength * 3 + # What time points are sampled by a TR? + timepoints = list(range(0, trs * tr_duration))[::tr_duration] - # What are the dimensions of the volume, including time - dimensions_tr = (dimensions[0], - dimensions[1], - dimensions[2], - len(stimfunction)) + noise_drift = _generate_noise_temporal_drift(trs, + tr_duration, + ) - # Generate the noise - noise_temporal = _generate_noise_temporal(stimfunction=stimfunction, - tr_duration=tr_duration, - ) * noise_strength[0] + noise_phys = _generate_noise_temporal_phys(timepoints, + ) - noise_spatial = _generate_noise_spatial(dimensions=dimensions, - ) * noise_strength[1] + noise_autoregression = _generate_noise_temporal_autoregression(timepoints, + ) - noise_system = _generate_noise_system(dimensions=dimensions_tr, - ) * noise_strength[2] + # Generate the volumes that will differ depending on the type of noise + # that it will be used for + volume_drift = _generate_noise_spatial(dimensions=dimensions, + spatial_sigma=spatial_sigma, + ) - # Find the outer product and add white noise - noise = np.multiply.outer(noise_spatial, noise_temporal) + noise_system + volume_phys = _generate_noise_spatial(dimensions=dimensions, + mask=mask, + spatial_sigma=spatial_sigma, + ) - return noise + volume_autoreg = _generate_noise_spatial(dimensions=dimensions, + mask=mask, + spatial_sigma=spatial_sigma, + ) + + # Multiply the noise by the spatial volume + noise_drift_volume = np.multiply.outer(volume_drift, noise_drift) + noise_phys_volume = np.multiply.outer(volume_phys, noise_phys) + noise_autoregression_volume = np.multiply.outer(volume_autoreg, + noise_autoregression) + + # Sum the noise (it would have been nice to just add all of them in a + # single line but this was causing formatting problems) + noise_temporal = noise_drift_volume * drift_sigma + noise_temporal = noise_temporal + (noise_phys_volume * physiological_sigma) + noise_temporal = noise_temporal + (noise_autoregression_volume * + auto_reg_sigma) + + # Only do this if you are making motion variance + if motion_sigma != 0 and np.sum(stimfunction) > 0: + # Make each noise type + stimfunction_tr = stimfunction[::(tr_duration * 1000)] + noise_task = _generate_noise_temporal_task(stimfunction_tr, + ) + volume_task = _generate_noise_spatial(dimensions=dimensions, + mask=mask, + spatial_sigma=spatial_sigma, + ) + noise_task_volume = np.multiply.outer(volume_task, noise_task) + noise_temporal = noise_temporal + (noise_task_volume * motion_sigma) + + return noise_temporal def mask_brain(volume, - mask_name=None): + mask_name=None, + mask_threshold=0.1, + mask_self=0, + ): """ Mask the simulated volume + This takes in a volume and will output the masked volume. if a one + dimensional array is supplied then the output will be a volume of the + dimensions specified in the array. The mask can be created from the + volume by averaging it. All masks will be bounded to the range of 0 to 1. Parameters ---------- volume : multidimensional array - The volume that has been simulated + Either numpy array of the volume that has been simulated or a tuple + describing the dimensions of the mask to be created mask_name : str - What is the path to the mask to be loaded? + What is the path to the mask to be loaded? If empty then it defaults + to an MNI152 grey matter mask. + + mask_threshold : float + What is the threshold (0 -> 1) for including a voxel in the mask? + + mask_self : bool + If set to 1 then it makes a mask from the volume supplied (by + averaging across time points and changing the range). Returns ---------- - brain : multidimensional array, float + mask : multidimensional array, float The masked brain """ + # If the volume supplied is a 1d array then output a volume of the + # supplied dimensions + if len(volume.shape) == 1: + volume = np.ones(volume) + # Load in the mask if mask_name is None: mask_raw = np.load(resource_stream(__name__, "grey_matter_mask.npy")) else: mask_raw = np.load(mask_name) + # Is the mask based on the volume + if mask_self is 1: + mask_raw = np.zeros([volume.shape[0], volume.shape[1], volume.shape[ + 2], 1]) + + if len(volume.shape) == 4: + mask_raw[:, :, :, 0] = np.mean(volume, 3) + else: + mask_raw[:, :, :, 0] = np.array(volume) + + # Make sure the mask values range from 0 to 1 + mask_raw = mask_raw / mask_raw.max() + # If there is only one brain volume then make this a forth dimension if len(volume.shape) == 3: temp = np.zeros([volume.shape[0], volume.shape[1], volume.shape[2], 1]) @@ -1006,18 +1375,155 @@ def mask_brain(volume, # You might get a warning but ignore it mask = ndimage.zoom(mask_raw, zoom_factor, order=2) - # Anything that is in the bottom 1% (presumably the exterior of the - # brain, make zero) - mask[mask < 0.01] = 0 + # Any proportion that is below threshold (presumably the exterior of the + # brain), make zero + mask[mask < mask_threshold] = 0 + + # create the mask in 4d + mask = np.ones(volume.shape) * mask + + return mask - # Mask out the non grey matter regions - brain = volume * mask - return brain +def _noise_dict_update(noise_dict): + """ + Update the noise dictionary parameters, in case any were missing + + Parameters + ---------- + noise_dict : dict + + A dictionary specifying the types of noise in this experiment + + Returns + ------- + Updated dictionary + + """ + + # Check what noise is in the dictionary and add if necessary. Numbers + # determine relative proportion of noise + + if 'overall' not in noise_dict: + noise_dict['overall'] = 0.1 + if 'motion_sigma' not in noise_dict: + noise_dict['motion_sigma'] = 0.1 + if 'drift_sigma' not in noise_dict: + noise_dict['drift_sigma'] = 0.5 + if 'auto_reg_sigma' not in noise_dict: + noise_dict['auto_reg_sigma'] = 1 + if 'physiological_sigma' not in noise_dict: + noise_dict['physiological_sigma'] = 0.1 + if 'system_sigma' not in noise_dict: + noise_dict['system_sigma'] = 1 + if 'snr' not in noise_dict: + noise_dict['snr'] = 30 + if 'spatial_sigma' not in noise_dict: + if noise_dict['overall'] == 0: + noise_dict['spatial_sigma'] = 0.015 + else: + noise_dict['spatial_sigma'] = 0.015 / noise_dict['overall'] + + # Print the mixing of the noise + total_var = noise_dict['motion_sigma'] ** 2 + total_var += noise_dict['drift_sigma'] ** 2 + total_var += noise_dict['auto_reg_sigma'] ** 2 + total_var += noise_dict['physiological_sigma'] ** 2 + total_var += noise_dict['system_sigma'] ** 2 + + return noise_dict + + +def generate_noise(dimensions, + stimfunction, + tr_duration, + mask=None, + noise_dict=None, + ): + """ Generate the noise to be added to the signal. + Default noise parameters will create a noise volume with a standard + deviation of 0.1 (where the signal defaults to a value of 1). This has + built into estimates of how different types of noise mix. All noise + values can be set by the user + + Parameters + ---------- + dimensions : n length array, int + What is the shape of the volume to be generated + + stimfunction : Iterable, bool + When do the stimuli events occur + + tr_duration : float + What is the duration, in seconds, of each TR? + + mask : 4d array, float + The mask of the brain volume, using + + noise_dict : dictionary, float + This is a dictionary that must contain the key: overall. If there + are no other variables provided then it will default values + + Returns + ---------- + + multidimensional array, float + Generates the noise volume for these parameters + + """ + + # Change to be an empty dictionary if it is None + if noise_dict is None: + noise_dict = {} + + # Take in the noise dictionary and determine whether + noise_dict = _noise_dict_update(noise_dict) + + # What are the dimensions of the volume, including time + dimensions_tr = (dimensions[0], + dimensions[1], + dimensions[2], + int(len(stimfunction) / (tr_duration * 1000))) + + # Get the mask of the brain and set it to be 3d + if mask is None: + mask = np.ones(dimensions_tr) + + # Generate the noise + noise_temporal = _generate_noise_temporal(stimfunction=stimfunction, + tr_duration=tr_duration, + dimensions=dimensions, + mask=mask[:, :, :, 0], + spatial_sigma=noise_dict[ + 'spatial_sigma'], + motion_sigma=noise_dict[ + 'motion_sigma'], + drift_sigma=noise_dict[ + 'drift_sigma'], + auto_reg_sigma=noise_dict[ + 'auto_reg_sigma'], + physiological_sigma=noise_dict[ + 'physiological_sigma'], + ) + + noise_system = (_generate_noise_system(dimensions_tr=dimensions_tr) + * noise_dict['system_sigma']) + + # Find the outer product for the brain and nonbrain and add white noise + noise = noise_temporal + noise_system + + # Z score the volume and multiply it by the overall noise + noise = stats.zscore(noise) * noise_dict['overall'] + + # Set the SNR of the brain + noise = noise + (mask * noise_dict['snr'] * noise_dict['overall']) + + return noise def plot_brain(fig, brain, + mask=None, percentile=99, ): """ Display the brain that has been generated with a given threshold @@ -1029,8 +1535,11 @@ def plot_brain(fig, The figure to be displayed, generated from matplotlib. import matplotlib.pyplot as plt; fig = plt.figure() - brain : multidimensional array - This is a 3d or 4d array with the neural data + brain : 3d array + This is a 3d array with the neural data + + mask : 3d array + A binary mask describing the location that you want to specify as percentile : float What percentage of voxels will be included? Based on the values @@ -1052,8 +1561,6 @@ def plot_brain(fig, # How many voxels exceed a threshold brain_threshold = np.where(np.abs(brain) > threshold) - brain_all = np.where(np.abs(brain) > 0) - # Clear the way ax.clear() @@ -1061,14 +1568,18 @@ def plot_brain(fig, ax.set_ylim(0, brain.shape[1]) ax.set_zlim(0, brain.shape[2]) - ax.scatter(brain_all[0], - brain_all[1], - brain_all[2], - zdir='z', - c='black', - s=10, - alpha=0.01) - + # If a mask is provided then plot this + if mask is not None: + mask_threshold = np.where(np.abs(mask) > 0) + ax.scatter(mask_threshold[0], + mask_threshold[1], + mask_threshold[2], + zdir='z', + c='black', + s=10, + alpha=0.01) + + # Plot the volume ax.scatter(brain_threshold[0], brain_threshold[1], brain_threshold[2], diff --git a/examples/utils/fmrisim_example.py b/examples/utils/fmrisim_example.py index b1430f05f..74d60ed5b 100644 --- a/examples/utils/fmrisim_example.py +++ b/examples/utils/fmrisim_example.py @@ -20,23 +20,23 @@ Authors: Cameron Ellis (Princeton) 2016 """ import logging - import numpy as np from brainiak.utils import fmrisim as sim import matplotlib.pyplot as plt -from mpl_toolkits.mplot3d import Axes3D # noqa: F401 +from mpl_toolkits.mplot3d import Axes3D # noqa: F401 +import nibabel logger = logging.getLogger(__name__) # Inputs for generate_signal -dimensions = np.array([64, 64, 36]) # What is the size of the brain +dimensions = np.array([64, 64, 36]) # What is the size of the brain feature_size = [9, 4, 9, 9] feature_type = ['loop', 'cube', 'cavity', 'sphere'] -feature_coordinates_A = np.array( +coordinates_A = np.array( [[32, 32, 18], [26, 32, 18], [32, 26, 18], [32, 32, 12]]) -feature_coordinates_B = np.array( +coordinates_B = np.array( [[32, 32, 18], [38, 32, 18], [32, 38, 18], [32, 32, 24]]) -signal_magnitude = [30, 30, 30, 30] +signal_magnitude = [1, 0.5, 0.25, -1] # Inputs for generate_stimfunction @@ -46,16 +46,19 @@ tr_duration = 2 duration = 100 +# Specify a name to save this generated volume. +savename = 'examples/utils/example.nii' + # Generate a volume representing the location and quality of the signal volume_static_A = sim.generate_signal(dimensions=dimensions, - feature_coordinates=feature_coordinates_A, + feature_coordinates=coordinates_A, feature_type=feature_type, feature_size=feature_size, signal_magnitude=signal_magnitude, ) volume_static_B = sim.generate_signal(dimensions=dimensions, - feature_coordinates=feature_coordinates_B, + feature_coordinates=coordinates_B, feature_type=feature_type, feature_size=feature_size, signal_magnitude=signal_magnitude, @@ -72,20 +75,20 @@ stimfunction_A = sim.generate_stimfunction(onsets=onsets_A, event_durations=event_durations, total_time=duration, - tr_duration=tr_duration, ) stimfunction_B = sim.generate_stimfunction(onsets=onsets_B, event_durations=event_durations, total_time=duration, - tr_duration=tr_duration, ) # Convolve the HRF with the stimulus sequence signal_function_A = sim.double_gamma_hrf(stimfunction=stimfunction_A, + tr_duration=tr_duration, ) signal_function_B = sim.double_gamma_hrf(stimfunction=stimfunction_B, + tr_duration=tr_duration, ) # Multiply the HRF timecourse with the signal @@ -103,17 +106,22 @@ # Combine the stim functions stimfunction = list(np.add(stimfunction_A, stimfunction_B)) +# Generate the mask of the signal +mask = sim.mask_brain(signal) + +# Mask the signal to the shape of a brain (attenuates signal according to grey +# matter likelihood) +signal *= mask + # Create the noise volumes (using the default parameters noise = sim.generate_noise(dimensions=dimensions, stimfunction=stimfunction, tr_duration=tr_duration, + mask=mask, ) # Combine the signal and the noise -volume = signal + noise - -# Mask the volume to be the same shape as a brain -brain = sim.mask_brain(volume) +brain = signal + noise # Display the brain fig = plt.figure() @@ -122,8 +130,47 @@ # Get the axis to be plotted ax = sim.plot_brain(fig, brain[:, :, :, tr_counter], + mask=mask[:, :, :, tr_counter], percentile=99.9) # Wait for an input logging.info(tr_counter) - plt.pause(0.5) \ No newline at end of file + plt.pause(0.5) + +# Save the volume +affine_matrix = np.diag([-1, 1, 1, 1]) # LR gets flipped +brain_nifti = nibabel.Nifti1Image(brain, affine_matrix) # Create a nifti brain +nibabel.save(brain_nifti, savename) + +# Load in the test dataset and generate a random volume based on it + +# Pull out the data and associated data +volume = nibabel.load(savename).get_data() +dimensions = volume.shape[0:3] +tr_duration = tr_duration # Can't be pulled out of nibabel +stimfunction = sim.generate_stimfunction(onsets=[], + event_durations=[0], + total_time=volume.shape[3] * + tr_duration, + ) + +# Calculate the mask +mask = sim.mask_brain(volume=volume, + mask_name='self', + ) + +# Calculate the noise parameters +noise_dict = sim.calc_noise(volume=volume, + mask=mask, + ) + +# Create the noise volumes (using the default parameters +noise = sim.generate_noise(dimensions=dimensions, + tr_duration=tr_duration, + stimfunction=stimfunction, + mask=mask, + noise_dict=noise_dict, + ) + +brain_noise = nibabel.Nifti1Image(noise, affine_matrix) # Create a nifti brain +nibabel.save(brain_noise, 'examples/utils/example2.nii') # Save diff --git a/examples/utils/requirements.txt b/examples/utils/requirements.txt index 6ccafc3f9..2951e8bf2 100644 --- a/examples/utils/requirements.txt +++ b/examples/utils/requirements.txt @@ -1 +1,2 @@ matplotlib +nibabel diff --git a/setup.py b/setup.py index d3b219077..0b6aab7bf 100644 --- a/setup.py +++ b/setup.py @@ -111,6 +111,7 @@ def finalize_options(self): ]) + setup( name='brainiak', use_scm_version=True, @@ -123,6 +124,7 @@ def finalize_options(self): install_requires=[ 'cython', 'mpi4py', + 'nitime', 'numpy', 'scikit-learn[alldeps]>=0.18', 'scipy', diff --git a/tests/utils/test_fmrisim.py b/tests/utils/test_fmrisim.py index 7b2821a15..0cd44f287 100644 --- a/tests/utils/test_fmrisim.py +++ b/tests/utils/test_fmrisim.py @@ -26,40 +26,40 @@ def test_generate_signal(): # Inputs for generate_signal - dimensions = np.array([64, 64, 36]) # What is the size of the brain - feature_size = [2] + dimensions = np.array([10, 10, 10]) # What is the size of the brain + feature_size = [3] feature_type = ['cube'] feature_coordinates = np.array( - [[32, 32, 18]]) + [[5, 5, 5]]) signal_magnitude = [30] # Generate a volume representing the location and quality of the signal - volume_static = sim.generate_signal(dimensions=dimensions, - feature_coordinates=feature_coordinates, - feature_type=feature_type, - feature_size=feature_size, - signal_magnitude=signal_magnitude, - ) - - assert np.all(volume_static.shape == dimensions), "Check signal shape" - assert np.max(volume_static) == signal_magnitude, "Check signal magnitude" - assert np.sum(volume_static>0) == math.pow(feature_size[0], 3), "Check " \ - "feature size" - assert volume_static[32,32,18] == signal_magnitude, "Check signal location" - assert volume_static[32,32,10] == 0, "Check noise location" + volume = sim.generate_signal(dimensions=dimensions, + feature_coordinates=feature_coordinates, + feature_type=feature_type, + feature_size=feature_size, + signal_magnitude=signal_magnitude, + ) + + assert np.all(volume.shape == dimensions), "Check signal shape" + assert np.max(volume) == signal_magnitude, "Check signal magnitude" + assert np.sum(volume>0) == math.pow(feature_size[0], 3), "Check feature " \ + "size" + assert volume[5, 5, 5] == signal_magnitude, "Check signal location" + assert volume[5, 5, 1] == 0, "Check noise location" feature_coordinates = np.array( - [[32, 32, 18], [32, 28, 18], [28, 32, 18]]) + [[5, 5, 5], [3, 3, 3], [7, 7, 7]]) - volume_static = sim.generate_signal(dimensions=dimensions, - feature_coordinates=feature_coordinates, - feature_type=['loop', 'cavity', 'sphere'], - feature_size=[9], - signal_magnitude=signal_magnitude, - ) - assert volume_static[32, 32, 18] == 0, "Loop is empty" - assert volume_static[32, 28, 18] == 0, "Cavity is empty" - assert volume_static[28, 32, 18] != 0, "Sphere is not empty" + volume = sim.generate_signal(dimensions=dimensions, + feature_coordinates=feature_coordinates, + feature_type=['loop', 'cavity', 'sphere'], + feature_size=[3], + signal_magnitude=signal_magnitude, + ) + assert volume[5, 5, 5] == 0, "Loop is empty" + assert volume[3, 3, 3] == 0, "Cavity is empty" + assert volume[7, 7, 7] != 0, "Sphere is not empty" def test_generate_stimfunction(): @@ -74,48 +74,48 @@ def test_generate_stimfunction(): stimfunction = sim.generate_stimfunction(onsets=onsets, event_durations=event_durations, total_time=duration, - tr_duration=tr_duration, ) - assert len(stimfunction) == duration / tr_duration, "stimfunction incorrect " \ - "length" - assert np.sum(stimfunction) == np.sum(event_durations * len(onsets)) / \ - tr_duration, "Event number" - + assert len(stimfunction) == duration * 1000, "stimfunction incorrect " \ + "length" + eventNumber = np.sum(event_durations * len(onsets)) * 1000 + assert np.sum(stimfunction) == eventNumber, "Event number" # Create the signal function signal_function = sim.double_gamma_hrf(stimfunction=stimfunction, + tr_duration=tr_duration, ) - assert len(signal_function) == len(stimfunction), "The length did not change" + assert len(signal_function) == len(stimfunction) / (tr_duration * 1000), \ + "The length did not change" onsets = [10] stimfunction = sim.generate_stimfunction(onsets=onsets, event_durations=event_durations, total_time=duration, - tr_duration=tr_duration, ) signal_function = sim.double_gamma_hrf(stimfunction=stimfunction, + tr_duration=tr_duration, ) assert np.sum(signal_function < 0) > 0, "No values below zero" def test_apply_signal(): - dimensions = np.array([64, 64, 36]) # What is the size of the brain + dimensions = np.array([10, 10, 10]) # What is the size of the brain feature_size = [2] feature_type = ['cube'] feature_coordinates = np.array( - [[32, 32, 18]]) + [[5, 5, 5]]) signal_magnitude = [30] # Generate a volume representing the location and quality of the signal - volume_static = sim.generate_signal(dimensions=dimensions, - feature_coordinates=feature_coordinates, - feature_type=feature_type, - feature_size=feature_size, - signal_magnitude=signal_magnitude, - ) + volume = sim.generate_signal(dimensions=dimensions, + feature_coordinates=feature_coordinates, + feature_type=feature_type, + feature_size=feature_size, + signal_magnitude=signal_magnitude, + ) # Inputs for generate_stimfunction onsets = [10, 30, 50, 70, 90] @@ -127,22 +127,23 @@ def test_apply_signal(): stimfunction = sim.generate_stimfunction(onsets=onsets, event_durations=event_durations, total_time=duration, - tr_duration=tr_duration, ) signal_function = sim.double_gamma_hrf(stimfunction=stimfunction, + tr_duration=tr_duration, ) # Convolve the HRF with the stimulus sequence signal = sim.apply_signal(signal_function=signal_function, - volume_static=volume_static, + volume_static=volume, ) assert signal.shape == (dimensions[0], dimensions[1], dimensions[2], - duration / tr_duration), "The output is the wrong size" + duration / tr_duration), "The output is the " \ + "wrong size" signal = sim.apply_signal(signal_function=stimfunction, - volume_static=volume_static, + volume_static=volume, ) assert np.any(signal == signal_magnitude), "The stimfunction is not binary" @@ -150,21 +151,20 @@ def test_apply_signal(): def test_generate_noise(): - - dimensions = np.array([64, 64, 36]) # What is the size of the brain + dimensions = np.array([10, 10, 10]) # What is the size of the brain feature_size = [2] feature_type = ['cube'] feature_coordinates = np.array( - [[32, 32, 18]]) - signal_magnitude = [30] + [[5, 5, 5]]) + signal_magnitude = [1] # Generate a volume representing the location and quality of the signal - volume_static = sim.generate_signal(dimensions=dimensions, - feature_coordinates=feature_coordinates, - feature_type=feature_type, - feature_size=feature_size, - signal_magnitude=signal_magnitude, - ) + volume = sim.generate_signal(dimensions=dimensions, + feature_coordinates=feature_coordinates, + feature_type=feature_type, + feature_size=feature_size, + signal_magnitude=signal_magnitude, + ) # Inputs for generate_stimfunction onsets = [10, 30, 50, 70, 90] @@ -176,41 +176,39 @@ def test_generate_noise(): stimfunction = sim.generate_stimfunction(onsets=onsets, event_durations=event_durations, total_time=duration, - tr_duration=tr_duration, ) signal_function = sim.double_gamma_hrf(stimfunction=stimfunction, + tr_duration=tr_duration, ) # Convolve the HRF with the stimulus sequence signal = sim.apply_signal(signal_function=signal_function, - volume_static=volume_static, + volume_static=volume, ) - # Create the noise volumes (using the default parameters) + # Generate the mask of the signal + mask = sim.mask_brain(signal, mask_threshold=0.1) + assert min(mask[mask > 0]) > 0.1, "Mask thresholding did not work" + + # Create the noise volumes (using the default parameters) noise = sim.generate_noise(dimensions=dimensions, stimfunction=stimfunction, tr_duration=tr_duration, + mask=mask, ) - assert signal.shape == noise.shape, "The dimensions of signal " \ - "and noise the same" - - Z_noise = sim._generate_noise_temporal(stimfunction, tr_duration, 1) - noise = sim._generate_noise_temporal(stimfunction, tr_duration, 0) - - assert np.std(Z_noise) < np.std(noise), "Z scoring is not working" - - # Combine the signal and the noise - volume = signal + noise + assert signal.shape == noise.shape, "The dimensions of signal and noise " \ + "the same" assert np.std(signal) < np.std(noise), "Noise was not created" noise = sim.generate_noise(dimensions=dimensions, stimfunction=stimfunction, tr_duration=tr_duration, - noise_strength=[0, 0, 0] + mask=mask, + noise_dict={'overall': 0}, ) assert np.sum(noise) == 0, "Noise strength could not be manipulated" @@ -220,11 +218,11 @@ def test_generate_noise(): def test_mask_brain(): # Inputs for generate_signal - dimensions = np.array([64, 64, 36]) # What is the size of the brain + dimensions = np.array([10, 10, 10]) # What is the size of the brain feature_size = [2] feature_type = ['cube'] feature_coordinates = np.array( - [[32, 32, 18]]) + [[4, 4, 4]]) signal_magnitude = [30] # Generate a volume representing the location and quality of the signal @@ -236,14 +234,15 @@ def test_mask_brain(): ) # Mask the volume to be the same shape as a brain - brain = sim.mask_brain(volume) + mask = sim.mask_brain(volume)[:,:,:,0] + brain = volume * (mask > 0) assert np.sum(brain != 0) == np.sum(volume != 0), "Masking did not work" - assert brain[0,0,0,0] == 0, "Masking did not work" - assert brain[32,32,18,0] != 0, "Masking did not work" + assert brain[0, 0, 0] == 0, "Masking did not work" + assert brain[4, 4, 4] != 0, "Masking did not work" feature_coordinates = np.array( - [[3, 3, 3]]) + [[1, 1, 1]]) volume = sim.generate_signal(dimensions=dimensions, feature_coordinates=feature_coordinates, @@ -253,6 +252,55 @@ def test_mask_brain(): ) # Mask the volume to be the same shape as a brain - brain = sim.mask_brain(volume) + mask = sim.mask_brain(volume)[:,:,:,0] + brain = volume * (mask > 0) + + assert np.sum(brain != 0) < np.sum(volume != 0), "Masking did not work" + + +def test_calc_noise(): + + # Inputs for functions + onsets = [10, 30, 50, 70, 90] + event_durations = [6] + tr_duration = 2 + duration = 100 + tr_number = int(np.floor(duration / tr_duration)) + dimensions_tr = np.array([10, 10, 10, tr_number]) + + # Preset the noise dict + nd_orig = {'auto_reg_sigma': 1, + 'drift_sigma': 0.5, + 'overall': 0.1, + 'snr': 30, + 'spatial_sigma': 0.15, + 'system_sigma': 1, + } + + # Create the time course for the signal to be generated + stimfunction = sim.generate_stimfunction(onsets=onsets, + event_durations=event_durations, + total_time=duration, + ) + + # Mask the volume to be the same shape as a brain + mask = sim.mask_brain(dimensions_tr) + noise = sim.generate_noise(dimensions=dimensions_tr[0:3], + stimfunction=stimfunction, + tr_duration=tr_duration, + mask=mask, + noise_dict=nd_orig, + ) + + # Calculate the noise + nd_calc = sim.calc_noise(noise, mask) + + assert abs(nd_calc['overall'] - nd_orig['overall']) < 0.1, 'overall ' \ + 'calculated ' \ + 'incorrectly' + + assert abs(nd_calc['snr'] - nd_orig['snr']) < 10, 'snr calculated ' \ + 'incorrectly' - assert np.sum(brain != 0) < np.sum(volume != 0), "Masking did not work" \ No newline at end of file + assert abs(nd_calc['system_sigma'] - nd_orig['system_sigma']) < 1, \ + 'snr calculated incorrectly'