Genotype Data Formatting#
This module implements a collection of workflows used to format genotype data.
Description#
We include steps for the formatting of genotype files. This includes the conversion between VCF and PLINK formats, the splitting of data (by specified input, chromosomes or genes) and the merging of data (by specified input, or by chromosomes).
Input#
Depending on the analysis task, input files are specified in one of the following formats:
A single Whole genome data in VCF format, or in PLINK bim/bed/fam bundle; Or,
A list of VCF or PLINK bed file
A singular column file containing a list of VCF or PLINK bed file
A two column file containing a list of per chromosome VCF or PLINK bed file where the first column is chrom and 2nd column is file name
Output#
Genotype data in PLINK format partitioned by chromosome
Minimal Working Example#
The data and singularity container bioinfo.sif
can be downloaded from Synapse.
ii. VCF to PLINK#
Timing: X min
ROSMAP_sample_list.txt
is a list that includes all ROSMAP samples we need for analysis, in formatting of FID, IID. This file has been uploaded to ftp: /ftp_fgc_xqtl/projects/WGS/ROSMAP
sos run pipeline/genotype_formatting.ipynb vcf_to_plink
--genoFile `ls vcf_qc/*.leftnorm.bcftools_qc.vcf.gz` \
--cwd Genotype/ \
--keep_samples ./ROSMAP_sample_list.txt
--container /mnt/vast/hpc/csg/containers/bioinfo.sif \
-J 22 -q csg -c csg.yml --mem 120G
iii. Merge PLINK Files#
Timing: X min
This step merges all the files and may require anout 300G mem to run, because there are some variants’ ID with 80+ characters. And only plink can do the merge job, plink2 doesn’t support merge.
sos run xqtl-protocol/pipeline/genotype_formatting.ipynb merge_plink \
--genoFile `ls *.leftnorm.bcftools_qc.bed` \
--name ROSMAP_NIA_WGS.leftnorm.bcftools_qc \
--cwd Genotype/ \
--container /mnt/vast/hpc/csg/containers/bioinfo.sif \
-J 5 -q csg -c csg.yml --mem 300G
iv. Partition Genotype Data by Chromosome#
Timing: <1 min
sos run pipeline/genotype_formatting.ipynb genotype_by_chrom \
--genoFile input/protocol_example.genotype.chr21_22.bed \
--cwd output \
--chrom `cut -f 1 input/protocol_example.genotype.chr21_22.bim | uniq | sed "s/chr//g"` \
--container containers/bioinfo.sif
INFO: Running genotype_by_chrom_1:
INFO: genotype_by_chrom_1 (index=1) is completed.
INFO: genotype_by_chrom_1 (index=0) is completed.
INFO: genotype_by_chrom_1 output: /Users/alexmccreight/xqtl-protocol/output/protocol_example.genotype.chr21_22.22.bed /Users/alexmccreight/xqtl-protocol/output/protocol_example.genotype.chr21_22.21.bed in 2 groups
INFO: Running genotype_by_chrom_2:
INFO: genotype_by_chrom_2 is completed (pending nested workflow).
INFO: Running write_data_list:
INFO: write_data_list is completed.
INFO: write_data_list output: /Users/alexmccreight/xqtl-protocol/output/protocol_example.genotype.chr21_22.genotype_by_chrom_files.txt
INFO: genotype_by_chrom_2 output: /Users/alexmccreight/xqtl-protocol/output/protocol_example.genotype.chr21_22.genotype_by_chrom_files.txt
INFO: Workflow genotype_by_chrom (ID=w5be6d78805f1df2e) is executed successfully with 3 completed steps and 4 completed substeps.
Command Interface#
sos run genotype_formatting.ipynb -h
usage: sos run genotype_formatting.ipynb
[workflow_name | -t targets] [options] [workflow_options]
workflow_name: Single or combined workflows defined in this script
targets: One or more targets to generate
options: Single-hyphen sos parameters (see "sos run -h" for details)
workflow_options: Double-hyphen workflow-specific parameters
Workflows:
plink_to_vcf
vcf_to_plink
plink_by_gene
plink_by_chrom
merge_plink
merge_vcf
Global Workflow Options:
--cwd output (as path)
Work directory & output directory
--container ''
The filename name for containers
--job-size 1 (as int)
For cluster jobs, number commands to run per job
--walltime 5h
Wall clock time expected
--mem 3G
Memory expected
--numThreads 20 (as int)
Number of threads
--genoFile paths
the path to a bed file or VCF file, a vector of bed
files or VCF files, or a text file listing the bed files
or VCF files to process
Sections
plink_to_vcf_1:
vcf_to_plink:
plink_by_gene_1:
Workflow Options:
--window 500000 (as int)
cis window size
--region-list VAL (as path, required)
Region definition
plink_by_chrom_1:
Workflow Options:
--chrom VAL VAL ... (as type, required)
plink_by_chrom_2, plink_by_gene_2:
plink_to_vcf_2:
merge_plink:
Workflow Options:
--name VAL (as str, required)
File prefix for the analysis output
merge_vcf:
Workflow Options:
--name VAL (as str, required)
File prefix for the analysis output
[global]
# Work directory & output directory
parameter: cwd = path("output")
# The filename name for containers
parameter: container = ''
import re
parameter: entrypoint= ('micromamba run -a "" -n' + ' ' + re.sub(r'(_apptainer:latest|_docker:latest|\.sif)$', '', container.split('/')[-1])) if container else ""
# For cluster jobs, number commands to run per job
parameter: job_size = 1
# Wall clock time expected
parameter: walltime = "5h"
# Memory expected
parameter: mem = "16G"
# Number of threads
parameter: numThreads = 20
# the path to a bed file or VCF file, a vector of bed files or VCF files, or a text file listing the bed files or VCF files to process
parameter: genoFile = paths
# use this function to edit memory string for PLINK input
from sos.utils import expand_size
cwd = f"{cwd:a}"
import os
def get_genotype_file(geno_file_paths):
#
def valid_geno_file(x):
suffixes = path(x).suffixes
if suffixes[-1] == '.bed':
return True
if len(suffixes)>1 and ''.join(suffixes[-2:]) == ".vcf.gz":
return True
return False
#
def complete_geno_path(x, geno_file):
if not valid_geno_file(x):
raise ValueError(f"Genotype file {x} should be VCF (end with .vcf.gz) or PLINK bed file (end with .bed)")
if not os.path.isfile(x):
# relative path
if not os.path.isfile(f'{geno_file:ad}/' + x):
raise ValueError(f"Cannot find genotype file {x}")
else:
x = f'{geno_file:ad}/' + x
return x
#
def format_chrom(chrom):
if chrom.startswith('chr'):
chrom = chrom[3:]
return chrom
# Inputs are either VCF or bed, or a vector of them
if len(geno_file_paths) > 1:
if all([valid_geno_file(x) for x in geno_file_paths]):
return paths(geno_file_paths)
else:
raise ValueError(f"Invalid input {geno_file_paths}")
# Input is one genotype file or text list of genotype files
geno_file = geno_file_paths[0]
if valid_geno_file(geno_file):
return paths(geno_file)
else:
units = [x.strip().split() for x in open(geno_file).readlines() if x.strip() and not x.strip().startswith('#')]
if all([len(x) == 1 for x in units]):
return paths([complete_geno_path(x[0], geno_file) for x in units])
elif all([len(x) == 2 for x in units]):
genos = dict([(format_chrom(x[0]), path(complete_geno_path(x[1], geno_file))) for x in units])
else:
raise ValueError(f"{geno_file} should contain one column of file names, or two columns of chrom number and corresponding file name")
return genos
genoFile = get_genotype_file(genoFile)
PLINK to VCF#
[plink_to_vcf_1]
if isinstance(genoFile, dict):
genoFile = genoFile.values()
input: genoFile, group_by = 1
output: f'{cwd}/{_input:bn}.vcf.gz'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
bash: expand= "${ }", stderr = f'{_output:nn}.stderr', stdout = f'{_output:nn}.stdout', container = container, entrypoint=entrypoint
plink2 --bfile ${_input:n} \
--recode vcf-iid \
--out ${_output:nn} \
--threads ${numThreads} \
--memory ${int(expand_size(mem) * 0.9)/1e06} --output-chr chrMT
bgzip -l 9 ${_output:n}
tabix -f -p vcf ${_output}
bash: expand= "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout', container = container, entrypoint=entrypoint
for i in ${_output} ; do
# Capture file metadata
output_info="$i"
output_size=$(ls -lh "$i" | awk '{print $5}')
output_rows=$(zcat "$i" | wc -l)
output_column=$(zcat "$i" | grep -v "##" | head -1 | wc -w)
output_header_row=$(zcat "$i" | grep "##" | wc -l)
output_preview=$(zcat "$i" | grep -v "##" | head | cut -f 1-11)
# Write captured information to the stdout file
printf "output_info: %s\noutput_size: %s\noutput_rows: %d\noutput_column: %d\noutput_header_row: %d\noutput_preview:\n%s\n" \
"$output_info" "$output_size" "$output_rows" "$output_column" "$output_header_row" "$output_preview"
done
VCF to PLINK#
Export VCF files to PLINK 1.0 format, without keeping allele orders by default. The resulting PLINK will lose ref/alt allele information but will go by major/minor allele, as conventionally used in standard PLINK format. Notice that PLINK 1.0 format does not allow for dosages. PLINK 2.0 format support it, but it is generally not supported by downstreams data analysis.
In the following code block the option --vcf-half-call m
treat half-call as missing.
[vcf_to_plink]
parameter: remove_duplicates = False
parameter: add_chr = True
# The path to the file that contains the list of samples to remove (format FID, IID)
parameter: remove_samples = path('.')
# The path to the file that contains the list of samples to keep (format FID, IID)
parameter: keep_samples = path('.')
fail_if(not (keep_samples.is_file() or keep_samples == path('.')), msg = f'Cannot find ``{keep_samples}``')
fail_if(not (remove_samples.is_file() or remove_samples == path('.')), msg = f'Cannot find ``{remove_samples}``')
if isinstance(genoFile, dict):
genoFile = genoFile.values()
input: genoFile, group_by = 1
output: f'{cwd}/{_input:bnn}.bed'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
bash: container = container, expand= "${ }", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', entrypoint=entrypoint
plink2 --vcf ${_input} \
--vcf-half-call m \
--vcf-require-gt \
--allow-extra-chr \
${('--keep %s' % keep_samples) if keep_samples.is_file() else ""} \
${('--remove %s' % remove_samples) if remove_samples.is_file() else ""} \
--make-bed --out ${_output:n} ${"--rm-dup exclude-all" if remove_duplicates else "" } \
--threads ${numThreads} \
--memory ${int(expand_size(mem) * 0.9)/1e06} ${"--output-chr chrMT" if add_chr else ""}
bash: expand= "${ }", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container, entrypoint=entrypoint
stdout=${_output:n}.stdout
for i in ${_output} ; do
echo "output_info: $i " >> $stdout;
echo "output_size: $(ls -lh "$i" | awk '{print $5}')" >> $stdout;
done
Split PLINK genotypes into specified regions#
[genotype_by_region_1]
# cis window size
parameter: window = 0
# Region definition
parameter: region_list = path
regions = [x.strip().split() for x in open(region_list).readlines() if x.strip() and not x.strip().startswith('#')]
input: genoFile, for_each = 'regions'
output: f'{cwd}/{region_list:bn}_genotype_by_region/{_input:bn}.{_regions[3]}.bed'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
bash: expand= "${ }", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container, entrypoint=entrypoint
plink2 --bfile ${_input:an} \
--make-bed \
--out ${_output[0]:n} \
--chr ${_regions[0]} \
--from-bp ${f'0' if (int(_regions[1]) - window) < 0 else f'{(int(_regions[1]) - window)}'} \
--to-bp ${int(_regions[2]) + window} \
--allow-no-sex --output-chr chrMT || touch ${_output}
bash: expand= "${ }", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container, entrypoint=entrypoint
for i in ${_output} ; do
echo "output_info: $i "
echo "output_size: $(ls -lh "$i" | awk '{print $5}')"
done
Compute LD matrices for given input region#
PLINK based implementation#
FIXME: Hao, I suggest including all contents for LD matrix storage type benchmarking into this repo, so we justify why we would like to save the data as square 0, float 16 and using npz format. Perhaps we should start a folder called “code/auxillary” to keep notebooks such as these? You can then remove what you have in the brain-xqtl-analysis
repository after you migrate all the relevant contents here.
[ld_by_region_plink_1]
# Region definition
parameter: region_list = path
parameter: float_type = 16
regions = [x.strip().split() for x in open(region_list).readlines() if x.strip() and not x.strip().startswith('#')]
input: genoFile, for_each = 'regions'
output: f'{cwd}/{region_list:bn}_LD/{_input:bn}.{_regions[0]}_{_regions[1]}_{_regions[2]}.float{float_type}.npz'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
bash: expand = "${ }", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container, volumes = [f'{region_list:ad}:{region_list:ad}'], entrypoint=entrypoint
plink --bfile ${_input:an} \
--out ${_output:nn} \
--chr ${_regions[0]} \
--from-bp ${_regions[1]} \
--to-bp ${_regions[2]} \
--r square0 \
--make-just-bim \
--threads ${numThreads}
python: expand= "${ }", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container, entrypoint=entrypoint
import pandas as pd
import numpy as np
np_ld = np.loadtxt("${_output:nn}.ld", delimiter = "\t", dtype = "float${float_type}")
bim = pd.read_csv("${_output:nn}.bim", "\t", header = None)[1].to_numpy()
np.savez_compressed("${_output}", np_ld, bim, allow_pickel = True)
bash: expand= "$[ ]", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container, entrypoint=entrypoint
echo "The npz file is a numpy compressed version of the .ld file described below"
for i in $[_output:nn] ; do
echo "output_info: $i.ld "
echo "output_size:" `ls -lh $i.ld | cut -f 5 -d " "`
echo "output_column:" `head -1 $i | wc -w `
echo "output_row:" `wc -l $i `
done
ldstore2 based implementation#
This is good for larger sample sizes such as data from UK Biobank although we are not facing this challenge in the FunGen-xQTL project.
FIXME: we need to build ldstore2 into a container image. According to Diana it should be
pip3 install https://files.pythonhosted.org/packages/a8/fd/f98ab7dea176f42cb61b80450b795ef19b329e8eb715b87b0d13c2a0854d/ldstore-0.1.9.tar.gz
FIXME: Diana, what’s the input for this workflow?
Create master
file for ldstore2
#
The master file is a semicolon-separated text file and contains no space. It contains the following mandatory column names and one dataset per line:
FIXME: Diana, this documentation is not clearly written. I cannot understand it. What are the mandatory column names? What does it mean by one data-set per line?
For the Z file, the format should be
rsid:chrom:pos:a1:a2
. Formatting for chromosome should be01,02,03
etcList of samples
The LDstore draft is currently availale here with the code to prepare for the genotypic input here. A minimal working example can be found [here]
Split PLINK by Chromosome#
[genotype_by_chrom_1]
stop_if(len(paths(genoFile))>1, msg = "This workflow expects one input genotype file.")
parameter: chrom = list
chrom = list(set(chrom))
input: genoFile, for_each = "chrom"
output: f'{cwd}/{_input:bn}.{_chrom}.bed'
# look up for genotype file
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
bash: expand= "$[ ]", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container, volumes = [f'{genoFile:ad}:{genoFile:ad}'], entrypoint=entrypoint
##### Get the locus genotypes for $[_chrom]
plink2 --bfile $[_input:an] \
--make-bed \
--out $[_output[0]:n] \
--chr $[_chrom] \
--threads $[numThreads] \
--memory $[int(expand_size(mem) * 0.9)/1e06] \
--allow-no-sex
[genotype_by_chrom_2]
input: group_by = "all"
output: f'{_input[0]:nn}.{step_name[:-2]}_files.txt'
sos_run("write_data_list", data_files = _input, out = _output, ext = "bed")
[plink_to_vcf_2]
input: group_by = "all"
output: f'{_input[0]:nnn}.{step_name[:-2]}_files.txt'
sos_run("write_data_list", data_files = _input, out = _output, ext = "vcf.gz")
[genotype_by_region_2]
input: group_by = "all"
output: f'{_input[0]:nn}.{step_name[:-2]}_files.txt'
sos_run("write_data_list", data_files = _input, out = _output, ext = "bed")
[ld_by_region_*_2]
parameter: region_list = path
input: group_by = "all"
output: f'{cwd}/{region_list:bn}_LD/{genoFile:bn}.ld.list'
sos_run("write_data_list", data_files = _input, out = _output, ext = "npy.gz")
[write_data_list]
parameter: out = path
parameter: ext = str
parameter: data_files = paths
input: data_files
output: out
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
python: expand= "${ }", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container, entrypoint=entrypoint
import pandas as pd
import os, sys
# Extracting the id list outside for better readability
n = len("${ext}".split("."))+1
id_list = [x.split(".")[-n] for x in [${_input:r,}]]
files = [${_input:r,}]
# check if some files are empty remove them from the id_list
non_empty_files = []
non_empty_ids = []
for file, id in zip(files, id_list):
try:
# Check if file is empty
if os.path.getsize(file) > 0:
non_empty_files.append(file)
non_empty_ids.append(id)
else:
print(f"Empty file found: {file}", file=sys.stderr)
except OSError as e:
print(f"Error accessing file {file}: {e}", file=sys.stderr)
if not non_empty_files:
raise ValueError("No non-empty files found. Exiting.")
data_tempt = pd.DataFrame({
"#id": non_empty_ids,
"#path": non_empty_files
})
data_tempt.to_csv("${_output}", index=False, sep="\t")
bash: expand= "${ }", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container, entrypoint=entrypoint
i="${_output}"
output_size=$(ls -lh $i | cut -f 5 -d ' ')
output_rows=$(cat $i | wc -l | cut -f 1 -d ' ')
output_column=$(cat $i | head -1 | wc -w)
output_preview=$(cat $i | grep -v "##" | head | cut -f 1,2,3,4,5,6)
printf "output_info: %s\noutput_size: %s\noutput_rows: %s\noutput_column: %s\noutput_preview:\n%s\n" \
"$i" "$output_size" "$output_rows" "$output_column" "$output_preview"
Split VCF by Chromosome#
FIXME: add this as needed
Merge PLINK files#
[merge_plink]
skip_if(len(genoFile) == 1)
# File prefix for the analysis output
parameter: name = str
# The path to the file that contains the list of samples to keep (format FID, IID)
parameter: keep_samples = path('.')
parameter: extra_plink_opts = []
input: genoFile, group_by = 'all'
output: f"{cwd}/{name}{_input[0]:x}"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
bash: container=container, expand= "${ }", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', entrypoint=entrypoint
echo ${_input:n} | tr ' ' '\n' | tail -n +2 > ${_output:n}.merge_list
plink${"2" if str(_input).endswith("pgen") else " --keep-allele-order"} \
--${"p" if str(_input).endswith("pgen") else "b"}file ${_input[0]:n} \
--${"p" if str(_input).endswith("pgen") else ""}merge-list ${_output:n}.merge_list \
--make-${"pgen" if str(_input).endswith("pgen") else "bed"} \
--out ${_output:n} \
--threads ${numThreads} \
--memory ${int(expand_size(mem) * 0.9)/1e06} ${('--keep %s' % keep_samples) if keep_samples.is_file() else ""} ${" ".join([("--%s" % x) for x in extra_plink_opts])}
rm -f ${_output:n}.merge_list
i="${_output}"
output_size=$(ls -lh $i | cut -f 5 -d ' ')
output_rows=$(zcat $i | wc -l | cut -f 1 -d ' ')
output_column=$(zcat $i | head -1 | wc -w)
output_preview=$(cat $i | grep -v "##" | head | cut -f 1,2,3,4,5,6)
printf "output_info: %s\noutput_size: %s\noutput_rows: %s\noutput_column: %s\noutput_preview:\n%s\n" \
"$i" "$output_size" "$output_rows" "$output_column" "$output_preview"
Merge VCF files#
[merge_vcf]
skip_if(len(genoFile) == 1)
# File prefix for the analysis output
parameter: name = str
input: genoFile, group_by = 'all'
output: f"{cwd}/{name}.vcf.gz"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
bash: container=container, expand= "${ }", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', entrypoint=entrypoint
bcftools concat -Oz ${_input} > ${_output}
tabix -p vcf ${_output}
bash: expand= "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout', container = container, entrypoint=entrypoint
for i in ${_output} ; do
# Capture file metadata
output_info="$i"
output_size=$(ls -lh "$i" | awk '{print $5}')
output_rows=$(zcat "$i" | wc -l)
output_column=$(zcat "$i" | grep -v "##" | head -1 | wc -w)
output_header_row=$(zcat "$i" | grep "##" | wc -l)
output_preview=$(zcat "$i" | grep -v "##" | head | cut -f 1-11)
# Write captured information to the stdout file
printf "output_info: %s\noutput_size: %s\noutput_rows: %d\noutput_column: %d\noutput_header_row: %d\noutput_preview:\n%s\n" \
"$output_info" "$output_size" "$output_rows" "$output_column" "$output_header_row" "$output_preview" >> ${_output:n}.stdout
done