logo

pyleida.dynamics_metrics

The module 'pyleida.dynamics_metrics' provides functions to compute the metrics from dynamical systems theory

Expand source code
"""The module 'pyleida.dynamics_metrics' provides functions
to compute the metrics from dynamical systems theory"""

from ._dynamics_metrics import (
    compute_dynamics_metrics,
    fractional_occupancy,
    fractional_occupancy_group,
    dwell_times,
    dwell_times_group,
    transition_probabilities,
    transition_probabilities_group,
    group_transition_matrix,
    plot_patterns_k,
)

__all__ = [
    "compute_dynamics_metrics",
    "fractional_occupancy",
    "fractional_occupancy_group",
    "dwell_times",
    "dwell_times_group",
    "transition_probabilities",
    "transition_probabilities_group",
    "group_transition_matrix",
    "plot_patterns_k"
]

Functions

def compute_dynamics_metrics(clusters_labels, TR=None, save_results=False, path=None)

Execute the dynamics analysis using Dynamical Systems theory tools for each k explored: computes the fractional occupancies,dwell times, and transitions probabilities for each subject of each group/condition.

Params:

clusters_labels : pd.dataframe with shape (n_eigenvectors, 2 + range_of_K). Contains the predicted labels of each eigenvector across the K range explored. 1st column contains the subjects ids, and 2nd column contains the class/conditions labels. The remaining columns contain the predictions/labelling for each k explored.

TR : np.float or int. Specify the Repetition Time of the fMRI data.

save_results : bool. Whether to save results. If True, the results are saved in a folder called 'dynamics_metrics'.

path : str. Specify the path in which the 'dynamics_metrics' folder will be created if 'save_results' was set to True.

Returns:

metrics : dict. Contains 'dwell_times' (pd.DataFrame), 'fract_occupancies' (pd.DataFrame), 'transitions' (pd.DataFrame).

Expand source code
def compute_dynamics_metrics(clusters_labels,TR=None,save_results=False,path=None):
    """
    Execute the dynamics analysis using Dynamical Systems
    theory tools for each k explored: computes the fractional
    occupancies,dwell times, and transitions probabilities for 
    each subject of each group/condition.

    Params:
    -------
    clusters_labels : pd.dataframe with shape (n_eigenvectors, 2 + range_of_K).
        Contains the predicted labels of each eigenvector
        across the K range explored. 1st column contains
        the subjects ids, and 2nd column contains the 
        class/conditions labels. The remaining columns contain
        the predictions/labelling for each k explored.  
        
    TR : np.float or int.
        Specify the Repetition Time of the fMRI data.

    save_results : bool. 
        Whether to save results. If True, the results are
        saved in a folder called 'dynamics_metrics'.

    path : str.
        Specify the path in which the 'dynamics_metrics'
        folder will be created if 'save_results' was set
        to True.

    Returns:
    --------
    metrics : dict. 
        Contains 'dwell_times' (pd.DataFrame),
        'fract_occupancies' (pd.DataFrame),
        'transitions' (pd.DataFrame).
    """

    #create folder (if specified) to save the results
    if save_results:
        try:
            results_path = f'{path}/dynamics_metrics'
            if not os.path.exists(results_path): 
                os.makedirs(results_path)
            print(f"-Folder created to save results: './{results_path}'")
        except:
            raise Exception("PROCESS ABORTED: the folder to save the results could't be created.")
    
    #check the names of 1st and 2nd col, and change them to 'subject_id' and 'condition', if different.
    if 'subject_id' not in clusters_labels.columns: 
        clusters_labels.rename(columns={clusters_labels.columns[0]:'subject_id'},inplace=True)
    if 'condition' not in clusters_labels.columns: 
        clusters_labels.rename(columns={clusters_labels.columns[1]:'condition'},inplace=True)
    
    meta = clusters_labels[['subject_id','condition']] #keep only metadata
    Ks = [col for col in clusters_labels.columns if col.startswith('k_')]
    ys = clusters_labels[Ks].values #keep only clusters assignements for each k.

    dwell_times,occupancies,transitions = {},{},{}
    for k_idx,k in enumerate(Ks):
        _ = [int(s) for s in k.replace('_',' ').split() if s.isdigit()]  
        print(f'k = {_[0]}')
        if save_results: 
            k_path = f'{results_path}/{k}'
            if not os.path.exists(k_path):
                os.mkdir(k_path)    
        else:
            k_path = None

        #Compute the metrics
        occupancies[k] = fractional_occupancy_group(
            meta,
            ys[:,k_idx],
            save_results=save_results,
            path=k_path
            )
        dwell_times[k] = dwell_times_group(
            meta,
            ys[:,k_idx],
            TR=TR,
            save_results=save_results,
            path=k_path
            )
        transitions[k] = transition_probabilities_group(
            meta,
            ys[:,k_idx],
            save_results=save_results,
            path=k_path
            )

    print('\n*The metrics were succesfully computed.')

    return {
        'dwell_times':dwell_times,
        'occupancies':occupancies,
        'transitions':transitions,
        }
