#! /usr/bin/env python3 # -*- coding: utf-8 -*- """ Check and process frameshift in alignment files. """ import shutil import pandas as pd from pathlib import Path from Bio import SeqIO from Bio.SeqRecord import SeqRecord from Bio.Seq import Seq import sys import argparse def parse_frameshift_list(fs_list: Path) -> pd.DataFrame: """ Parse a frameshift list file into a pandas DataFrame. Args: fs_list (Path): Path to the frameshift list file. Returns: pd.DataFrame: DataFrame containing the frameshift information. """ try: df = pd.read_csv(fs_list, sep=",", header=0) except Exception as e: print(f"Error reading frameshift list file {fs_list}: {e}") sys.exit(1) return df def get_alignment_files(aln_dir: Path, ext: str) -> list[Path]: """ Get a list of alignment files in a directory with a specific extension. Args: aln_dir (Path): Path to the directory containing alignment files. ext (str): Extension of the alignment files. Returns: list[Path]: List of Paths to the alignment files. """ try: aln_files = list(aln_dir.glob(f"*{ext}")) except Exception as e: print(f"Error accessing alignment files in {aln_dir}: {e}") sys.exit(1) return aln_files def main(alignment_dir: str, ext: str, frameshift_list: str, outdir: str): aln_dir = Path(alignment_dir) fs_list_path = Path(frameshift_list) out_dir = Path(outdir) out_dir.mkdir(parents=True, exist_ok=True) fs_df = parse_frameshift_list(fs_list_path) aln_files = get_alignment_files(aln_dir, ext) good_aln_count = 0 keep_fs_count = 0 discard_count = 0 for file in aln_files: if str(file) not in fs_df["alignment_file"].values.tolist(): try: shutil.copy(file, out_dir / file.name) good_aln_count += 1 continue except Exception as e: print(f"Error copying file {file} to {out_dir}: {e}") sys.exit(1) if ( fs_df.loc[ fs_df["alignment_file"] == str(file), "possible_attributions" ].values[0] == "Framshift only in Ziziphus jujuba or Elaeagnus pungens" ): try: records = [] for record in SeqIO.parse(file, "fasta"): seq = str(record.seq).replace("!", "-") id = record.id records.append(SeqRecord(seq=Seq(seq), id=id, description="")) print( f"Keep the alignment file with frameshift: {file}. Due to frameshift only in outgroup." ) keep_fs_count += 1 SeqIO.write(records, out_dir / file.name, "fasta") except Exception as e: print(f"Error processing file {file}: {e}") sys.exit(1) else: discard_count += 1 print("Frameshift processing completed successfully.") print(f"Number of good alignments copied: {good_aln_count}") print(f"Number of alignments with frameshift kept: {keep_fs_count}") print(f"Number of alignments with frameshift discarded: {discard_count}") if __name__ == "__main__": parser = argparse.ArgumentParser( description="Check and process frameshift in alignment files." ) parser.add_argument( "-a", "--alignment_dir", required=True, help="Directory containing alignment files", ) parser.add_argument( "-e", "--ext", default=".nal", help="Extension of alignment files" ) parser.add_argument( "-f", "--frameshift_list", required=True, help="CSV file listing frameshift information", ) parser.add_argument( "-o", "--outdir", required=True, help="Output directory for processed files" ) args = parser.parse_args() main(args.alignment_dir, args.ext, args.frameshift_list, args.outdir)