Advanced regression models for association analysis with individual level data#


This notebook performs various advanced statistical analysis on multiple xQTL in a given region. Current procedures implemented include:

  1. Univariate analysis

    • SuSiE

    • Univeriate TWAS weights: LASSO, Elastic net, mr.ash, SuSiE, Bayes alphabet soup

    • Cross validation of TWAS methods (optional but highly recommended if TWAS weights are computed)

  2. Functional data (epigenomic xQTL) analysis

    • fSuSiE

  3. Multivariate analysis

    • mvSuSiE

    • Multivariate TWAS weights: mvSuSiE and mr.mash


  1. A list of regions to be analyzed (optional); the last column of this file should be region name.

  2. Either a list of per chromosome genotype files, or one file for genotype data of the entire genome. Genotype data has to be in PLINK bed format.

  3. Vector of lists of phenotype files per region to be analyzed, in UCSC bed.gz with index in bed.gz.tbi formats.

  4. Vector of covariate files corresponding to the lists above.

  5. Customized association windows file for variants (cis or trans). If it is not provided, a fixed sized window will be used around the region (a cis-window)

  6. Optionally a vector of names of the phenotypic conditions in the form of cond1 cond2 cond3 separated with whitespace.

Input 2 and 3 should be outputs from genotype_per_region and annotate_coord modules in previous preprocessing steps. 4 should be output of covariate_preprocessing pipeline that contains genotype PC, phenotypic hidden confounders and fixed covariates.

Example genotype data#

#chr        path
chr21 /mnt/mfs/statgen/xqtl_workflow_testing/protocol_example.genotype.chr21.bed
chr22 /mnt/mfs/statgen/xqtl_workflow_testing/protocol_example.genotype.chr22.bed

Alternatively, simply use protocol_example.genotype.chr21_22.bed if all chromosomes are in the same file.

Example phenotype list#

#chr    start   end ID  path
chr12   752578  752579  ENSG00000060237  /home/gw/GIT/github/fungen-xqtl-analysis/analysis/Wang_Columbia/ROSMAP/MWE/output/phenotype/protocol_example.protein.bed.gz
chr12   990508  990509  ENSG00000082805  /home/gw/GIT/github/fungen-xqtl-analysis/analysis/Wang_Columbia/ROSMAP/MWE/output/phenotype/protocol_example.protein.bed.gz
chr12   2794969 2794970 ENSG00000004478  /home/gw/GIT/github/fungen-xqtl-analysis/analysis/Wang_Columbia/ROSMAP/MWE/output/phenotype/protocol_example.protein.bed.gz
chr12   4649113 4649114 ENSG00000139180  /home/gw/GIT/github/fungen-xqtl-analysis/analysis/Wang_Columbia/ROSMAP/MWE/output/phenotype/protocol_example.protein.bed.gz
chr12   6124769 6124770 ENSG00000110799  /home/gw/GIT/github/fungen-xqtl-analysis/analysis/Wang_Columbia/ROSMAP/MWE/output/phenotype/protocol_example.protein.bed.gz
chr12   6534516 6534517 ENSG00000111640  /home/gw/GIT/github/fungen-xqtl-analysis/analysis/Wang_Columbia/ROSMAP/MWE/output/phenotype/protocol_example.protein.bed.gz

Example association-window file#

It should have strictly 4 columns, with the header a commented out line:

#chr    start    end    gene_id
chr10   0    6480000    ENSG00000008128
chr1    0    6480000    ENSG00000008130
chr1    0    6480000    ENSG00000067606
chr1    0    7101193    ENSG00000069424
chr1    0    7960000    ENSG00000069812
chr1    0    6480000    ENSG00000078369
chr1    0    6480000    ENSG00000078808

The key is that the 4th column ID should match with the 4th column ID in the phenotype list. Otherwise the association-window to analyze will not be found.

About indels#

Option --no-indel will remove indel from analysis. FIXME: Gao need to provide more guidelines how to deal with indels in practice.


For each analysis region, the output is SuSiE model fitted and saved in RDS format.

Minimal Working Example Steps#

i. SuSiE with TWAS weights from multiple methods#

Timing [FIXME]

Below we duplicate the examples for phenotype and covariates to demonstrate that when there are multiple phenotypes for the same genotype it is possible to use this pipeline to analyze all of them (more than two is accepted as well).

Here using --region-name we focus the analysis on 3 genes. In practice if this parameter is dropped, the union of all regions in all phenotype region lists will be analyzed. It is possible for some of the regions there are no genotype data, in which case the pipeline will output RDS files with a warning message to indicate the lack of genotype data to analyze.

Note: Suggested output naming convention is cohort_modality, eg ROSMAP_snRNA_pseudobulk.

sos run pipeline/mnm_regression.ipynb susie_twas  \
    --name protocol_example_protein  \
    --genoFile input/xqtl_association/protocol_example.genotype.chr21_22.bed   \
    --phenoFile output/phenotype/protocol_example.protein.region_list.txt \
                output/phenotype/protocol_example.protein.region_list.txt \
    --covFile output/covariate/protocol_example.protein.protocol_example.samples.protocol_example.genotype.chr21_22.pQTL.plink_qc.prune.pca.Marchenko_PC.gz \
              output/covariate/protocol_example.protein.protocol_example.samples.protocol_example.genotype.chr21_22.pQTL.plink_qc.prune.pca.Marchenko_PC.gz  \
    --customized-association-windows input/xqtl_association/protocol_example.protein.enhanced_cis_chr21_chr22.bed \
    --region-name ENSG00000241973_P42356 ENSG00000160209_O00764 ENSG00000100412_Q99798 \
    --phenotype-names trait_A trait_B \
    --container oras://

It is also possible to analyze a selected list of regions using option --region-list. The last column of this file will be used for the list to analyze. Here for example use the same list of regions as we used for customized association-window:

sos run xqtl-protocol/pipeline/mnm_regression.ipynb susie_twas  \
    --name protocol_example_protein  \
    --genoFile xqtl_association/protocol_example.genotype.chr21_22.bed   \
    --phenoFile output/phenotype/protocol_example.protein.region_list.txt \
                output/phenotype/protocol_example.protein.region_list.txt \
    --covFile output/covariate/protocol_example.protein.protocol_example.samples.protocol_example.genotype.chr21_22.pQTL.plink_qc.prune.pca.Marchenko_PC.gz \
              output/covariate/protocol_example.protein.protocol_example.samples.protocol_example.genotype.chr21_22.pQTL.plink_qc.prune.pca.Marchenko_PC.gz  \
    --customized-association-windows xqtl_association/protocol_example.protein.enhanced_cis_chr21_chr22.bed \
    --region-list xqtl_association/protocol_example.protein.enhanced_cis_chr21_chr22.bed \
    --phenotype-names trait_A trait_B \
    --container oras://

Note: When both --region-name and --region-list are used, the union of regions from these parameters will be analyzed.

FIXME: We should probably just explain these parameters, will work better for conversion script

To perform fine-mapping only without TWAS weights,

sos run pipeline/mnm_regression.ipynb susie_twas --no-twas-weights ... # rest of parameters the same. 

To perform fine-mapping and TWAS weights without cross validation,

sos run pipeline/mnm_regression.ipynb susie_twas --twas-cv-folds 0 ... # rest of parameters the same. 

It is also possible to specify a subset of samples to analyze, using --keep-samples parameter. For example we create a file to keep the ID of 50 samples,

zcat output/covariate/protocol_example.protein.protocol_example.samples.protocol_example.genotype.chr21_22.pQTL.plink_qc.prune.pca.Marchenko_PC.gz | head -1 | awk '{for (i=2; i<=51; i++) printf $i " "; print ""}'> output/keep_samples.txt

then use them in our analysis,

sos run xqtl-protocol/pipeline/mnm_regression.ipynb susie_twas --keep-samples output/keep_samples.txt ... # rest of parameters the same

ii. fSuSiE#

Timing [FIXME]

Note: Suggested output naming convention is cohort_modality, eg ROSMAP_snRNA_pseudobulk.

sos run pipeline/mnm_regression.ipynb fsusie \
    --name protocol_example_methylation \
    --genoFile xqtl_association/protocol_example.genotype.chr21_22.plink_per_chrom.txt \
    --phenoFile output/phenotype_by_region/protocol_example.methylation.bed.phenotype_by_region_files.txt \
                output/phenotype_by_region/protocol_example.methylation.bed.phenotype_by_region_files.txt  \
    --covFile output/covariate/protocol_example.methylation.protocol_example.samples.protocol_example.genotype.chr21_22.pQTL.plink_qc.prune.pca.Marchenko_PC.gz \
              output/covariate/protocol_example.methylation.protocol_example.samples.protocol_example.genotype.chr21_22.pQTL.plink_qc.prune.pca.Marchenko_PC.gz \
    --container oras://

iii. mvSuSiE with TWAS weights from mr.mash#

