TWAS, cTWAS and MR#

Introduction#

This module provides software implementations for transcriptome-wide association analysis (TWAS), Quantile TWAS and performs variant selection for providing sparse signals for cTWAS (causal TWAS) analysis as described in Qian et al (2024+) the multi-group cTWAS method. It will additionally perform Mendelian Randomization using fine-mapping instrumental variables (IV) as described in Zhang et al 2020 for “causal” effects estimation and model validation, with the unit of analysis being a single gene-trait pair.

This procedure is a continuation of the SuSiE-TWAS workflow — it assumes that xQTL fine-mapping has been performed and moleuclar traits prediction weights pre-computed (to be used for TWAS). Cross validation for TWAS weights is optional but highly recommended.

Quantile TWAS extends traditional TWAS by testing genetic effects at different quantiles of the trait distribution, which provides insights into genetic associations that vary across the distribution rather than just at the mean.

GWAS data required are GWAS summary statistics and LD matrix for the region of interest.

Step 1: TWAS#

  1. Extract GWAS z-score for region of interest and corresponding LD matrix.

  2. (Optional) perform allele matching QC for the LD matrix with summary stats.

  3. Process weights: for a number of methods such as LASSO, Elastic Net and mr.ash we have to take the weights as is for QTL variants overlapping with GWAS variants. For SuSiE weights it can be adjusted to exactly match GWAS variants.

  4. Perofrm TWAS test for multiple sets of weights.

  5. For each gene, filter TWAS results by keeping the best model selected by CV. Drop the genes that don’t show good evidence of TWAS prediction weights.

Step 2: Variant Selection for Imputable Genes via the Best Prediction Methods#

  1. Determine if the gene is imputable at each context based on the twas_cv performance by adjusted \(r^2\) (>=0.01) and p-values (<0.05).

  2. The imputable gene-context pair will go through variant selection step. Maximum 10 variants with top pip selected from either top_loci table or SuSiE CS set.

  3. Harmonize weights against LD reference and udpate SuSiE weight.

  4. Extract weights by best model for the context then by the variant names were selected from the previous step

Step 3: cTWAS analysis#

FIXME: add more documentation here

Step 4: MR for candidate genes#

  1. Limit MR only to those showing some evidence of cTWAS significance AND have strong instrumental variable (fine-mapping PIP or CS).

  2. Use fine-mapped xQTL with GWAS data to perform MR.

  3. For multiple IV, aggregate individual IV estimates using a fixed-effect meta-analysis procedure.

  4. Identify and exclude results with severe violations of the exclusion restriction (ER) assumption.

Step 5: Quantile TWAS analysis#

  • Use pre-computed TWAS weights (beta for now) for quantile-specific testing.

  • For each quantile level, cluster and integrate by fixed and dynamic region groups, and extract relevant GWAS z-scores and LD matrix for the region of interest.

  • Perform quantile region-specific association tests, identifying genetic variants with effects that vary across different quantile regions of the phenotype distribution.

Input#

GWAS Data Input Interface (Similar to susie_rss)#

