biyelunwen/99.scripts/miscs/get_og_seqs.py

149 lines
4.7 KiB
Python

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Extract Single Copy OG sequences from a comprehensive FASTA file based on OG definitions from results of orthofinder.
"""
import sys
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
from Bio.Seq import Seq
from pathlib import Path
import argparse
def get_og_fastas(og_dir: Path, ext: str) -> dict[str, list[str]]:
"""
Get a dictionary of OGs and their corresponding sequence IDs from FASTA files in a directory.
Args:
og_dir (Path): Path to the directory containing OG FASTA files.
ext (str): Extension of the FASTA files. Default is ".fa".
Returns:
dict: Dictionary with OG names as keys and lists of sequence IDs as values.
"""
og_dict: dict[str, list[str]] = {}
try:
for fasta_file in og_dir.glob(f"*{ext}"):
og_name = fasta_file.stem
seq_ids: list[str] = []
for record in SeqIO.parse(fasta_file, "fasta"):
seq_ids.append(record.id)
og_dict[og_name] = seq_ids
except Exception as e:
print(f"Error processing OG FASTA files in {og_dir}: {e}")
sys.exit(1)
return og_dict
def parse_all_fasta(fasta_file: Path) -> dict[str, Seq]:
"""
Parse a FASTA file and return a dictionary of sequence IDs and their corresponding Seq objects.
Args:
fasta_file (Path): Path to the FASTA file.
Returns:
dict: Dictionary with sequence IDs as keys and Seq objects as values.
"""
seq_dict: dict[str, Seq] = {}
try:
for record in SeqIO.parse(fasta_file, "fasta"):
seq_dict[record.id] = record.seq
except Exception as e:
print(f"Error parsing FASTA file {fasta_file}: {e}")
sys.exit(1)
return seq_dict
def output_og_seqs(
all_seq_dict: dict[str, Seq], og_dict: dict[str, list[str]], output_dir: Path
):
"""
Output sequences for each OG into separate FASTA files, remove gene id and only keep taxon name.
Args:
all_seq_dict (dict): Dictionary of all sequences with sequence IDs as keys.
og_dict (dict): Dictionary of OGs with OG names as keys and lists of sequence IDs as values.
output_dir (Path): Path to the output directory.
"""
for og_name, seq_ids in og_dict.items():
og_seqs: list[SeqRecord] = []
for seq_id in seq_ids:
if seq_id in all_seq_dict:
seq_record = SeqRecord(
all_seq_dict[seq_id], id=seq_id.split("@")[0], description=""
)
og_seqs.append(seq_record)
else:
print(f"Warning: Sequence ID {seq_id} not found in all sequences.")
output_fasta = output_dir / f"{og_name}.fa"
try:
SeqIO.write(og_seqs, output_fasta, "fasta")
except Exception as e:
print(f"Error writing to FASTA file {output_fasta}: {e}")
sys.exit(1)
def write_og_list(og_dict: dict[str, list[str]]):
"""
Write the OG list to a text file.
Args:
og_dict (dict): Dictionary of OGs with OG names as keys and lists of sequence IDs as values.
"""
try:
with open("og_list.tsv", "w") as f:
for og_name, seq_ids in og_dict.items():
line = f"{og_name}\t" + "\t".join(seq_ids) + "\n"
f.write(line)
print("OG list written to og_list.tsv")
except Exception as e:
print(f"Error writing OG list to file: {e}")
sys.exit(1)
def main(og_dir_path: str, all_fasta_path: str, output_dir_path: str, ext: str = ".fa"):
og_dir = Path(og_dir_path)
all_fasta = Path(all_fasta_path)
output_dir = Path(output_dir_path)
output_dir.mkdir(parents=True, exist_ok=True)
og_dict = get_og_fastas(og_dir, ext)
all_seqs = parse_all_fasta(all_fasta)
output_og_seqs(all_seqs, og_dict, output_dir)
write_og_list(og_dict)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Extract OG sequences from a comprehensive FASTA file based on OG definitions."
)
parser.add_argument(
"-d",
"--og_dir",
required=True,
help="Directory containing OG FASTA files.",
)
parser.add_argument(
"-a",
"--all_fasta",
required=True,
help="FASTA file containing all sequences.",
)
parser.add_argument(
"-o",
"--output_dir",
required=True,
help="Output directory for OG FASTA files.",
)
parser.add_argument(
"-e",
"--ext",
default=".fa",
help="Extension of OG FASTA files (default: .fa).",
)
args = parser.parse_args()
main(args.og_dir, args.all_fasta, args.output_dir, args.ext)