sos run pipeline/mnm_regression.ipynb mnm \
    --name multi_trait \
    --genoFile /mnt/vast/hpc/csg/FunGen_xQTL/ROSMAP/Genotype/geno_by_chrom/ROSMAP_NIA_WGS.leftnorm.bcftools_qc.plink_qc.11.bed \
    --phenoFile /mnt/vast/hpc/csg/wanggroup/fungen-xqtl-analysis/analysis/Wang_Columbia/ROSMAP/pseudo_bulk_eqtl/Mic/output/data_preprocessing/phenotype_data/Mic.log2cpm.region_list.txt \
      /mnt/vast/hpc/csg/wanggroup/fungen-xqtl-analysis/analysis/Wang_Columbia/ROSMAP/pseudo_bulk_eqtl/Ast/output/data_preprocessing/phenotype_data/Ast.log2cpm.region_list.txt \
      /mnt/vast/hpc/csg/wanggroup/fungen-xqtl-analysis/analysis/Wang_Columbia/ROSMAP/pseudo_bulk_eqtl/Oli/output/data_preprocessing/phenotype_data/Oli.log2cpm.region_list.txt \
    --covFile /mnt/vast/hpc/csg/wanggroup/fungen-xqtl-analysis/analysis/Wang_Columbia/ROSMAP/pseudo_bulk_eqtl/Mic/output/data_preprocessing/covariate_data/Mic.log2cpm.Mic.rosmap_cov.ROSMAP_NIA_WGS.leftnorm.bcftools_qc.plink_qc.snuc_pseudo_bulk.related.plink_qc.extracted.pca.projected.Marchenko_PC.gz \
      /mnt/vast/hpc/csg/wanggroup/fungen-xqtl-analysis/analysis/Wang_Columbia/ROSMAP/pseudo_bulk_eqtl/Ast/output/data_preprocessing/covariate_data/Ast.log2cpm.Ast.rosmap_cov.ROSMAP_NIA_WGS.leftnorm.bcftools_qc.plink_qc.snuc_pseudo_bulk.related.plink_qc.extracted.pca.projected.Marchenko_PC.gz \
      /mnt/vast/hpc/csg/wanggroup/fungen-xqtl-analysis/analysis/Wang_Columbia/ROSMAP/pseudo_bulk_eqtl/Oli/output/data_preprocessing/covariate_data/Oli.log2cpm.Oli.rosmap_cov.ROSMAP_NIA_WGS.leftnorm.bcftools_qc.plink_qc.snuc_pseudo_bulk.related.plink_qc.extracted.pca.projected.Marchenko_PC.gz \
    --customized-association-windows /mnt/vast/hpc/csg/cl4215/mrmash/workflow/susie_twas/TADB_enhanced_cis.coding.bed \
    --cwd output/ \
    --region-name ENSG00000073921 \
    --phenotype-names Mic_DeJager_eQTL Ast_DeJager_eQTL Oli_DeJager_eQTL \
    --mixture_prior /home/aw3600/MR_KMT_analysis/PTWAS/twas_mr_test/mash_test/mixture_prior_example.EZ.prior.rds \
    --max_cv_variants 1000 \
    --ld_reference_meta_file /mnt/vast/hpc/csg/data_public/20240120_ADSP_LD_matrix/ld_meta_file.tsv





Possible Reason


Command interface#

sos run mnm_regression.ipynb -h

Workflow implementation#

# It is required to input the name of the analysis
parameter: name = str
parameter: cwd = path("output")
# A list of file paths for genotype data, or the genotype data itself. 
parameter: genoFile = path
# One or multiple lists of file paths for phenotype data.
parameter: phenoFile = paths
# One or multiple lists of file paths for phenotype ID mapping file. The first column should be the original ID, the 2nd column should be the ID to be mapped to.
parameter: phenoIDFile = paths()
# Covariate file path
parameter: covFile = paths
# Optional: if a region list is provide the analysis will be focused on provided region. 
# The LAST column of this list will contain the ID of regions to focus on
# Otherwise, all regions with both genotype and phenotype files will be analyzed
parameter: region_list = path()
# Optional: if a region name is provided 
# the analysis would be focused on the union of provides region list and region names
parameter: region_name = []
# Only focus on a subset of samples
parameter: keep_samples = path()
# An optional list documenting the custom association window for each region to analyze, with four column, chr, start, end, region ID (eg gene ID).
# If this list is not provided, the default `window` parameter (see below) will be used.
parameter: customized_association_windows = path()
# Specify the cis window for the up and downstream radius to analyze around the region of interest in units of bp
# When this is set to negative, we will rely on using customized_association_windows
parameter: cis_window = -1
# save data object or not
parameter: save_data = False
# Name of phenotypes
parameter: phenotype_names = [f'{x:bn}' for x in phenoFile]
# And indicator whether it is trans-analysis, ie, not using phenotypic coordinate information
parameter: trans_analysis = False
parameter: seed = 999

# association analysis paramters
# initial number of single effects for SuSiE
parameter: init_L = 8
# maximum number of single effects to use for SuSiE
parameter: max_L = 30
# remove a variant if it has more than imiss missing individual level data
parameter: imiss = 1.0
# MAF and variance of X cutoff
parameter: maf = 0.0025
parameter: xvar_cutoff = 0.0
# MAC cutoff, on top of MAF cutoff
parameter: mac = 5
# Remove indels if indel = False
parameter: indel = True
parameter: pip_cutoff = 0.025
parameter: coverage = [0.95, 0.7, 0.5]
# If this value is not 0, then an initial single effect analysis will be performed 
# to determine if follow up analysis will be continued or to simply return NULL
# If this is negative we use a default way to determine this cutoff which is conservative but still useful
parameter: skip_analysis_pip_cutoff = []
# Skip fine-mapping
parameter: skip_fine_mapping = False
# Skip TWAS weights computation
parameter: skip_twas_weights = False
# Perform K folds valiation CV for TWAS
# Set it to zero if this is to be skipped
parameter: twas_cv_folds = 5
parameter: twas_cv_threads = twas_cv_folds
# maximum number of variants to consider for CV
# We will randomly pick a subset of it for CV purpose
# We can set it to eg 8000 to save computational burden althought may risk overfitting for methods comparison purpose
# When set to -1 we don't use this feature
parameter: max_cv_variants = -1
parameter: ld_reference_meta_file = path()
# Analysis environment settings
parameter: container = ""
import re
parameter: entrypoint= ('micromamba run -a "" -n' + ' ' + re.sub(r'(_apptainer:latest|_docker:latest|\.sif)$', '', container.split('/')[-1])) if container else ""
# For cluster jobs, number commands to run per job
parameter: job_size = 200
# Wall clock time expected
parameter: walltime = "1h"
# Memory expected
parameter: mem = "20G"
# Number of threads
parameter: numThreads = 1

if len(phenoFile) != len(covFile):
    raise ValueError("Number of input phenotypes files must match that of covariates files")
if len(phenoFile) != len(phenotype_names):
    raise ValueError("Number of input phenotypes files must match the number of phenotype names")
if len(phenoIDFile) > 0 and len(phenoFile) != len(phenoIDFile):
    raise ValueError("Number of input phenotypes files must match the number of phenotype ID mapping files")

if len(skip_analysis_pip_cutoff) == 0:
    skip_analysis_pip_cutoff = [0.0] * len(phenoFile)
if len(skip_analysis_pip_cutoff) == 1:
    skip_analysis_pip_cutoff = skip_analysis_pip_cutoff * len(phenoFile)
if len(skip_analysis_pip_cutoff) != len(phenoFile):
    raise ValueError(f"``skip_analysis_pip_cutoff`` should have either length 1 or length the same as phenotype files ({len(phenoFile)} in this case)")

# make it into an R List string
skip_analysis_pip_cutoff = [f"'{y}'={x}" for x,y in zip(skip_analysis_pip_cutoff, phenotype_names)]
def group_by_region(lst, partition):
    # from itertools import accumulate
    # partition = [len(x) for x in partition]
    # Compute the cumulative sums once
    # cumsum_vector = list(accumulate(partition))
    # Use slicing based on the cumulative sums
    # return [lst[(cumsum_vector[i-1] if i > 0 else 0):cumsum_vector[i]] for i in range(len(partition))]
    return partition

import os
import pandas as pd

def adapt_file_path(file_path, reference_file):
    Adapt a single file path based on its existence and a reference file's path.

    - file_path (str): The file path to adapt.
    - reference_file (str): File path to use as a reference for adaptation.

    - str: Adapted file path.

    - FileNotFoundError: If no valid file path is found.
    reference_path = os.path.dirname(reference_file)

    # Check if the file exists
    if os.path.isfile(file_path):
        return file_path

    # Check file name without path
    file_name = os.path.basename(file_path)
    if os.path.isfile(file_name):
        return file_name

    # Check file name in reference file's directory
    file_in_ref_dir = os.path.join(reference_path, file_name)
    if os.path.isfile(file_in_ref_dir):
        return file_in_ref_dir

    # Check original file path prefixed with reference file's directory
    file_prefixed = os.path.join(reference_path, file_path)
    if os.path.isfile(file_prefixed):
        return file_prefixed

    # If all checks fail, raise an error
    raise FileNotFoundError(f"No valid path found for file: {file_path}")

def adapt_file_path_all(df, column_name, reference_file):
    return df[column_name].apply(lambda x: adapt_file_path(x, reference_file))
[get_analysis_regions: shared = "regional_data"]
# input is genoFile, phenoFile, covFile and optionally region_list. If region_list presents then we only analyze what's contained in the list.
# regional_data should be a dictionary like:
#{'data': [("genotype_1.bed", "phenotype_1.bed.gz", "covariate_1.gz"), ("genotype_2.bed", "phenotype_1.bed.gz", "phenotype_2.bed.gz", "covariate_1.gz", "covariate_2.gz") ... ],
# 'meta_info': [("chr12:752578-752579","chr12:752577-752580", "gene_1", "trait_1"), ("chr13:852580-852581","chr13:852579-852580", "gene_2", "trait_1", "trait_2") ... ]}
import numpy as np

