93 lines
2.7 KiB
Python
Executable File
93 lines
2.7 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
import os
|
|
import argparse
|
|
from Bio import SeqIO
|
|
from Bio.SeqRecord import SeqRecord
|
|
from pathlib import Path
|
|
|
|
|
|
def parse_fasta(fasta_file_path):
|
|
"""
|
|
Parse a FASTA file and return a list of SeqRecord.
|
|
Args:
|
|
fasta_file_path: Path to the FASTA file
|
|
Returns:
|
|
list: List of SeqRecord
|
|
"""
|
|
try:
|
|
records = list(SeqIO.parse(fasta_file_path, "fasta"))
|
|
return records
|
|
except Exception as e:
|
|
print(f"Error parsing FASTA file {fasta_file_path}: {e}")
|
|
return []
|
|
|
|
|
|
def ogs_to_fasta(ogs_name, seq_list, source_records, output_dir):
|
|
"""
|
|
Convert OGS list to FASTA format.
|
|
|
|
Args:
|
|
ogs_name: Name of the OGS
|
|
seq_list: List of sequence IDs
|
|
source_records: List of SeqRecord from source FASTA
|
|
output_dir: Directory to save the output FASTA file
|
|
"""
|
|
output_path = Path(output_dir) / f"{ogs_name}.fa"
|
|
seq_dict = {record.id: record for record in source_records}
|
|
|
|
with open(output_path, "w") as out_f:
|
|
for seq_id in seq_list:
|
|
if seq_id in seq_dict:
|
|
updated_id = seq_id.split("@")[0]
|
|
updated_record = SeqRecord(
|
|
seq_dict[seq_id].seq,
|
|
id=updated_id,
|
|
description="",
|
|
)
|
|
SeqIO.write(updated_record, out_f, "fasta")
|
|
else:
|
|
print(f"Warning: Sequence ID {seq_id} not found in source records.")
|
|
|
|
|
|
def process_ogs_list(ogs_file, source_fasta, output_dir):
|
|
"""
|
|
Process OGS list file and convert each OGS to FASTA format.
|
|
|
|
Args:
|
|
ogs_file: Path to the OGS list file
|
|
source_fasta: Path to the source FASTA file
|
|
output_dir: Directory to save the output FASTA files
|
|
"""
|
|
source_records = parse_fasta(source_fasta)
|
|
|
|
with open(ogs_file, "r") as f:
|
|
for line in f:
|
|
parts = line.strip().split("\t")
|
|
if len(parts) < 2:
|
|
continue
|
|
ogs_name = parts.pop(0)
|
|
ogs_to_fasta(ogs_name, parts, source_records, output_dir)
|
|
print(f"Processed OGS: {ogs_name}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Convert OGS list to FASTA format.")
|
|
parser.add_argument(
|
|
"-i", "--input_ogs", required=True, help="Path to the OGS list file"
|
|
)
|
|
parser.add_argument(
|
|
"-s", "--source_fasta", required=True, help="Path to the source FASTA file"
|
|
)
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output_dir",
|
|
required=True,
|
|
help="Directory to save the output FASTA files",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
os.makedirs(args.output_dir, exist_ok=True)
|
|
|
|
process_ogs_list(args.input_ogs, args.source_fasta, args.output_dir)
|