biyelunwen/99.scripts/workflow/phylogeny_reconstruction/04.concatenate_ogs.py

147 lines
4.8 KiB
Python
Executable File

#! /usr/bin/env python3
import os
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from collections import defaultdict
import argparse
def get_sequence_lengths(fasta_files):
"""
get the lengths of sequences in each FASTA file
Assumes all sequences in a file have the same length
"""
file_lengths = {}
for fasta_file in fasta_files:
try:
with open(fasta_file, "r") as f:
for record in SeqIO.parse(f, "fasta"):
# get length of the first sequence
file_lengths[fasta_file] = len(record.seq)
break
except Exception as e:
print(f"Error reading file {fasta_file}: {e}")
file_lengths[fasta_file] = 0
return file_lengths
def concatenate_fasta_files(fasta_files, output_file):
"""
Concatenate sequences from multiple FASTA files by name, using "-" for missing sequences.
"""
# Get the sequence lengths for each file
file_lengths = get_sequence_lengths(fasta_files)
# Store all sequence names and their corresponding content
sequences_dict = defaultdict(dict)
all_sequence_names = set()
# Read sequences from each file
for i, fasta_file in enumerate(fasta_files):
try:
with open(fasta_file, "r") as f:
for record in SeqIO.parse(f, "fasta"):
seq_name = record.id
sequences_dict[seq_name][i] = str(record.seq)
all_sequence_names.add(seq_name)
except Exception as e:
print(f"Error reading file {fasta_file}: {e}")
# Create concatenated sequences
concatenated_sequences = []
for seq_name in sorted(all_sequence_names):
concatenated_seq = []
for i, fasta_file in enumerate(fasta_files):
if i in sequences_dict[seq_name]:
# This file has the sequence, add it directly
concatenated_seq.append(sequences_dict[seq_name][i])
else:
# This file is missing the sequence, use "-" to fill the gap
gap_length = file_lengths[fasta_file]
concatenated_seq.append("-" * gap_length)
# Concatenate all parts of the sequence
full_sequence = "".join(concatenated_seq)
# Create a new sequence record
new_record = SeqRecord(
Seq(full_sequence),
id=seq_name,
description=f"concatenated_from_{len(fasta_files)}_files",
)
concatenated_sequences.append(new_record)
with open(output_file, "w") as output_handle:
SeqIO.write(concatenated_sequences, output_handle, "fasta")
print(
f"Successfully concatenate {len(concatenated_sequences)} sequences to {output_file}"
)
print(f"Input file count: {len(fasta_files)}")
# Output statistics
for i, fasta_file in enumerate(fasta_files):
seq_count = sum(1 for seqs in sequences_dict.values() if i in seqs)
print(
f"File {i + 1}: {os.path.basename(fasta_file)} - Sequence count: {seq_count}, Sequence length: {file_lengths[fasta_file]}."
)
print(f"Total output sequence length: {len(concatenated_sequences[0].seq)}.")
def get_fasta_files_from_directory(directory, extensions):
"""
get all FASTA files from a directory with specified extensions
"""
fasta_files = []
for filename in os.listdir(directory):
if any(filename.endswith(ext) for ext in extensions):
fasta_files.append(os.path.join(directory, filename))
return sorted(fasta_files)
def main():
parser = argparse.ArgumentParser(
description="Concatenate multiple FASTA files by sequence names."
)
parser.add_argument("-i", "--input", nargs="+", help="Input FASTA file list")
parser.add_argument("-d", "--directory", help="Directory containing FASTA files")
parser.add_argument("-o", "--output", required=True, help="Output file")
parser.add_argument(
"-e",
"--extensions",
nargs="+",
default=[".fasta", ".fa", ".fna"],
help="FASTA file extensions to look for in directory",
)
args = parser.parse_args()
# 获取输入文件
if args.directory:
fasta_files = get_fasta_files_from_directory(args.directory, args.extensions)
if not fasta_files:
print(
f"Cannot find FASTA files in {args.directory} with extensions {args.extensions}"
)
return
elif args.input:
fasta_files = args.input
else:
print("Please specify input files or directory")
return
print(f"Found {len(fasta_files)} FASTA files:")
# Perform concatenation
concatenate_fasta_files(fasta_files, args.output)
if __name__ == "__main__":
main()