def preload_id_map(id_map_files):
    id_maps = {}
    for id_map_file in id_map_files:
        if id_map_file is not None and os.path.isfile(id_map_file):
            df = pd.read_csv(id_map_file, sep=r"\s+", header=None, comment='#', names=['old_ID', 'new_ID'])
            id_maps[id_map_file] = df.set_index('old_ID')['new_ID'].to_dict()
    return id_maps

def load_and_apply_id_map(pheno_path, id_map_path, preloaded_id_maps):
    pheno_df = pd.read_csv(pheno_path, sep=r"\s+", header=0)
    pheno_df['Original_ID'] = pheno_df['ID']
    if id_map_path in preloaded_id_maps:
        id_map = preloaded_id_maps[id_map_path]
        pheno_df['ID'] = pheno_df['ID'].map(id_map).fillna(pheno_df['ID'])
    return pheno_df

def filter_by_region_ids(data, region_ids):
    if region_ids is not None and len(region_ids) > 0:
        data = data[data['ID'].isin(region_ids)]
    return data

def custom_join(series):
    # Initialize an empty list to hold the processed items
    result = []
    for item in series:
        if ',' in item:
            # If the item contains commas, split by comma and convert to tuple
            # If the item does not contain commas, add it directly
    # Convert the list of items to a tuple and return
    return tuple(result)

