157 lines
5.2 KiB
Python
Executable File
157 lines
5.2 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
import os
|
|
from Bio import SeqIO
|
|
from Bio.Seq import Seq
|
|
from Bio.SeqRecord import SeqRecord
|
|
from collections import defaultdict
|
|
import argparse
|
|
|
|
|
|
def get_sequence_lengths(fasta_files):
|
|
"""
|
|
get the lengths of sequences in each FASTA file
|
|
Assumes all sequences in a file have the same length
|
|
"""
|
|
file_lengths = {}
|
|
for fasta_file in fasta_files:
|
|
try:
|
|
with open(fasta_file, "r") as f:
|
|
for record in SeqIO.parse(f, "fasta"):
|
|
# get length of the first sequence
|
|
file_lengths[fasta_file] = len(record.seq)
|
|
break
|
|
except Exception as e:
|
|
print(f"Error reading file {fasta_file}: {e}")
|
|
file_lengths[fasta_file] = 0
|
|
|
|
return file_lengths
|
|
|
|
|
|
def concatenate_fasta_files(fasta_files, output_file):
|
|
"""
|
|
Concatenate sequences from multiple FASTA files by name, using "-" for missing sequences.
|
|
"""
|
|
# Get the sequence lengths for each file
|
|
file_lengths = get_sequence_lengths(fasta_files)
|
|
|
|
# Store all sequence names and their corresponding content
|
|
sequences_dict = defaultdict(dict)
|
|
all_sequence_names = set()
|
|
|
|
# Read sequences from each file
|
|
for i, fasta_file in enumerate(fasta_files):
|
|
try:
|
|
with open(fasta_file, "r") as f:
|
|
for record in SeqIO.parse(f, "fasta"):
|
|
seq_name = record.id
|
|
sequences_dict[seq_name][i] = str(record.seq)
|
|
all_sequence_names.add(seq_name)
|
|
except Exception as e:
|
|
print(f"Error reading file {fasta_file}: {e}")
|
|
|
|
# Create concatenated sequences
|
|
concatenated_sequences = []
|
|
|
|
for seq_name in sorted(all_sequence_names):
|
|
concatenated_seq = []
|
|
|
|
for i, fasta_file in enumerate(fasta_files):
|
|
if i in sequences_dict[seq_name]:
|
|
# This file has the sequence, add it directly
|
|
concatenated_seq.append(sequences_dict[seq_name][i])
|
|
else:
|
|
# This file is missing the sequence, use "-" to fill the gap
|
|
gap_length = file_lengths[fasta_file]
|
|
concatenated_seq.append("-" * gap_length)
|
|
|
|
# Concatenate all parts of the sequence
|
|
full_sequence = "".join(concatenated_seq)
|
|
|
|
# Create a new sequence record
|
|
|
|
new_record = SeqRecord(
|
|
Seq(full_sequence),
|
|
id=seq_name,
|
|
description=f"concatenated_from_{len(fasta_files)}_files",
|
|
)
|
|
concatenated_sequences.append(new_record)
|
|
|
|
with open(output_file, "w") as output_handle:
|
|
SeqIO.write(concatenated_sequences, output_handle, "fasta")
|
|
|
|
print(
|
|
f"Successfully concatenate {len(concatenated_sequences)} sequences to {output_file}"
|
|
)
|
|
print(f"Input file count: {len(fasta_files)}")
|
|
|
|
# Output statistics
|
|
for i, fasta_file in enumerate(fasta_files):
|
|
seq_count = sum(1 for seqs in sequences_dict.values() if i in seqs)
|
|
print(
|
|
f"File {i + 1}: {os.path.basename(fasta_file)} - Sequence count: {seq_count}, Sequence length: {file_lengths[fasta_file]}."
|
|
)
|
|
|
|
print(f"Total output sequence length: {len(concatenated_sequences[0].seq)}.")
|
|
|
|
|
|
def get_fasta_files_from_directory(directory, extensions, list_file=None):
|
|
"""
|
|
get all FASTA files from a directory with specified extensions
|
|
"""
|
|
fasta_files = []
|
|
if list_file:
|
|
with open(list_file, "r") as lf:
|
|
for line in lf:
|
|
filepath = os.path.join(directory, line.strip())
|
|
if os.path.isfile(filepath):
|
|
fasta_files.append(filepath)
|
|
return sorted(fasta_files)
|
|
for filename in os.listdir(directory):
|
|
if any(filename.endswith(ext) for ext in extensions):
|
|
fasta_files.append(os.path.join(directory, filename))
|
|
return sorted(fasta_files)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Concatenate multiple FASTA files by sequence names."
|
|
)
|
|
parser.add_argument("-i", "--input", nargs="+", help="Input FASTA file list")
|
|
parser.add_argument("-d", "--directory", help="Directory containing FASTA files")
|
|
parser.add_argument("-o", "--output", required=True, help="Output file")
|
|
parser.add_argument(
|
|
"-e",
|
|
"--extensions",
|
|
nargs="+",
|
|
default=[".fasta", ".fa", ".fna"],
|
|
help="FASTA file extensions to look for in directory",
|
|
)
|
|
parser.add_argument("-l", "--list", help="List of input FASTA files", default=None)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# 获取输入文件
|
|
if args.directory:
|
|
fasta_files = get_fasta_files_from_directory(
|
|
args.directory, args.extensions, args.list
|
|
)
|
|
if not fasta_files:
|
|
print(
|
|
f"Cannot find FASTA files in {args.directory} with extensions {args.extensions}"
|
|
)
|
|
return
|
|
elif args.input:
|
|
fasta_files = args.input
|
|
else:
|
|
print("Please specify input files or directory")
|
|
return
|
|
|
|
print(f"Found {len(fasta_files)} FASTA files:")
|
|
|
|
# Perform concatenation
|
|
concatenate_fasta_files(fasta_files, args.output)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|