Source code for mkado.analysis.polarized

"""Polarized McDonald-Kreitman test implementation."""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING

from mkado.analysis.statistics import (
    alpha,
    dos,
    fishers_exact,
    neutrality_index,
    omega_decomposition,
)
from mkado.core.alignment import PolarizedAlignedPair
from mkado.core.codons import DEFAULT_CODE, GeneticCode
from mkado.core.sequences import SequenceSet

if TYPE_CHECKING:
    pass


[docs] @dataclass class PolarizedMKResult: """Results from a polarized McDonald-Kreitman test. The polarized test uses a second outgroup (``outgroup2``) to assign each fixed difference and each ingroup polymorphism to one of two lineages — the ingroup or the first outgroup — by majority rule on the inferred ancestral state. Sites where the ancestral state cannot be determined are reported under ``*_unpolarized``. Pn and Ps count derived alleles segregating in the ingroup (subject to ``min_frequency`` filtering, identical to the standard MK test) and are split by lineage assignment. """ dn_ingroup: int """Non-synonymous fixed differences assigned to the ingroup lineage.""" ds_ingroup: int """Synonymous fixed differences assigned to the ingroup lineage.""" pn_ingroup: int """Non-synonymous polymorphisms whose derived allele arose on the ingroup lineage.""" ps_ingroup: int """Synonymous polymorphisms whose derived allele arose on the ingroup lineage.""" dn_outgroup: int """Non-synonymous fixed differences assigned to the first outgroup lineage.""" ds_outgroup: int """Synonymous fixed differences assigned to the first outgroup lineage.""" dn_unpolarized: int """Non-synonymous fixed differences where lineage could not be determined.""" ds_unpolarized: int """Synonymous fixed differences where lineage could not be determined.""" pn_unpolarized: int """Non-synonymous polymorphisms where ancestral state could not be determined.""" ps_unpolarized: int """Synonymous polymorphisms where ancestral state could not be determined.""" p_value_ingroup: float """Fisher's exact test p-value on the ingroup-lineage 2x2 table.""" ni_ingroup: float | None """Neutrality Index for the ingroup lineage.""" alpha_ingroup: float | None """Proportion of adaptive substitutions on the ingroup lineage.""" dos_ingroup: float | None """Direction of Selection for the ingroup lineage.""" ln: float | None = None """Total non-synonymous sites (Nei-Gojobori) over analyzed codons.""" ls: float | None = None """Total synonymous sites (Nei-Gojobori) over analyzed codons.""" omega: float | None = None """``(Dn_ingroup/Ds_ingroup) * (Ls/Ln)`` — ingroup-lineage dN/dS. Note: this result class deliberately omits ``omega_a`` / ``omega_na``; per-gene polarized alpha is too noisy to give a useful rate decomposition. See ``docs/omega.rst`` for the rationale. """ def __str__(self) -> str: ni_str = f"{self.ni_ingroup:.4f}" if self.ni_ingroup is not None else "NA" alpha_str = f"{self.alpha_ingroup:.4f}" if self.alpha_ingroup is not None else "NA" dos_str = f"{self.dos_ingroup:.4f}" if self.dos_ingroup is not None else "NA" omega_str = f"{self.omega:.4f}" if self.omega is not None else "NA" ls_str = f"{self.ls:.2f}" if self.ls is not None else "NA" ln_str = f"{self.ln:.2f}" if self.ln is not None else "NA" return ( f"Polarized MK Test Results:\n" f" Ingroup Lineage:\n" f" Divergence: Dn={self.dn_ingroup}, Ds={self.ds_ingroup}\n" f" Polymorphism: Pn={self.pn_ingroup}, Ps={self.ps_ingroup}\n" f" Fisher's p-value: {self.p_value_ingroup:.4g}\n" f" NI: {ni_str}, Alpha: {alpha_str}, DoS: {dos_str}\n" f" omega (dN/dS): {omega_str}\n" f" Sites: Ln={ln_str}, Ls={ls_str}\n" f" Outgroup Lineage:\n" f" Divergence: Dn={self.dn_outgroup}, Ds={self.ds_outgroup}\n" f" Unpolarized:\n" f" Divergence: Dn={self.dn_unpolarized}, Ds={self.ds_unpolarized}\n" f" Polymorphism: Pn={self.pn_unpolarized}, Ps={self.ps_unpolarized}" )
[docs] def to_dict(self) -> dict: """Convert results to a dictionary.""" return { "ingroup": { "dn": self.dn_ingroup, "ds": self.ds_ingroup, "pn": self.pn_ingroup, "ps": self.ps_ingroup, "p_value": self.p_value_ingroup, "ni": self.ni_ingroup, "alpha": self.alpha_ingroup, "dos": self.dos_ingroup, "omega": self.omega, }, "outgroup": { "dn": self.dn_outgroup, "ds": self.ds_outgroup, }, "unpolarized": { "dn": self.dn_unpolarized, "ds": self.ds_unpolarized, "pn": self.pn_unpolarized, "ps": self.ps_unpolarized, }, "ln": self.ln, "ls": self.ls, }
[docs] def polarized_mk_test( ingroup: SequenceSet | str | Path, outgroup1: SequenceSet | str | Path, outgroup2: SequenceSet | str | Path, reading_frame: int = 1, genetic_code: GeneticCode | None = None, pool_polymorphisms: bool = False, min_frequency: float = 0.0, ) -> PolarizedMKResult: """Perform a polarized McDonald-Kreitman test. Uses a second outgroup to determine the ancestral state and assign mutations to specific lineages. Args: ingroup: SequenceSet or path to FASTA file for ingroup sequences outgroup1: SequenceSet or path to FASTA file for first outgroup outgroup2: SequenceSet or path to FASTA file for second outgroup (used for polarization) reading_frame: Reading frame (1, 2, or 3) genetic_code: Genetic code for translation (uses standard if None) pool_polymorphisms: If True, count polymorphisms from both ingroup and outgroup1 (libsequence convention). If False (default), count only ingroup polymorphisms (DnaSP/original MK convention). min_frequency: Minimum derived allele frequency for a polymorphism to be counted (default 0.0, i.e., include all polymorphisms). Useful for filtering out rare variants that may be slightly deleterious. Returns: PolarizedMKResult containing test statistics """ code = genetic_code or DEFAULT_CODE # Load sequences if paths provided if isinstance(ingroup, (str, Path)): ingroup = SequenceSet.from_fasta(ingroup, reading_frame, code) if isinstance(outgroup1, (str, Path)): outgroup1 = SequenceSet.from_fasta(outgroup1, reading_frame, code) if isinstance(outgroup2, (str, Path)): outgroup2 = SequenceSet.from_fasta(outgroup2, reading_frame, code) # Create polarized aligned pair pair = PolarizedAlignedPair( ingroup=ingroup, outgroup=outgroup1, outgroup2=outgroup2, genetic_code=code, ) # Count divergence with polarization dn_in = 0 ds_in = 0 dn_out = 0 ds_out = 0 dn_unpol = 0 ds_unpol = 0 for codon_idx in range(pair.num_codons): if pair.is_fixed_between(codon_idx): polarization = pair.polarize_fixed_difference(codon_idx) if polarization is None: # Cannot polarize - count as unpolarized result = pair.classify_fixed_difference(codon_idx) if result is not None: nonsyn, syn = result dn_unpol += nonsyn ds_unpol += syn else: lineage, (nonsyn, syn) = polarization if lineage == "ingroup": dn_in += nonsyn ds_in += syn else: dn_out += nonsyn ds_out += syn # Count polymorphisms with polarization pn_in = 0 ps_in = 0 pn_unpol = 0 ps_unpol = 0 if pool_polymorphisms: # Pooled mode: count polymorphisms from both populations (libsequence convention) # Note: polarization is not applied in pooled mode as it's unclear which # lineage to attribute shared polymorphisms to poly_sites = pair.polymorphic_sites_pooled() for codon_idx in poly_sites: result = pair.classify_polymorphism_pooled(codon_idx) if result is not None: nonsyn, syn = result pn_in += nonsyn ps_in += syn else: # Standard mode: count only ingroup polymorphisms (DnaSP convention) # Apply polarization to determine if polymorphism arose on ingroup lineage poly_sites = pair.polymorphic_sites_ingroup() for codon_idx in poly_sites: # First, try to polarize the polymorphism result = pair.polarize_ingroup_polymorphism(codon_idx) if result is None: # Could not polarize - count as unpolarized unpol_result = pair.classify_polymorphism(codon_idx) if unpol_result is not None: nonsyn, syn = unpol_result pn_unpol += nonsyn ps_unpol += syn continue # Polymorphism is polarized to ingroup lineage # Apply frequency filter if specified if min_frequency > 0: # Get frequency spectrum from ingroup freqs = ingroup.site_frequency_spectrum(codon_idx) if not freqs: continue # Get outgroup2 codons to determine ancestral state out_codons = outgroup2.codon_set_clean(codon_idx) if not out_codons: continue # Find ancestral codon (shared between ingroup and outgroup2) ingroup_codons = set(freqs.keys()) shared_codons = ingroup_codons & out_codons if not shared_codons: continue # Use the most frequent shared codon as ancestral ancestral = max(shared_codons, key=lambda c: freqs.get(c, 0)) # Calculate derived allele frequency derived_freq = 1.0 - freqs[ancestral] # Skip if below minimum frequency threshold if derived_freq < min_frequency: continue nonsyn, syn = result pn_in += nonsyn ps_in += syn # Calculate statistics for ingroup lineage p_val = fishers_exact(dn_in, ds_in, pn_in, ps_in) ni = neutrality_index(dn_in, ds_in, pn_in, ps_in) a = alpha(dn_in, ds_in, pn_in, ps_in) d = dos(dn_in, ds_in, pn_in, ps_in) ln, ls = pair.count_total_sites() omega, _, _ = omega_decomposition(dn_in, ds_in, ln, ls, a) return PolarizedMKResult( dn_ingroup=dn_in, ds_ingroup=ds_in, pn_ingroup=pn_in, ps_ingroup=ps_in, dn_outgroup=dn_out, ds_outgroup=ds_out, dn_unpolarized=dn_unpol, ds_unpolarized=ds_unpol, pn_unpolarized=pn_unpol, ps_unpolarized=ps_unpol, p_value_ingroup=p_val, ni_ingroup=ni, alpha_ingroup=a, dos_ingroup=d, ln=ln, ls=ls, omega=omega, )