def aggregate_phenotype_data(accumulated_pheno_df):
    if not accumulated_pheno_df.empty:
        accumulated_pheno_df = accumulated_pheno_df.groupby(['#chr','ID','cond','path','cov_path'], as_index=False).agg({
            '#chr': lambda x: np.unique(x).astype(str)[0],
            'ID': lambda x: np.unique(x).astype(str)[0],
            'Original_ID': ','.join,
            'start': 'min',
            'end': 'max'
        }).groupby(['#chr','ID'], as_index=False).agg({
            'cond': ','.join,
            'path': ','.join,
            'Original_ID': custom_join,
            'cov_path': ','.join,
            'start': 'min',
            'end': 'max'
    return accumulated_pheno_df

def process_cis_files(pheno_files, cov_files, phenotype_names, pheno_id_files, region_ids, preloaded_id_maps):
    Example output:
    #chr    start      end    ID  Original_ID   path     cov_path             cond
    chr12   752578   752579  ENSG00000060237  Q9H4A3,P62873  protocol_example.protein_1.bed.gz,protocol_example.protein_2.bed.gz  covar_1.gz,covar_2.gz  trait_A,trait_B
    accumulated_pheno_df = pd.DataFrame()
    pheno_id_files = [None] * len(pheno_files) if len(pheno_id_files) == 0 else pheno_id_files

    for pheno_path, cov_path, phenotype_name, id_map_path in zip(pheno_files, cov_files, phenotype_names, pheno_id_files):
        if not os.path.isfile(cov_path):
            raise FileNotFoundError(f"No valid path found for file: {cov_path}")
        pheno_df = load_and_apply_id_map(pheno_path, id_map_path, preloaded_id_maps)
        pheno_df = filter_by_region_ids(pheno_df, region_ids)
        if not pheno_df.empty:
            pheno_df.iloc[:, 4] = adapt_file_path_all(pheno_df, pheno_df.columns[4], f"{pheno_path:a}")
            pheno_df = pheno_df.assign(cov_path=str(cov_path), cond=phenotype_name)           
            accumulated_pheno_df = pd.concat([accumulated_pheno_df, pheno_df], ignore_index=True)

    accumulated_pheno_df = aggregate_phenotype_data(accumulated_pheno_df)
    return accumulated_pheno_df

def process_trans_files(pheno_files, cov_files, phenotype_names, pheno_id_files, region_ids, customized_association_windows):
    Example output:
    #chr    start      end    ID  Original_ID   path     cov_path             cond
    chr21   0   0  chr21_18133254_19330300  carnitine,benzoate,hippurate  metabolon_1.bed.gz,metabolon_2.bed.gz  covar_1.gz,covar_2.gz  trait_A,trait_B
    if not os.path.isfile(customized_association_windows):
        raise ValueError("Customized association analysis window must be specified for trans analysis.")
    accumulated_pheno_df = pd.DataFrame()
    pheno_id_files = [None] * len(pheno_files) if len(pheno_id_files) == 0 else pheno_id_files
    genotype_windows = pd.read_csv(customized_association_windows, comment="#", header=None, names=["#chr","start","end","ID"], sep="\t")
    genotype_windows = filter_by_region_ids(genotype_windows, region_ids)
    if genotype_windows.empty:
        return accumulated_pheno_df
    for pheno_path, cov_path, phenotype_name, id_map_path in zip(pheno_files, cov_files, phenotype_names, pheno_id_files):
        if not os.path.isfile(cov_path):
            raise FileNotFoundError(f"No valid path found for file: {cov_path}")
        pheno_df = pd.read_csv(pheno_path, sep=r"\s+", header=0, names=['Original_ID', 'path'])
        if not pheno_df.empty:
            pheno_df.iloc[:, -1] = adapt_file_path_all(pheno_df, pheno_df.columns[-1], f"{pheno_path:a}")
            pheno_df = pheno_df.assign(cov_path=str(cov_path), cond=phenotype_name)
            # Here we combine genotype_windows which contains "#chr" and "ID" to pheno_df by creating a cartesian product
            pheno_df = pd.merge(genotype_windows.assign(key=1), pheno_df.assign(key=1), on='key').drop('key', axis=1)
            # then set start and end columns to zero
            pheno_df['start'] = 0
            pheno_df['end'] = 0
            if id_map_path is not None:
                # Filter pheno_df by specific association-window and phenotype pairs
                association_analysis_pair = pd.read_csv(id_map_path, sep=r"\s+", header=None, comment='#', names=['ID', 'Original_ID'])
                pheno_df = pd.merge(pheno_df, association_analysis_pair, on=['ID', 'Original_ID'])
            accumulated_pheno_df = pd.concat([accumulated_pheno_df, pheno_df], ignore_index=True)

    accumulated_pheno_df = aggregate_phenotype_data(accumulated_pheno_df)
    return accumulated_pheno_df

# Load genotype meta data
if f"{genoFile:x}" == ".bed":
    geno_meta_data = pd.DataFrame([("chr"+str(x), f"{genoFile:a}") for x in range(1,23)] + [("chrX", f"{genoFile:a}")], columns=['#chr', 'geno_path'])
    geno_meta_data = pd.read_csv(f"{genoFile:a}", sep =r"\s+", header=0)
    geno_meta_data.iloc[:, 1] = adapt_file_path_all(geno_meta_data, geno_meta_data.columns[1], f"{genoFile:a}")
    geno_meta_data.columns = ['#chr', 'geno_path']
    geno_meta_data['#chr'] = geno_meta_data['#chr'].apply(lambda x: str(x) if str(x).startswith('chr') else f'chr{x}')

# Checking the DataFrame
valid_chr_values = [f'chr{x}' for x in range(1, 23)] + ['chrX']
if not all(value in valid_chr_values for value in geno_meta_data['#chr']):
    raise ValueError("Invalid chromosome values found. Allowed values are chr1 to chr22 and chrX.")

region_ids = []
# If region_list is provided, read the file and extract IDs
if region_list.is_file():
    region_list_df = pd.read_csv(region_list, delim_whitespace=True, header=None, comment = "#")
    region_ids = region_list_df.iloc[:, -1].unique()  # Extracting the last column for IDs

# If region_name is provided, include those IDs as well
# --region-name A B C will result in a list of ["A", "B", "C"] here
if len(region_name) > 0:
    region_ids = list(set(region_ids).union(set(region_name)))

if trans_analysis:
    meta_data = process_trans_files(phenoFile, covFile, phenotype_names, phenoIDFile, region_ids, customized_association_windows)
    meta_data = process_cis_files(phenoFile, covFile, phenotype_names, phenoIDFile, region_ids, preload_id_map(phenoIDFile))
if not meta_data.empty:
    meta_data = meta_data.merge(geno_meta_data, on='#chr', how='inner')
    # Adjust association-window
    if os.path.isfile(customized_association_windows):
        print(f"Loading customized association analysis window from {customized_association_windows}")
        association_windows_list = pd.read_csv(customized_association_windows, comment="#", header=None, names=["#chr","start","end","ID"], sep="\t")
        meta_data = pd.merge(meta_data, association_windows_list, on=['#chr', 'ID'], how='left', suffixes=('', '_association'))
        mismatches = meta_data[meta_data['start_association'].isna()]
        if not mismatches.empty:
            print("First 5 mismatches:")
            raise ValueError(f"{len(mismatches)} regions to analyze cannot be found in ``{customized_association_windows}``. Please check your ``{customized_association_windows}`` database to make sure it contains all association-window definitions. ")
        if cis_window < 0 :
            raise ValueError("Please either input valid path to association-window file via ``--customized-association-windows``, or set ``--cis-window`` to a non-negative integer.")
        if cis_window == 0:
            print("Warning: only variants within the range of start and end of molecular phenotype will be considered since cis_window is set to zero and no customized association window file was found. Please make sure this is by design.")
        meta_data['start_association'] = meta_data['start'].apply(lambda x: max(x - cis_window, 0))
        meta_data['end_association'] = meta_data['end'] + cis_window

    # Example meta_data:
    # #chr    start      end    start_association       end_association           ID  Original_ID   path     cov_path             cond             coordinate     geno_path
    # 0  chr12   752578   752579  652578   852579  ENSG00000060237  Q9H4A3,P62873  protocol_example.protein_1.bed.gz,protocol_example.protein_2.bed.gz  covar_1.gz,covar_2.gz  trait_A,trait_B    chr12:752578-752579  protocol_example.genotype.chr21_22.bed       
    # Create the final dictionary
    regional_data = {
        'data': [(row['geno_path'], *row['path'].split(','), *row['cov_path'].split(',')) for _, row in meta_data.iterrows()],
        'meta_info': [(f"{row['#chr']}:{row['start']}-{row['end']}", # this is the phenotypic region to extract data from
                       f"{row['#chr']}:{row['start_association']}-{row['end_association']}", # this is the association window region
                       row['ID'], row['Original_ID'], *row['cond'].split(',')) for _, row in meta_data.iterrows()]
    regional_data = {'data':[], 'meta_info':[]}

Univariate analysis: SuSiE and TWAS#

# Further limit TWAS to only using selected variants
parameter: min_twas_maf = 0.01
parameter: min_twas_xvar = 0.01
parameter: keep_variants = path()
depends: sos_variable("regional_data")
# Check if both 'data' and 'meta_info' are empty lists
stop_if(len(regional_data['data']) == 0, f'Either genotype or phenotype data are not available for region {", ".join(region_name)}.')
meta_info = regional_data['meta_info']
input: regional_data["data"], group_by = lambda x: group_by_region(x, regional_data["data"]), group_with = "meta_info"

if skip_fine_mapping and skip_twas_weights:
    save_data = True
    output_files = [f'{cwd:a}/{step_name}/{name}.{_meta_info[0].split(":")[0]}_{_meta_info[2]}.univariate_data.rds']
elif not skip_fine_mapping and skip_twas_weights:
    output_files = [f'{cwd:a}/{step_name}/{name}.{_meta_info[0].split(":")[0]}_{_meta_info[2]}.univariate_bvsr.rds']
    output_files = [f'{cwd:a}/{step_name}/{name}.{_meta_info[0].split(":")[0]}_{_meta_info[2]}.univariate_bvsr.rds',
output: output_files
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bnn}'
R: expand = '${ }', stdout = f"{_output[0]:nn}.susie_twas.stdout", stderr = f"{_output[0]:nn}.susie_twas.stderr", container = container, entrypoint = entrypoint
    phenotype_files = c(${",".join(['"%s"' % x.absolute() for x in _input[1:len(_input)//2+1]])})
    covariate_files = c(${",".join(['"%s"' % x.absolute() for x in _input[len(_input)//2+1:]])})
    conditions = c(${",".join(['"%s"' % x for x in _meta_info[4:]])})
    pip_cutoff_to_skip = list(${",".join(skip_analysis_pip_cutoff)})
    pip_cutoff_to_skip = unlist(c(pip_cutoff_to_skip[conditions]))
    # extract subset of samples
    keep_samples = NULL
    if (${"TRUE" if keep_samples.is_file() else "FALSE"}) {
      keep_samples = unlist(strsplit(readLines(${keep_samples:ar}), "\\s+"))
      message(paste(length(keep_samples), "samples are selected to be loaded for analysis"))
    # Load regional association data
        fdat = load_regional_univariate_data(genotype = ${_input[0]:anr},
                                              phenotype = phenotype_files,
                                              covariate = covariate_files,
                                              region = ${("'%s'" % _meta_info[0]) if int(_meta_info[0].split('-')[-1])>0 else 'NULL'}, # if the end position is zero return NULL
                                              association_window = "${_meta_info[1]}",
                                              conditions = conditions,
                                              maf_cutoff = ${maf},
                                              mac_cutoff = ${mac},
                                              imiss_cutoff = ${imiss},
                                              keep_indel = ${"TRUE" if indel else "FALSE"},
                                              keep_samples = keep_samples,
                                              keep_variants = ${'"%s"' % keep_variants if not keep_variants.is_file() else "NULL"},
                                              extract_region_name = list(${",".join([("c('"+x+"')") if isinstance(x, str) else ("c"+ str(x)) for x in _meta_info[3]])}),
                                              phenotype_header = ${"4" if int(_meta_info[0].split('-')[-1])>0 else "1"},
                                              region_name_col = ${"4" if int(_meta_info[0].split('-')[-1])>0 else "1"},
                                              scale_residuals = FALSE)
    }, NoSNPsError = function(e) {
        message("Error: ", paste(e$message, "${_meta_info[2] + '@' + _meta_info[1]}"))
        saveRDS(list(${_meta_info[2]} = e$message), "${_output[0]:ann}.univariate_data.rds", compress='xz')
    # save data-set
    if (${"TRUE" if save_data else "FALSE"}) {
        saveRDS(list(${_meta_info[2]} = fdat), "${_output[0]:ann}.univariate_data.rds", compress='xz')
    if (${"FALSE" if skip_fine_mapping and skip_twas_weights else "TRUE"}) {
      if ("${_meta_info[2]}" != "${_meta_info[3]}") {
          region_name = c("${_meta_info[2]}", c(${",".join([("c('"+x+"')") if isinstance(x, str) else ("c"+ str(x)) for x in _meta_info[3]])}))
      } else {
          region_name = "${_meta_info[2]}"

      region_info = list(region_coord=parse_region("${_meta_info[0]}"), grange=parse_region("${_meta_info[1]}"), region_name=region_name)

      finemapping_result = list()
      preset_variants_result = list()
      condition_names = vector()
      empty_elements_cnt = 0
      r = 1

      while (r<=length(fdat$residual_Y)) {
          # Update condition names
          dropped_samples = list(X=fdat$dropped_sample$dropped_samples_X[[r]], 
          new_names = names(fdat$residual_Y)[r]
          ##FIXME:  residule Y lost their colname when there was only 1 column
          # new_col_names = colnames(fdat$residual_Y[[r]])
          new_col_names = list(${",".join([("c('"+x+"')") if isinstance(x, str) else ("c"+ str(x)) for x in _meta_info[3]])})[[r]]
          if (is.null(new_col_names)) {
            # column names does not exist, create generic names instead
            new_col_names = 1:ncol(fdat$residual_Y[[r]])
          # if the name is different as region name ie, these original ID the same as gene ID, then give the new name
          if(!(identical(new_names, new_col_names)))
          new_names = paste(new_names, new_col_names, sep="_") # DLPFC_iso1 DLPFC_iso2

          out <- list()
          # Run the first round focused on fine-mapping with many variants included
          # fine-mapping with complete data-set
          if (${"FALSE" if skip_fine_mapping else "TRUE"}) {
              out$finemapping <- lapply(1:ncol(fdat$residual_Y[[r]]), function(i) {
                                  rs <- univariate_analysis_pipeline( X=fdat$residual_X[[r]], 
                                                                      Y_scalar=if (fdat$residual_Y_scalar[[r]] == 1) 1 else fdat$residual_Y_scalar[[r]][,i,drop=FALSE],
                                                                      other_quantities = list(dropped_samples = dropped_samples),
                                                                      # filters
                                                                      imiss_cutoff = ${imiss},
                                                                      maf_cutoff = NULL,
                                                                      xvar_cutoff = 0,
                                                                      ld_reference_meta_file = NULL,
                                                                      pip_cutoff_to_skip = pip_cutoff_to_skip[r],
                                                                      # methods parameter configuration
                                                                      init_L = ${init_L},
                                                                      max_L = ${max_L},
                                                                      l_step = 5,
                                                                      # fine-mapping results summary
                                                                      signal_cutoff = ${pip_cutoff},
                                                                      coverage = c(${",".join([str(x) for x in coverage])}),
                                                                      # TWAS weights and CV for TWAS weights
                                                                      twas_weights = FALSE, 
                                                                      cv_threads = ${twas_cv_threads}
          if (${"FALSE" if skip_twas_weights else "TRUE"}) {
              # Run the second round focused on TWAS using selected variants
              out$twas_models <- lapply(1:ncol(fdat$residual_Y[[r]]), function(i) {
                                  rs <- univariate_analysis_pipeline(X=fdat$residual_X[[r]], 
                                                                  Y_scalar=if (fdat$residual_Y_scalar[[r]] == 1) 1 else fdat$residual_Y_scalar[[r]][,i,drop=FALSE],
                                                                  other_quantities = list(dropped_samples = dropped_samples),
                                                                  # filters
                                                                  imiss_cutoff = ${imiss},
                                                                  maf_cutoff = ${min_twas_maf},
                                                                  xvar_cutoff = ${min_twas_xvar},
                                                                  ld_reference_meta_file=${('"%s"' % ld_reference_meta_file) if not ld_reference_meta_file.is_dir() else "NULL"},
                                                                  pip_cutoff_to_skip = pip_cutoff_to_skip[r],
                                                                  # methods parameter configuration
                                                                  init_L = ${init_L},
                                                                  max_L = ${max_L},
                                                                  l_step = 5,
                                                                  # fine-mapping results summary
                                                                  signal_cutoff = ${pip_cutoff},
                                                                  coverage = c(${",".join([str(x) for x in coverage])}),
                                                                  # TWAS weights and CV for TWAS weights
                                                                  twas_weights = TRUE, 
                                                                  cv_threads = ${twas_cv_threads}

          empty_elements_idx <- unique(, lapply(out, function(results) which(sapply(results, function(x) is.list(x) && length(x) == 0)))))
          if (length(empty_elements_idx)>0) {
            empty_elements_cnt <- empty_elements_cnt + length(empty_elements_idx)
            if (!is.null(out$finemapping)) {
                out$finemapping <- out$finemapping[-empty_elements_idx]
            if (!is.null(out$twas_models)) {
                out$twas_models <- out$twas_models[-empty_elements_idx]
            new_names <- new_names[-empty_elements_idx]
          if (!is.null(out$finemapping)) {
              finemapping_result = c(finemapping_result, out$finemapping)
          if (!is.null(out$twas_models)) {
              preset_variants_result = c(preset_variants_result, out$twas_models)
          condition_names = c(condition_names, new_names)
          if (length(new_names)>0) {
            message("Analysis completed for: ", paste(new_names, collapse=","))
          # original data no longer relevant, set to NA to release memory
          fdat$residual_X[[r]] <- NA
          fdat$residual_Y[[r]] <- NA
          r = r + 1
      # Reorganize outputs
      # The BVSR model in this case contain different versions of SuSiE fits
      # preset_variants_result
      # Clean up TWAS weights output
      twas_output <- list()
      if (length(preset_variants_result)>0) {
          names(preset_variants_result) <- condition_names
          for (r in condition_names) {
            twas_output[[r]] <- preset_variants_result[[r]]$twas_weights_result
            preset_variants_result[[r]]$twas_weights_result <- NULL
            twas_output[[r]]$variant_names <- preset_variants_result[[r]]$variant_names
            twas_output[[r]]$region_info <- region_info
            preset_variants_result[[r]]$region_info <- region_info
      # Clean up fine-mapping output
      finemapping_output <- list()
      if (length(finemapping_result)>0) {
          names(finemapping_result) <- condition_names
      for (r in condition_names) {
          if (r %in% names(finemapping_result)) {
              finemapping_output[[r]] <- finemapping_result[[r]]
              finemapping_output[[r]]$region_info <- region_info
          if (r %in% names(preset_variants_result)) {
              finemapping_output[[r]]$preset_variants_result <- preset_variants_result[[r]]
      saveRDS(list("${_meta_info[2]}" = finemapping_output), "${_output[0]:ann}.univariate_bvsr.rds", compress='xz')
      if (length(twas_output) > 0) saveRDS(list("${_meta_info[2]}" = twas_output), "${_output[0]:ann}.univariate_twas_weights.rds", compress='xz')
      if (empty_elements_cnt>0) {
          message(empty_elements_cnt, " analysis are skipped for failing to pass initial screen for potential association signals")

Multivariate analysis: mvSuSiE and mr.mash#

# Prior model file generated from mashr. 
# Default will be used if it does not exist.
parameter: mixture_prior = path()
parameter: mixture_prior_cv = path()
parameter: prior_weights_min = 5e-4
parameter: prior_canonical_matrices = False
parameter: sample_partition = path() 
parameter: mvsusie_max_iter = 200
parameter: mrmash_max_iter = 5000
depends: sos_variable("regional_data")
# Check if both 'data' and 'meta_info' are empty lists
stop_if(len(regional_data['data']) == 0, f'Either genotype or phenotype data are not available for region {", ".join(region_name)}.')

meta_info = regional_data['meta_info']

input: regional_data["data"], group_by = lambda x: group_by_region(x, regional_data["data"]), group_with = "meta_info"
if skip_fine_mapping and skip_twas_weights:
    save_data = True
    output_files = [f'{cwd:a}/{step_name}/{name}.{_meta_info[0].split(":")[0]}_{_meta_info[2]}.multicontext_data.rds']
elif not skip_fine_mapping and skip_twas_weights:
    output_files = [f'{cwd:a}/{step_name}/{name}.{_meta_info[0].split(":")[0]}_{_meta_info[2]}.multicontext_bvsr.rds']
    output_files = [f'{cwd:a}/{step_name}/{name}.{_meta_info[0].split(":")[0]}_{_meta_info[2]}.multicontext_bvsr.rds',
output: output_files
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output[0]:nn}.mnm.stdout", stderr = f"{_output[0]:nn}.mnm.stderr", container = container, entrypoint = entrypoint

    phenotype_files = c(${",".join(['"%s"' % x.absolute() for x in _input[1:len(_input)//2+1]])})
    covariate_files = c(${",".join(['"%s"' % x.absolute() for x in _input[len(_input)//2+1:]])})
    conditions = c(${",".join(['"%s"' % x for x in _meta_info[4:]])})
    pip_cutoff_to_skip = list(${",".join(skip_analysis_pip_cutoff)})
    pip_cutoff_to_skip = unlist(pip_cutoff_to_skip[conditions])
    # extract subset of samples
    keep_samples = NULL
    if (${"TRUE" if keep_samples.is_file() else "FALSE"}) {
      keep_samples = unlist(strsplit(readLines(${keep_samples:ar}), "\\s+"))
      message(paste(length(keep_samples), "samples are selected to be loaded for analysis"))
    # Load regional association data
    fdat = load_regional_multivariate_data(genotype = ${_input[0]:anr},
                                          phenotype = phenotype_files,
                                          covariate = covariate_files,
                                          region = "${_meta_info[0]}",
                                          conditions = conditions,
                                          association_window = "${_meta_info[1]}",
                                          maf_cutoff = ${maf},
                                          mac_cutoff = ${mac},
                                          imiss_cutoff = ${imiss},
                                          keep_indel = ${"TRUE" if indel else "FALSE"},
                                          keep_samples = keep_samples,
                                          extract_region_name = list(${",".join([("c('"+x+"')") if isinstance(x, str) else ("c"+ str(x)) for x in _meta_info[3]])}),
                                          phenotype_header = ${"4" if int(_meta_info[0].split('-')[-1])>0 else "1"},
                                          region_name_col = ${"4" if int(_meta_info[0].split('-')[-1])>0 else "1"},
                                          scale_residuals = FALSE)
        }, NoSNPsError = function(e) {
        message("Error: ", paste(e$message, "${_meta_info[2] + '@' + _meta_info[1]}"))
        saveRDS(list(${_meta_info[2]} = e$message), "${_output[0]:ann}.multicontext_data.rds", compress='xz')
    if (${"TRUE" if save_data else "FALSE"}) {
        saveRDS(fdat, "${_output[0]:ann}.multicontext_data.rds", compress='xz')
    if (${"FALSE" if skip_fine_mapping and skip_twas_weights else "TRUE"}) {
      if ("${_meta_info[2]}" != "${_meta_info[3]}") {
          region_name = c("${_meta_info[2]}", c(${",".join([("c('"+x+"')") if isinstance(x, str) else ("c"+ str(x)) for x in _meta_info[3]])}))
      } else {
          region_name = "${_meta_info[2]}"

      region_info = list(region_coord=parse_region("${_meta_info[0]}"), grange=parse_region("${_meta_info[1]}"), region_name=region_name)

      dd_prior <- if (${mixture_prior:r} == '.' || ${mixture_prior:r} == '') NULL else readRDS(${mixture_prior:r})
      dd_prior_cv <- if (${mixture_prior_cv:r} == '.' || ${mixture_prior_cv:r} == '') NULL else readRDS(${mixture_prior_cv:r})
      use_canonical_prior = ${"TRUE" if prior_canonical_matrices else "FALSE"} && ${"FALSE" if not mixture_prior.is_file() else "TRUE"}
      if (is.null(dd_prior)) use_canonical_prior = TRUE
      result <- multivariate_analysis_pipeline(
                      X = fdat$X,
                      Y = fdat$residual_Y,
                      maf = fdat$maf,
                      X_variance = fdat$X_variance,
                      other_quantities = list(dropped_samples = fdat$dropped_samples),
                      # filters
                      imiss_cutoff = ${imiss}, 
                      maf_cutoff = ${maf},
                      xvar_cutoff = ${xvar_cutoff},
                      ld_reference_meta_file = ${('"%s"' % ld_reference_meta_file) if not ld_reference_meta_file.is_dir() else "NULL"},
                      pip_cutoff_to_skip = pip_cutoff_to_skip,
                      # methods parameter configuration
                      max_L = -1,
                      data_driven_prior_matrices = dd_prior, 
                      data_driven_prior_matrices_cv = dd_prior_cv, 
                      data_driven_prior_weights_cutoff = ${prior_weights_min}, 
                      canonical_prior_matrices = use_canonical_prior,
                      mvsusie_max_iter = ${mvsusie_max_iter}, 
                      mrmash_max_iter = ${mrmash_max_iter},
                      # fine-mapping results summary
                      signal_cutoff = ${pip_cutoff},
                      coverage = c(${",".join([str(x) for x in coverage])}),
                      # TWAS weights and CV for TWAS weights
                      twas_weights = ${"TRUE" if not skip_twas_weights else "FALSE"},
                      sample_partition = ${"NULL" if sample_partition=='.' or sample_partition=='' else sample_partition},
                      max_cv_variants = ${max_cv_variants}, 
                      cv_folds = ${twas_cv_folds},
                      cv_threads = ${twas_cv_threads}            
      result$region_info <- region_info
      if (!is.null(result$twas_weights_result)) {
          for (r in names(result$twas_weights_result)) {
              result$twas_weights_result[[r]]$region_info <- region_info
          names(result$twas_weights_result) <- paste0(names(result$twas_weights_result), "_${_meta_info[2]}")
          saveRDS(list("${_meta_info[2]}" = result$twas_weights_result), "${_output[0]:ann}.multicontext_twas_weights.rds", compress='xz')
          result$twas_weights_result <- NULL
      } else if (${"TRUE" if not skip_twas_weights else "FALSE"}){
        saveRDS(list("${_meta_info[2]}" = NULL), "${_output[0]:ann}.multicontext_twas_weights.rds", compress='xz')
      saveRDS(list("${_meta_info[2]}" = result), "${_output[0]:ann}.multicontext_bvsr.rds", compress='xz')

xQTL fine-mapping across multiple genes#

This pipeline jointly fine-map multiple genes in a given region. To determine if a gene is eligable for joint fine-mapping with other genes, it will first load the univeriate fine-mapping result of a gene and assess if its top_loci candidates overlap with any of the other genes. Genes don’t statisfy this requirement will be dropped. It is therefore required that the univariate fine-mapping results and its meta data are available, in the format of susie_twas output.

depends: sos_variable("regional_data")
# Check if both 'data' and 'meta_info' are empty lists
stop_if(len(regional_data['data']) == 0, f'Either genotype or phenotype data are not available for region {", ".join(region_name)}.')
meta_info = regional_data['meta_info']
genes = meta_info[0][3]
if isinstance(genes, tuple):
    genes = genes[0] if isinstance(genes[0], tuple) else list(genes)
    genes = [genes]

if len(skip_analysis_pip_cutoff) == 0:
    skip_analysis_pip_cutoff = [0.0] * len(genes)

if len(skip_analysis_pip_cutoff) == 1:
    skip_analysis_pip_cutoff = skip_analysis_pip_cutoff * len(genes)

skip_analysis_pip_cutoff = ["'{}'={}".format(genes[i], skip_analysis_pip_cutoff[i].split('=')[1]) for i in range(len(genes))]

parameter: pheno_id_map_file = path
parameter: fine_mapping_meta = path
parameter: data_driven_prior = False
# Parameters below are relevant when we use data driven prior
# which is an experimental feature
parameter: n_random = 10
parameter: n_null = 10
parameter: independent_variant_list = path()
parameter: prior_weights_min = 5e-4
parameter: prior_canonical_matrices = False
# This is relevant to cross-validation
parameter: sample_partition = path() 
parameter: mvsusie_max_iter = 200
parameter: mrmash_max_iter = 5000

if not data_driven_prior:
     prior_canonical_matrices = True
input: regional_data["data"],group_by = lambda x: group_by_region(x, regional_data["data"]), group_with = "meta_info"

if skip_fine_mapping and skip_twas_weights:
    save_data = True
    output_files = [f'{cwd:a}/{step_name}/{name}.{_meta_info[2]}.multigene_data.rds']
elif not skip_fine_mapping and skip_twas_weights:
    output_files = [f'{cwd:a}/{step_name}/{name}.{_meta_info[2]}.multigene_bvsr.rds']
    output_files = [f'{cwd:a}/{step_name}/{name}.{_meta_info[2]}.multigene_bvsr.rds',f'{cwd:a}/{step_name}/{name}.{_meta_info[2]}.multigene_twas_weights.rds']

output: output_files
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
R: expand = '${ }', stdout = f"{_output[0]:nn}.mnm_genes.stdout", stderr = f"{_output[0]:nn}.mnm_genes.stderr", container = container, entrypoint = entrypoint

    combine_result_list <- function(univariate_finemapping_metadata, condition_name, conditions = NULL, extracted_region){
        regional_window_combined_susie_result <- list()
        regional_window_combined_sumstats_result <- list()
        extracted_univariate_finemapping_metadata <- univariate_finemapping_metadata%>%filter(region_id%in%extracted_region)
        for(i in 1:dim(extracted_univariate_finemapping_metadata)[1]){
           gene_id <- extracted_univariate_finemapping_metadata$region_id[i]
           susie_result <- readRDS(extracted_univariate_finemapping_metadata$combined_data[i])
           sumstats_result <- readRDS(extracted_univariate_finemapping_metadata$combined_data_sumstats[i]) 
           if (!is.null(conditions)) {
              if (any(names(susie_result[[gene_id]]) %in% conditions)) {
                  extracted_conditions = names(susie_result[[gene_id]])[names(susie_result[[gene_id]]) %in% conditions]
                 for (j in 1:length(extracted_conditions)){
                     regional_window_combined_susie_result[[condition_name]][[extracted_conditions[j]]] <- susie_result[[gene_id]][[extracted_conditions[j]]]
                     regional_window_combined_sumstats_result[[condition_name]][[extracted_conditions[j]]] <- sumstats_result[[gene_id]][[extracted_conditions[j]]]
           } else {
                    regional_window_combined_susie_result[[condition_name]][[gene_id]] <- susie_result[[gene_id]][[condition_name]]
                    regional_window_combined_sumstats_result[[condition_name]][[gene_id]] <- sumstats_result[[gene_id]][[condition_name]]
           return(list(regional_window_combined_susie_result = regional_window_combined_susie_result, regional_window_combined_sumstats_result = regional_window_combined_sumstats_result))

    extract_non_null_cs_list <- function(combined_list, condition_name, extracted_region) {
        # List to store results for genes passing the first step
        extracted_regional_window_combined_susie_result <- list()
        extracted_regional_window_combined_sumstats_result <- list()

        # Iterate over each gene's susie and sumstats results
        filtered_region <- extracted_region[extracted_region%in%names(combined_list[["regional_window_combined_susie_result"]][[condition_name]])]
        for (gene_id in filtered_region) {
            susie_result <- get_nested_element(combined_list,c("regional_window_combined_susie_result",condition_name,gene_id))

           # Check if there exits top_loci.
           if (!is.null(susie_result[["top_loci"]])&&nrow(susie_result[["top_loci"]])[1]!=0) {
              # If not all zeros, add this gene's susie and sumstats results to the lists
              extracted_regional_window_combined_susie_result[[condition_name]][[gene_id]] <- get_nested_element(combined_list,c("regional_window_combined_susie_result",condition_name,gene_id))
              extracted_regional_window_combined_sumstats_result[[condition_name]][[gene_id]] <- get_nested_element(combined_list,c("regional_window_combined_sumstats_result",condition_name,gene_id))
         return(list(extracted_regional_window_combined_susie_result = extracted_regional_window_combined_susie_result, extracted_regional_window_combined_sumstats_result = extracted_regional_window_combined_sumstats_result))
    process_and_compare_variants <- function(combined_list, condition_name) {
       extracted_regional_window_combined_susie_result <- list()
       extracted_regional_window_combined_sumstats_result <- list()
       compare_variants <- function(variants1, variants2) {
            return(any(variants1 %in% variants2) | any(variants2 %in% variants1))
       subset_top_loci_variants <- function(df) {
          find_non_zero_rows <- function(df) {
                non_zero_indices <- which(rowSums(df!= 0) > 0)
           non_zero_indices <- find_non_zero_rows(df)

           if (length(non_zero_indices) > 0) {
              non_zero_index_variants <- df[non_zero_indices, , drop = FALSE]%>%select(variant_id)%>%pull(variant_id)
              zero_index_filtered_variants <- df[-non_zero_indices, ,drop = FALSE]%>%filter(pip>=0.01)%>%select(variant_id)%>%pull(variant_id)
              top_loci_variants <- c(non_zero_index_variants, zero_index_filtered_variants)
           } else {
              zero_index_filtered_variants <- df%>%filter(pip>=0.01)%>%select(variant_id)%>%pull(variant_id)
              top_loci_variants <- zero_index_filtered_variants
       for (i in 1:length(combined_list[["extracted_regional_window_combined_susie_result"]][[condition_name]])) {
           susie_result_1 <- combined_list[["extracted_regional_window_combined_susie_result"]][[condition_name]][[i]]
           gene_id <- names(combined_list[["extracted_regional_window_combined_susie_result"]][[condition_name]])[i]
           # Extract top_loci variants
           variants1 <- subset_top_loci_variants(susie_result_1[["top_loci"]])
           if (length(variants1) > 0) {
              different_in_all_genes <- TRUE
           for (j in 1:length(combined_list[["extracted_regional_window_combined_susie_result"]][[condition_name]])) {
               if (i != j) {
                  susie_result_2 <- combined_list[["extracted_regional_window_combined_susie_result"]][[condition_name]][[j]]
                  variants2 <- subset_top_loci_variants(susie_result_2[["top_loci"]])
                  # Compare variants
                  if (length(variants2) > 0) {
                    if (compare_variants(variants1, variants2)) {
                        # If variants are not totally different across all genes, set the flag to FALSE
                       different_in_all_genes <- FALSE
            # After the loop, check the flag
            if (!different_in_all_genes) {
               extracted_regional_window_combined_susie_result[[condition_name]][[gene_id]] <- get_nested_element(combined_list, c("extracted_regional_window_combined_susie_result", condition_name, gene_id))
               extracted_regional_window_combined_sumstats_result[[condition_name]][[gene_id]] <- get_nested_element(combined_list, c("extracted_regional_window_combined_sumstats_result", condition_name, gene_id))
      return(list(extracted_regional_window_combined_susie_result = extracted_regional_window_combined_susie_result, extracted_regional_window_combined_sumstats_result = extracted_regional_window_combined_sumstats_result))

    fine_mapping_metadata = fread(${fine_mapping_meta:r})
    phenotype_files = c(${",".join(['"%s"' % x.absolute() for x in _input[1:len(_input)//2+1]])})
    covariate_files = c(${",".join(['"%s"' % x.absolute() for x in _input[len(_input)//2+1:]])})
    condition_name = ${("'%s'" % _meta_info[-1])}
    region_names = ${",".join([("c('"+x+"')") if isinstance(x, str) else ("c"+ str(x)) for x in _meta_info[3]])}
    if (any(region_names %in% fine_mapping_metadata$region_id)) {
       filtered_region <- fine_mapping_metadata%>%filter(region_id%in%region_names)%>%select(region_id)%>%pull(region_id)
       filtered_event_names <- filtered_region
    } else {
       pheno_id_map <- fread(${pheno_id_map_file:r})
       gene_ids <- pheno_id_map%>%filter(ID%in%region_names)%>%select(gene)%>%pull(gene)%>%{.[!duplicated(.)]}
       filtered_region <- fine_mapping_metadata%>%filter(region_id%in%gene_ids)%>%select(region_id)%>%pull(region_id)
       filtered_event_names <- pheno_id_map%>%filter(ID%in%region_names&gene%in%filtered_region)%>%select(ID)%>%pull(ID)
    keep_samples = NULL
    if (${"TRUE" if keep_samples.is_file() else "FALSE"}) {
      keep_samples = unlist(strsplit(readLines(${keep_samples:ar}), "\\s+"))
      message(paste(length(keep_samples), "samples are selected to be loaded for analysis"))
    fdat = load_regional_multivariate_data(genotype = ${_input[0]:anr},
                                          phenotype = phenotype_files,
                                          covariate = covariate_files,
                                          region = ${("'%s'" % _meta_info[1]) if int(_meta_info[1].split('-')[-1])>0 else 'NULL'},
                                          maf_cutoff = ${maf},       
                                          mac_cutoff = ${mac},
                                          imiss_cutoff = ${imiss},
                                          conditions = NULL,
                                          association_window = ${("'%s'" % _meta_info[1]) if int(_meta_info[1].split('-')[-1])>0 else 'NULL'},
                                          extract_region_name = list(filtered_event_names),
                                          region_name_col = ${"4" if int(_meta_info[1].split('-')[-1])>0 else "1"},
                                          keep_indel = ${"TRUE" if indel else "FALSE"},
                                          keep_samples = keep_samples,
                                          phenotype_header = ${"4" if int(_meta_info[1].split('-')[-1])>0 else "1"}, # skip first 4 rows of transposed phenotype for chr, start, end and ID
                                          scale_residuals = FALSE)
    if (${"TRUE" if save_data else "FALSE"}) {
        saveRDS(fdat, "${_output[0]:ann}.multigene_data.rds", compress='xz')
    if (${"FALSE" if skip_fine_mapping and skip_twas_weights else "TRUE"}) {
        # Extract and consolidate univariate results to determine which genes to focus on
        if (grepl("sQTL", condition_name)) {conditions <- paste(condition_name, filtered_event_names, sep = "_")} else {conditions <- NULL}
        combined_regional_window_list <- combine_result_list(fine_mapping_metadata, condition_name, conditions, filtered_region)
        if (grepl("sQTL",condition_name)) { filtered_region <- paste(condition_name, filtered_event_names, sep = "_") }
        extracted_non_null_combined_susie_list <- extract_non_null_cs_list(combined_list = combined_regional_window_list, condition_name, filtered_region)
        if (length(extracted_non_null_combined_susie_list[["extracted_regional_window_combined_susie_result"]])!=0) {
           extracted_combined_susie_list <- process_and_compare_variants(combined_list = extracted_non_null_combined_susie_list, condition_name)
        } else {
           extracted_combined_susie_list <- NULL
        if (!is.null(extracted_combined_susie_list)&length(extracted_combined_susie_list[["extracted_regional_window_combined_susie_result"]])!=0) {
           if (grepl("sQTL", condition_name)) {
              filtered_event_names <- sub(paste0("^", condition_name, "_"), "", names(get_nested_element(extracted_combined_susie_list,c("extracted_regional_window_combined_susie_result", condition_name))))
           }  else if (grepl("pQTL", condition_name)) {
              filtered_event_names <- pheno_id_map %>%
                              filter(gene %in% names(get_nested_element(extracted_combined_susie_list, c("extracted_regional_window_combined_susie_result", condition_name)))) %>%
                              select(ID) %>% pull(ID)
          } else {
            filtered_event_names <- names(get_nested_element(extracted_combined_susie_list, c("extracted_regional_window_combined_susie_result", condition_name)))
        } else {
          filtered_event_names <- NULL

        if ("${_meta_info[2]}" != "${_meta_info[3]}") {
          region_name = c("${_meta_info[2]}", c(${",".join([("c('"+x+"')") if isinstance(x, str) else ("c"+ str(x)) for x in _meta_info[3]])}))
        } else {
          region_name = "${_meta_info[2]}"

        region_info = list(region_coord=parse_region("${_meta_info[0]}"), grange=parse_region("${_meta_info[1]}"), region_name=region_name)
        dd_prior <- NULL
        if (${"TRUE" if data_driven_prior else "FALSE"}) {
            # FIXME: please complete this function call
            dd_prior <- multigene_udr(extracted_combined_susie_list, coverage = paste0("cs_coverage_",${coverage[0]},sep=""), independent_variant_list = ${independent_variant_list:r}, n_random = ${n_random}, n_null = ${n_null}, seed = ${seed})
        if (length(filtered_event_names)>=2) {
           pip_cutoff_to_skip = list(${",".join(skip_analysis_pip_cutoff)})
           pip_cutoff_to_skip = unlist(pip_cutoff_to_skip[filtered_event_names])
           dd_prior_cv <- dd_prior
           result <- multivariate_analysis_pipeline(
                            X_variance = fdat$X_variance,
                            other_quantities = list(dropped_samples = fdat$dropped_samples),
                            # filters
                            imiss_cutoff = ${imiss}, 
                            maf_cutoff = ${maf},
                            xvar_cutoff = ${xvar_cutoff},
                            ld_reference_meta_file = ${('"%s"' % ld_reference_meta_file) if not ld_reference_meta_file.is_dir() else "NULL"},
                            pip_cutoff_to_skip = pip_cutoff_to_skip,
                            # methods parameter configuration
                            max_L = -1,
                            data_driven_prior_matrices = dd_prior, 
                            data_driven_prior_matrices_cv = dd_prior_cv, 
                            data_driven_prior_weights_cutoff = ${prior_weights_min}, 
                            canonical_prior_matrices = ${"TRUE" if prior_canonical_matrices else "FALSE"},
                            mvsusie_max_iter = ${mvsusie_max_iter}, 
                            mrmash_max_iter = ${mrmash_max_iter},
                            # fine-mapping results summary
                            signal_cutoff = ${pip_cutoff},
                            coverage = c(${",".join([str(x) for x in coverage])}),
                            # TWAS weights and CV for TWAS weights
                            twas_weights = ${"TRUE" if not skip_twas_weights else "FALSE"},
                            sample_partition = ${"NULL" if sample_partition=='.' or sample_partition=='' else sample_partition},
                            max_cv_variants = ${max_cv_variants}, 
                            cv_folds = ${twas_cv_folds},
                            cv_threads = ${twas_cv_threads}            
          result$region_info <- region_info
          if (!is.null(result$twas_weights_result)) {
              for (r in names(result$twas_weights_result)) {
                  result$twas_weights_result[[r]]$region_info <- region_info
              saveRDS(list("${_meta_info[2]}" = result$twas_weights_result), "${_output[0]:ann}.multigene_twas_weights.rds", compress='xz')
              result$twas_weights_result <- NULL
          saveRDS(list("${_meta_info[2]}" = result), "${_output[0]:ann}.multigene_bvsr.rds", compress='xz')        
        } else {
          for (f in c(${_output:ar,})) {
            saveRDS(NULL, f)

Functional regression fSuSiE for epigenomic QTL fine-mapping#

# prior can be either of ["mixture_normal", "mixture_normal_per_scale"]
parameter: prior = "mixture_normal"
parameter: max_SNP_EM = 100
# Max scale is such that 2^max_scale being the number of phenotypes in the transformed space. Default to 2^10  = 1024. Don't change it unless you know what you are doing. Max_scale should be at least larger than 5.
parameter:  max_scale = 10
# Purity and coverage used to call cs
parameter:  min_purity = 0.5
# Epigenetics mark filter
parameter: epigenetics_mark_treshold = 16
# Run susie for top pc of the fsusie input
parameter: susie_top_pc = 10
depends: sos_variable("regional_data")
# Check if both 'data' and 'meta_info' are empty lists
stop_if(len(regional_data['data']) == 0, f'Either genotype or phenotype data are not available for region {", ".join(region_name)}.')
meta_info = regional_data['meta_info']
input: regional_data["data"], group_by = lambda x: group_by_region(x, regional_data["data"]), group_with = "meta_info"
output: f'{cwd:a}/{step_name[:-2]}/{name}.{_meta_info[0].replace(":", "_").replace("-", "_")}.fsusie_{prior}{"_top_pc_weights" if not skip_twas_weights else ""}.rds'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
R: expand = '${ }', stdout = f"{_output:n}.stdout", stderr = f"{_output:n}.stderr", container = container, entrypoint = entrypoint
    # extract subset of samples
    keep_samples = NULL
    if (${"TRUE" if keep_samples.is_file() else "FALSE"}) {
      keep_samples = unlist(strsplit(readLines(${keep_samples:ar}), "\\s+"))
      message(paste(length(keep_samples), "samples are selected to be loaded for analysis"))

    # Load regional functional data
    fdat = load_regional_functional_data(genotype = ${_input[0]:anr},
                                          phenotype = c(${",".join(['"%s"' % x.absolute() for x in _input[1:len(_input)//2+1]])}),
                                          covariate = c(${",".join(['"%s"' % x.absolute() for x in _input[len(_input)//2+1:]])}),
                                          region = "${_meta_info[0]}",
                                          association_window = "${_meta_info[1]}",
                                          conditions = c(${",".join(['"%s"' % x for x in _meta_info[4:]])}),
                                          maf_cutoff = ${maf},
                                          mac_cutoff = ${mac},
                                          imiss_cutoff = ${imiss},
                                          keep_indel = ${"TRUE" if indel else "FALSE"},
                                          keep_samples = keep_samples,
                                          tabix_header = TRUE,
                                          phenotype_header = 4,
                                          region_name_col = 4,
                                          scale_residuals = FALSE)
    }, NoSNPsError = function(e) {
        message("Error: ", paste(e$message, "${_meta_info[2] + '@' + _meta_info[1]}"))
        saveRDS(list("${_meta_info[0]}" = e$message), ${_output:ar}, compress='xz')
    # Filter out list fdat that with less than a treshold of epigenomic marker.
    filter_fdat_except_specific_names <- function(fdat, n) {
        # Identify which elements in list1 meet the row count criteria
        indices_to_keep <- sapply(fdat$Y_coordinates, function(x) nrow(x) >= n)
        fdat_filtered <- map(fdat[!names(fdat) %in% c("dropped_sample", "X", "chrom")],~.x[indices_to_keep]) 
        return(c(fdat_filtered,fdat[names(fdat) %in% c("dropped_sample", "X", "chrom")]))

    fdat = filter_fdat_except_specific_names(fdat, n = ${epigenetics_mark_treshold})
    # Check if Y_coordinates is empty after filtering
    if (length(fdat$Y_coordinates) == 0) {
        e_msg = paste0("None of the study have more than or equal to ",${epigenetics_mark_treshold}, " epigenetics marks, region skipped")
        saveRDS(list("${_meta_info[0]}" = e_msg ),  ${_output:ar}, compress='xz')

    if (${"TRUE" if save_data else "FALSE"}) {
      # save data object for debug purpose
      saveRDS(list("${_meta_info[0]}" = fdat), "${_output:ann}.${epigenetics_mark_treshold}_marks.dataset.rds", compress='xz')
    fitted = setNames(replicate(length(fdat$residual_Y), list(), simplify = FALSE), names(fdat$residual_Y))
    for (r in 1:length(fitted)) {
        st = proc.time()
        fitted[[r]] = list()
        message(paste("Dimension of Y matrix is ", nrow(fdat$residual_Y[[r]]), "rows by", ncol(fdat$residual_Y[[r]]), "columns."))
        # Run SuSiE on top K PC
        if(${"TRUE" if (susie_top_pc >0 or not skip_twas_weights) else "FALSE"}) {
            top_pc_data <- prcomp(fdat$residual_Y[[r]], center = TRUE, scale. = TRUE)$x
            if (${susie_top_pc} < ncol(top_pc_data)) {
                top_pc_data <- top_pc_data[,1:${susie_top_pc},drop=F]
            fitted[[r]]$susie_on_top_pc <- list()
            for (i in 1:ncol(top_pc_data)) {
                susie_on_top_pc <- susie_wrapper(fdat$residual_X[[r]], top_pc_data[,i], init_L=${init_L}, max_L=${max_L}, refine=TRUE, coverage = ${coverage[0]})
                fitted[[r]]$susie_on_top_pc[[i]] <- susie_post_processor(susie_on_top_pc, fdat$residual_X[[r]], top_pc_data[,i], fdat$residual_X_scalar[[r]], 1, fdat$maf[[r]],
                                           secondary_coverage = c(${",".join([str(x) for x in coverage[1:]])}), signal_cutoff = ${pip_cutoff},
                                           other_quantities = list(dropped_samples = list(X=fdat$dropped_sample$dropped_samples_X[[r]], 
            # Run TWAS weights on the first PC
            # Exactly the same codes copied from susie_twas
            if ( ${"TRUE" if not skip_twas_weights else "FALSE"} ) {
                twas_weights_output <- twas_weights_pipeline(fdat$residual_X[[r]], top_pc_data[,1], 
                                         cv_folds = ${twas_cv_folds}, max_cv_variants=${max_cv_variants}, 
                # clean up the output database
                fitted[[r]] = c(fitted[[r]], twas_weights_output)
                fitted[[r]]$twas_weights = lapply(fitted[[r]]$twas_weights, function(x) { rownames(x) <- NULL; return(x) })
        # Run fSuSiE -- this can take a while
        fitted[[r]]$fsusie_result <- fsusie_wrapper(X = fdat$residual_X[[r]],
                                      Y = fdat$residual_Y[[r]],
                                      max_scale = ${max_scale},
                                      min.purity = ${min_purity},
                                      cov_lev = ${coverage[0]})
        fitted[[r]]$Y_coordinates = fdat$Y_coordinates[[r]] # Add Y coord 
        names(fitted[[r]]$fsusie_result$pip) = colnames(fdat$residual_X[[r]]) 
        fitted[[r]]$fsusie_summary <- susie_post_processor(fitted[[r]]$fsusie_result, fdat$residual_X[[r]], NULL, fdat$residual_X_scalar[[r]], 1, fdat$maf[[r]], 
                                                          secondary_coverage = c(${",".join([str(x) for x in coverage[1:]])}), signal_cutoff = ${pip_cutoff},
                                                          other_quantities = list(dropped_samples = list(X=fdat$dropped_sample$dropped_samples_X[[r]], y=fdat$dropped_sample$dropped_samples_Y[[r]], 
        fitted[[r]]$total_time_elapsed = proc.time() - st
        fitted[[r]]$region_info = list(region_coord=parse_region("${_meta_info[0]}"), grange=parse_region("${_meta_info[1]}"), region_name="${_meta_info[2]}")
        # original data no longer relevant, set to NA to release memory
        fdat$residual_X[[r]] <- NA
        fdat$residual_Y[[r]] <- NA
    saveRDS(list("${_meta_info[0]}" = fitted), ${_output:ar}, compress='xz')

Functional regression fSuSiE with other modality#

This is still WIP — mvfSuSiE package is still being developed

# prior can be either of ["mixture_normal", "mixture_normal_per_scale"]
parameter: prior  = "mixture_normal_per_scale"
parameter: max_SNP_EM = 1000
depends: sos_variable("regional_data")
# Check if both 'data' and 'meta_info' are empty lists
stop_if(len(regional_data['data']) == 0, f'Either genotype or phenotype data are not available for region {", ".join(region_name)}.')

meta_info = regional_data['meta_info']
input: regional_data["data"], group_by = lambda x: group_by_region(x, regional_data["data"]), group_with = "meta_info"
output: f'{cwd:a}/{step_name[:-2]}/{name}.{_meta_info[0]}.mvfsusie_{prior}.rds'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
R: expand = '${ }', stdout = f"{_output:n}.stdout", stderr = f"{_output:n}.stderr", container = container, entrypoint = entrypoint
    # Load regional association data
    fdat = load_regional_association_data(genotype = ${_input[0]:anr},
                                          phenotype = c(${",".join(['"%s"' % x.absolute() for x in _input[1::2]])}),
                                          covariate = c(${",".join(['"%s"' % x.absolute() for x in _input[2::2]])}),
                                          region = ${'"%s:%s-%s"' % (_meta_info[1], _meta_info[2], _meta_info[3])},
                                          maf_cutoff = ${maf},
                                          mac_cutoff = ${mac},
                                          imiss_cutoff = ${imiss})
    # Fine-mapping with mvfSuSiE
    Y = map(fdat$residual_Y, ~left_join(fdat$X[,1]%>>%rownames_to_column("rowname"), .x%>%t%>>%rownames_to_column("rowname") , by = "rowname")%>%select(-2)%>%column_to_rownames("rowname")%>%as.matrix )
    fitted <- multfsusie(Y_f = list(Y[[1]],Y[[3]]), 
                         Y_u = Reduce(cbind, Y[[2]]),
                         pos = list(pos1 =fdat$phenotype_coordiates[[1]], pos2 = fdat$phenotype_coordiates[[3]]),
    saveRDS(fitted, ${_output:ar})