biyelunwen/99.scripts/miscs/hmmsearch_result_to_new_ogs...

317 lines
9.6 KiB
Python
Executable File

#! /usr/bin/env python3
import os
import re
import argparse
import sys
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
from Bio.Seq import Seq
from pathlib import Path
import logging
logging.basicConfig(level=logging.DEBUG, format="[%(levelname)s]: %(message)s")
class BestHit:
def __init__(self, e_value: float | None = None, target_name: str | None = None):
self.e_value = e_value
self.target_name = target_name
def parse_fasta(fasta_file_path) -> dict[str, str]:
"""
Parse a FASTA file and return a list of sequence ids.
Args:
fasta_file_path: Path to the FASTA file
Returns:
list: Dictionary of sequences
"""
sequences: dict[str, str] = {}
try:
for record in SeqIO.parse(fasta_file_path, "fasta"):
sequences[record.id] = str(record.seq)
except Exception as e:
logging.error(f"Error parsing FASTA file {fasta_file_path}: {e}")
return sequences
def get_ids_from_fasta(fasta_file_path) -> list[str]:
"""
Parse a FASTA file and return a list of sequence ids.
Args:
fasta_file_path: Path to the FASTA file
Returns:
list: List of sequence ids
"""
seqs = parse_fasta(fasta_file_path)
return list(seqs.keys())
def parse_hmmer_tbl(tbl_file_path) -> BestHit:
"""
Parse HMMER tbl format result file and extract best hit information
Args:
tbl_file_path: Path to the tbl file
Returns:
dict: Best hit information
"""
best_hit = BestHit()
try:
with open(tbl_file_path, "r") as f:
for line in f:
# Skip comment lines
if line.startswith("#"):
continue
# Split line (tbl format is typically space or tab separated)
parts = re.split(r"\s+", line.strip())
if len(parts) < 5:
continue
# Extract key information: target name, E-value, score, etc.
# tbl format columns: target name, target accession, query name, query accession, E-value, score, etc.
target_name = parts[0]
e_value = float(parts[4]) # Full sequence E-value
# If it's a new sequence or we found a better hit (lower E-value)
if best_hit.e_value is None or e_value < best_hit.e_value:
best_hit = BestHit(e_value=e_value, target_name=target_name)
except Exception as e:
logging.error(f"Error parsing tbl file {tbl_file_path}: {e}")
return BestHit()
return best_hit
def find_corresponding_files(fasta_dir, tbl_dir) -> list[tuple[Path, Path]]:
"""
Find corresponding FASTA and tbl file pairs
Args:
fasta_dir: Directory containing FASTA files
tbl_dir: Directory containing tbl result files
Returns:
list: List of (fasta_file_path, tbl_file_path) tuples
"""
file_pairs: list[tuple[Path, Path]] = []
# Get all FASTA files
fasta_files = {}
for fasta_path in Path(fasta_dir).glob("*.fa"):
stem = fasta_path.stem
fasta_files[stem] = fasta_path
# Find corresponding tbl files
for stem, fasta_path in fasta_files.items():
tbl_name = f"{stem}.fa.hmmsearch.tblout"
tbl_path = Path(tbl_dir) / tbl_name
if tbl_path.exists():
file_pairs.append((fasta_path, tbl_path))
else:
logging.warning(f"No corresponding tbl file found for {stem}")
return file_pairs
def add_best_hit_to_ogs_dir(
file_pair: tuple[Path, Path],
all_sequences: dict[str, str],
ogs_dir: Path,
species_name: str | None = None,
) -> tuple[str, list[str]]:
"""
Add best hit information to FASTA file
Args:
file_pair: Tuple of (fasta_path, tbl_path)
all_sequences: Dictionary of homolog sequences
ogs_dir: Directory to save FASTA files
species_name: Species name of candidate species
Returns:
tuple: (stem, seq_ids)
"""
fasta_path, tbl_path = file_pair
# Parse tbl file to get best hits
best_hit = parse_hmmer_tbl(tbl_path)
if not best_hit.target_name:
logging.warning(f"No valid hits found in {tbl_path}, skipping...")
return fasta_path.stem, []
best_seq = best_hit.target_name
seq_ids = get_ids_from_fasta(fasta_path)
seqs: dict[str, str] = {}
for seq_id in seq_ids:
seq = all_sequences.get(seq_id)
if not seq:
logging.warning(f"Sequence ID {seq_id} not found! Skipping...")
return fasta_path.stem, []
seqs[seq_id] = seq
modified_ogs_seq_record = [
SeqRecord(seq=Seq(v), id=k.split("@")[0], description="")
for k, v in seqs.items()
]
homolog_dir = Path(ogs_dir) / fasta_path.stem
homolog_dir.mkdir(parents=True, exist_ok=True)
modified_ogs_seq_path = homolog_dir / f"{fasta_path.stem}.fa"
SeqIO.write(modified_ogs_seq_record, modified_ogs_seq_path, "fasta")
homolog_seq = all_sequences.get(best_seq)
if not homolog_seq:
logging.warning(f"Best hit sequence {best_seq} not found! Skipping...")
return fasta_path.stem, []
if not species_name:
species_name = best_seq.split("@")[0]
homolog_seq_path = homolog_dir / f"{fasta_path.stem}_{species_name}.fa"
homolog_seq_record = SeqRecord(
seq=Seq(homolog_seq), id=best_seq.split("@")[0], description=""
)
SeqIO.write([homolog_seq_record], homolog_seq_path, "fasta")
seq_ids.append(best_seq)
return fasta_path.stem, seq_ids
def process_all_files(fasta_dir, tbl_dir, output_dir, all_fasta, species_name):
"""
Process all FASTA and tbl file pairs
Args:
fasta_dir: Directory containing FASTA files
tbl_dir: Directory containing tbl result files
output_file: Output file path
all_fasta: FASTA format cds file of candidate species
"""
logging.info("Starting file processing...")
logging.info(f"FASTA directory: {fasta_dir}")
logging.info(f"tbl directory: {tbl_dir}")
logging.info(f"Output Directory: {output_dir}")
logging.info(f"Homolog FASTA file: {all_fasta}")
og_list = {}
all_sequences = parse_fasta(all_fasta)
if not all_sequences:
logging.error(f"No sequences found in FASTA file {all_fasta}")
sys.exit(1)
# Find corresponding file pairs
file_pairs = find_corresponding_files(fasta_dir, tbl_dir)
if not file_pairs:
logging.error("No corresponding FASTA-tbl file pairs found")
sys.exit(1)
logging.info(f"Found {len(file_pairs)} file pairs to process")
output_file = Path(output_dir) / "updated_ogs_list.txt"
ogs_dir = Path(output_dir) / "ogs"
ogs_dir.mkdir(parents=True, exist_ok=True)
# Process each file pair
for i, file_pair in enumerate(file_pairs, 1):
logging.info(f"Processing pair {i}/{len(file_pairs)}:")
stem, seq_ids = add_best_hit_to_ogs_dir(
file_pair, all_sequences, ogs_dir, species_name
)
if not seq_ids:
logging.info(f"Skipping {stem}!")
continue
og_list[stem] = seq_ids
with open(output_file, "w") as out_f:
for og, ids in og_list.items():
out_f.write(f"{og}\t" + "\t".join(ids) + "\n")
logging.info(f"\nProcessing completed! All results saved to: {output_dir}")
logging.info(f"Updated OGS list saved to: {output_file}")
logging.info(f"Total OGS processed successfully: {len(og_list)}")
def main(fasta_directory, tbl_directory, output_directory, all_fasta, species_name):
"""
Main function - take paths and run processing
Args:
fasta_directory: Folder containing FASTA files
tbl_directory: Folder containing tbl result files
output_file: Output file path
all_fasta: FASTA format cds file of candidate species
species_name: Species name of candidate species
"""
# Check if input directories exist
if not os.path.exists(fasta_directory):
logging.error(f"FASTA directory does not exist {fasta_directory}")
sys.exit(1)
if not os.path.exists(tbl_directory):
logging.error(f"tbl directory does not exist {tbl_directory}")
sys.exit(1)
try:
Path(output_directory).mkdir(parents=True, exist_ok=True)
except Exception as e:
logging.error(f"Error creating output directory {output_directory}: {e}")
sys.exit(1)
if not os.path.exists(all_fasta):
logging.error(f"Homolog FASTA file does not exist {all_fasta}")
sys.exit(1)
# Process all files
process_all_files(
fasta_directory, tbl_directory, output_directory, all_fasta, species_name
)
if __name__ == "__main__":
# Parse command-line arguments and call main
parser = argparse.ArgumentParser(
description="Add HMMER best hit into orthologs FASTA."
)
parser.add_argument(
"-d",
"--fasta_directory",
required=True,
help="Directory containing FASTA files",
)
parser.add_argument(
"-t",
"--tbl_directory",
required=True,
help="Directory containing tbl result files",
)
parser.add_argument(
"-f",
"--all_fasta",
required=True,
help="FASTA format cds file of cadidate species",
)
parser.add_argument(
"-o",
"--output_directory",
required=True,
help="Directory to write output seqname",
)
parser.add_argument(
"-s",
"--species_name",
required=False,
help="Species name of candidate species",
)
args = parser.parse_args()
main(
args.fasta_directory,
args.tbl_directory,
args.output_directory,
args.all_fasta,
args.species_name,
)