Source code for mkado.analysis.imputed

"""Imputed McDonald-Kreitman test implementation.

Based on Murga-Moreno et al. (2022) G3: Genes|Genomes|Genetics.
https://doi.org/10.1093/g3journal/jkac206

The impMKT corrects for slightly deleterious mutations by imputing the number
of weakly deleterious nonsynonymous polymorphisms segregating at low frequency,
rather than discarding all low-frequency variants. This retains more data and
increases power to detect positive selection at the gene level.
"""

from __future__ import annotations

import logging
from dataclasses import dataclass

import numpy as np

from mkado.analysis.asymptotic import PolymorphismData, sum_site_totals
from mkado.analysis.statistics import fishers_exact, omega_decomposition

logger = logging.getLogger(__name__)


[docs] @dataclass class ImputedMKResult: """Results from an imputed McDonald-Kreitman test. The imputed test partitions ``Pn`` into a weakly-deleterious fraction (``Pwd``) — imputed from the synonymous SFS scaled by the observed Pn/Ps ratio at low derived allele frequencies — and a neutral fraction (``Pn_neutral = Pn_total - Pwd``). ``alpha`` is computed against ``Pn_neutral`` rather than the raw ``Pn`` count, removing the downward bias caused by slightly deleterious nonsynonymous variants. ``Dn`` and ``Ds`` follow the standard MK definitions. ``Pn_total`` and ``Ps_total`` are the unfiltered ingroup polymorphism counts (the imputed test uses the entire SFS, including singletons). """ alpha: float | None """Proportion of adaptive substitutions, computed against ``Pn_neutral``.""" p_value: float """Fisher's exact test p-value on the imputed (Dn, Ds, Pn_neutral, Ps_total) table.""" pn_neutral: float """Estimated neutral nonsynonymous polymorphism count (``Pn_total - Pwd``).""" pwd: float """Imputed count of weakly-deleterious nonsynonymous polymorphisms.""" dn: int """Non-synonymous fixed differences.""" ds: int """Synonymous fixed differences.""" pn_total: int """Total non-synonymous ingroup polymorphisms (no frequency filter applied).""" ps_total: int """Total synonymous ingroup polymorphisms (no frequency filter applied).""" d: float | None = None """Estimated DFE deleterious fraction (Murga-Moreno et al. 2022, eq. 3).""" b: float | None = None """Estimated DFE weakly-deleterious fraction.""" f: float | None = None """Estimated DFE neutral fraction.""" cutoff: float = 0.15 """Derived allele frequency below which polymorphisms are treated as the imputation pool.""" ln: float | None = None """Total non-synonymous sites (Nei-Gojobori) over analyzed codons.""" ls: float | None = None """Total synonymous sites (Nei-Gojobori) over analyzed codons.""" omega: float | None = None """``(Dn/Ds) * (Ls/Ln)`` — dN/dS ratio.""" omega_a: float | None = None """Adaptive rate ``alpha * omega`` with imputed alpha (Gossmann, Keightley & Eyre-Walker 2012).""" omega_na: float | None = None """Non-adaptive component ``(1 - alpha) * omega``.""" alpha_ci_low: float | None = None """Lower 95% bootstrap CI for ``alpha``. ``None`` when ``n_bootstrap=0``.""" alpha_ci_high: float | None = None """Upper 95% bootstrap CI for ``alpha``.""" ci_method: str | None = None """``"bootstrap"`` when CI was computed, ``None`` when ``n_bootstrap=0``.""" # omega_a/omega_na CIs are pure arithmetic transforms of the alpha CI scaled # by the (constant) point-estimate omega; mirrors AsymptoticMKResult. @property def omega_a_ci_low(self) -> float | None: """Lower 95% CI for ``omega_a`` = ``alpha_ci_low * omega``.""" if self.omega is None or self.alpha_ci_low is None: return None return self.alpha_ci_low * self.omega @property def omega_a_ci_high(self) -> float | None: """Upper 95% CI for ``omega_a`` = ``alpha_ci_high * omega``.""" if self.omega is None or self.alpha_ci_high is None: return None return self.alpha_ci_high * self.omega @property def omega_na_ci_low(self) -> float | None: """Lower 95% CI for ``omega_na`` = ``(1 - alpha_ci_high) * omega`` (flipped).""" if self.omega is None or self.alpha_ci_high is None: return None return (1.0 - self.alpha_ci_high) * self.omega @property def omega_na_ci_high(self) -> float | None: """Upper 95% CI for ``omega_na`` = ``(1 - alpha_ci_low) * omega``.""" if self.omega is None or self.alpha_ci_low is None: return None return (1.0 - self.alpha_ci_low) * self.omega def __str__(self) -> str: alpha_str = f"{self.alpha:.4f}" if self.alpha is not None else "NA" lines = [ "Imputed MK Test Results:", f" Divergence: Dn={self.dn}, Ds={self.ds}", f" Polymorphism: Pn={self.pn_total}, Ps={self.ps_total}", f" DAF cutoff: {self.cutoff}", f" Imputed Pwd: {self.pwd:.2f}", f" Pn (neutral): {self.pn_neutral:.2f}", f" Alpha (α): {alpha_str}", f" p-value: {self.p_value:.4g}", ] if self.alpha_ci_low is not None and self.alpha_ci_high is not None: lines.append( f" Alpha 95% CI [{self.ci_method}]: " f"({self.alpha_ci_low:.4f}, {self.alpha_ci_high:.4f})" ) if self.ln is not None and self.ls is not None: lines.append(f" Sites: Ln={self.ln:.2f}, Ls={self.ls:.2f}") if self.omega is not None: omega_a_str = f"{self.omega_a:.4f}" if self.omega_a is not None else "NA" omega_na_str = f"{self.omega_na:.4f}" if self.omega_na is not None else "NA" lines.append( f" omega: {self.omega:.4f} " f"(omega_a={omega_a_str}, omega_na={omega_na_str})" ) if self.omega_a_ci_low is not None and self.omega_a_ci_high is not None: lines.append( f" omega_a 95% CI: ({self.omega_a_ci_low:.4f}, {self.omega_a_ci_high:.4f})" ) if self.omega_na_ci_low is not None and self.omega_na_ci_high is not None: lines.append( f" omega_na 95% CI: ({self.omega_na_ci_low:.4f}, " f"{self.omega_na_ci_high:.4f})" ) if self.d is not None: lines.append(f" DFE fractions: d={self.d:.4f}, b={self.b:.4f}, f={self.f:.4f}") return "\n".join(lines)
[docs] def to_dict(self) -> dict: """Convert results to a dictionary.""" result = { "alpha": self.alpha, "p_value": self.p_value, "pn_neutral": self.pn_neutral, "pwd": self.pwd, "dn": self.dn, "ds": self.ds, "pn_total": self.pn_total, "ps_total": self.ps_total, "cutoff": self.cutoff, "ln": self.ln, "ls": self.ls, "omega": self.omega, "omega_a": self.omega_a, "omega_na": self.omega_na, "alpha_ci_low": self.alpha_ci_low, "alpha_ci_high": self.alpha_ci_high, "ci_method": self.ci_method, "omega_a_ci_low": self.omega_a_ci_low, "omega_a_ci_high": self.omega_a_ci_high, "omega_na_ci_low": self.omega_na_ci_low, "omega_na_ci_high": self.omega_na_ci_high, } if self.d is not None: result["d"] = self.d result["b"] = self.b result["f"] = self.f return result
def _compute_imputed( polymorphisms: list[tuple[float, str]], dn: int, ds: int, cutoff: float, num_synonymous_sites: float | None, num_nonsynonymous_sites: float | None, ) -> ImputedMKResult: """Core imputed MK algorithm on pooled data.""" # Split polymorphisms at cutoff pn_low = sum(1 for freq, t in polymorphisms if t == "N" and freq <= cutoff) pn_high = sum(1 for freq, t in polymorphisms if t == "N" and freq > cutoff) ps_low = sum(1 for freq, t in polymorphisms if t == "S" and freq <= cutoff) ps_high = sum(1 for freq, t in polymorphisms if t == "S" and freq > cutoff) pn_total = pn_low + pn_high ps_total = ps_low + ps_high # Compute ratio of low to high frequency synonymous polymorphisms if ps_high == 0: ratio_ps = 0.0 else: ratio_ps = ps_low / ps_high # Impute weakly deleterious nonsynonymous polymorphisms pwd = pn_low - pn_high * ratio_ps pwd = max(pwd, 0.0) # Neutral nonsynonymous polymorphisms pn_neutral = pn_total - pwd # Corrected alpha if dn == 0 or ps_total == 0: alpha_val = None else: alpha_val = 1.0 - (pn_neutral / ps_total) * (ds / dn) # Fisher's exact test on corrected table pn_neutral_int = round(pn_neutral) p_value = fishers_exact(dn, ds, pn_neutral_int, ps_total) # DFE fractions d_frac = None b_frac = None f_frac = None if num_synonymous_sites is not None and num_nonsynonymous_sites is not None: m0 = num_synonymous_sites mi = num_nonsynonymous_sites if mi > 0 and ps_total > 0: b_frac = (pwd / ps_total) * (m0 / mi) f_frac = (m0 * pn_neutral) / (mi * ps_total) d_frac = 1.0 - f_frac - b_frac # ``num_*_sites`` are aliases for Ls (m0) and Ln (mi) — the DFE inputs # double as Nei-Gojobori site totals for the omega decomposition. omega, omega_a, omega_na = omega_decomposition( dn, ds, num_nonsynonymous_sites, num_synonymous_sites, alpha_val ) return ImputedMKResult( alpha=alpha_val, p_value=p_value, pn_neutral=pn_neutral, pwd=pwd, dn=dn, ds=ds, pn_total=pn_total, ps_total=ps_total, d=d_frac, b=b_frac, f=f_frac, cutoff=cutoff, ln=num_nonsynonymous_sites, ls=num_synonymous_sites, omega=omega, omega_a=omega_a, omega_na=omega_na, ) def _bootstrap_imputed_alpha( polymorphisms: list[tuple[float, str]], dn: int, ds: int, cutoff: float, num_synonymous_sites: float | None, num_nonsynonymous_sites: float | None, n_replicates: int, seed: int, ) -> tuple[float, float] | None: """Bootstrap CI on imputed alpha by case-resampling the polymorphism list. Vectorized: pre-extract ``freqs`` / ``is_n`` / ``is_low`` numpy arrays once, then per replicate index with ``rng.integers`` and count via ``np.count_nonzero``. The CI only needs alpha, so we inline the imputed formula and skip Fisher's-exact, DFE fractions, and omega — all of which ``_compute_imputed`` recomputes on every call but none of which feed the percentile. Returns ``None`` if fewer than half the replicates yield a defined alpha. """ n = len(polymorphisms) if n == 0 or n_replicates <= 0: return None if dn == 0: # alpha is undefined for every replicate; skip the loop entirely. return None freqs = np.fromiter((f for f, _ in polymorphisms), dtype=float, count=n) is_n = np.fromiter((t == "N" for _, t in polymorphisms), dtype=bool, count=n) is_low = freqs <= cutoff rng = np.random.default_rng(seed) bootstrap_alphas: list[float] = [] for _ in range(n_replicates): sample = rng.integers(0, n, size=n) boot_n = is_n[sample] boot_low = is_low[sample] pn_low = int(np.count_nonzero(boot_n & boot_low)) pn_high = int(np.count_nonzero(boot_n & ~boot_low)) ps_low = int(np.count_nonzero(~boot_n & boot_low)) ps_high = int(np.count_nonzero(~boot_n & ~boot_low)) ps_total = ps_low + ps_high if ps_total == 0: continue ratio_ps = ps_low / ps_high if ps_high else 0.0 pwd = max(pn_low - pn_high * ratio_ps, 0.0) pn_neutral = (pn_low + pn_high) - pwd bootstrap_alphas.append(1.0 - (pn_neutral / ps_total) * (ds / dn)) if len(bootstrap_alphas) < n_replicates // 2: logger.debug( "Imputed bootstrap CI: only %d/%d replicates yielded defined alpha; skipping CI", len(bootstrap_alphas), n_replicates, ) return None return ( float(np.percentile(bootstrap_alphas, 2.5)), float(np.percentile(bootstrap_alphas, 97.5)), )
[docs] def imputed_mk_test( gene_data: PolymorphismData, cutoff: float = 0.15, num_synonymous_sites: float | None = None, num_nonsynonymous_sites: float | None = None, n_bootstrap: int = 0, seed: int = 42, ) -> ImputedMKResult: """Perform the imputed McDonald-Kreitman test on a single gene. The impMKT corrects for slightly deleterious mutations by imputing the count of weakly deleterious nonsynonymous polymorphisms from the ratio of low- to high-frequency synonymous polymorphisms. Based on Murga-Moreno et al. (2022) G3. https://doi.org/10.1093/g3journal/jkac206 Args: gene_data: Polymorphism and divergence data from extract_polymorphism_data() cutoff: DAF cutoff separating low and high frequency (default 0.15) num_synonymous_sites: Number of synonymous sites (m0) for DFE fractions num_nonsynonymous_sites: Number of nonsynonymous sites (mi) for DFE fractions n_bootstrap: Number of bootstrap replicates for the alpha CI. ``0`` (default) disables CI computation and preserves byte-identical legacy output. seed: Random seed for the bootstrap (default 42). Returns: ImputedMKResult with corrected alpha and optionally DFE fractions """ ls = num_synonymous_sites if num_synonymous_sites is not None else gene_data.ls ln = num_nonsynonymous_sites if num_nonsynonymous_sites is not None else gene_data.ln result = _compute_imputed( gene_data.polymorphisms, gene_data.dn, gene_data.ds, cutoff, ls, ln, ) if n_bootstrap > 0: ci = _bootstrap_imputed_alpha( gene_data.polymorphisms, gene_data.dn, gene_data.ds, cutoff, ls, ln, n_bootstrap, seed, ) if ci is not None: result.alpha_ci_low, result.alpha_ci_high = ci result.ci_method = "bootstrap" return result
[docs] def imputed_mk_test_multi( gene_data: list[PolymorphismData], cutoff: float = 0.15, num_synonymous_sites: float | None = None, num_nonsynonymous_sites: float | None = None, n_bootstrap: int = 0, seed: int = 42, ) -> ImputedMKResult: """Perform the imputed MK test on pooled data from multiple genes. Pools all polymorphisms and divergence counts across genes, then runs the imputed MK algorithm on the combined data. Args: gene_data: List of PolymorphismData from individual genes cutoff: DAF cutoff separating low and high frequency (default 0.15) num_synonymous_sites: Number of synonymous sites (m0) for DFE fractions num_nonsynonymous_sites: Number of nonsynonymous sites (mi) for DFE fractions n_bootstrap: Number of bootstrap replicates for the alpha CI. ``0`` (default) disables CI computation and preserves byte-identical legacy output. seed: Random seed for the bootstrap (default 42). Returns: ImputedMKResult with corrected alpha from pooled data """ all_polymorphisms: list[tuple[float, str]] = [] dn_total = 0 ds_total = 0 for g in gene_data: all_polymorphisms.extend(g.polymorphisms) dn_total += g.dn ds_total += g.ds ln_agg, ls_agg = sum_site_totals(gene_data) ls = num_synonymous_sites if num_synonymous_sites is not None else ls_agg ln = num_nonsynonymous_sites if num_nonsynonymous_sites is not None else ln_agg result = _compute_imputed( all_polymorphisms, dn_total, ds_total, cutoff, ls, ln, ) if n_bootstrap > 0: ci = _bootstrap_imputed_alpha( all_polymorphisms, dn_total, ds_total, cutoff, ls, ln, n_bootstrap, seed, ) if ci is not None: result.alpha_ci_low, result.alpha_ci_high = ci result.ci_method = "bootstrap" return result