def dwell_times(labels, TR=None, plot=False)

Computes the dwell time (mean duration) of each phase-locking state. The dwell time represents the mean number of consecutive epochs spent in a particular state throughout the duration of a scan.

Params:

labels : numpy 1D array. Labels of PL states traversed across a scan.

TR : np.float | int | None Specify the Repetition Time of the fMRI data. If None, the dwell times are expressed in volume unit, instead of seconds.

plot : bool. Whether to create a barplot showing the computed dwell time of each phase-locking state.

Expand source code
def dwell_times(labels,TR=None,plot=False):
    """
    Computes the dwell time (mean duration) of each
    phase-locking state. The dwell time represents
    the mean number of consecutive epochs spent in
    a particular state throughout the duration of a scan.

    Params:
    --------
    labels : numpy 1D array.
        Labels of PL states traversed across a scan.

    TR : np.float | int | None
        Specify the Repetition Time of the fMRI data.
        If None, the dwell times are expressed in
        volume unit, instead of seconds.

    plot : bool.
        Whether to create a barplot showing
        the computed dwell time of each phase-locking
        state.
    """
    
    dwell = {}
    for cluster in np.unique(labels): #for each cluster (aka brain state)
        dwell[cluster] = []
        dwell[cluster].append([len(list(g[1])) for g in groupby(labels) if g[0]==cluster]) #get a sequence with the times it appears across time.
    
    #transforming list to array
    for cluster in dwell.keys(): 
        dwell[cluster] = np.concatenate([i for i in dwell[cluster]])
    
    #compute the average lifetime (TRs or seconds) of each state    
    if TR is None:
        mean_dwell = pd.DataFrame({'Cluster':[i for i in dwell.keys()],'Mean_lifetime':[np.mean(dwell[i]) for i in dwell.keys()]})
    else:
        mean_dwell = pd.DataFrame({'Cluster':[i for i in dwell.keys()],'Mean_lifetime':[np.mean(dwell[i])*TR for i in dwell.keys()]})
    
    if plot:
        plt.figure()
        plt.grid()
        sns.barplot(x=[f'State {i+1}' for i in range(len(mean_dwell))],y=mean_dwell.Mean_lifetime)
        if TR is None:
            plt.ylabel('Average lifetime (TRs)',fontsize=15,fontweight='regular') 
        else:
            plt.ylabel('Average lifetime (s)',fontsize=15,fontweight='regular')
        plt.xlabel('State',fontsize=15,fontweight='regular') 
        plt.tight_layout()
        plt.show()
                                          
    return dwell,mean_dwell
def dwell_times_group(metadata, labels, TR=None, save_results=False, path=None)

Compute the dwell times for each subject and optionally save the results.

Params:

metadata : pd.dataframe with shape (N_subjects x N_time_points, 2). Contains the metadata. 1st column contains the subjects ids, and 2nd column contains the class/conditions/sessions labels.

labels : numpy 1D array. Contains the clusters labels of each time point for each subject and condition contained in 'metadata'.

TR : np.float or int. Specify the Repetition Time of the fMRI data.

save_results : bool. Whether to save the computed dwell times in a .csv file, and the plot in a .png file.

path : str. Only provided if save_results=True. Define the path in which we want to save the results.

Returns:

results : pd.dataframe. Contains 'subject_id' in 1st col, 'condition' in 2nd col, and a column for each cluster [brain state] with the values indicating the mean consecutive appareances of each state.

