Source code for mkado.core.sequences

"""Sequence and SequenceSet classes for handling aligned sequences."""

from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING

import numpy as np

from mkado.core.codons import DEFAULT_CODE, GeneticCode
from mkado.io.fasta import read_fasta

if TYPE_CHECKING:
    pass


[docs] @dataclass class Sequence: """Represents a single biological sequence.""" name: str sequence: str def __len__(self) -> int: return len(self.sequence) def __getitem__(self, index: int | slice) -> str: return self.sequence[index]
[docs] def get_codon(self, codon_index: int, reading_frame: int = 1) -> str: """Get the codon at a given index. Args: codon_index: Zero-based codon index reading_frame: Reading frame (1, 2, or 3) Returns: Three-letter codon string """ start = (reading_frame - 1) + (codon_index * 3) return self.sequence[start : start + 3]
[docs] def num_codons(self, reading_frame: int = 1) -> int: """Get the number of complete codons in the sequence.""" return (len(self.sequence) - (reading_frame - 1)) // 3
[docs] @dataclass class SequenceSet: """Represents a set of aligned sequences (e.g., from one species).""" sequences: list[Sequence] = field(default_factory=list) reading_frame: int = 1 genetic_code: GeneticCode = field(default_factory=lambda: DEFAULT_CODE) _codon_set_clean_cache: dict[int, set[str]] = field( default_factory=dict, init=False, repr=False, compare=False ) """Per-position memoization for ``codon_set_clean()``. Treat the SequenceSet as immutable once any caching method has been called; mutating ``sequences`` or ``reading_frame`` afterwards will leave stale entries here."""
[docs] @classmethod def from_fasta( cls, path: str | Path, reading_frame: int = 1, genetic_code: GeneticCode | None = None, ) -> SequenceSet: """Load sequences from a FASTA file. Args: path: Path to FASTA file reading_frame: Reading frame (1, 2, or 3) genetic_code: Genetic code for translation Returns: SequenceSet containing all sequences from the file """ sequences = [Sequence(name, seq) for name, seq in read_fasta(path)] return cls( sequences=sequences, reading_frame=reading_frame, genetic_code=genetic_code or DEFAULT_CODE, )
def __len__(self) -> int: return len(self.sequences) def __getitem__(self, index: int) -> Sequence: return self.sequences[index] def __getstate__(self) -> dict: # Drop the per-position cache when pickling — it can be re-populated # cheaply in the receiver and would otherwise inflate pickle size by # O(num_codons) set objects. state = self.__dict__.copy() state["_codon_set_clean_cache"] = {} return state @property def num_codons(self) -> int: """Number of complete codons (based on first sequence).""" if not self.sequences: return 0 return self.sequences[0].num_codons(self.reading_frame) @property def alignment_length(self) -> int: """Length of the alignment (based on first sequence).""" if not self.sequences: return 0 return len(self.sequences[0])
[docs] def codon_set(self, codon_index: int) -> set[str]: """Get the set of unique codons at a position across all sequences. Args: codon_index: Zero-based codon index Returns: Set of unique codon strings """ codons = set() for seq in self.sequences: codon = seq.get_codon(codon_index, self.reading_frame) codons.add(codon) return codons
[docs] def codon_set_clean(self, codon_index: int) -> set[str]: """Get unique codons at a position, excluding those with N or gaps. Result is cached per ``codon_index`` for the lifetime of the ``SequenceSet``. Callers must not mutate the returned set. Args: codon_index: Zero-based codon index Returns: Set of unique valid codon strings """ cached = self._codon_set_clean_cache.get(codon_index) if cached is not None: return cached codons = self.codon_set(codon_index) clean = {c for c in codons if "N" not in c and "-" not in c} self._codon_set_clean_cache[codon_index] = clean return clean
[docs] def amino_set(self, codon_index: int) -> set[str]: """Get the set of unique amino acids at a codon position. Args: codon_index: Zero-based codon index Returns: Set of unique amino acid characters """ codons = self.codon_set(codon_index) return {self.genetic_code.translate(c) for c in codons}
[docs] def amino_set_clean(self, codon_index: int) -> set[str]: """Get unique amino acids at a position, excluding ambiguous codons. Args: codon_index: Zero-based codon index Returns: Set of unique amino acid characters """ codons = self.codon_set_clean(codon_index) return {self.genetic_code.translate(c) for c in codons}
[docs] def is_polymorphic(self, codon_index: int) -> bool: """Check if a codon position is polymorphic (has >1 unique codon). Args: codon_index: Zero-based codon index Returns: True if polymorphic """ codons = self.codon_set_clean(codon_index) return len(codons) > 1
[docs] def site_set(self, site_index: int) -> set[str]: """Get the set of unique nucleotides at a site across all sequences. Args: site_index: Zero-based nucleotide position Returns: Set of unique nucleotide characters """ return {seq.sequence[site_index] for seq in self.sequences if site_index < len(seq)}
[docs] def site_set_clean(self, site_index: int) -> set[str]: """Get unique nucleotides at a site, excluding N and gaps. Args: site_index: Zero-based nucleotide position Returns: Set of unique valid nucleotide characters """ sites = self.site_set(site_index) return {s for s in sites if s not in ("N", "-")}
[docs] def codon_array(self) -> np.ndarray: """Get a 2D array of codons (n_sequences x n_codons). Returns: NumPy array of codon strings """ n_seqs = len(self.sequences) n_codons = self.num_codons arr = np.empty((n_seqs, n_codons), dtype="U3") for i, seq in enumerate(self.sequences): for j in range(n_codons): arr[i, j] = seq.get_codon(j, self.reading_frame) return arr
[docs] def polymorphic_codons(self) -> list[int]: """Get indices of polymorphic codon positions. Returns: List of codon indices that are polymorphic """ return [i for i in range(self.num_codons) if self.is_polymorphic(i)]
[docs] def site_frequency_spectrum(self, codon_index: int) -> dict[str, float]: """Calculate allele frequencies at a codon position. Args: codon_index: Zero-based codon index Returns: Dict mapping codon strings to their frequencies (0-1) """ counts: dict[str, int] = {} total = 0 for seq in self.sequences: codon = seq.get_codon(codon_index, self.reading_frame) if "N" not in codon and "-" not in codon: counts[codon] = counts.get(codon, 0) + 1 total += 1 if total == 0: return {} return {codon: count / total for codon, count in counts.items()}
[docs] def derived_allele_frequency(self, codon_index: int, ancestral_codon: str) -> float | None: """Calculate the derived allele frequency at a codon position. Args: codon_index: Zero-based codon index ancestral_codon: The ancestral (outgroup) codon Returns: Frequency of derived alleles (0-1), or None if not computable """ freqs = self.site_frequency_spectrum(codon_index) if not freqs: return None ancestral_codon = ancestral_codon.upper() if ancestral_codon not in freqs: # Ancestral allele not present - all are derived return 1.0 return 1.0 - freqs[ancestral_codon]
[docs] def filter_by_name(self, pattern: str) -> SequenceSet: """Filter sequences by name pattern (substring match). Args: pattern: Substring to match in sequence names Returns: New SequenceSet containing only matching sequences """ filtered = [seq for seq in self.sequences if pattern in seq.name] # The new SequenceSet starts with an empty cache: the parent's cached # codon sets contain codons from sequences that aren't in the filtered # subset, so reusing them would yield wrong results. return SequenceSet( sequences=filtered, reading_frame=self.reading_frame, genetic_code=self.genetic_code, )