Source code for pysinger.io.vcf_reader

"""
VCF reader — load phased or haploid VCF into Node objects.

Mirrors Sampler::naive_read_vcf / naive_read_vcf_haploid in Sampler.cpp.

Returns a list of Node objects with mutation sites set, one node per
haplotype (2 per individual for diploid, 1 per individual for haploid).
"""
from __future__ import annotations

from typing import List, Tuple

from ..data.node import Node


[docs] def read_vcf_phased( vcf_file: str, start_pos: float = 0.0, end_pos: float = float("inf"), ) -> Tuple[List[Node], float]: """Read a phased VCF and return (nodes, sequence_length). One Node per haplotype (2 × n_individuals). Mutations are stored as positions relative to *start_pos*. Mirrors Sampler::naive_read_vcf. Parameters ---------- vcf_file: Path to the .vcf file. start_pos, end_pos: Genomic region to load (half-open [start_pos, end_pos)). Returns ------- nodes : List[Node] Haplotype nodes (index 0 .. 2n-1). sequence_length : float end_pos - start_pos. """ nodes: List[Node] = [] prev_pos = -1 contig_length: float = -1.0 with open(vcf_file) as fh: lines = fh.readlines() for line in lines: if line.startswith("##contig"): # parse ##contig=<ID=...,length=NNN> import re m = re.search(r"length=(\d+)", line) if m: contig_length = float(m.group(1)) continue if line.startswith("#CHROM"): fields = line.split() n_ind = len(fields) - 9 nodes = [] for i in range(2 * n_ind): n = Node(time=0.0, index=i) nodes.append(n) continue if line.startswith("#"): continue parts = line.split() if len(parts) < 9: continue chrom, pos_s, id_, ref, alt = parts[0], parts[1], parts[2], parts[3], parts[4] pos = float(pos_s) if pos < start_pos: continue if pos > end_pos: break if pos == prev_pos: continue # skip duplicate positions (multi-allelic) if len(ref) > 1 or len(alt) > 1: prev_pos = pos continue # skip indels / structural variants genotypes_raw = parts[9:] if not nodes: n_ind = len(genotypes_raw) nodes = [] for i in range(2 * n_ind): n = Node(time=0.0, index=i) nodes.append(n) gt_vals = [] for g in genotypes_raw: a0 = 1 if g[0] == "1" else 0 a1 = 1 if (len(g) > 2 and g[2] == "1") else 0 gt_vals.append(a0) gt_vals.append(a1) gt_sum = sum(gt_vals) if 1 <= gt_sum < len(gt_vals): rel_pos = pos - start_pos for i, v in enumerate(gt_vals): if v == 1: nodes[i].add_mutation(rel_pos) prev_pos = pos if end_pos < float("inf"): sequence_length = end_pos - start_pos elif contig_length >= 0: sequence_length = contig_length - start_pos else: sequence_length = prev_pos - start_pos return nodes, sequence_length
[docs] def read_vcf_haploid( vcf_file: str, start_pos: float = 0.0, end_pos: float = float("inf"), ) -> Tuple[List[Node], float]: """Read a haploid VCF and return (nodes, sequence_length). One Node per individual (column after FORMAT). Mirrors Sampler::naive_read_vcf_haploid. """ nodes: List[Node] = [] prev_pos = -1 contig_length: float = -1.0 with open(vcf_file) as fh: lines = fh.readlines() for line in lines: if line.startswith("##contig"): import re m = re.search(r"length=(\d+)", line) if m: contig_length = float(m.group(1)) continue if line.startswith("#CHROM"): fields = line.split() n_ind = len(fields) - 9 nodes = [Node(time=0.0, index=i) for i in range(n_ind)] continue if line.startswith("#"): continue parts = line.split() if len(parts) < 9: continue pos = float(parts[1]) ref = parts[3] alt = parts[4] if pos < start_pos: continue if pos > end_pos: break if pos == prev_pos: continue if len(ref) > 1 or len(alt) > 1: prev_pos = pos continue genotypes_raw = parts[9:] if not nodes: nodes = [Node(time=0.0, index=i) for i in range(len(genotypes_raw))] gt_vals = [1 if g[0] == "1" else 0 for g in genotypes_raw] gt_sum = sum(gt_vals) if 1 <= gt_sum < len(gt_vals): rel_pos = pos - start_pos for i, v in enumerate(gt_vals): if v == 1: nodes[i].add_mutation(rel_pos) prev_pos = pos if end_pos < float("inf"): sequence_length = end_pos - start_pos elif contig_length >= 0: sequence_length = contig_length - start_pos else: sequence_length = prev_pos - start_pos return nodes, sequence_length