Expand source code
def dwell_times_group(metadata,labels,TR=None,save_results=False,path=None):
    """
    Compute the dwell times for each subject and
    optionally save the results.

    Params:
    -------
    metadata : pd.dataframe with shape (N_subjects x N_time_points, 2).
        Contains the metadata. 1st column contains
        the subjects ids, and 2nd column contains the
        class/conditions/sessions labels.

    labels : numpy 1D array. 
        Contains the clusters labels of each time point
        for each subject and condition contained in 'metadata'.

    TR : np.float or int.
        Specify the Repetition Time of the fMRI data.

    save_results : bool. 
        Whether to save the computed dwell times in a .csv file,
        and the plot in a .png file.

    path : str. 
        Only provided if save_results=True. Define the path
        in which we want to save the results.

    Returns:
    --------
    results : pd.dataframe. 
        Contains 'subject_id' in 1st col, 'condition' in 2nd col,
        and a column for each cluster [brain state] with the
        values indicating the mean consecutive appareances of
        each state.
    """
    if save_results and path is None: 
        raise Exception('You must provide a path to save the results')
    assert metadata.shape[0] == labels.shape[0], \
        "The number of rows in 'metadata' must be the same as in 'labels'"
    
    results = []
    #N_clusters = len(np.unique(labels))
    for cond in np.unique(metadata.condition):
        subs = metadata[metadata.condition==cond].subject_id.values
        for s in np.unique(subs):
            idx = np.logical_and(metadata.subject_id==s,metadata.condition==cond)
            dw = dwell_times(labels[idx],TR=TR,plot=False)[-1]
            dw_ = {
                **{'subject_id':s,'condition':cond},
                **{f'PL_state_{k+1}':v for k,v in zip(dw.Cluster,dw.Mean_lifetime)}
                }
            results.append(dw_)
    
    results = pd.DataFrame(results).fillna(0.0)

    #reorder columns by PL_state (necessary because, when a subject do not traverse
    # a particular state, that state will be located at the end of the dataframe columns)
    N_states = len(results.columns)-2
    states_columns = [f'PL_state_{i+1}' for i in range(N_states)]
    results = results[['subject_id','condition']+states_columns]

    if save_results:
        try: 
            results.to_csv(f'{path}/dwell_times.csv',sep='\t',index=False)
        except:
            print("Warning: 'dwell_times.csv' was not saved in local folder.")

    return results
def fractional_occupancy(cluster_assignement)

Compute the fractional occupancy of each phase-locking state across time in a subject scan (or the complete dataset to determine the occupancies of each state), i.e, the temporal percentage of epochs assigned to a given cluster centroid Vc. Formally, it is defined as the number of time points in which a PL state is active during the scan, divided by the total number of time points in the scan.

Params:

cluster_assignement : ndarray with shape (N_time_points,). Contains the clusters labels for each time point.

Returns:

occupancies : pd.DataFrame with shape (N_clusters, 3). Contains the fractional occupancy of each cluster.

Expand source code
def fractional_occupancy(cluster_assignement):
    """
    Compute the fractional occupancy of each phase-locking state 
    across time in a subject scan (or the complete dataset to 
    determine the occupancies of each state), i.e, the temporal
    percentage of epochs assigned to a given cluster centroid Vc. 
    Formally, it is defined as the number of time points in which a
    PL state is active during the scan, divided by the total number
    of time points in the scan.

    Params:
    -------
    cluster_assignement : ndarray with shape (N_time_points,).
        Contains the clusters labels for each time point.

    Returns:
    ---------
    occupancies : pd.DataFrame with shape (N_clusters, 3).
        Contains the fractional occupancy of each cluster.
    """
    N_time_points = cluster_assignement.shape[0] #number of time points
    occupancies = pd.DataFrame(pd.Series(cluster_assignement).value_counts().reset_index())
    occupancies.columns = ['cluster','n_volumes']
    #clusters_count['occupancy'] = [(i/T)*100 for i in clusters_count.Count]
    occupancies['occupancy'] = [i/N_time_points for i in occupancies.n_volumes]

    return occupancies.sort_values(by='cluster')
def fractional_occupancy_group(metadata, labels, save_results=False, path=None)

Compute the fractional occupancy for each subject in 'metadata', and optionally and save the results.

Params:

metadata : pd.dataframe with shape (N_subjects x N_time_points, 2). Contains the metadata. 1st column contains the subjects ids, and 2nd column contains the class/conditions/sessions labels.

