149 lines
4.7 KiB
Python
149 lines
4.7 KiB
Python
#! /usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Extract Single Copy OG sequences from a comprehensive FASTA file based on OG definitions from results of orthofinder.
|
|
"""
|
|
|
|
import sys
|
|
from Bio import SeqIO
|
|
from Bio.SeqRecord import SeqRecord
|
|
from Bio.Seq import Seq
|
|
from pathlib import Path
|
|
import argparse
|
|
|
|
|
|
def get_og_fastas(og_dir: Path, ext: str) -> dict[str, list[str]]:
|
|
"""
|
|
Get a dictionary of OGs and their corresponding sequence IDs from FASTA files in a directory.
|
|
|
|
Args:
|
|
og_dir (Path): Path to the directory containing OG FASTA files.
|
|
ext (str): Extension of the FASTA files. Default is ".fa".
|
|
|
|
Returns:
|
|
dict: Dictionary with OG names as keys and lists of sequence IDs as values.
|
|
"""
|
|
og_dict: dict[str, list[str]] = {}
|
|
try:
|
|
for fasta_file in og_dir.glob(f"*{ext}"):
|
|
og_name = fasta_file.stem
|
|
seq_ids: list[str] = []
|
|
for record in SeqIO.parse(fasta_file, "fasta"):
|
|
seq_ids.append(record.id)
|
|
og_dict[og_name] = seq_ids
|
|
except Exception as e:
|
|
print(f"Error processing OG FASTA files in {og_dir}: {e}")
|
|
sys.exit(1)
|
|
return og_dict
|
|
|
|
|
|
def parse_all_fasta(fasta_file: Path) -> dict[str, Seq]:
|
|
"""
|
|
Parse a FASTA file and return a dictionary of sequence IDs and their corresponding Seq objects.
|
|
|
|
Args:
|
|
fasta_file (Path): Path to the FASTA file.
|
|
Returns:
|
|
dict: Dictionary with sequence IDs as keys and Seq objects as values.
|
|
"""
|
|
seq_dict: dict[str, Seq] = {}
|
|
try:
|
|
for record in SeqIO.parse(fasta_file, "fasta"):
|
|
seq_dict[record.id] = record.seq
|
|
except Exception as e:
|
|
print(f"Error parsing FASTA file {fasta_file}: {e}")
|
|
sys.exit(1)
|
|
return seq_dict
|
|
|
|
|
|
def output_og_seqs(
|
|
all_seq_dict: dict[str, Seq], og_dict: dict[str, list[str]], output_dir: Path
|
|
):
|
|
"""
|
|
Output sequences for each OG into separate FASTA files, remove gene id and only keep taxon name.
|
|
|
|
Args:
|
|
all_seq_dict (dict): Dictionary of all sequences with sequence IDs as keys.
|
|
og_dict (dict): Dictionary of OGs with OG names as keys and lists of sequence IDs as values.
|
|
output_dir (Path): Path to the output directory.
|
|
"""
|
|
for og_name, seq_ids in og_dict.items():
|
|
og_seqs: list[SeqRecord] = []
|
|
for seq_id in seq_ids:
|
|
if seq_id in all_seq_dict:
|
|
seq_record = SeqRecord(
|
|
all_seq_dict[seq_id], id=seq_id.split("@")[0], description=""
|
|
)
|
|
og_seqs.append(seq_record)
|
|
else:
|
|
print(f"Warning: Sequence ID {seq_id} not found in all sequences.")
|
|
output_fasta = output_dir / f"{og_name}.fa"
|
|
try:
|
|
SeqIO.write(og_seqs, output_fasta, "fasta")
|
|
except Exception as e:
|
|
print(f"Error writing to FASTA file {output_fasta}: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
def write_og_list(og_dict: dict[str, list[str]]):
|
|
"""
|
|
Write the OG list to a text file.
|
|
|
|
Args:
|
|
og_dict (dict): Dictionary of OGs with OG names as keys and lists of sequence IDs as values.
|
|
"""
|
|
try:
|
|
with open("og_list.tsv", "w") as f:
|
|
for og_name, seq_ids in og_dict.items():
|
|
line = f"{og_name}\t" + "\t".join(seq_ids) + "\n"
|
|
f.write(line)
|
|
print("OG list written to og_list.tsv")
|
|
except Exception as e:
|
|
print(f"Error writing OG list to file: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
def main(og_dir_path: str, all_fasta_path: str, output_dir_path: str, ext: str = ".fa"):
|
|
og_dir = Path(og_dir_path)
|
|
all_fasta = Path(all_fasta_path)
|
|
output_dir = Path(output_dir_path)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
og_dict = get_og_fastas(og_dir, ext)
|
|
all_seqs = parse_all_fasta(all_fasta)
|
|
output_og_seqs(all_seqs, og_dict, output_dir)
|
|
write_og_list(og_dict)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(
|
|
description="Extract OG sequences from a comprehensive FASTA file based on OG definitions."
|
|
)
|
|
parser.add_argument(
|
|
"-d",
|
|
"--og_dir",
|
|
required=True,
|
|
help="Directory containing OG FASTA files.",
|
|
)
|
|
parser.add_argument(
|
|
"-a",
|
|
"--all_fasta",
|
|
required=True,
|
|
help="FASTA file containing all sequences.",
|
|
)
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output_dir",
|
|
required=True,
|
|
help="Output directory for OG FASTA files.",
|
|
)
|
|
parser.add_argument(
|
|
"-e",
|
|
"--ext",
|
|
default=".fa",
|
|
help="Extension of OG FASTA files (default: .fa).",
|
|
)
|
|
args = parser.parse_args()
|
|
main(args.og_dir, args.all_fasta, args.output_dir, args.ext)
|