Computational Neuroscience: fMRI Signal Processing and Brain Network Analysis
Advanced computational methods for analyzing fMRI data, including signal processing techniques, connectivity analysis, and machine learning applications in neuroscience research.
Computational Neuroscience: fMRI Signal Processing and Brain Network Analysis
Functional magnetic resonance imaging (fMRI) has revolutionized our understanding of brain function. This comprehensive guide explores the mathematical foundations and computational techniques used to analyze brain signals and networks.
Mathematical Foundations of fMRI Signal Processing
BOLD Signal and Hemodynamic Response
The Blood Oxygen Level Dependent (BOLD) signal follows the hemodynamic response function:
Where:
- , , , , (canonical parameters)
- is the gamma function
Signal-to-Noise Ratio in fMRI
The temporal SNR is defined as:
Where is the mean signal and is the temporal standard deviation.
General Linear Model for fMRI
The GLM framework for fMRI analysis:
Where:
- is the observed BOLD signal
- is the design matrix
- are the regression coefficients
- is the error term
Understanding fMRI signal processing and analysis techniques
fMRI Data Preprocessing Pipeline
Motion Correction and Registration
pythonimport numpy as np import nibabel as nib from nilearn import datasets, image, plotting from nilearn.maskers import NiftiLabelsMasker, NiftiMasker from scipy import ndimage from sklearn.decomposition import PCA, FastICA import matplotlib.pyplot as plt import seaborn as sns class fMRIPreprocessor: def __init__(self, tr=2.0): self.tr = tr # Repetition time self.preprocessing_steps = [] def slice_timing_correction(self, data, slice_order='ascending'): """Apply slice timing correction""" n_slices = data.shape[2] corrected_data = np.zeros_like(data) if slice_order == 'ascending': slice_times = np.linspace(0, self.tr, n_slices, endpoint=False) elif slice_order == 'interleaved': slice_times = np.concatenate([ np.linspace(0, self.tr/2, n_slices//2, endpoint=False), np.linspace(self.tr/2, self.tr, n_slices//2, endpoint=False) ]) # Interpolate to common time grid for t in range(data.shape[3]): for z in range(n_slices): # Simple linear interpolation for demonstration shift = slice_times[z] / self.tr corrected_data[:, :, z, t] = ndimage.shift( data[:, :, z, t], shift, mode='nearest' ) self.preprocessing_steps.append("Slice timing correction") return corrected_data def motion_correction(self, data, reference_volume=0): """Rigid body motion correction""" from scipy.optimize import minimize def cost_function(params, vol1, vol2): """Cost function for registration""" # Simplified rigid body transformation tx, ty, tz, rx, ry, rz = params # Apply transformation (simplified) transformed = ndimage.shift(vol2, [tx, ty, tz]) # Mutual information as cost function hist_2d, _, _ = np.histogram2d( vol1.ravel(), transformed.ravel(), bins=50 ) # Normalize hist_2d = hist_2d / np.sum(hist_2d) # Calculate mutual information marginal_1 = np.sum(hist_2d, axis=1) marginal_2 = np.sum(hist_2d, axis=0) mi = 0 for i in range(len(marginal_1)): for j in range(len(marginal_2)): if hist_2d[i, j] > 0: mi += hist_2d[i, j] * np.log( hist_2d[i, j] / (marginal_1[i] * marginal_2[j]) ) return -mi # Minimize negative MI reference = data[:, :, :, reference_volume] motion_params = [] corrected_data = np.zeros_like(data) corrected_data[:, :, :, reference_volume] = reference for t in range(data.shape[3]): if t == reference_volume: motion_params.append([0, 0, 0, 0, 0, 0]) continue # Optimize registration parameters result = minimize( cost_function, x0=[0, 0, 0, 0, 0, 0], args=(reference, data[:, :, :, t]), method='Powell' ) motion_params.append(result.x) # Apply correction tx, ty, tz = result.x[:3] corrected_data[:, :, :, t] = ndimage.shift( data[:, :, :, t], [tx, ty, tz] ) self.preprocessing_steps.append("Motion correction") return corrected_data, np.array(motion_params) def temporal_filtering(self, data, low_pass=0.1, high_pass=0.01): """Apply temporal filtering""" from scipy.signal import butter, filtfilt nyquist = 0.5 / self.tr low = high_pass / nyquist high = low_pass / nyquist b, a = butter(4, [low, high], btype='band') filtered_data = np.zeros_like(data) for x in range(data.shape[0]): for y in range(data.shape[1]): for z in range(data.shape[2]): filtered_data[x, y, z, :] = filtfilt( b, a, data[x, y, z, :] ) self.preprocessing_steps.append("Temporal filtering") return filtered_data def spatial_smoothing(self, data, fwhm=6.0): """Apply spatial smoothing with Gaussian kernel""" sigma = fwhm / (2 * np.sqrt(2 * np.log(2))) smoothed_data = np.zeros_like(data) for t in range(data.shape[3]): smoothed_data[:, :, :, t] = ndimage.gaussian_filter( data[:, :, :, t], sigma=sigma ) self.preprocessing_steps.append(f"Spatial smoothing (FWHM={fwhm}mm)") return smoothed_data # Example usage preprocessor = fMRIPreprocessor(tr=2.0) print("fMRI Preprocessing Pipeline Initialized") print("Available steps:", preprocessor.preprocessing_steps)
Physiological Noise Correction
pythondef physiological_noise_correction(data, cardiac_rate=60, resp_rate=15): """Remove physiological noise using RETROICOR""" # Simulate physiological signals n_volumes = data.shape[3] time_points = np.arange(n_volumes) * 2.0 # TR = 2s # Cardiac phase cardiac_phase = 2 * np.pi * cardiac_rate * time_points / 60 cardiac_regressors = np.column_stack([ np.cos(cardiac_phase), np.sin(cardiac_phase), np.cos(2 * cardiac_phase), np.sin(2 * cardiac_phase) ]) # Respiratory phase resp_phase = 2 * np.pi * resp_rate * time_points / 60 resp_regressors = np.column_stack([ np.cos(resp_phase), np.sin(resp_phase), np.cos(2 * resp_phase), np.sin(2 * resp_phase) ]) # Combine regressors physio_regressors = np.column_stack([cardiac_regressors, resp_regressors]) # Add constant term design_matrix = np.column_stack([ np.ones(n_volumes), physio_regressors ]) # Remove physiological noise voxel-wise corrected_data = np.zeros_like(data) for x in range(data.shape[0]): for y in range(data.shape[1]): for z in range(data.shape[2]): voxel_ts = data[x, y, z, :] # Fit GLM beta = np.linalg.lstsq(design_matrix, voxel_ts, rcond=None)[0] # Remove physiological components (keep only constant) corrected_ts = voxel_ts - design_matrix[:, 1:] @ beta[1:] corrected_data[x, y, z, :] = corrected_ts return corrected_data, design_matrix # Apply physiological noise correction print("Applying physiological noise correction...") # corrected_data, physio_design = physiological_noise_correction(sample_data)
Brain Network Analysis
Functional Connectivity Analysis
pythonclass BrainNetworkAnalyzer: def __init__(self, atlas='aal'): self.atlas = atlas self.connectivity_matrices = [] def extract_roi_timeseries(self, fmri_data, atlas_img): """Extract ROI time series using atlas""" masker = NiftiLabelsMasker( labels_img=atlas_img, standardize=True, memory='nilearn_cache', verbose=0 ) roi_timeseries = masker.fit_transform(fmri_data) return roi_timeseries, masker def compute_connectivity_matrix(self, roi_timeseries, method='correlation'): """Compute functional connectivity matrix""" n_rois = roi_timeseries.shape[1] if method == 'correlation': connectivity = np.corrcoef(roi_timeseries.T) elif method == 'partial_correlation': from sklearn.covariance import GraphicalLasso # Estimate precision matrix gl = GraphicalLasso(alpha=0.1) gl.fit(roi_timeseries) # Partial correlation = -precision / sqrt(diag) precision = gl.precision_ connectivity = -precision / np.sqrt( np.outer(np.diag(precision), np.diag(precision)) ) np.fill_diagonal(connectivity, 1) elif method == 'mutual_information': from sklearn.feature_selection import mutual_info_regression connectivity = np.zeros((n_rois, n_rois)) for i in range(n_rois): for j in range(i+1, n_rois): mi = mutual_info_regression( roi_timeseries[:, [i]], roi_timeseries[:, j] )[0] connectivity[i, j] = connectivity[j, i] = mi np.fill_diagonal(connectivity, 1) self.connectivity_matrices.append(connectivity) return connectivity def graph_theory_metrics(self, connectivity_matrix, threshold=0.3): """Compute graph theory metrics""" import networkx as nx # Threshold connectivity matrix binary_matrix = (np.abs(connectivity_matrix) > threshold).astype(int) np.fill_diagonal(binary_matrix, 0) # Create NetworkX graph G = nx.from_numpy_array(binary_matrix) metrics = {} # Basic metrics metrics['density'] = nx.density(G) metrics['transitivity'] = nx.transitivity(G) metrics['average_clustering'] = nx.average_clustering(G) # Centrality measures metrics['degree_centrality'] = nx.degree_centrality(G) metrics['betweenness_centrality'] = nx.betweenness_centrality(G) metrics['eigenvector_centrality'] = nx.eigenvector_centrality(G) # Path metrics if nx.is_connected(G): metrics['average_path_length'] = nx.average_shortest_path_length(G) metrics['diameter'] = nx.diameter(G) else: # For disconnected graphs, compute for largest component largest_cc = max(nx.connected_components(G), key=len) subgraph = G.subgraph(largest_cc) metrics['average_path_length'] = nx.average_shortest_path_length(subgraph) metrics['diameter'] = nx.diameter(subgraph) # Small-world metrics # Random graph comparison random_G = nx.erdos_renyi_graph(G.number_of_nodes(), metrics['density']) random_clustering = nx.average_clustering(random_G) random_path_length = nx.average_shortest_path_length(random_G) metrics['small_worldness'] = ( (metrics['average_clustering'] / random_clustering) / (metrics['average_path_length'] / random_path_length) ) return metrics def network_visualization(self, connectivity_matrix, roi_labels=None, threshold=0.3): """Visualize brain network""" import networkx as nx from matplotlib.patches import Circle # Threshold matrix thresholded = connectivity_matrix.copy() thresholded[np.abs(thresholded) < threshold] = 0 # Create graph G = nx.from_numpy_array(thresholded) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6)) # Plot 1: Connectivity matrix im = ax1.imshow(connectivity_matrix, cmap='RdBu_r', vmin=-1, vmax=1) ax1.set_title('Functional Connectivity Matrix') ax1.set_xlabel('ROI Index') ax1.set_ylabel('ROI Index') plt.colorbar(im, ax=ax1, label='Correlation') # Plot 2: Network graph pos = nx.spring_layout(G, k=1, iterations=50) # Node sizes based on degree node_sizes = [300 * G.degree(node) for node in G.nodes()] # Edge weights for visualization edges = G.edges() weights = [G[u][v]['weight'] for u, v in edges] nx.draw_networkx_nodes(G, pos, node_size=node_sizes, node_color='lightblue', ax=ax2) nx.draw_networkx_edges(G, pos, width=np.abs(weights)*2, alpha=0.6, ax=ax2) if roi_labels is not None: labels = {i: roi_labels[i] for i in range(len(roi_labels))} nx.draw_networkx_labels(G, pos, labels, font_size=8, ax=ax2) ax2.set_title('Brain Network Graph') ax2.axis('off') plt.tight_layout() return fig # Example network analysis analyzer = BrainNetworkAnalyzer() print("Brain Network Analyzer initialized")
Independent Component Analysis
pythondef ica_analysis(fmri_data, n_components=20): """Perform ICA to identify brain networks""" # Reshape data for ICA n_voxels = np.prod(fmri_data.shape[:3]) n_timepoints = fmri_data.shape[3] data_2d = fmri_data.reshape(n_voxels, n_timepoints).T # Remove zero variance voxels voxel_std = np.std(data_2d, axis=0) valid_voxels = voxel_std > 0 data_clean = data_2d[:, valid_voxels] # Apply ICA ica = FastICA(n_components=n_components, random_state=42, max_iter=1000) ica_components = ica.fit_transform(data_clean) ica_mixing = ica.mixing_ # Reconstruct spatial maps spatial_maps = np.zeros((n_voxels, n_components)) spatial_maps[valid_voxels, :] = ica.components_.T # Reshape back to 3D spatial_maps_3d = spatial_maps.reshape( fmri_data.shape[:3] + (n_components,) ) # Identify default mode network (DMN) # Look for component with high values in typical DMN regions # This is simplified - in practice, would use template matching results = { 'spatial_maps': spatial_maps_3d, 'time_courses': ica_components, 'mixing_matrix': ica_mixing, 'components_': ica.components_ } return results def visualize_ica_components(ica_results, component_idx=0): """Visualize ICA components""" spatial_map = ica_results['spatial_maps'][:, :, :, component_idx] time_course = ica_results['time_courses'][:, component_idx] fig, axes = plt.subplots(2, 2, figsize=(12, 10)) # Spatial map - axial slice slice_idx = spatial_map.shape[2] // 2 im1 = axes[0, 0].imshow(spatial_map[:, :, slice_idx], cmap='RdBu_r') axes[0, 0].set_title(f'IC {component_idx} - Axial Slice {slice_idx}') plt.colorbar(im1, ax=axes[0, 0]) # Spatial map - sagittal slice slice_idx = spatial_map.shape[0] // 2 im2 = axes[0, 1].imshow(spatial_map[slice_idx, :, :], cmap='RdBu_r') axes[0, 1].set_title(f'IC {component_idx} - Sagittal Slice {slice_idx}') plt.colorbar(im2, ax=axes[0, 1]) # Time course axes[1, 0].plot(time_course) axes[1, 0].set_title(f'IC {component_idx} - Time Course') axes[1, 0].set_xlabel('Time (volumes)') axes[1, 0].set_ylabel('Signal') # Power spectrum from scipy.signal import periodogram freqs, psd = periodogram(time_course, fs=1/2.0) # TR = 2s axes[1, 1].loglog(freqs, psd) axes[1, 1].set_title(f'IC {component_idx} - Power Spectrum') axes[1, 1].set_xlabel('Frequency (Hz)') axes[1, 1].set_ylabel('Power') plt.tight_layout() return fig print("ICA analysis functions defined")
Machine Learning Applications in Neuroscience
Classification of Brain States
pythonfrom sklearn.ensemble import RandomForestClassifier from sklearn.svm import SVC from sklearn.model_selection import cross_val_score, StratifiedKFold from sklearn.preprocessing import StandardScaler from sklearn.pipeline import Pipeline class BrainStateClassifier: def __init__(self): self.models = { 'random_forest': RandomForestClassifier(n_estimators=100, random_state=42), 'svm': SVC(kernel='rbf', gamma='scale', random_state=42), } self.fitted_models = {} self.feature_importances = {} def extract_features(self, fmri_data, method='connectivity'): """Extract features from fMRI data""" if method == 'connectivity': # Use upper triangle of connectivity matrix as features n_rois = fmri_data.shape[1] connectivity = np.corrcoef(fmri_data.T) # Extract upper triangle (excluding diagonal) triu_indices = np.triu_indices(n_rois, k=1) features = connectivity[triu_indices] elif method == 'spectral': # Power spectral density features from scipy.signal import welch features = [] for roi in range(fmri_data.shape[1]): freqs, psd = welch(fmri_data[:, roi], fs=1/2.0, nperseg=64) features.extend(psd) features = np.array(features) elif method == 'graph_metrics': # Graph theory metrics as features connectivity = np.corrcoef(fmri_data.T) metrics = analyzer.graph_theory_metrics(connectivity) # Convert centrality dicts to arrays features = [] for key, value in metrics.items(): if isinstance(value, dict): features.extend(list(value.values())) else: features.append(value) features = np.array(features) return features def train_classifiers(self, X, y, cv_folds=5): """Train multiple classifiers with cross-validation""" results = {} for name, model in self.models.items(): # Create pipeline with standardization pipeline = Pipeline([ ('scaler', StandardScaler()), ('classifier', model) ]) # Cross-validation cv = StratifiedKFold(n_splits=cv_folds, shuffle=True, random_state=42) cv_scores = cross_val_score(pipeline, X, y, cv=cv, scoring='accuracy') # Fit on full dataset pipeline.fit(X, y) self.fitted_models[name] = pipeline # Extract feature importances (for tree-based models) if hasattr(model, 'feature_importances_'): self.feature_importances[name] = model.feature_importances_ results[name] = { 'cv_scores': cv_scores, 'mean_accuracy': np.mean(cv_scores), 'std_accuracy': np.std(cv_scores) } return results def predict_brain_state(self, fmri_data, model_name='random_forest'): """Predict brain state from new fMRI data""" if model_name not in self.fitted_models: raise ValueError(f"Model {model_name} not trained") features = self.extract_features(fmri_data) prediction = self.fitted_models[model_name].predict(features.reshape(1, -1)) # Get prediction probabilities if available if hasattr(self.fitted_models[model_name].named_steps['classifier'], 'predict_proba'): probabilities = self.fitted_models[model_name].predict_proba( features.reshape(1, -1) )[0] else: probabilities = None return prediction[0], probabilities # Example usage classifier = BrainStateClassifier() # Simulate data for different brain states n_subjects = 50 n_timepoints = 200 n_rois = 90 # Generate synthetic data for different conditions resting_state_data = [] task_state_data = [] labels = [] for subject in range(n_subjects): # Resting state: lower connectivity, more random rest_data = np.random.randn(n_timepoints, n_rois) * 0.5 rest_connectivity = np.random.rand(n_rois, n_rois) * 0.3 rest_data += np.random.multivariate_normal( np.zeros(n_rois), rest_connectivity, n_timepoints ) # Task state: higher connectivity in specific networks task_data = np.random.randn(n_timepoints, n_rois) * 0.3 task_connectivity = np.random.rand(n_rois, n_rois) * 0.5 # Increase connectivity in "task-relevant" regions (first 20 ROIs) task_connectivity[:20, :20] += 0.5 task_data += np.random.multivariate_normal( np.zeros(n_rois), task_connectivity, n_timepoints ) resting_state_data.append(rest_data) task_state_data.append(task_data) # Extract features print("Extracting features for classification...") X = [] y = [] for i in range(n_subjects): # Resting state features rest_features = classifier.extract_features(resting_state_data[i]) X.append(rest_features) y.append(0) # Resting state label # Task state features task_features = classifier.extract_features(task_state_data[i]) X.append(task_features) y.append(1) # Task state label X = np.array(X) y = np.array(y) # Train classifiers print(f"Training classifiers on {len(X)} samples...") classification_results = classifier.train_classifiers(X, y) print("\nClassification Results:") for model_name, results in classification_results.items(): print(f"{model_name}:") print(f" Mean accuracy: {results['mean_accuracy']:.3f} ± {results['std_accuracy']:.3f}")
Dynamic Functional Connectivity
Time-Varying Connectivity Analysis
pythondef sliding_window_connectivity(fmri_data, window_length=50, step_size=1): """Compute time-varying connectivity using sliding windows""" n_timepoints = fmri_data.shape[0] n_rois = fmri_data.shape[1] n_windows = (n_timepoints - window_length) // step_size + 1 dynamic_connectivity = np.zeros((n_windows, n_rois, n_rois)) for w in range(n_windows): start_idx = w * step_size end_idx = start_idx + window_length window_data = fmri_data[start_idx:end_idx, :] connectivity = np.corrcoef(window_data.T) # Handle NaN values connectivity = np.nan_to_num(connectivity) dynamic_connectivity[w, :, :] = connectivity return dynamic_connectivity def connectivity_state_analysis(dynamic_connectivity, n_states=4): """Identify discrete connectivity states using clustering""" from sklearn.cluster import KMeans from sklearn.decomposition import PCA n_windows, n_rois, _ = dynamic_connectivity.shape # Vectorize connectivity matrices (upper triangle) triu_indices = np.triu_indices(n_rois, k=1) connectivity_vectors = np.zeros((n_windows, len(triu_indices[0]))) for w in range(n_windows): connectivity_vectors[w, :] = dynamic_connectivity[w, :, :][triu_indices] # Apply PCA for dimensionality reduction pca = PCA(n_components=min(20, connectivity_vectors.shape[1])) connectivity_pca = pca.fit_transform(connectivity_vectors) # K-means clustering kmeans = KMeans(n_clusters=n_states, random_state=42, n_init=10) state_labels = kmeans.fit_predict(connectivity_pca) # Compute state centroids in original space state_centroids = np.zeros((n_states, n_rois, n_rois)) for state in range(n_states): state_windows = dynamic_connectivity[state_labels == state] state_centroids[state, :, :] = np.mean(state_windows, axis=0) results = { 'state_labels': state_labels, 'state_centroids': state_centroids, 'pca_data': connectivity_pca, 'pca_explained_variance': pca.explained_variance_ratio_, 'kmeans_model': kmeans } return results def visualize_dynamic_connectivity(dynamic_connectivity, state_results, roi_labels=None): """Visualize dynamic connectivity results""" n_states = len(np.unique(state_results['state_labels'])) fig, axes = plt.subplots(2, n_states, figsize=(4*n_states, 8)) # Plot state centroids for state in range(n_states): centroid = state_results['state_centroids'][state, :, :] im = axes[0, state].imshow(centroid, cmap='RdBu_r', vmin=-1, vmax=1) axes[0, state].set_title(f'State {state + 1}') if state == n_states - 1: plt.colorbar(im, ax=axes[0, state]) # Plot state time course state_labels = state_results['state_labels'] time_points = np.arange(len(state_labels)) for state in range(n_states): state_times = time_points[state_labels == state] axes[1, state].scatter(state_times, [state] * len(state_times), alpha=0.6, s=20) axes[1, state].set_ylim(-0.5, n_states - 0.5) axes[1, state].set_xlabel('Time (windows)') if state == 0: axes[1, state].set_ylabel('Connectivity State') plt.tight_layout() return fig # Example dynamic connectivity analysis print("Computing dynamic functional connectivity...") # Use synthetic data for demonstration sample_timeseries = np.random.randn(200, 50) # 200 timepoints, 50 ROIs # Add some time-varying structure for t in range(200): if t < 50: # State 1: high connectivity in first 25 ROIs sample_timeseries[t, :25] += np.random.multivariate_normal( np.zeros(25), np.eye(25) * 0.5 ) elif t < 100: # State 2: high connectivity in last 25 ROIs sample_timeseries[t, 25:] += np.random.multivariate_normal( np.zeros(25), np.eye(25) * 0.5 ) elif t < 150: # State 3: global connectivity sample_timeseries[t, :] += np.random.multivariate_normal( np.zeros(50), np.eye(50) * 0.3 ) # State 4: baseline (no additional structure) # Compute dynamic connectivity dfc = sliding_window_connectivity(sample_timeseries, window_length=30, step_size=5) print(f"Dynamic connectivity shape: {dfc.shape}") # Identify connectivity states state_results = connectivity_state_analysis(dfc, n_states=4) print(f"Identified {len(np.unique(state_results['state_labels']))} connectivity states") # Visualize results dfc_fig = visualize_dynamic_connectivity(dfc, state_results) plt.show()
Advanced techniques for analyzing dynamic brain networks and connectivity
Advanced Signal Processing Techniques
Wavelet Analysis for Non-Stationary Signals
pythonimport pywt from scipy.signal import hilbert def wavelet_coherence_analysis(signal1, signal2, fs=0.5, scales=None): """Compute wavelet coherence between two signals""" if scales is None: scales = np.logspace(0, 3, 50) # Frequency range # Continuous wavelet transform cwt1, _ = pywt.cwt(signal1, scales, 'cmor1.5-1.0', sampling_period=1/fs) cwt2, _ = pywt.cwt(signal2, scales, 'cmor1.5-1.0', sampling_period=1/fs) # Cross-wavelet transform cross_wavelet = cwt1 * np.conj(cwt2) # Compute coherence coherence = np.abs(cross_wavelet)**2 / (np.abs(cwt1)**2 * np.abs(cwt2)**2) # Convert scales to frequencies central_freq = pywt.central_frequency('cmor1.5-1.0') frequencies = central_freq / (scales * (1/fs)) return coherence, frequencies, cross_wavelet def phase_locking_value(signal1, signal2, freq_band=(0.01, 0.1), fs=0.5): """Compute phase locking value between signals""" from scipy.signal import butter, filtfilt # Filter signals in frequency band nyquist = 0.5 * fs low = freq_band[0] / nyquist high = freq_band[1] / nyquist b, a = butter(4, [low, high], btype='band') filtered1 = filtfilt(b, a, signal1) filtered2 = filtfilt(b, a, signal2) # Extract phases using Hilbert transform analytic1 = hilbert(filtered1) analytic2 = hilbert(filtered2) phase1 = np.angle(analytic1) phase2 = np.angle(analytic2) # Phase difference phase_diff = phase1 - phase2 # Phase locking value plv = np.abs(np.mean(np.exp(1j * phase_diff))) return plv, phase_diff # Example wavelet analysis print("Performing wavelet coherence analysis...") # Generate example signals with time-varying coupling t = np.linspace(0, 100, 1000) freq1 = 0.05 # 0.05 Hz freq2 = 0.05 # Time-varying coupling strength coupling_strength = np.exp(-((t - 50)**2) / (2 * 15**2)) # Gaussian window signal1 = np.sin(2 * np.pi * freq1 * t) + 0.5 * np.random.randn(len(t)) signal2 = (coupling_strength * np.sin(2 * np.pi * freq2 * t + np.pi/4) + (1 - coupling_strength) * np.random.randn(len(t)) * 0.5) # Compute wavelet coherence coherence, frequencies, cross_wavelet = wavelet_coherence_analysis(signal1, signal2) print(f"Coherence matrix shape: {coherence.shape}") print(f"Frequency range: {frequencies.min():.4f} - {frequencies.max():.4f} Hz") # Compute PLV plv, phase_diff = phase_locking_value(signal1, signal2, freq_band=(0.04, 0.06)) print(f"Phase Locking Value: {plv:.3f}")
Clinical Applications and Case Studies
Alzheimer's Disease Classification
pythonclass ADClassifier: """Classifier for Alzheimer's Disease using fMRI connectivity""" def __init__(self): self.feature_names = [] self.model = None self.scaler = None def extract_ad_features(self, connectivity_matrix, roi_labels): """Extract AD-relevant features from connectivity matrix""" # Define key brain networks for AD dmn_regions = ['PCC', 'MPFC', 'ANG', 'PHIP'] # Default Mode Network memory_regions = ['HIPP', 'PHIP', 'ENT', 'PREC'] # Memory network executive_regions = ['DLPFC', 'ACC', 'IPL'] # Executive network features = [] feature_names = [] # 1. Within-network connectivity for network_name, regions in [ ('DMN', dmn_regions), ('Memory', memory_regions), ('Executive', executive_regions) ]: network_indices = [i for i, label in enumerate(roi_labels) if any(region in label for region in regions)] if len(network_indices) > 1: network_connectivity = connectivity_matrix[ np.ix_(network_indices, network_indices) ] # Mean within-network connectivity triu_indices = np.triu_indices(len(network_indices), k=1) mean_connectivity = np.mean(network_connectivity[triu_indices]) features.append(mean_connectivity) feature_names.append(f'{network_name}_within_connectivity') # 2. Between-network connectivity dmn_indices = [i for i, label in enumerate(roi_labels) if any(region in label for region in dmn_regions)] exec_indices = [i for i, label in enumerate(roi_labels) if any(region in label for region in executive_regions)] if len(dmn_indices) > 0 and len(exec_indices) > 0: dmn_exec_connectivity = connectivity_matrix[ np.ix_(dmn_indices, exec_indices) ] mean_between = np.mean(dmn_exec_connectivity) features.append(mean_between) feature_names.append('DMN_Executive_between_connectivity') # 3. Global efficiency (graph theory metric) import networkx as nx binary_matrix = (np.abs(connectivity_matrix) > 0.3).astype(int) np.fill_diagonal(binary_matrix, 0) G = nx.from_numpy_array(binary_matrix) if nx.is_connected(G): global_efficiency = nx.global_efficiency(G) else: # Use largest connected component largest_cc = max(nx.connected_components(G), key=len) subgraph = G.subgraph(largest_cc) global_efficiency = nx.global_efficiency(subgraph) features.append(global_efficiency) feature_names.append('global_efficiency') self.feature_names = feature_names return np.array(features) def train_ad_classifier(self, connectivity_matrices, labels, roi_labels): """Train classifier for AD detection""" # Extract features X = [] for conn_matrix in connectivity_matrices: features = self.extract_ad_features(conn_matrix, roi_labels) X.append(features) X = np.array(X) y = np.array(labels) # 0: Healthy Control, 1: AD # Preprocessing self.scaler = StandardScaler() X_scaled = self.scaler.fit_transform(X) # Train classifier from sklearn.ensemble import RandomForestClassifier self.model = RandomForestClassifier( n_estimators=100, random_state=42, class_weight='balanced' ) self.model.fit(X_scaled, y) # Feature importance importance_df = pd.DataFrame({ 'feature': self.feature_names, 'importance': self.model.feature_importances_ }).sort_values('importance', ascending=False) return importance_df def predict_ad(self, connectivity_matrix, roi_labels): """Predict AD probability for new subject""" features = self.extract_ad_features(connectivity_matrix, roi_labels) features_scaled = self.scaler.transform(features.reshape(1, -1)) prediction = self.model.predict(features_scaled)[0] probability = self.model.predict_proba(features_scaled)[0] return prediction, probability # Example AD classification print("Setting up Alzheimer's Disease classification...") # Simulate data for demo n_subjects = 100 n_rois = 90 roi_labels = [f'ROI_{i}' for i in range(n_rois)] # Add some realistic ROI names roi_labels[0] = 'PCC' # Posterior Cingulate Cortex roi_labels[1] = 'MPFC' # Medial Prefrontal Cortex roi_labels[10] = 'HIPP_L' # Left Hippocampus roi_labels[11] = 'HIPP_R' # Right Hippocampus ad_classifier = ADClassifier() # Generate synthetic connectivity matrices healthy_matrices = [] ad_matrices = [] for i in range(n_subjects // 2): # Healthy controls: higher connectivity in DMN healthy_conn = np.random.rand(n_rois, n_rois) * 0.5 healthy_conn[0, 1] = 0.8 # Strong PCC-MPFC connection healthy_conn[1, 0] = 0.8 healthy_matrices.append(healthy_conn) # AD patients: reduced connectivity ad_conn = np.random.rand(n_rois, n_rois) * 0.3 ad_conn[0, 1] = 0.3 # Reduced PCC-MPFC connection ad_conn[1, 0] = 0.3 ad_conn[10, 11] = 0.2 # Reduced hippocampal connectivity ad_conn[11, 10] = 0.2 ad_matrices.append(ad_conn) # Combine data all_matrices = healthy_matrices + ad_matrices all_labels = [0] * (n_subjects // 2) + [1] * (n_subjects // 2) # Train classifier importance_results = ad_classifier.train_ad_classifier( all_matrices, all_labels, roi_labels ) print("Feature Importance for AD Classification:") print(importance_results)
Conclusion
This comprehensive exploration of computational neuroscience demonstrates the power of mathematical modeling and advanced signal processing techniques in understanding brain function. From basic fMRI preprocessing to sophisticated network analysis and machine learning applications, these methods provide crucial insights into neural mechanisms and clinical applications.
Key Takeaways
- Signal Processing Foundations: Understanding the BOLD signal and hemodynamic response is crucial for proper fMRI analysis
- Network Analysis: Graph theory metrics reveal important properties of brain organization
- Machine Learning: Classification and prediction methods enable clinical applications
- Dynamic Connectivity: Time-varying analysis captures the temporal dynamics of brain networks
- Clinical Translation: These methods have direct applications in neurological and psychiatric disorders
The future of computational neuroscience and brain-computer interfaces
Future Directions
- Integration of multimodal neuroimaging (fMRI, EEG, MEG)
- Real-time neurofeedback applications
- Personalized brain stimulation protocols
- Large-scale brain simulation models
- Applications in brain-computer interfaces
For complete code examples and datasets, visit our Computational Neuroscience Repository.
Comments
Related Articles
View all →Astronomical Data Analysis: From Exoplanet Detection to Gravitational Waves
Comprehensive guide to modern astronomical data analysis techniques, including exoplanet detection algorithms, stellar classification using machine learning, and gravitational wave signal processing.
Climate Data Analysis: Modeling Global Temperature Trends with Machine Learning
Comprehensive analysis of global climate data using machine learning techniques, featuring interactive visualizations, time-series forecasting, and statistical modeling of temperature anomalies.
Deep Learning for Medical Image Analysis: A Comprehensive Review
Exploring the latest advances in deep learning techniques for medical image analysis, including CNN architectures, attention mechanisms, and transformer models for radiology, pathology, and clinical diagnosis.
Last updated: 2025-05-17 17:35:55 by linhduongtuan