labels : numpy 1D array. Contains the clusters labels of each time point for each subject and condition contained in 'metadata'.

save_results : bool. Whether to save the computed fractional occupancies in a .csv file, and the plot in a .png file.

path : str. Define the path in which we want to save the results (necessary if 'save_results' was set to True).

Returns:

results : pd.dataframe. Contains 'subject_id' in 1st column, 'condition' in 2nd column, and a column for each cluster (i.e., phase-locking state) with the values indicating the fractional occupancy of each state.

Expand source code
def fractional_occupancy_group(metadata,labels,save_results=False,path=None):
    """
    Compute the fractional occupancy for each subject
    in 'metadata', and optionally and save the results.

    Params:
    -------
    metadata : pd.dataframe with shape (N_subjects x N_time_points, 2).
        Contains the metadata. 1st column contains
        the subjects ids, and 2nd column contains
        the class/conditions/sessions labels.

    labels : numpy 1D array. 
        Contains the clusters labels of each time
        point for each subject and condition contained
        in 'metadata'.

    save_results : bool. 
        Whether to save the computed fractional
        occupancies in a .csv file, and the plot
        in a .png file.

    path : str. 
        Define the path in which we want to save
        the results (necessary if 'save_results'
        was set to True).

    Returns:
    --------
    results : pd.dataframe. 
        Contains 'subject_id' in 1st column, 'condition'
        in 2nd column, and a column for each cluster (i.e.,
        phase-locking state) with the values indicating the
        fractional occupancy of each state.
    """
    assert metadata.shape[0] == labels.shape[0], \
        "The number of rows in 'metadata' must be the same as the number of provided 'labels'."
    if save_results and path is None: 
        raise ValueError('You must provide a path to save the results')

    results = []
    #N_clusters = len(np.unique(labels)) #get the number of clusters (brain states)
    for cond in np.unique(metadata.condition):
        subs_ids = metadata[metadata.condition==cond].subject_id.values #get the subject id's that belongs to the current condition.
        for sub in np.unique(subs_ids): #for each subject
            idx = np.logical_and(metadata.subject_id==sub,metadata.condition==cond) #get index of the current subject clusters labels
            occ = fractional_occupancy(labels[idx]) #compute the occupancies for current subject
            occ_ = {
                **{'subject_id':sub,'condition':cond},
                **{f'PL_state_{k+1}':v for k,v in zip(occ.cluster,occ.occupancy)}
                }
            results.append(occ_)
    
    results = pd.DataFrame(results).fillna(0.0) #convert results to dataframe and fill NaNs with 0.0

    #reorder columns by PL_state (necessary because, when a subject do not traverse
    # a particular state, that state will be located at the end of the dataframe columns)
    N_states = len(results.columns)-2
    states_columns = [f'PL_state_{i+1}' for i in range(N_states)]
    results = results[['subject_id','condition']+states_columns]

    #save results
    if save_results:
        try: 
            results.to_csv(f'{path}/occupancies.csv',sep='\t',index=False)
        except:
            print("Warning: 'occupancies.csv' was not saved in local folder.")

    return results
def group_transition_matrix(transitions_df, metric='mean', plot=True, cmap='inferno', darkstyle=False)

Compute and plot the mean or median transition matrix of each group (or a single group).

Params:

transitions_df : pd.DataFrame. Contains the transitions probabilities of each subject and group/condition. Output of the 'transition_probabilities_group' function.

metric : str. Select whether to compute the 'mean' or 'median' transition matrix for each group.

plot : bool. Whether to plot the matrices.

cmap : str. Select the cmap to use in the heatmap/s.

darkstyle : bool. Whether to use a darkstyle for the plot.

Returns:

mats : dict. Contains the groups names as keys, and a numpy 2D array (N_states,N_states) as values.

