317 lines
9.6 KiB
Python
Executable File
317 lines
9.6 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
import os
|
|
import re
|
|
import argparse
|
|
import sys
|
|
from Bio import SeqIO
|
|
from Bio.SeqRecord import SeqRecord
|
|
from Bio.Seq import Seq
|
|
from pathlib import Path
|
|
import logging
|
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format="[%(levelname)s]: %(message)s")
|
|
|
|
|
|
class BestHit:
|
|
def __init__(self, e_value: float | None = None, target_name: str | None = None):
|
|
self.e_value = e_value
|
|
self.target_name = target_name
|
|
|
|
|
|
def parse_fasta(fasta_file_path) -> dict[str, str]:
|
|
"""
|
|
Parse a FASTA file and return a list of sequence ids.
|
|
Args:
|
|
fasta_file_path: Path to the FASTA file
|
|
Returns:
|
|
list: Dictionary of sequences
|
|
"""
|
|
sequences: dict[str, str] = {}
|
|
try:
|
|
for record in SeqIO.parse(fasta_file_path, "fasta"):
|
|
sequences[record.id] = str(record.seq)
|
|
except Exception as e:
|
|
logging.error(f"Error parsing FASTA file {fasta_file_path}: {e}")
|
|
return sequences
|
|
|
|
|
|
def get_ids_from_fasta(fasta_file_path) -> list[str]:
|
|
"""
|
|
Parse a FASTA file and return a list of sequence ids.
|
|
Args:
|
|
fasta_file_path: Path to the FASTA file
|
|
Returns:
|
|
list: List of sequence ids
|
|
"""
|
|
seqs = parse_fasta(fasta_file_path)
|
|
return list(seqs.keys())
|
|
|
|
|
|
def parse_hmmer_tbl(tbl_file_path) -> BestHit:
|
|
"""
|
|
Parse HMMER tbl format result file and extract best hit information
|
|
|
|
Args:
|
|
tbl_file_path: Path to the tbl file
|
|
|
|
Returns:
|
|
dict: Best hit information
|
|
"""
|
|
best_hit = BestHit()
|
|
|
|
try:
|
|
with open(tbl_file_path, "r") as f:
|
|
for line in f:
|
|
# Skip comment lines
|
|
if line.startswith("#"):
|
|
continue
|
|
|
|
# Split line (tbl format is typically space or tab separated)
|
|
parts = re.split(r"\s+", line.strip())
|
|
if len(parts) < 5:
|
|
continue
|
|
|
|
# Extract key information: target name, E-value, score, etc.
|
|
# tbl format columns: target name, target accession, query name, query accession, E-value, score, etc.
|
|
target_name = parts[0]
|
|
e_value = float(parts[4]) # Full sequence E-value
|
|
|
|
# If it's a new sequence or we found a better hit (lower E-value)
|
|
if best_hit.e_value is None or e_value < best_hit.e_value:
|
|
best_hit = BestHit(e_value=e_value, target_name=target_name)
|
|
except Exception as e:
|
|
logging.error(f"Error parsing tbl file {tbl_file_path}: {e}")
|
|
return BestHit()
|
|
|
|
return best_hit
|
|
|
|
|
|
def find_corresponding_files(fasta_dir, tbl_dir) -> list[tuple[Path, Path]]:
|
|
"""
|
|
Find corresponding FASTA and tbl file pairs
|
|
|
|
Args:
|
|
fasta_dir: Directory containing FASTA files
|
|
tbl_dir: Directory containing tbl result files
|
|
|
|
Returns:
|
|
list: List of (fasta_file_path, tbl_file_path) tuples
|
|
"""
|
|
file_pairs: list[tuple[Path, Path]] = []
|
|
|
|
# Get all FASTA files
|
|
fasta_files = {}
|
|
|
|
for fasta_path in Path(fasta_dir).glob("*.fa"):
|
|
stem = fasta_path.stem
|
|
fasta_files[stem] = fasta_path
|
|
|
|
# Find corresponding tbl files
|
|
for stem, fasta_path in fasta_files.items():
|
|
tbl_name = f"{stem}.fa.hmmsearch.tblout"
|
|
tbl_path = Path(tbl_dir) / tbl_name
|
|
if tbl_path.exists():
|
|
file_pairs.append((fasta_path, tbl_path))
|
|
else:
|
|
logging.warning(f"No corresponding tbl file found for {stem}")
|
|
|
|
return file_pairs
|
|
|
|
|
|
def add_best_hit_to_ogs_dir(
|
|
file_pair: tuple[Path, Path],
|
|
all_sequences: dict[str, str],
|
|
ogs_dir: Path,
|
|
species_name: str | None = None,
|
|
) -> tuple[str, list[str]]:
|
|
"""
|
|
Add best hit information to FASTA file
|
|
|
|
Args:
|
|
file_pair: Tuple of (fasta_path, tbl_path)
|
|
all_sequences: Dictionary of homolog sequences
|
|
ogs_dir: Directory to save FASTA files
|
|
species_name: Species name of candidate species
|
|
Returns:
|
|
tuple: (stem, seq_ids)
|
|
"""
|
|
fasta_path, tbl_path = file_pair
|
|
|
|
# Parse tbl file to get best hits
|
|
best_hit = parse_hmmer_tbl(tbl_path)
|
|
|
|
if not best_hit.target_name:
|
|
logging.warning(f"No valid hits found in {tbl_path}, skipping...")
|
|
return fasta_path.stem, []
|
|
|
|
best_seq = best_hit.target_name
|
|
seq_ids = get_ids_from_fasta(fasta_path)
|
|
seqs: dict[str, str] = {}
|
|
for seq_id in seq_ids:
|
|
seq = all_sequences.get(seq_id)
|
|
if not seq:
|
|
logging.warning(f"Sequence ID {seq_id} not found! Skipping...")
|
|
return fasta_path.stem, []
|
|
seqs[seq_id] = seq
|
|
modified_ogs_seq_record = [
|
|
SeqRecord(seq=Seq(v), id=k.split("@")[0], description="")
|
|
for k, v in seqs.items()
|
|
]
|
|
homolog_dir = Path(ogs_dir) / fasta_path.stem
|
|
homolog_dir.mkdir(parents=True, exist_ok=True)
|
|
modified_ogs_seq_path = homolog_dir / f"{fasta_path.stem}.fa"
|
|
SeqIO.write(modified_ogs_seq_record, modified_ogs_seq_path, "fasta")
|
|
|
|
homolog_seq = all_sequences.get(best_seq)
|
|
if not homolog_seq:
|
|
logging.warning(f"Best hit sequence {best_seq} not found! Skipping...")
|
|
return fasta_path.stem, []
|
|
|
|
if not species_name:
|
|
species_name = best_seq.split("@")[0]
|
|
homolog_seq_path = homolog_dir / f"{fasta_path.stem}_{species_name}.fa"
|
|
homolog_seq_record = SeqRecord(
|
|
seq=Seq(homolog_seq), id=best_seq.split("@")[0], description=""
|
|
)
|
|
SeqIO.write([homolog_seq_record], homolog_seq_path, "fasta")
|
|
seq_ids.append(best_seq)
|
|
return fasta_path.stem, seq_ids
|
|
|
|
|
|
def process_all_files(fasta_dir, tbl_dir, output_dir, all_fasta, species_name):
|
|
"""
|
|
Process all FASTA and tbl file pairs
|
|
|
|
Args:
|
|
fasta_dir: Directory containing FASTA files
|
|
tbl_dir: Directory containing tbl result files
|
|
output_file: Output file path
|
|
all_fasta: FASTA format cds file of candidate species
|
|
"""
|
|
logging.info("Starting file processing...")
|
|
logging.info(f"FASTA directory: {fasta_dir}")
|
|
logging.info(f"tbl directory: {tbl_dir}")
|
|
logging.info(f"Output Directory: {output_dir}")
|
|
logging.info(f"Homolog FASTA file: {all_fasta}")
|
|
|
|
og_list = {}
|
|
all_sequences = parse_fasta(all_fasta)
|
|
if not all_sequences:
|
|
logging.error(f"No sequences found in FASTA file {all_fasta}")
|
|
sys.exit(1)
|
|
|
|
# Find corresponding file pairs
|
|
file_pairs = find_corresponding_files(fasta_dir, tbl_dir)
|
|
|
|
if not file_pairs:
|
|
logging.error("No corresponding FASTA-tbl file pairs found")
|
|
sys.exit(1)
|
|
|
|
logging.info(f"Found {len(file_pairs)} file pairs to process")
|
|
|
|
output_file = Path(output_dir) / "updated_ogs_list.txt"
|
|
ogs_dir = Path(output_dir) / "ogs"
|
|
ogs_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Process each file pair
|
|
for i, file_pair in enumerate(file_pairs, 1):
|
|
logging.info(f"Processing pair {i}/{len(file_pairs)}:")
|
|
stem, seq_ids = add_best_hit_to_ogs_dir(
|
|
file_pair, all_sequences, ogs_dir, species_name
|
|
)
|
|
if not seq_ids:
|
|
logging.info(f"Skipping {stem}!")
|
|
continue
|
|
og_list[stem] = seq_ids
|
|
|
|
with open(output_file, "w") as out_f:
|
|
for og, ids in og_list.items():
|
|
out_f.write(f"{og}\t" + "\t".join(ids) + "\n")
|
|
|
|
logging.info(f"\nProcessing completed! All results saved to: {output_dir}")
|
|
logging.info(f"Updated OGS list saved to: {output_file}")
|
|
logging.info(f"Total OGS processed successfully: {len(og_list)}")
|
|
|
|
|
|
def main(fasta_directory, tbl_directory, output_directory, all_fasta, species_name):
|
|
"""
|
|
Main function - take paths and run processing
|
|
|
|
Args:
|
|
fasta_directory: Folder containing FASTA files
|
|
tbl_directory: Folder containing tbl result files
|
|
output_file: Output file path
|
|
all_fasta: FASTA format cds file of candidate species
|
|
species_name: Species name of candidate species
|
|
"""
|
|
# Check if input directories exist
|
|
if not os.path.exists(fasta_directory):
|
|
logging.error(f"FASTA directory does not exist {fasta_directory}")
|
|
sys.exit(1)
|
|
|
|
if not os.path.exists(tbl_directory):
|
|
logging.error(f"tbl directory does not exist {tbl_directory}")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
Path(output_directory).mkdir(parents=True, exist_ok=True)
|
|
except Exception as e:
|
|
logging.error(f"Error creating output directory {output_directory}: {e}")
|
|
sys.exit(1)
|
|
|
|
if not os.path.exists(all_fasta):
|
|
logging.error(f"Homolog FASTA file does not exist {all_fasta}")
|
|
sys.exit(1)
|
|
|
|
# Process all files
|
|
process_all_files(
|
|
fasta_directory, tbl_directory, output_directory, all_fasta, species_name
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Parse command-line arguments and call main
|
|
parser = argparse.ArgumentParser(
|
|
description="Add HMMER best hit into orthologs FASTA."
|
|
)
|
|
parser.add_argument(
|
|
"-d",
|
|
"--fasta_directory",
|
|
required=True,
|
|
help="Directory containing FASTA files",
|
|
)
|
|
parser.add_argument(
|
|
"-t",
|
|
"--tbl_directory",
|
|
required=True,
|
|
help="Directory containing tbl result files",
|
|
)
|
|
parser.add_argument(
|
|
"-f",
|
|
"--all_fasta",
|
|
required=True,
|
|
help="FASTA format cds file of cadidate species",
|
|
)
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output_directory",
|
|
required=True,
|
|
help="Directory to write output seqname",
|
|
)
|
|
parser.add_argument(
|
|
"-s",
|
|
"--species_name",
|
|
required=False,
|
|
help="Species name of candidate species",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
main(
|
|
args.fasta_directory,
|
|
args.tbl_directory,
|
|
args.output_directory,
|
|
args.all_fasta,
|
|
args.species_name,
|
|
)
|