#! /usr/bin/env python3 # -*- coding: utf-8 -*- """ Extract Single Copy OG sequences from a comprehensive FASTA file based on OG definitions from results of orthofinder. """ import sys from Bio import SeqIO from Bio.SeqRecord import SeqRecord from Bio.Seq import Seq from pathlib import Path import argparse def get_og_fastas(og_dir: Path, ext: str) -> dict[str, list[str]]: """ Get a dictionary of OGs and their corresponding sequence IDs from FASTA files in a directory. Args: og_dir (Path): Path to the directory containing OG FASTA files. ext (str): Extension of the FASTA files. Default is ".fa". Returns: dict: Dictionary with OG names as keys and lists of sequence IDs as values. """ og_dict: dict[str, list[str]] = {} try: for fasta_file in og_dir.glob(f"*{ext}"): og_name = fasta_file.stem seq_ids: list[str] = [] for record in SeqIO.parse(fasta_file, "fasta"): seq_ids.append(record.id) og_dict[og_name] = seq_ids except Exception as e: print(f"Error processing OG FASTA files in {og_dir}: {e}") sys.exit(1) return og_dict def parse_all_fasta(fasta_file: Path) -> dict[str, Seq]: """ Parse a FASTA file and return a dictionary of sequence IDs and their corresponding Seq objects. Args: fasta_file (Path): Path to the FASTA file. Returns: dict: Dictionary with sequence IDs as keys and Seq objects as values. """ seq_dict: dict[str, Seq] = {} try: for record in SeqIO.parse(fasta_file, "fasta"): seq_dict[record.id] = record.seq except Exception as e: print(f"Error parsing FASTA file {fasta_file}: {e}") sys.exit(1) return seq_dict def output_og_seqs( all_seq_dict: dict[str, Seq], og_dict: dict[str, list[str]], output_dir: Path ): """ Output sequences for each OG into separate FASTA files, remove gene id and only keep taxon name. Args: all_seq_dict (dict): Dictionary of all sequences with sequence IDs as keys. og_dict (dict): Dictionary of OGs with OG names as keys and lists of sequence IDs as values. output_dir (Path): Path to the output directory. """ for og_name, seq_ids in og_dict.items(): og_seqs: list[SeqRecord] = [] for seq_id in seq_ids: if seq_id in all_seq_dict: seq_record = SeqRecord( all_seq_dict[seq_id], id=seq_id.split("@")[0], description="" ) og_seqs.append(seq_record) else: print(f"Warning: Sequence ID {seq_id} not found in all sequences.") output_fasta = output_dir / f"{og_name}.fa" try: SeqIO.write(og_seqs, output_fasta, "fasta") except Exception as e: print(f"Error writing to FASTA file {output_fasta}: {e}") sys.exit(1) def write_og_list(og_dict: dict[str, list[str]]): """ Write the OG list to a text file. Args: og_dict (dict): Dictionary of OGs with OG names as keys and lists of sequence IDs as values. """ try: with open("og_list.tsv", "w") as f: for og_name, seq_ids in og_dict.items(): line = f"{og_name}\t" + "\t".join(seq_ids) + "\n" f.write(line) print("OG list written to og_list.tsv") except Exception as e: print(f"Error writing OG list to file: {e}") sys.exit(1) def main(og_dir_path: str, all_fasta_path: str, output_dir_path: str, ext: str = ".fa"): og_dir = Path(og_dir_path) all_fasta = Path(all_fasta_path) output_dir = Path(output_dir_path) output_dir.mkdir(parents=True, exist_ok=True) og_dict = get_og_fastas(og_dir, ext) all_seqs = parse_all_fasta(all_fasta) output_og_seqs(all_seqs, og_dict, output_dir) write_og_list(og_dict) if __name__ == "__main__": parser = argparse.ArgumentParser( description="Extract OG sequences from a comprehensive FASTA file based on OG definitions." ) parser.add_argument( "-d", "--og_dir", required=True, help="Directory containing OG FASTA files.", ) parser.add_argument( "-a", "--all_fasta", required=True, help="FASTA file containing all sequences.", ) parser.add_argument( "-o", "--output_dir", required=True, help="Output directory for OG FASTA files.", ) parser.add_argument( "-e", "--ext", default=".fa", help="Extension of OG FASTA files (default: .fa).", ) args = parser.parse_args() main(args.og_dir, args.all_fasta, args.output_dir, args.ext)