Expand source code
def group_transition_matrix(transitions_df,metric='mean',plot=True,cmap='inferno',darkstyle=False):
    """
    Compute and plot the mean or median transition matrix
    of each group (or a single group).

    Params:
    -------
    transitions_df : pd.DataFrame.
        Contains the transitions probabilities of each
        subject and group/condition. Output of the
        'transition_probabilities_group' function.

    metric : str.
       Select whether to compute the 'mean' or 'median'
       transition matrix for each group.

    plot : bool.
        Whether to plot the matrices.

    cmap : str.
        Select the cmap to use in the heatmap/s.

    darkstyle : bool.
        Whether to use a darkstyle for the plot.

    Returns:
    --------
    mats : dict. 
        Contains the groups names as keys, and a
        numpy 2D array (N_states,N_states) as values.
    """
    if not isinstance(metric,str):
        raise TypeError("'metric' must be a string!")
    else:
        if metric not in ['mean','median']:
            raise ValueError("Valid options for 'metric' are 'mean' or 'median'.")

    N_clusters = np.sqrt(len(transitions_df.columns[2:])).astype(int) #get the number of clusters/states.
    mats = {}

    conds = np.unique(transitions_df.condition)

    for cond in conds:
        if metric=='mean': 
            current_group_matrix = transitions_df[transitions_df.condition==cond].iloc[:,2:].mean().values.reshape(N_clusters,N_clusters)
        elif metric=='median':
            current_group_matrix = transitions_df[transitions_df.condition==cond].iloc[:,2:].median().values.reshape(N_clusters,N_clusters)
        mats[cond] = current_group_matrix

        if plot:
            #plotting
            with plt.style.context('dark_background' if darkstyle else 'default'):
                plt.figure(figsize=(5.5,4) if N_clusters<10 else (8,6) if 10<N_clusters<15 else (10,8))
                sns.heatmap(
                    current_group_matrix,
                    cmap=cmap,
                    linecolor='black' if not darkstyle else 'white',
                    linewidths=1,
                    annot=True,
                    square=True,
                    cbar_kws={"shrink": .5,"label":"Transition\nprobability"},
                    vmin=0,
                    vmax=1,
                    center=0.5,
                    fmt='.2f'
                    )
                plt.xlabel('To',fontsize=15,fontweight='regular',labelpad=20)
                plt.ylabel('From',fontsize=15,fontweight='regular',labelpad=20)
                plt.yticks(np.arange(0.5,N_clusters+.5,1),[f'State {i+1}' for i in range(N_clusters)],fontweight='regular',rotation='horizontal')
                if N_clusters<=5:
                    plt.xticks(np.arange(0.5,N_clusters+.5,1),[f'State {i+1}' for i in range(N_clusters)],fontweight='regular')
                else:
                    plt.xticks(np.arange(0.5,N_clusters+.5,1),[f'State {i+1}' for i in range(N_clusters)],fontweight='regular',rotation=30,horizontalalignment="right")
                plt.title(cond,fontweight='regular',fontsize=18)
                plt.tight_layout()
                plt.show()

    return mats
def plot_patterns_k(dynamics_metric, type='violin', metric=None, add_points=False, colors=None)

Create either a violinplot, barplot or boxplot showing the corresponding values of each phase-locking pattern of a particular k partition, provided in 'dynamics_metric' for each group/condition/session.

Params:

dynamics_metric : pd.DataFrame.
Contains the values of a dynamical system theory metric for each subject and PL pattern. 1st column contains 'subject_id', 2nd column 'condition', and the rest of the columns the values for each pattern.

type : str. Select the type of plot to create. Options are 'barplot', 'violinplot' or 'boxplot'.

metric : str. Optional. Select the metric whose values are in 'dynamics_metric'. Only used to insert y label.

add_points : str. Whether to show each observation as a point above the created plot. Valid options are False, 'swarm', or 'strip'.

color : list or None. Select the color to represent each group. If None, black and grey are used.