I. GWAS Summary Statistics Files

  • Input: Vector of files for one or more GWAS studies.

  • Format:

    • Tab-delimited files that is tabix-indexed by the first chrom(or #chrom) and second pos column.

    • First 4 columns: chrom or #chrom, pos, A1, A2

    • Additional columns can be loaded using column mapping file see below

  • Column Mapping files (optional):

    • Optional YAML file for custom column mapping.

    • Required columns: chrom, pos, A1, A2, z or (betahat and sebetahat).

    • Optional columns: n, var_y (relevant to fine-mapping).

II. GWAS Summary Statistics Meta-File: this is optional and helpful when there are lots of GWAS data to process via the same command

  • Columns: study_id, chromosome number, path to summary statistics file, optional path to column mapping file.

  • Note: Chromosome number 0 indicates a genome-wide file.

eg: gwas_meta.tsv

study_id    chrom    file_path                 column_mapping_file
study1      1        gwas1.tsv.gz         column_mapping.yml
study1      2        gwas2.tsv.gz         column_mapping.yml
study2      0        gwas3.tsv.gz         column_mapping.yml

If both summary stats file (I) and meta data file (II) are specified we will take the union of the two.

III. LD Reference Metadata File

  • Format: Single TSV file.

  • Contents:

    • Columns: #chrom, start, end, path to the LD matrix, genomic build.

    • LD matrix path format: comma-separated, first entry is the LD matrix, second is the bim file.

  • Documentation: Refer to our LD reference preparation document for detailed information.

Output of Fine-Mapping & TWAS Pipeline#

xQTL Weight Database Metadata File:

  • Essential columns: #chr, start, end, TSS, region_id, original_data

  • Structure of the weight database:

    • RDS format.

    • Organized hierarchically: region → context → weight matrix.

    • Each column represents a different method.

eg: xqtl_meta.tsv

#chr start end region_id TSS original_data combined_data combined_data_sumstats contexts contexts_top_loci
chr1 0 6480000 ENSG00000008128 1724356 "KNIGHT_pQTL.ENSG00000008128.univariate_susie_twas_weights.rds, MiGA_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds, MSBB_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds, ROSMAP_Bennett_Klein_pQTL.ENSG00000008128.univariate_susie_twas_weights.rds, ROSMAP_DeJager_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds, ROSMAP_Kellis_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds, ROSMAP_mega_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds, STARNET_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds" Fungen_xQTL.ENSG00000008128.cis_results_db.export.rds Fungen_xQTL.ENSG00000008128.cis_results_db.export_sumstats.rds Knight_eQTL_brain,MiGA_GFM_eQTL,MiGA_GTS_eQTL,MiGA_SVZ_eQTL,MiGA_THA_eQTL,BM_10_MSBB_eQTL,BM_22_MSBB_eQTL,BM_36_MSBB_eQTL,BM_44_MSBB_eQTL,monocyte_ROSMAP_eQTL,Mic_DeJager_eQTL,Ast_DeJager_eQTL,Oli_DeJager_eQTL,Exc_DeJager_eQTL,Inh_DeJager_eQTL,DLPFC_DeJager_eQTL,PCC_DeJager_eQTL,AC_DeJager_eQTL,Mic_Kellis_eQTL,Ast_Kellis_eQTL,Oli_Kellis_eQTL,OPC_Kellis_eQTL,Exc_Kellis_eQTL,Inh_Kellis_eQTL,Ast_mega_eQTL,Exc_mega_eQTL,Inh_mega_eQTL,Oli_mega_eQTL,STARNET_eQTL_Mac Knight_eQTL_brain,MiGA_GFM_eQTL,MiGA_GTS_eQTL,MiGA_SVZ_eQTL,MiGA_THA_eQTL,BM_10_MSBB_eQTL,BM_22_MSBB_eQTL,BM_36_MSBB_eQTL,BM_44_MSBB_eQTL,monocyte_ROSMAP_eQTL,Mic_DeJager_eQTL,Ast_DeJager_eQTL,Oli_DeJager_eQTL,Exc_DeJager_eQTL,Inh_DeJager_eQTL,DLPFC_DeJager_eQTL,PCC_DeJager_eQTL,AC_DeJager_eQTL,Mic_Kellis_eQTL,Ast_Kellis_eQTL,Oli_Kellis_eQTL,OPC_Kellis_eQTL,Exc_Kellis_eQTL,Inh_Kellis_eQTL,Ast_mega_eQTL,Exc_mega_eQTL,Inh_mega_eQTL,Oli_mega_eQTL,STARNET_eQTL_Mac

This file is automatically generated as part of the FunGen-xQTL protocol, although only the essential columns are relevant to our application here.

TWAS region information#

This is required for cTWAS analysis, where multiple TWAS and SNP data within each region are combined for joint inference to select the variables, either genes or SNPs, to figure out which variables are likely to be directly associated with the phenotype of interest, rather than being associated through correlations with true causal variables.

chrom    start    end    block_id  
1        1000     5000   block1    
2        2000     6000   block2
3        3000     7000   block3

Output#

I. A table with the following contents

gwas_study, chrom, start, end, block, gene, TSS, context, is_imputable, method, is_selected_method, rsq_adj_cv, pval_cv, twas_z, twas_pval

where

  • if twas_z is NA it means the context is not imputable for the method of choice

II. a list of refined_twas_weights_data per block, in RDS format, of this structure:

$ region_id
   $ gene 
      $ context
        $ selected_model
        $ is_imputable 
        $ selected_top_variants
        $ selected_model_weights

This will only contain imputatable genes. It should come with a meta-data file like this:

chrom    start    end    block_id  refined_twas_db
1        1000     5000   block1    block1.rds
2        2000     6000   block2    block2.rds
3        3000     7000   block3    block3.rds

Example#

sos run xqtl-protocol/code/pecotmr_integration/twas.ipynb twas \
   --cwd /output/ --name test \
   --gwas_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv \
   --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \
   --regions /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/picalm_EUR_LD_blocks.bed \
   --xqtl_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/twas_157_genes_table_2024.tsv \
   --xqtl_type_table /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/data_type_table.txt \
   --rsq_pval_cutoff 0.05 --rsq_cutoff 0.01
sos run xqtl-protocol/code/pecotmr_integration/twas.ipynb ctwas \
   --cwd /output/ --name test \
   --gwas_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv \
   --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \
   --regions /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/picalm_EUR_LD_blocks.bed \
   --xqtl_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/twas_157_genes_table_2024.tsv \
   --twas_weight_cutoff 0 --focal_region_ids "10_80126158_82231647" "11_84267999_86714492"    

Here using --region-name we focus the analysis on 2 blocks: format as chr_start_stop.

sos run xqtl-protocol/code/pecotmr_integration/twas.ipynb quantile_twas \
   --cwd /output/ --name test \
   --gwas_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv \
   --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \
   --region_name chr11_84267999_86714492 chr7_54681006_57314931 \
   --xqtl_meta_data /home/al4225/project/quantile_twas/quantile_twas_analysis/test_data/small_region_gene_meta_data_test.tsv

It is also possible to analyze a selected list of regions using option --regions. The 3 columns(chr, start, stop) of this file will be used for the block list to analyze. Here for example use the same list of regions as we used for LD block:

sos run xqtl-protocol/code/pecotmr_integration/twas.ipynb quantile_twas \
   --cwd /output/ --name test \
   --gwas_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv \
   --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \
   --regions /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/EUR_LD_blocks.bed \
   --xqtl_meta_data /home/al4225/project/quantile_twas/quantile_twas_analysis/test_data/small_region_gene_meta_data_test.tsv
[global]
parameter: cwd = path("output/")
parameter: gwas_meta_data = path()
parameter: xqtl_meta_data = path()
parameter: ld_meta_data = path()
parameter: xqtl_type_table = ''
parameter: gwas_name = []
parameter: gwas_data = []
parameter: column_mapping = []
parameter: regions = path()
# Optional: if a region name is provided 
# the analysis would be focused on the union of provides region list and region names
parameter: region_name = []
parameter: name = f"{xqtl_meta_data:bn}.{gwas_meta_data:bn}"
parameter: container = ''
import re
parameter: entrypoint= ('micromamba run -a "" -n' + ' ' + re.sub(r'(_apptainer:latest|_docker:latest|\.sif)$', '', container.split('/')[-1])) if container else ""
parameter: job_size = 100
parameter: walltime = "5m"
parameter: mem = "8G"
parameter: numThreads = 1

import os
import pandas as pd

def adapt_file_path(file_path, reference_file):
    """
    Adapt a single file path based on its existence and a reference file's path.

    Args:
    - file_path (str): The file path to adapt.
    - reference_file (str): File path to use as a reference for adaptation.

    Returns:
    - str: Adapted file path.

    Raises:
    - FileNotFoundError: If no valid file path is found.
    """
    reference_path = os.path.dirname(reference_file)

    # Check if the file exists
    if os.path.isfile(file_path):
        return file_path

    # Check file name without path
    file_name = os.path.basename(file_path)
    if os.path.isfile(file_name):
        return file_name

    # Check file name in reference file's directory
    file_in_ref_dir = os.path.join(reference_path, file_name)
    if os.path.isfile(file_in_ref_dir):
        return file_in_ref_dir

    # Check original file path prefixed with reference file's directory
    file_prefixed = os.path.join(reference_path, file_path)
    if os.path.isfile(file_prefixed):
        return file_prefixed

    # If all checks fail, raise an error
    raise FileNotFoundError(f"No valid path found for file: {file_path}")

def group_by_region(lst, partition):
    # from itertools import accumulate
    # partition = [len(x) for x in partition]
    # Compute the cumulative sums once
    # cumsum_vector = list(accumulate(partition))
    # Use slicing based on the cumulative sums
    # return [lst[(cumsum_vector[i-1] if i > 0 else 0):cumsum_vector[i]] for i in range(len(partition))]
    return partition
[get_analysis_regions: shared = ["filtered_region_info", "filtered_regional_xqtl_files", "regional_data"]]
from collections import OrderedDict

def check_required_columns(df, required_columns):
    """Check if the required columns are present in the dataframe."""
    missing_columns = [col for col in required_columns if col not in list(df.columns)]
    if missing_columns:
        raise ValueError(f"Missing required columns: {', '.join(missing_columns)}")

def extract_regional_data(gwas_meta_data, xqtl_meta_data, regions, region_name, gwas_name, gwas_data, column_mapping):
    """
    Extracts data from GWAS and xQTL metadata files and additional GWAS data provided. 

    Args:
    - gwas_meta_data (str): File path to the GWAS metadata file.
    - xqtl_meta_data (str): File path to the xQTL weight metadata file.
    - gwas_name (list): vector of GWAS study names.
    - gwas_data (list): vector of GWAS data.
    - column_mapping (list, optional): vector of column mapping files.

    Returns:
    - Tuple of two dictionaries:
        - GWAS Dictionary: Maps study IDs to a list containing chromosome number, 
          GWAS file path, and optional column mapping file path.
        - xQTL Dictionary: Nested dictionary with region IDs as keys.

    Raises:
    - FileNotFoundError: If any specified file path does not exist.
    - ValueError: If required columns are missing in the input files or vector lengths mismatch.
    """
    # Check vector lengths
    if len(gwas_name) != len(gwas_data):
        raise ValueError("gwas_name and gwas_data must be of equal length")
    
    if len(column_mapping)>0 and len(column_mapping) != len(gwas_name):
        raise ValueError("If column_mapping is provided, it must be of the same length as gwas_name and gwas_data")

    # Required columns for each file type
    required_gwas_columns = ['study_id', 'chrom', 'file_path']
    required_xqtl_columns = ['region_id', '#chr', 'start', 'end', "TSS", 'original_data'] #region_id here is gene name
    required_ld_columns = ['chr', 'start', 'stop']
    
    # Reading the GWAS metadata file
    gwas_df = pd.read_csv(gwas_meta_data, sep="\t")
    check_required_columns(gwas_df, required_gwas_columns)
    gwas_dict = OrderedDict()
    
    # Reading LD regions info
    # Initialize empty DataFrame for regions
    regions_df = pd.DataFrame(columns=['chr', 'start', 'stop'])

    # Check if regions file exists and read it
    if os.path.isfile(regions):
        file_regions_df = pd.read_csv(regions, sep="\t", skipinitialspace=True)
        file_regions_df.columns = [col.strip() for col in file_regions_df.columns]  # Strip spaces from column names
        file_regions_df['chr'] = file_regions_df['chr'].str.strip()
        check_required_columns(file_regions_df, required_ld_columns)
        regions_df = pd.concat([regions_df, file_regions_df])
    # Process region_name if provided: 
    # fomat: region_name = ["chr1_16103_2888443", "chr1_4320284_5853833"]
    if len(region_name) > 0:
        # Split region_name entries into chr, start, and stop columns
        extra_regions = [name.split("_") for name in region_name]
        extra_regions_df = pd.DataFrame(extra_regions, columns=['chr', 'start', 'stop'])
        extra_regions_df['start'] = extra_regions_df['start'].astype(int)
        extra_regions_df['stop'] = extra_regions_df['stop'].astype(int)
        # Add extra regions to regions_df
        regions_df = pd.concat([regions_df, extra_regions_df])
    # Remove duplicates and reset index
    regions_df = regions_df.drop_duplicates().reset_index(drop=True)
    regions_dict = OrderedDict()

    # Reading the xQTL weight metadata file
    xqtl_df = pd.read_csv(xqtl_meta_data, sep=" ")
    check_required_columns(xqtl_df, required_xqtl_columns)
    xqtl_dict = OrderedDict()

    # Process additional GWAS data from R vectors
    for name, data, mapping in zip(gwas_name, gwas_data, column_mapping or [None]*len(gwas_name)):
        gwas_dict[name] = {0: [data, mapping]}

    for _, row in gwas_df.iterrows():
        file_path = row['file_path']
        mapping_file = row.get('column_mapping_file')
        
        # Adjust paths if necessary
        file_path = adapt_file_path(file_path, gwas_meta_data)
        if mapping_file:
            mapping_file = adapt_file_path(mapping_file,  gwas_meta_data)

       # Create or update the entry for the study_id
        if row['study_id'] not in gwas_dict:
            gwas_dict[row['study_id']] = {}

        # Expand chrom 0 to chrom 1-22 or use the specified chrom
        chrom_range = range(1, 23) if row['chrom'] == 0 else [row['chrom']]
        for chrom in chrom_range:
            if chrom in gwas_dict[row['study_id']]:
                existing_entry = gwas_dict[row['study_id']][f'chr{chrom}']
                raise ValueError(f"Duplicate chromosome specification for study_id {row['study_id']}, chrom {chrom}. "
                                 f"Conflicting entries: {existing_entry} and {[file_path, mapping_file]}")
            gwas_dict[row['study_id']][f'chr{chrom}'] = [file_path, mapping_file]
            
    for _, row in regions_df.iterrows():
        LD_region_id = f"{row['chr']}_{row['start']}_{row['stop']}"
        overlapping_xqtls = xqtl_df[(xqtl_df['#chr'] == row['chr']) & 
                                     (xqtl_df['TSS'] <= row['stop']) & 
                                     (xqtl_df['TSS'] >= (row['start']))]
        file_paths = []
        mapped_genes = []
        # Collect file paths for xQTLs overlapping this region
        for _, xqtl_row in overlapping_xqtls.iterrows():
            original_data = xqtl_row['original_data']
            file_list = original_data.split(',') if ',' in original_data else [original_data]
            file_paths.extend([adapt_file_path(fp.strip(), xqtl_meta_data) for fp in file_list])
            mapped_genes.extend([xqtl_row['region_id']] * len(file_list))

        # Store metadata and files in the dictionary
        regions_dict[LD_region_id] = {
            "meta_info": [row['chr'], row['start'], row['stop'], LD_region_id, mapped_genes],
            "files": file_paths
        }
        
    for _, row in xqtl_df.iterrows():
        file_paths = [adapt_file_path(fp.strip(), xqtl_meta_data) for fp in row['original_data'].split(',')]  # Splitting and stripping file paths
        xqtl_dict[row['region_id']] = {"meta_info": [row['#chr'], row['start'], row['end'], row['region_id'], row['contexts']],
                                       "files": file_paths}
    return gwas_dict, xqtl_dict, regions_dict


gwas_dict, xqtl_dict, regions_dict = extract_regional_data(gwas_meta_data, xqtl_meta_data,regions,region_name,gwas_name, gwas_data, column_mapping)
regional_data = dict([("GWAS", gwas_dict), ("xQTL", xqtl_dict), ("Regions", regions_dict)])


# get regions data 
region_info = [x["meta_info"] for x in regional_data['Regions'].values()]
regional_xqtl_files = [x["files"] for x in regional_data['Regions'].values()]

# Filter out empty xQTL file paths
filtered_region_info = []
filtered_regional_xqtl_files = []
skipped_regions =[]

for region, files in zip(region_info, regional_xqtl_files):
    if files:
        filtered_region_info.append(region)
        filtered_regional_xqtl_files.append(files)
    else:
        skipped_regions.append(region)
print(f"Skipping {len(skipped_regions)} out of {len(regional_xqtl_files)} regions, no overlapping xQTL weights found. ")
[twas]
depends: sos_variable("filtered_regional_xqtl_files")
parameter: coverage = "cs_coverage_0.95"
# Threashold for rsq and pvalue for imputability determination for a model 
parameter: rsq_cutoff = 0.01
parameter: rsq_pval_cutoff = 0.05
parameter: mr_pval_cutoff = 0.05
parameter: save_ctwas_data = True
parameter: save_mr_result = True
parameter: rsq_option="adj_rsq"
parameter: rsq_pval_option=["adj_rsq_pval", "pval"]

input: filtered_regional_xqtl_files, group_by = lambda x: group_by_region(x, filtered_regional_xqtl_files), group_with = "filtered_region_info"
output_files = [f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.twas.tsv.gz']
if save_ctwas_data:
    output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.twas_data.rds')
if save_mr_result:
    output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.mr_result.tsv.gz')
output: output_files
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output[0]:n}.stdout", stderr = f"{_output[0]:n}.stderr", container = container, entrypoint = entrypoint

    library(dplyr)
    library(data.table)
    library(pecotmr)
    library(readr)

    # get xQTL weight information 
    xqtl_meta_df <- fread("${xqtl_meta_data}", data.table=FALSE) # Get related gene information from the xqtl_meta data table
    xqtl_type_table <- if (isTRUE(file.exists("${xqtl_type_table}"))) fread("${xqtl_type_table}") else NULL
    gwas_studies = c(${paths(regional_data["GWAS"].keys()):r,})
    gwas_files = c(${paths([v[_filtered_region_info[0]] for k, v in regional_data["GWAS"].items()]):r,})
    gene_list <- c(${', '.join([f"'{gene}'" for gene in _filtered_region_info[4]])})

    # Process weights 
    twas_weights_results <- list()
    weight_db_list <- c(${_input:r,})
    names(weight_db_list) <- gene_list
    weight_db_list <- split(weight_db_list, names(weight_db_list), c)
    # remove weight db files that only contain messages: "No SNPs found in the specified region ..."
    weight_db_list_update <- Filter(Negate(is.null), lapply(weight_db_list, function(file_list){do.call(c,lapply(file_list, function(file) if (file.size(file) > 200) file))}))
    if(length(weight_db_list_update) <= 0) stop(paste0("No weight information available from ", paste0(weight_db_list, collapse=","), ". "))
    
    # Step 1: Load TWAS weights and generate twas weight db, output export_twas_weights_db 
    for (gene_db in names(weight_db_list_update)) {
        weight_dbs <- weight_db_list_update[[gene_db]]
        twas_weights_results[[gene_db]] = load_twas_weights(weight_dbs, variable_name_obj="variant_names", 
                                                           susie_obj="susie_weights_intermediate",
                                                           twas_weights_table = "twas_weights")
        if (length(twas_weights_results[[gene_db]]) > 1) {
          twas_weights_results[[gene_db]]$data_type <- setNames(lapply(names(twas_weights_results[[gene_db]]$weights), function(context) xqtl_type_table$type[sapply(xqtl_type_table$context,
                                                                function(x) grepl(x, context))]), names(twas_weights_results[[gene_db]]$weights))      
        } else {
          twas_weights_results[[gene_db]] <- NULL
        }                   
    }

    # Step 2: twas analysis for imputable genes across contexts
    twas_results_db <- twas_pipeline(twas_weights_results, 
                                     "${ld_meta_data}", 
                                     "${gwas_meta_data}",  
                                     region_block="${_filtered_region_info[3]}",
                                     rsq_cutoff = ${rsq_cutoff}, 
                                     rsq_option = "${rsq_option}", 
                                     rsq_pval_cutoff = ${rsq_pval_cutoff}, 
                                     rsq_pval_option=c(${", ".join([f'"{x}"' for x in rsq_pval_option])}), 
                                     mr_pval_cutoff = ${mr_pval_cutoff},
                                     mr_coverage_column = "${coverage}", 
                                     output_twas_data = ${"TRUE" if save_ctwas_data else "FALSE"})
    if(!is.null(twas_results_db)){
        twas_results_db$twas_result <- merge( twas_results_db$twas_result, xqtl_meta_df[, c("region_id","TSS","start","end")], by.x="molecular_id", by.y="region_id")
        fwrite(twas_results_db$twas_result[, c(2,1,14:16,3:13)], file = ${_output[0]:r}, sep = "\t", compress = "gzip")
    } else {
        fwrite(data.frame(), file = ${_output[0]:r}, sep = "\t", compress = "gzip")
    }
    # Step 3: reformat for follow up cTWAS analysis
    if (${"TRUE" if save_ctwas_data else "FALSE"}) {
        saveRDS(twas_results_db$twas_data, "${_output[0]:nnn}.twas_data.rds", compress='xz')
    }
    if (${"TRUE" if save_mr_result else "FALSE"}) {
        if(!is.null(twas_results_db)) {
            fwrite(twas_results_db$mr_result, file = "${_output[0]:nnn}.mr_result.tsv.gz", sep = "\t", compress = "gzip")
        } else {
            fwrite(data.frame(), file = "${_output[0]:nnn}.mr_result.tsv.gz", sep = "\t", compress = "gzip")
        }
    }
[ctwas]
depends: sos_variable("filtered_region_info")
# twas weight cutoff for ctwas variant selection
parameter: twas_weight_cutoff = 0
# cs_min_cor cutoff in susie finemapping result for variant selection
parameter: cs_min_cor = 0
# minimum pip cutoff in susie finemapping result for variant selection
parameter: min_pip_cutoff =0 
# A scalar in (0,1]. The proportion of SNPs, reduce runtime at the cost of accuracy 
parameter: thin = 0.1
# A list of regions to be subset for screening and fine-mapping, for example: "10_80126158_82231647"
parameter: focal_region_ids = []

region_blocks = ', '.join([f"'{region_info[3]}'" for region_info in filtered_region_info])
twas_output_files = [f'{cwd:a}/twas/{name}.{region_info[3]}.twas_data.rds' for region_info in filtered_region_info]
if focal_region_ids:
    focal_region_ids = 'c(' + ', '.join(f'"{focal_region_id}"' for focal_region_id in focal_region_ids) + ')'    
else:
    focal_region_ids = "NULL"  # Default to NULL if no focal_region_ids provided
    
input: twas_output_files, group_by = "all"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
R: expand = '${ }', stdout = f"{_output:n}.stdout", stderr = f"{_output:n}.stderr", container = container, entrypoint = entrypoint

    library(data.table)
    library(ctwas) # multigroup_ctwas
    library(pecotmr)
    
    outputdir = "${cwd:a}/ctwas/"
    if (!dir.exists(file.path(outputdir))) dir.create(file.path(outputdir))

    focal_region_ids <- if (is.null(${focal_region_ids})) names(region_data) else ${focal_region_ids}
    maxSNP = 20000

    # load genome-wide data across all regions
    gwas_studies <- unique(fread("${gwas_meta_data}",data.table=FALSE, select = "study_id"))[,1]
    data <- lapply(c(${_input:r,}), readRDS)
    names(data) <- c(${region_blocks})

    weights <- do.call(c, lapply(names(data), function(region_block){
        trim_ctwas_variants(data[[region_block]], twas_weight_cutoff=${twas_weight_cutoff}, cs_min_cor=${cs_min_cor},
                            min_pip_cutoff=${min_pip_cutoff}, max_num_variants=Inf)
    }))
    
    # compile z_snp, z_gene, and snp_info across multiple regions
    snp_info <- do.call(c, lapply(unname(data), function(x) x$snp_info))
    snp_map <- snp_info[!duplicated(names(snp_info))]
    z_gene_dat <- do.call(c, lapply(unname(data), function(x)x$z_gene))
    z_snp_dat <-  do.call(c, lapply(unname(data), function(x)x$z_snp))
    z_gene <- setNames(lapply(gwas_studies, function(study)  do.call(rbind, z_gene_dat[names(z_gene_dat) == study])), gwas_studies)
    z_gene <- lapply(z_gene, function(x) x[!is.na(x$z) & x$id %in% names(weights),])
    z_snp <- setNames(lapply(gwas_studies, function(study) {
        df <- do.call(rbind, z_snp_dat[names(z_snp_dat) == study])
        df[!duplicated(df$id), ] 
    }), gwas_studies)
    
    regions_data <- get_ctwas_meta_data("${ld_meta_data}", names(snp_map))
    region_info <- regions_data$region_info
    LD_map <- regions_data$LD_info
    rm(data)
  
    for (study in gwas_studies){
      outname = paste0("${name}", "_", study)
  
      # assemble regions
      region_data_file <- file.path(outputdir, paste0(outname, ".ctwas_region_data.thin", ${thin}, ".rds"))
      boundary_gene_file <- file.path(outputdir, paste0(outname, ".ctwas_boundary_genes.rds"))
      if (file.exists(region_data_file)) {
        message("Region assemble data ",region_data_file, " already exists, using pre-computed region data for analysis. " )
        region_data <- readRDS(region_data_file)
        boundary_genes <- readRDS(boundary_gene_file)
      } else{
        res <- assemble_region_data(region_info,
                                    z_snp[[study]],
                                    z_gene[[study]],
                                    weights,
                                    snp_map,
                                    thin = ${thin},
                                    maxSNP = maxSNP,
                                    min_group_size = 1,
                                    ncore = 5)
        region_data <- res$region_data
        boundary_genes <- res$boundary_genes
        saveRDS(region_data, region_data_file, compress='xz')
        saveRDS(boundary_genes, boundary_gene_file, compress='xz')
      }

      # assess global parameters
      group_prior_file <- file.path(outputdir, paste0(outname, ".ctwas_param.rds"))
      if (file.exists(group_prior_file)) {
        message("Global parameter ",group_prior_file, " already exists, using pre-computed global parameter for analysis. " )
        group_prior_data <- readRDS(group_prior_file)
        group_prior <- group_prior_data$group_prior
        group_prior_var <- group_prior_data$group_prior_var
      } else {
        param <- est_param(region_data,
                         init_group_prior = NULL,
                         init_group_prior_var = NULL,
                         group_prior_var_structure = "shared_nonSNP", 
                         niter_prefit = 3,
                         niter = 30,
                         min_var = 2,
                         min_gene = 1,
                         min_group_size = 1,
                         min_p_single_effect = 0.8,
                         ncore = ${numThreads},
                         verbose = TRUE)
       group_prior <- param$group_prior
       group_prior_var <- param$group_prior_var 
       saveRDS(param, group_prior_file, compress='xz')
      }

      # screen regions
      screen_regions_file <- file.path(outputdir, paste0(outname, ".ctwas_screen_res.rds"))
      if (file.exists(screen_regions_file)) {
        message("Region screening result ",screen_regions_file, " already exists, using pre-computed screen region result for analysis. " )
        screen_res <- readRDS(screen_regions_file)
      } else {
        screen_res <- screen_regions(region_data[focal_region_ids],
                                     LD_map = LD_map,
                                     weights = weights,
                                     filter_L = TRUE,
                                     filter_nonSNP_PIP = FALSE,
                                     min_L = 0,
                                     ncore = ncore_LD,
                                     LD_format = "custom",
                                     LD_loader_fun = ctwas_ld_loader,
                                     snpinfo_loader_fun = ctwas_bimfile_loader,
                                     logfile = file.path(outputdir, paste0(outname, ".screen_regions.log")))
        screened_region_data <- screen_res$screened_region_data
        screen_summary <- screen_res$screen_summary
        screened_region_L <- screen_res$screened_region_L
        # Expand screened region_data with all SNPs in the regions
        screened_region_data <- expand_region_data(screened_region_data,
                                                   snp_map,
                                                   z_snp[[study]],
                                                   maxSNP = maxSNP,
                                                   ncore = ${numThreads})
        screen_res$screened_region_data <- screened_region_data
        saveRDS(screen_res, screen_regions_file, compress='xz')
      }


      # finemapping
      finemap_res_file <- file.path(outputdir, paste0(outname, ".ctwas_finemap_res.rds"))
      susie_alpha_file <- file.path(outputdir, paste0(outname, ".ctwas_susie_alpha_res.rds"))
      if (file.exists(finemap_res_file)) {
        finemap_res <- readRDS(finemap_res_file)
        susie_alpha_res <- readRDS(susie_alpha_file)
      } else{
        screened_region_data <- screen_res$screened_region_data
        screened_region_L <- screen_res$screened_region_L
        res <- finemap_regions(screened_region_data,
                              LD_map = LD_map,
                              weights = weights,
                              group_prior = param$group_prior,
                              group_prior_var = param$group_prior_var,
                              L = screened_region_L,
                              ncore = ncore_LD,
                              save_cor = TRUE,
                              cor_dir = file.path(outputdir, "cor_matrix"),
                              LD_format = "custom",
                              LD_loader_fun = ctwas_ld_loader,
                              snpinfo_loader_fun = ctwas_bimfile_loader,
                              verbose = TRUE,
                              logfile = file.path(outputdir, paste0(outname, ".finemap_regions.log")))
        finemap_res <- res$finemap_res
        finemap_res$pval <- z2p(finemap_res$z)
        susie_alpha_res <- res$susie_alpha_res
        saveRDS(finemap_res, finemap_res_file, compress='xz')
        saveRDS(susie_alpha_res, susie_alpha_file, compress='xz')
        
        # LD mismatch diagnosis 
        diag_LD_res_file <- file.path(outputdir, paste0(outname, ".ctwas_diag_LD_res.rds"))
        diag_res <- diagnose_LD_mismatch_susie(region_ids = focal_region_ids, 
                                        z_snp = z_snp[[study]], 
                                        LD_map = LD_map, 
                                        gwas_n = nrow(z_snp[[study]]),
                                        p_diff_thresh = 5e-8,
                                        ncore = ncore_LD,
                                        LD_format = "custom",
                                        LD_loader_fun = ctwas_ld_loader,
                                        snpinfo_loader_fun = ctwas_bimfile_loader)
        problematic_snps <- diag_res$problematic_snps
        flipped_snps <- diag_res$flipped_snps
        condz_stats <- diag_res$condz_stats
        saveRDS(diag_res, diag_LD_res_file, compress='xz')
      }
    }
[quantile_twas]
depends: sos_variable("filtered_regional_xqtl_files")
parameter: save_ctwas_data = True
parameter: save_mr_result = False
input: filtered_regional_xqtl_files, group_by = lambda x: group_by_region(x, filtered_regional_xqtl_files), group_with = "filtered_region_info"
output_files = [f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.quantile_twas.tsv.gz']
if save_ctwas_data:
    output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.quantile_twas_data.rds')
if save_mr_result:
    output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.quantile_mr_result.tsv.gz')
output: output_files
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output[0]:n}.stdout", stderr = f"{_output[0]:n}.stderr", container = container, entrypoint = entrypoint

    library(dplyr)
    library(data.table)
    library(pecotmr)
    library(readr)

    # get xQTL weight information
    xqtl_meta_df <- fread("${xqtl_meta_data}") # Get related gene information from the xqtl_meta data table
    xqtl_type_table <- if (isTRUE(file.exists("${xqtl_type_table}"))) fread("${xqtl_type_table}") else NULL
    gwas_studies = c(${paths(regional_data["GWAS"].keys()):r,})
    gwas_files = c(${paths([v[_filtered_region_info[0]] for k, v in regional_data["GWAS"].items()]):r,})
    gene_list <- c(${', '.join([f"'{gene}'" for gene in _filtered_region_info[4]])})

    # Initialize export_twas_weights_db
    export_twas_weights_db <- list()
    export_twas_weights_db[["${_filtered_region_info[3]}"]] <- list()

    # Initialize twas_weights_results
    twas_weights_results <- list()

    # Create initial weight_db_list
    weight_db_list <- c(${_input:r,})
    names(weight_db_list) <- gene_list

    # Split weight_db_list
    weight_db_list <- split(weight_db_list, names(weight_db_list))

    # Update weight_db_list
    weight_db_list_update <- lapply(weight_db_list, function(file_list){
        do.call(c, lapply(file_list, function(file) if (file.size(file) > 200) file))
    })

    # Check if weight_db_list_update is empty
    if(length(weight_db_list_update) <= 0) {
        stop(paste0("No weight information available from ", paste0(weight_db_list, collapse=","), ". "))
    }

    # Define tau_values
    tau_values <- seq(0.01, 0.99, 0.01)

    # Main processing loop
    for (gene_db in names(weight_db_list_update)) {
        weight_dbs <- weight_db_list_update[[gene_db]]
        
        twas_weights_results[[gene_db]] <- load_quantile_twas_weights(
            weight_db_files = weight_dbs,
            tau_values = tau_values,
            between_cluster = 0.8,
            num_intervals = 3
        )
        
        if (!is.null(twas_weights_results[[gene_db]]) && !is.null(twas_weights_results[[gene_db]]$weights)) {
            twas_weights_results[[gene_db]]$data_type <- setNames(
                lapply(names(twas_weights_results[[gene_db]]$weights), function(context) {
                    xqtl_type_table$type[sapply(xqtl_type_table$context, function(x) grepl(x, context))]
                }),
                names(twas_weights_results[[gene_db]]$weights)
            )
        } else {
            print(paste("Warning: No valid weights found for gene:", gene_db))
        }
    }

    #Step 2: twas analysis for imputable genes across contexts
    twas_results_db <- twas_pipeline(twas_weights_data = twas_weights_results, 
                                    ld_meta_file_path = "${ld_meta_data}", 
                                    gwas_meta_file = "${gwas_meta_data}", 
                                    region_block = "${_filtered_region_info[3]}", 
                                    quantile_twas = TRUE,  
                                    output_twas_data = ${"TRUE" if save_ctwas_data else "FALSE"} )

    # Merging with xQTL meta-data      
    message("Merging twas_result with xqtl_meta_df...")    
    twas_results_db$twas_result <- merge(twas_results_db$twas_result, xqtl_meta_df[, c("region_id","TSS","start","end")], by.x="molecular_id", by.y="region_id")
    twas_results_db$twas_result <- unique(twas_results_db$twas_result)
    fwrite(twas_results_db$twas_result[, c(2, 1, (ncol(twas_results_db$twas_result)-2):ncol(twas_results_db$twas_result), 3:(ncol(twas_results_db$twas_result)-3))], file = ${_output[0]:r}, sep = "\t", compress = "gzip")

    # Step 3: reformat for follow up cTWAS analysis
    if (${"TRUE" if save_ctwas_data else "FALSE"}) {
        saveRDS(twas_results_db$twas_data, "${_output[0]:nnn}.quantile_twas_data.rds", compress='xz')
    }
    if (${"TRUE" if save_mr_result else "FALSE"}) {
        fwrite(twas_results_db$mr_result, file = "${_output[0]:nnn}.quantile_mr_result.tsv.gz", sep = "\t", compress = "gzip")
    }
    message("quantile twas analysis is completed in this block.")