129 lines
3.9 KiB
Python
129 lines
3.9 KiB
Python
#! /usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Check and process frameshift in alignment files.
|
|
"""
|
|
|
|
import shutil
|
|
import pandas as pd
|
|
from pathlib import Path
|
|
from Bio import SeqIO
|
|
from Bio.SeqRecord import SeqRecord
|
|
from Bio.Seq import Seq
|
|
import sys
|
|
import argparse
|
|
|
|
|
|
def parse_frameshift_list(fs_list: Path) -> pd.DataFrame:
|
|
"""
|
|
Parse a frameshift list file into a pandas DataFrame.
|
|
|
|
Args:
|
|
fs_list (Path): Path to the frameshift list file.
|
|
|
|
Returns:
|
|
pd.DataFrame: DataFrame containing the frameshift information.
|
|
"""
|
|
try:
|
|
df = pd.read_csv(fs_list, sep=",", header=0)
|
|
except Exception as e:
|
|
print(f"Error reading frameshift list file {fs_list}: {e}")
|
|
sys.exit(1)
|
|
return df
|
|
|
|
|
|
def get_alignment_files(aln_dir: Path, ext: str) -> list[Path]:
|
|
"""
|
|
Get a list of alignment files in a directory with a specific extension.
|
|
|
|
Args:
|
|
aln_dir (Path): Path to the directory containing alignment files.
|
|
ext (str): Extension of the alignment files.
|
|
|
|
Returns:
|
|
list[Path]: List of Paths to the alignment files.
|
|
"""
|
|
try:
|
|
aln_files = list(aln_dir.glob(f"*{ext}"))
|
|
except Exception as e:
|
|
print(f"Error accessing alignment files in {aln_dir}: {e}")
|
|
sys.exit(1)
|
|
return aln_files
|
|
|
|
|
|
def main(alignment_dir: str, ext: str, frameshift_list: str, outdir: str):
|
|
aln_dir = Path(alignment_dir)
|
|
fs_list_path = Path(frameshift_list)
|
|
out_dir = Path(outdir)
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
fs_df = parse_frameshift_list(fs_list_path)
|
|
aln_files = get_alignment_files(aln_dir, ext)
|
|
good_aln_count = 0
|
|
keep_fs_count = 0
|
|
discard_count = 0
|
|
|
|
for file in aln_files:
|
|
if str(file) not in fs_df["alignment_file"].values.tolist():
|
|
try:
|
|
shutil.copy(file, out_dir / file.name)
|
|
good_aln_count += 1
|
|
continue
|
|
except Exception as e:
|
|
print(f"Error copying file {file} to {out_dir}: {e}")
|
|
sys.exit(1)
|
|
if (
|
|
fs_df.loc[
|
|
fs_df["alignment_file"] == str(file), "possible_attributions"
|
|
].values[0]
|
|
== "Framshift only in Ziziphus jujuba or Elaeagnus pungens"
|
|
):
|
|
try:
|
|
records = []
|
|
for record in SeqIO.parse(file, "fasta"):
|
|
seq = str(record.seq).replace("!", "-")
|
|
id = record.id
|
|
records.append(SeqRecord(seq=Seq(seq), id=id, description=""))
|
|
print(
|
|
f"Keep the alignment file with frameshift: {file}. Due to frameshift only in outgroup."
|
|
)
|
|
keep_fs_count += 1
|
|
SeqIO.write(records, out_dir / file.name, "fasta")
|
|
except Exception as e:
|
|
print(f"Error processing file {file}: {e}")
|
|
sys.exit(1)
|
|
else:
|
|
discard_count += 1
|
|
print("Frameshift processing completed successfully.")
|
|
print(f"Number of good alignments copied: {good_aln_count}")
|
|
print(f"Number of alignments with frameshift kept: {keep_fs_count}")
|
|
print(f"Number of alignments with frameshift discarded: {discard_count}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(
|
|
description="Check and process frameshift in alignment files."
|
|
)
|
|
parser.add_argument(
|
|
"-a",
|
|
"--alignment_dir",
|
|
required=True,
|
|
help="Directory containing alignment files",
|
|
)
|
|
parser.add_argument(
|
|
"-e", "--ext", default=".nal", help="Extension of alignment files"
|
|
)
|
|
parser.add_argument(
|
|
"-f",
|
|
"--frameshift_list",
|
|
required=True,
|
|
help="CSV file listing frameshift information",
|
|
)
|
|
parser.add_argument(
|
|
"-o", "--outdir", required=True, help="Output directory for processed files"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
main(args.alignment_dir, args.ext, args.frameshift_list, args.outdir)
|