Expand source code
def plot_patterns_k(dynamics_metric,type='violin',metric=None,add_points=False,colors=None):
    """
    Create either a violinplot, barplot or boxplot showing
    the corresponding values of each phase-locking pattern
    of a particular k partition, provided in 'dynamics_metric'
    for each group/condition/session.

    Params:
    -------
    dynamics_metric : pd.DataFrame.  
        Contains the values of a dynamical system theory
        metric for each subject and PL pattern. 
        1st column contains 'subject_id', 2nd column 'condition',
        and the rest of the columns the values for each pattern.

    type : str.
        Select the type of plot to create. Options are 'barplot',
        'violinplot' or 'boxplot'.

    metric : str. Optional.
        Select the metric whose values are in 'dynamics_metric'.
        Only used to insert y label.

    add_points : str.
        Whether to show each observation as a point
        above the created plot.
        Valid options are False, 'swarm', or 'strip'.

    color : list or None.
        Select the color to represent each group.
        If None, black and grey are used.
    """

    options_type = ['violin','boxplot','barplot']
    options_addpoints = ['strip','swarm',False]

    n_groups = np.unique(dynamics_metric.condition).size

    if type not in options_type:
        raise ValueError("Valid type options are 'violin', 'boxplot', or 'barplot'.")
    elif type=='violin' and n_groups>2:
        raise ValueError("'violin' is not available when the number of groups/conditions > 2")

    if add_points not in options_addpoints:
        raise ValueError("Valid options are 'strip', 'swarm', or False")

    if colors is not None:
        if n_groups != len(colors):
            raise ValueError("The number of colors and the number of groups/condition must be the same.")

    data = pd.melt(dynamics_metric,id_vars=['condition'],value_vars=list(dynamics_metric.columns[2:]))
    data['variable'] = [str(i).replace('_',' ') for i in data.variable]
    
    n_patterns = np.unique(data.variable).size

    plt.figure(figsize=(7,4))
    if add_points=='swarm':
        sns.swarmplot(
            data=data,
            x='variable',
            y='value',
            hue = 'condition',
            #jitter=True,
            color='black',
            edgecolor=None,
            dodge = True,
            size=1,
            alpha=.6,
            )

    elif add_points=='strip':
        sns.stripplot(
            data=data,
            x='variable',
            y='value',
            hue = 'condition',
            jitter=True,
            color='black',
            edgecolor=None,
            dodge = True,
            size=1,
            alpha=.6
            )
        
    plt.legend([],[], frameon=False)
    
    if type=='boxplot':
        sns.boxplot(
            data=data,
            x='variable',
            y='value',
            hue='condition',
            palette=['black','grey'] if colors is None else colors,
            )

    elif type=='violin':
        sns.violinplot(
            data=data,
            x='variable',
            y='value',
            hue='condition',
            palette=['black','grey'] if colors is None else colors,
            scale='width',
            inner='box',
            linewidth=0.1,
            split=True,
            cut = 0,
            dodge = False
            )
        
    elif type=='barplot':
        sns.barplot(
            data=data,
            x='variable',
            y='value',
            hue='condition',
            palette=['black','grey'] if colors is None else colors
            )

    plt.xlabel('PL Pattern',fontsize=16,labelpad=15)
    plt.ylabel('' if metric is None else metric,fontsize=16,labelpad=15)
    if n_patterns>5:
        plt.xticks(
            range(n_patterns),
            [f'Pattern {i+1}' for i in range(n_patterns)],
            rotation=30,
            horizontalalignment="right"
            )
    else:
        plt.xticks(range(n_patterns),[f'Pattern {i+1}' for i in range(n_patterns)])
        
    plt.tight_layout()
def transition_probabilities(labels, k, norm=True, plot=False)

Compute the transitions between patterns across time for a single subject. The number of states ('k') defines the matrix shape, ensuring that, if a given subject don't traverse a state of the detected states for the whole group, the matrix will be constructed correctly, respecting the original number of states. If a subject don't traverse a particular state, then the corresponding row will contain all zeros.

Params:

labels : ndarray array with shape (N_time_points,). Contains the KMeans predicted label for each time point.

norm : bool. Whether to normalize the values of the transitions matrix.

plot : bool. Whether to plot the transitions matrix.

Returns:

transitions : ndarray with shape (N_states,N_states).

Expand source code
def transition_probabilities(labels,k,norm=True,plot=False):
    """
    Compute the transitions between patterns across time
    for a single subject. The number of states ('k')
    defines the matrix shape, ensuring that, if a given
    subject don't traverse a state of the detected states
    for the whole group, the matrix will be constructed
    correctly, respecting the original number of states.
    If a subject don't traverse a particular state,
    then the corresponding row will contain all zeros.
    
    Params:
    ------
    labels : ndarray with shape (N_time_points,). 
        Contains the KMeans predicted label
        for each time point.

    norm : bool. 
        Whether to normalize the values of the
        transitions matrix.

    plot : bool. 
        Whether to plot the transitions matrix.
    
    Returns:
    --------
    transitions : ndarray with shape (N_states,N_states).
    """

    N_states = k
    transitions = np.zeros((N_states, N_states)) #create empty matrix 
    N_volumes = labels.size 

    #compute the transitions
    for volume_idx in range(N_volumes-1):
        _from = labels[volume_idx]
        _to = _from + labels[volume_idx+1] - labels[volume_idx]
        transitions[_from,_to]+=1

    if norm:
        np.seterr(invalid='ignore') # hide potential warning when dividing 0/0 = NaN
        transitions = np.divide(
            transitions.astype(np.float_),
            np.sum(transitions,axis=1).reshape(transitions.shape[1],1)
            ) #normalize the values to probabilities

    #replace nan (if any) with 0. Can happen when a subject
    # doesn't traverse a state at all.
    transitions = np.nan_to_num(transitions,copy=True)
        
    if plot:
        plt.figure()
        sns.heatmap(
            transitions,
            annot=True,
            cmap='viridis', 
            square=True,
            linecolor='black',
            linewidths=0.5,
            xticklabels=[f'State {i+1}' for i in range(N_states)],
            yticklabels=[f'State {i+1}' for i in range(N_states)],
            cbar_kws={"shrink": 0.5}
            )
        plt.yticks(rotation='horizontal')
        plt.xlabel('To',fontsize=15,fontweight='regular')
        plt.ylabel('From',fontsize=15,fontweight='regular')
        plt.tight_layout()

    return transitions
def transition_probabilities_group(metadata, labels, save_results=False, path=None)

Compute the transition probabilities between phase-locking states for each subject in 'metadata'.

Params:

metadata : pd.dataframe (N_subjects x N_time_points, 2). Contains the metadata. 1st column contains the subjects ids, and 2nd column contains the class/conditions/sessions labels.

labels : ndarray with shape (N_samples,). Contains the clusters labels of each time point for each subject and condition contained in 'metadata'.

save_results: bool. Whether to save the computed transition probabilities in a .csv file.

path : str. Only provided if save_results=True. Define the path in which we want to save the results.

Returns:

results: pd.dataframe. Contains 'subject_id' in 1st col, 'condition' in 2nd col, and a column for each transition [e.g. From_1_to_2,From_1_to_3 …]

Expand source code
def transition_probabilities_group(metadata,labels,save_results=False,path=None):
    """
    Compute the transition probabilities between
    phase-locking states for each subject in 'metadata'.

    Params:
    -------
    metadata : pd.dataframe (N_subjects x N_time_points, 2).
        Contains the metadata. 1st column contains
        the subjects ids, and 2nd column contains
        the class/conditions/sessions labels.

    labels : ndarray with shape (N_samples,). 
        Contains the clusters labels of each time
        point for each subject and condition contained
        in 'metadata'.

    save_results: bool. 
        Whether to save the computed transition
        probabilities in a .csv file.

    path : str. 
        Only provided if save_results=True. Define
        the path in which we want to save the results.

    Returns:
    --------
    results: pd.dataframe. 
        Contains 'subject_id' in 1st col, 'condition' in 2nd col,
        and a column for each transition [e.g. From_1_to_2,From_1_to_3 ...]
    """
    assert metadata.shape[0] == labels.shape[0], \
        "The number of rows in 'metadata' must be the same as in 'labels'"
    if save_results and path is None: 
        raise Exception('You must provide a path to save the results')

    results = []
    subjects_list,cond_list = [],[]
    N_clusters = len(np.unique(labels))
    for cond in np.unique(metadata.condition):
        subs_ids = metadata[metadata.condition==cond].subject_id.values #get an array with the subjects ids that belongs to a given condition.
        for subject in np.unique(subs_ids):
            idx = np.logical_and(metadata.subject_id==subject,metadata.condition==cond)
            tr = transition_probabilities(labels[idx],k=N_clusters,norm=True,plot=False)
            results.append(np.ravel(tr)) #vectorize the transitions matrix.
            subjects_list.append(subject)
            cond_list.append(cond)

    results = pd.DataFrame(np.vstack(results))
    columns_names = []
    for i in range(N_clusters):
        for j in range(N_clusters):
            columns_names.append(f'From_{i+1}_to_{j+1}')
    
    results.columns = columns_names
    results.insert(0,'subject_id',subjects_list)
    results.insert(1,'condition',cond_list)

    if save_results: 
        try:
            results.to_csv(f'{path}/transitions_probabilities.csv',sep='\t',index=False)
        except:
            print("The results could't be saved in .csv file. Please check the provided path.")

    return results