#!/usr/bin/perl use strict; use warnings; use POSIX; use IO::Select; use IO::Socket; use Digest::MD5; use Getopt::Long; use Cwd qw(abs_path); use Fcntl qw(:flock SEEK_END); use POSIX qw(ceil :sys_wait_h); use File::Path qw(remove_tree); use Time::HiRes qw(time usleep); # Get OS name my $os_name = $^O; # Turn on autoflush $|++; # Max number of forks to use my $max_forks = get_free_cpus(); # Server port my $port = 10003; # Stores executing machine hostnames my @machines; my %machines; # Path to text file containing computers to run on my $machine_file_path; # MrBayes block which will be used for each run my $mb_block; # Where this script is located my $script_path = abs_path($0); # Directory script was called from my $init_dir = abs_path("."); # Where the script was called from my $initial_directory = $ENV{PWD}; # General script settings my $no_forks; # Allow for reusing info from an old run my $input_is_dir = 0; # Allow user to specify mbsum output as input for the script my $input_is_mbsum = 0; # How the script was called my $invocation = "perl bucky.pl @ARGV"; # Name of output directory my $project_name = "bucky-".int(time()); #my $project_name = "bucky-dir"; # BUCKy settings my $alpha = 1; my $ngen = 1000000; my @unlink; # Read commandline settings GetOptions( "no-forks" => \$no_forks, "machine-file=s" => \$machine_file_path, "alpha|a=s" => \$alpha, "ngen|n=i" => \$ngen, "port=i" => \$port, "no-mbsum|s" => \$input_is_mbsum, "n-threads|T=i" => \$max_forks, "out-dir|o=s" => \$project_name, "server-ip=s" => \&client, # for internal usage only "help|h" => sub { print &help; exit(0); }, "usage" => sub { print &usage; exit(0); }, ); # Get paths to required executables my $bucky = check_path_for_exec("bucky"); my $mbsum = check_path_for_exec("mbsum") if (!$input_is_mbsum); # Check that BUCKy version >= 1.4.4 check_bucky_version($bucky); my $archive = shift(@ARGV); # Some error checking die "You must specify an archive file.\n\n", &usage if (!defined($archive)); die "Could not locate '$archive', perhaps you made a typo.\n" if (!-e $archive); die "Could not locate '$machine_file_path'.\n" if (defined($machine_file_path) && !-e $machine_file_path); die "Invalid alpha for BUCKy specified, input must be a float or 'infinity'.\n" if ($alpha !~ /(^inf(inity)?)|(^\d+(\.\d+)?$)/i); # Input is a previous run directory, reuse information $input_is_dir++ if (-d $archive); # Determine which machines we will run the analyses on if (defined($machine_file_path)) { # Get list of machines print "Fetching machine names listed in '$machine_file_path'...\n"; open(my $machine_file, '<', $machine_file_path); chomp(@machines = <$machine_file>); close($machine_file); # Check that we can connect to specified machines foreach my $index (0 .. $#machines) { my $machine = $machines[$index]; print " Testing connection to: $machine...\n"; # Attempt to ssh onto machine with a five second timeout my $ssh_test = `timeout 5 ssh -v $machine exit 2>&1`; # Look for machine's IP in test connection my $machine_ip; if ($ssh_test =~ /Connecting to \S+ \[(\S+)\] port \d+\./s) { $machine_ip = $1; } # Could connect but passwordless login not enabled if ($ssh_test =~ /Are you sure you want to continue connecting \(yes\/no\)/s) { print " Connection to $machine failed, removing from list of useable machines (passwordless login not enabled).\n"; splice(@machines, $index, 1); } # Successful connection elsif (defined($machine_ip)) { print " Connection to $machine [$machine_ip] successful.\n"; $machines{$machine} = $machine_ip; } # Unsuccessful connection else { print " Connection to $machine failed, removing from list of useable machines.\n"; splice(@machines, $index, 1); } } } print "\nScript was called as follows:\n$invocation\n\n"; my $archive_root; my $archive_root_no_ext; if (!$input_is_dir) { # Clean run with no prior output # Extract name information from input file if ($input_is_mbsum) { ($archive_root = $archive) =~ s/.*\/(.*)/$1/; ($archive_root_no_ext = $archive) =~ s/(.*\/)?(.*)(\.tar(\.gz)?$)|(\.tgz$)/$2/; } else { ($archive_root = $archive) =~ s/.*\/(.*)/$1/; ($archive_root_no_ext = $archive) =~ s/(.*\/)?(.*)(\.mb\.tar(\.gz)?$)|(\.mb\.tgz$)/$2/; } die "Could not determine archive root name, did you specify the proper input?\n" if (!defined($archive_root_no_ext)); # Initialize working directory # Remove conditional eventually mkdir($project_name) || die "Could not create '$project_name'$!.\n" if (!-e $project_name); my $archive_abs_path = abs_path($archive); # Remove conditional eventually run_cmd("ln -s $archive_abs_path $project_name/$archive_root") if (! -e "$project_name/$archive_root"); } else { # Prior output available, set relevant variables $project_name = $archive; my @contents = glob("$project_name/*"); # Determine the archive name by looking for a symlink my $found_name = 0; foreach my $file (@contents) { if (-l $file) { $file =~ s/\Q$project_name\E\///; $archive = $file; $found_name = 1; } } die "Could not locate archive in '$project_name'.\n" if (!$found_name); # Extract name information from input file # ($archive_root = $archive) =~ s/.*\/(.*)/$1/; # ($archive_root_no_ext = $archive) =~ s/(.*\/)?(.*)(\.mb\.tar(\.gz)?$)|(\.mb\.tgz$)/$2/; if ($input_is_mbsum) { ($archive_root = $archive) =~ s/.*\/(.*)/$1/; ($archive_root_no_ext = $archive) =~ s/(.*\/)?(.*)(\.tar(\.gz)?$)|(\.tgz$)/$2/; } else { ($archive_root = $archive) =~ s/.*\/(.*)/$1/; ($archive_root_no_ext = $archive) =~ s/(.*\/)?(.*)(\.mb\.tar(\.gz)?$)|(\.mb\.tgz$)/$2/; } die "Could not determine archive root name, is your input file properly named?\n" if (!defined($archive_root_no_ext)); } # The name of the output archive my $mbsum_archive = "$archive_root_no_ext.mbsum.tar.gz"; my $bucky_archive = "$archive_root_no_ext.BUCKy.tar"; my $quartet_output = "$archive_root_no_ext.CFs.csv"; $mbsum_archive = $archive if ($input_is_mbsum); chdir($project_name); # Change how Ctrl+C is interpreted to allow for clean up $SIG{'INT'} = 'INT_handler'; # Define and initialize directories my $mb_out_dir = "mb-out/"; my $mb_sum_dir = "mb-sum/"; mkdir($mb_out_dir) or die "Could not create '$mb_out_dir': $!.\n" if (!-e $mb_out_dir); mkdir($mb_sum_dir) or die "Could not create '$mb_sum_dir': $!.\n" if (!-e $mb_sum_dir); # Check if completed genes from a previous run exist my %complete_quartets; if (-e $bucky_archive && -e $quartet_output) { print "Archive containing completed quartets found for this dataset found in '$bucky_archive'.\n"; print "Completed quartets within in this archive will be removed from the job queue.\n\n"; # Because it takes longer to append to a tarball than append to a text file, the tarball most # likely has fewer quartet entries in it, we must therefore account for this ensuring that # the tarball and csv have the same quartet entries # See which quartets in the tarball are complete chomp(my @complete_quartets_tarball = `tar tf '$init_dir/$project_name/$bucky_archive'`); # Add quartets to a hash for easier lookup my %complete_quartets_tarball; foreach my $complete_quartet (@complete_quartets_tarball) { $complete_quartets_tarball{$complete_quartet}++; } # Load csv into memory #open(my $quartet_output_file, "<", $quartet_output); open(my $quartet_output_file, "<", "$init_dir/$project_name/$quartet_output"); (my @quartet_info = <$quartet_output_file>); close($quartet_output_file); # Remove header line my $header = shift(@quartet_info); # Rewrite csv to include only quartets also contained in tarball #open($quartet_output_file, ">", $quartet_output); open($quartet_output_file, ">", "$init_dir/$project_name/$quartet_output"); print {$quartet_output_file} $header; foreach my $quartet (@quartet_info) { my ($taxon1, $taxon2, $taxon3, $taxon4) = split(",", $quartet); my $quartet_name = "$taxon1--$taxon2--$taxon3--$taxon4"; my $quartet_name_tarball = $quartet_name.".tar.gz"; if (exists($complete_quartets_tarball{$quartet_name_tarball})) { $complete_quartets{$quartet_name}++; print {$quartet_output_file} $quartet; } } close($quartet_output_file); } my @taxa; my @genes; if ($input_is_mbsum) { # Unarchive input genes chomp(@genes = `tar xvf '$init_dir/$project_name/$archive' -C $mb_sum_dir 2>&1`); @genes = map { s/x //; $_ } @genes if ($os_name eq "darwin"); die "No genes found in '$archive'.\n" if (!@genes); # Move into MrBayes output directory chdir($mb_sum_dir); # Remove subdirectories that may have been a part of the tarball my @dirs; my @gene_roots; foreach my $gene (@genes) { (my $gene_root = $gene) =~ s/.*\///; next if ($gene eq $gene_root); # Move mbsum files so they are no longer in subdirectories if (!-d $gene) { system("mv '$gene' '$gene_root'"); push(@gene_roots, $gene_root); } else { push(@dirs, $gene); #splice(@genes, $index, 1); } } @genes = @gene_roots if (@gene_roots); # Remove any subdirectories that may have been in the input foreach my $dir (@dirs) { remove_tree($dir); } # Parse taxa present in each gene, determine which are shared across all genes my %taxa; foreach my $gene (@genes) { my @taxa = @{parse_mbsum_taxa($gene)}; # Count taxa present foreach my $taxon (@taxa) { $taxa{$taxon}++; } } # Add taxa present in all genes to analysis foreach my $taxon (keys %taxa) { if ($taxa{$taxon} == scalar(@genes)) { push(@taxa, $taxon); } } # Archive genes system("tar", "czf", $mbsum_archive, @genes); } else { # Unarchive input genes chomp(@genes = `tar xvf '$init_dir/$archive' -C '$mb_out_dir' 2>&1`); @genes = map { s/x //; $_ } @genes if ($os_name eq "darwin"); # Move into MrBayes output directory chdir($mb_out_dir); # Check that each gene has a log file my %taxa; foreach my $gene (@genes) { # Unzip a single gene chomp(my @mb_files = `tar xvf '$gene' 2>&1`); @mb_files = map { s/x //; $_ } @mb_files if ($os_name eq "darwin"); # Locate the log file output by MrBayes my $log_file_name; foreach my $file (@mb_files) { if ($file =~ /\.log$/) { $log_file_name = $file; last; } } die "Could not locate log file for '$gene'.\n" if (!defined($log_file_name)); # Parse log file for run information my $mb_log = parse_mb_log($log_file_name); # Check for taxa present my @taxa = @{$mb_log->{TAXA}}; foreach my $taxon (@taxa) { $taxa{$taxon}++; } # Clean up unlink(@mb_files); } # Add taxa present in all genes to analysis foreach my $taxon (keys %taxa) { if ($taxa{$taxon} == scalar(@genes)) { push(@taxa, $taxon); } } } # Create list of possible quartets my @quartets = combine(\@taxa, 4); my $original_size = scalar(@quartets); # Remove completed quartets if (%complete_quartets) { foreach my $index (reverse(0 .. $#quartets)) { my $quartet = $quartets[$index]; $quartet = join("--", @{$quartet}); if (exists($complete_quartets{$quartet})) { splice(@quartets, $index, 1); } } } print "Found ".scalar(@taxa)." taxa shared across all genes in this archive, ".scalar(@quartets). " of $original_size possible quartets will be run using output from ".scalar(@genes)." total genes.\n"; # Go back to working directory chdir(".."); # Determine whether or not we need to run mbsum on the specified input my $should_summarize = 1; if (-e $mbsum_archive && $input_is_dir && !$input_is_mbsum) { chomp(my @sums = `tar tf '$mbsum_archive'`) || die "Something appears to be wrong with '$mbsum_archive'.\n"; # Check that each gene has actually been summarized, if not redo the summaries if (scalar(@sums) != scalar(@genes)) { unlink($mbsum_archive); } else { $should_summarize = 0; } } $should_summarize = 0 if ($input_is_mbsum); # Summarize MrBayes output if needed if ($should_summarize) { # Run mbsum on each gene print "Summarizing MrBayes output for ".scalar(@genes)." genes.\n"; my @pids; foreach my $gene (@genes) { # Wait until a CPU is available until(okay_to_run(\@pids)) {}; my $pid; until (defined($pid)) { $pid = fork(); usleep(30000); } # The child fork if ($pid == 0) { #setpgrp(); run_mbsum($gene); exit(0); } else { push(@pids, $pid); } } # Wait for all summaries to finish foreach my $pid (@pids) { waitpid($pid, 0); } undef(@pids); # Remove directory storing mb output remove_tree($mb_out_dir); # Archive and zip mb summaries chdir($mb_sum_dir); #system("tar", "czf", $mbsum_archive, glob("$archive_root_no_ext*.sum")); system("tar", "czf", $mbsum_archive, glob("*.sum")); system("cp", $mbsum_archive, ".."); chdir(".."); } die "\nAll quartets have already been completed.\n\n" if (!@quartets); # Returns the external IP address of this computer chomp(my $server_ip = `dig +short myip.opendns.com \@resolver1.opendns.com 2>&1`); if ($server_ip !~ /(?:[0-9]{1,3}\.){3}[0-9]{1,3}/) { print "Could not determine external IP address, only local clients will be created.\n"; $server_ip = "127.0.0.1"; } # Initialize a server my $sock = IO::Socket::INET->new( LocalPort => $port, Blocking => 0, Reuse => 1, Listen => SOMAXCONN, Proto => 'tcp') or die "Could not create server socket: $!.\n"; $sock->autoflush(1); print "Job server successfully created.\n"; # Should probably do this earlier # Determine server hostname and add to machines if none were specified by the user chomp(my $server_hostname = `hostname`); if (scalar(@machines) == 0) { push(@machines, $server_hostname); $machines{$server_hostname} = "127.0.0.1"; } elsif (scalar(@machines) == 1) { # Check if the user input only the local machine in the config if ($machines{$machines[0]} eq $server_ip) { $machines{$machines[0]} = "127.0.0.1"; } } my @pids; foreach my $machine (@machines) { # Fork and create a client on the given machine my $pid = fork(); if ($pid == 0) { close(STDIN); close(STDOUT); close(STDERR); (my $script_name = $script_path) =~ s/.*\///; # Send over/copy files depending on where analyses will be run if ($machines{$machine} ne "127.0.0.1" && $machines{$machine} ne $server_ip) { # Send MrBayes summaries to remote machines if ($input_is_mbsum) { system("scp", "-q", "$mb_sum_dir/$mbsum_archive", $machine.":/tmp"); } else { system("scp", "-q", $mbsum_archive, $machine.":/tmp"); } # Send this script to the machine system("scp", "-q", $script_path, $machine.":/tmp"); # Send BUCKy executable to the machine system("scp", "-q", $bucky, $machine.":/tmp"); # Execute this perl script in client mode on the given machine # -tt forces pseudo-terminal allocation and lets us stop remote processes exec("ssh", "-tt", $machine, "perl", "/tmp/$script_name", $mbsum_archive, "--server-ip=$server_ip:$port"); } else { # Send this script to the machine system("cp", $script_path, "/tmp"); # Send BUCKy executable to the machine system("cp", $bucky, "/tmp"); # Execute this perl script in client mode exec("perl", "/tmp/$script_name", "$init_dir/$project_name/$mb_sum_dir/$mbsum_archive", "--server-ip=127.0.0.1:$port"); } exit(0); } else { push(@pids, $pid); } } # Move into mbsum directory chdir($mb_sum_dir); my $select = IO::Select->new($sock); # Stores which job is next in queue my $job_number = 0; # Number of open connections to a client my $total_connections; # Number of complete jobs (necessary?) my $complete_count = 0; # Number of connections server has closed my $closed_connections = 0; # Minimum number of connections server should expect my $starting_connections = scalar(@machines); my $time = time(); my $num_digits = get_num_digits({'NUMBER' => scalar(@quartets)}); # Begin the server's job distribution my %complete_queue; my $complete_queue_max_size = 100; while ((!defined($total_connections) || $closed_connections != $total_connections) || $total_connections < $starting_connections) { # Contains handles to clients which have sent information to the server my @clients = $select->can_read(0); # Free up CPU by sleeping for 10 ms usleep(10000); # Reap any children that we can foreach my $pid (@pids) { waitpid($pid, WNOHANG); } # Handle each ready client individually CLIENT: foreach my $client (@clients) { if (scalar(keys %complete_queue) > $complete_queue_max_size) { my $cwd = abs_path("."); chdir($mb_sum_dir); dump_quartets(\%complete_queue); chdir($cwd); undef(%complete_queue); } # Client requesting new connection if ($client == $sock) { $total_connections++; $select->add($sock->accept()); } else { # Get client's message my $response = <$client>; next if (not defined($response)); # a response should never actually be undefined # Client wants to send us a file if ($response =~ /SEND_FILE: (.*)/) { my $file_name = $1; receive_file({'FILE_PATH' => $file_name, 'FILE_HANDLE' => $client}); } # Client has finished a job if ($response =~ /DONE '(.*)' '(.*)' \|\|/) { $complete_count++; printf(" Analyses complete: %".$num_digits."d/%d.\r", $complete_count, scalar(@quartets)); my $completed_quartet = $1; my $quartet_statistics = $2; #push(@unlink, glob("$completed_quartet*")); push(@unlink, glob(abs_path(".")."/$completed_quartet*")); $complete_queue{$completed_quartet} = $quartet_statistics; } # Client wants a new job if ($response =~ /NEW: (.*)/) { my $client_ip = $1; # Check if jobs remain in the queue if ($job_number < scalar(@quartets)) { printf("\n Analyses complete: %".$num_digits."d/%d.\r", 0, scalar(@quartets)) if ($job_number == 0); my $quartet = join("--", @{$quartets[$job_number]}); # Tell local clients to move into mbsum directory if ($client_ip eq $server_ip) { print {$client} "CHDIR: ".abs_path(".")."\n"; } # Invocation changes if we want to use a prior of infinity if ($alpha =~ /(^inf(inity)?)/i) { print {$client} "NEW: '$quartet' '--use-independence-prior -n $ngen'\n"; } else { print {$client} "NEW: '$quartet' '-a $alpha -n $ngen'\n"; } $job_number++; } else { # Client has asked for a job, but there are none remaining print {$client} "HANGUP\n"; $select->remove($client); $client->close(); $closed_connections++; next CLIENT; } } } } } # Dump remaining quartets dump_quartets(\%complete_queue); # Wait until all children have completed foreach my $pid (@pids) { waitpid($pid, 0); } print "\n All connections closed.\n"; print "Total execution time: ", sec2human(time() - $time), ".\n\n"; rmdir("$initial_directory/$project_name/$mb_sum_dir"); sub client { my ($opt_name, $address) = @_; my ($server_ip, $port) = split(":", $address); chdir("/tmp"); my $bucky = "/tmp/bucky"; my $pgrp = $$; setpgrp(); # Determine this host's IP chomp(my $ip = `dig +short myip.opendns.com \@resolver1.opendns.com`); # Set IP to localhost if we don't have internet if ($ip !~ /(?:[0-9]{1,3}\.){3}[0-9]{1,3}/) { $ip = "127.0.0.1"; } # Determine file name of mbsum archive the client should use my @ARGV = split(/\s+/, $invocation); shift(@ARGV); shift(@ARGV); # remove "perl" and "bucky.pl" my $mbsum_archive = shift(@ARGV); # Extract files from mbsum archive my @sums; if ($mbsum_archive =~ /\//) { chomp(@sums = `tar tf '$mbsum_archive'`); } else { chomp(@sums = `tar xvf '$mbsum_archive' 2>&1`); @sums = map { s/x //; $_ } @sums if ($os_name eq "darwin"); } # Spawn more clients my @pids; # A slightly modification in order to limit forks to 10 # my $total_forks = get_free_cpus(); my $total_forks = 10; if ($total_forks > 1) { foreach my $fork (1 .. $total_forks - 1) { my $pid = fork(); if ($pid == 0) { last; } else { push(@pids, $pid); } } } # The name of the quartet we are working on my $quartet; # Stores names of unneeded files my @unlink; # Change signal handling so killing the server kills these processes and cleans up $SIG{HUP} = sub { unlink($0, $bucky); unlink(@sums); unlink($mbsum_archive); kill -15, $$; }; $SIG{TERM} = sub { unlink(glob("$quartet*")) if (defined($quartet)); exit(0); }; # Connect to the server my $sock = new IO::Socket::INET( PeerAddr => $server_ip.":".$port, Proto => 'tcp') or exit(0); $sock->autoflush(1); print {$sock} "NEW: $ip\n"; while (chomp(my $response = <$sock>)) { if ($response =~ /CHDIR: (.*)/) { chdir($1); } elsif ($response =~ /NEW: '(.*)' '(.*)'/) { $quartet = $1; my $bucky_settings = $2; # If client is local this needs to be defined now #chomp(@sums = `tar tf $mbsum_archive`) if (!@sums); # Create prune tree file contents required for BUCKy my $count = 0; my $prune_tree_output = "translate\n"; foreach my $member (split("--", $quartet)) { $count++; $prune_tree_output .= " $count $member"; if ($count == 4) { $prune_tree_output .= ";\n"; } else { $prune_tree_output .= ",\n"; } } # Write prune tree file my $prune_file_path = "$quartet-prune.txt"; push(@unlink, $prune_file_path); open(my $prune_file, ">", $prune_file_path); print {$prune_file} $prune_tree_output; close($prune_file); push(@unlink, "$quartet.input", "$quartet.out", "$quartet.cluster", "$quartet.concordance", "$quartet.gene"); # Run BUCKy on specified quartet system($bucky, split(" ", $bucky_settings), "-cf", 0, "-o", $quartet, "-p", $prune_file_path, @sums); unlink($prune_file_path); # Zip and tarball the results my @results = glob($quartet."*"); my $quartet_archive_name = "$quartet.tar.gz"; # Open concordance file and parse out the three possible resolutions my $num_genes = get_used_genes("$quartet.out"); my $split_info = parse_concordance_output("$quartet.concordance", $num_genes); # Archive and compress results system("tar", "czf", $quartet_archive_name, @results); # Send the results back to the server if this is a remote client if ($server_ip ne "127.0.0.1" && $server_ip ne $ip) { send_file({'FILE_PATH' => $quartet_archive_name, 'FILE_HANDLE' => $sock}); unlink($quartet_archive_name); } unlink(@unlink); undef(@unlink); print {$sock} "DONE '$quartet_archive_name' '$split_info' || NEW: $ip\n"; } elsif ($response eq "HANGUP") { last; } } # Have initial client wait for all others to finish and clean up if ($$ == $pgrp) { foreach my $pid (@pids) { waitpid($pid, 0); } unlink($0, $bucky); unlink(@sums, $mbsum_archive); #if ($server_ip ne $ip) { # unlink(@sums, $mbsum_archive); #} #else { # unlink(@sums); #} } exit(0); } sub dump_quartets { my $quartets = shift; my $pid; until (defined($pid)) { $pid = fork(); usleep(30000); } if ($pid == 0) { my @completed_quartets = keys %{$quartets}; my @quartet_statistics = values %{$quartets}; $SIG{TERM} = sub { close(STDIN); close(STDOUT); close(STDERR); unlink(@unlink); exit(0); }; # Check if this is the first to complete, if so we must create CF output file if (!-e "../$quartet_output") { open(my $quartet_output_file, ">", "../$quartet_output"); print {$quartet_output_file} "taxon1,taxon2,taxon3,taxon4,CF12_34,CF12_34_lo,CF12_34_hi,CF13_24,CF13_24_lo,CF13_24_hi,CF14_23,CF14_23_lo,CF14_23_hi,ngenes\n"; foreach my $quartet (@quartet_statistics) { print {$quartet_output_file} $quartet,"\n"; } close($quartet_output); } else { # Obtain a file lock on archive so another process doesn't simultaneously try to add to it open(my $quartet_output_file, ">>", "../$quartet_output"); flock($quartet_output_file, LOCK_EX) || die "Could not lock '$quartet_output': $!.\n"; seek($quartet_output_file, 0, SEEK_END) || die "Could not seek '$quartet_output': $!.\n"; # Add completed gene #print {$quartet_output_file} @quartet_statistics,"\n"; foreach my $quartet (@quartet_statistics) { print {$quartet_output_file} $quartet,"\n"; } # Release lock flock($quartet_output_file, LOCK_UN) || die "Could not unlock '$quartet_output': $!.\n"; close($quartet_output_file); } # Check if this is the first to complete, if so we must create BUCKy tarball # if (!-e "../$bucky_archive") { # system("tar", "cf", "../$bucky_archive", @completed_quartets); # unlink(@completed_quartets); # } # else { # my @unlink; # foreach my $completed_quartet (@completed_quartets) { # (my $quartet = $completed_quartet) =~ s/\.tar\.gz//; # push(@unlink, glob("$quartet*")); # } # #$SIG{TERM} = sub { close(STDIN); close(STDOUT); close(STDERR); unlink(glob("$quartet*")); exit(0); }; # #$SIG{TERM} = sub { close(STDIN); close(STDOUT); close(STDERR); unlink(@unlink); exit(0); }; # # Obtain a file lock on archive so another process doesn't simultaneously try to add to it # open(my $bucky_archive_file, "<", "../$bucky_archive"); # flock($bucky_archive_file, LOCK_EX) || die "Could not lock '$bucky_archive': $!.\n"; # # Add completed gene # system("tar", "rf", "../$bucky_archive", @completed_quartets); # unlink(@completed_quartets); # # Release lock # flock($bucky_archive_file, LOCK_UN) || die "Could not unlock '$bucky_archive': $!.\n"; # close($bucky_archive_file); # } unlink(@unlink); undef(@unlink); exit(0); } else { push(@pids, $pid); } # unlink(@unlink); # undef(@unlink); return; } sub get_used_genes { my $file_name = shift; # Number of genes actually used by BUCKy my $num_genes; # Open up the BUCKy output file open(my $bucky_out, "<", $file_name); while (my $line = <$bucky_out>) { if ($line =~ /Read (\d+) genes with a total of/) { $num_genes = $1; last; } } close($bucky_out); die "Error determining number of genes used by BUCKy ($file_name).\n" if (!defined($num_genes)); return $num_genes; } sub parse_concordance_output { my ($file_name, $ngenes) = @_; my @taxa; my %splits; # Open up the specified output file open(my $concordance_file, "<", $file_name); my $split; my $in_translate; my $in_all_splits; while (my $line = <$concordance_file>) { # Parse the translate table if ($in_translate) { if ($line =~ /\d+ (.*)([,;])/) { my $taxon = $1; my $line_end = $2; push(@taxa, $taxon); $in_translate = 0 if ($line_end eq ';'); } } # Parse the split information if ($in_all_splits) { # Set the split we are parsing information from if ($line =~ /^(\{\S+\})/) { my $current_split = $1; if ($current_split eq "{1,4|2,3}") { $split = "14|23"; } elsif ($current_split eq "{1,3|2,4}") { $split = "13|24"; } elsif ($current_split eq "{1,2|3,4}") { $split = "12|34"; } } # Parse mean number of loci for split if ($line =~ /=\s+(\S+) \(number of loci\)/) { $splits{$split}->{"CF"} = $1 / $ngenes; } # Parse 95% confidence interval if ($line =~ /95% CI for CF = \((\d+),(\d+)\)/) { #$splits{$split}->{"95%_CI"} = "(".($1 / $ngenes).",".($2 / $ngenes).")"; $splits{$split}->{"95%_CI_LO"} = ($1 / $ngenes); $splits{$split}->{"95%_CI_HI"} = ($2 / $ngenes); } } $in_translate++ if ($line =~ /^translate/); $in_all_splits++ if ($line =~ /^All Splits:/); } # Concat taxa names together my $return = join(",", @taxa); $return .= ","; # Concat split proportions with their 95% CI to return if (exists($splits{"12|34"})) { #$return .= $splits{"12|34"}->{"CF"}.$splits{"12|34"}->{"95%_CI"}."\t"; $return .= $splits{"12|34"}->{"CF"}.",".$splits{"12|34"}->{"95%_CI_LO"}.",".$splits{"12|34"}->{"95%_CI_HI"}.","; } else { #$return .= "0(0,0)\t"; $return .= "0,0,0,"; } if (exists($splits{"13|24"})) { #$return .= $splits{"13|24"}->{"CF"}.$splits{"13|24"}->{"95%_CI"}."\t"; $return .= $splits{"13|24"}->{"CF"}.",".$splits{"13|24"}->{"95%_CI_LO"}.",".$splits{"13|24"}->{"95%_CI_HI"}.","; } else { #$return .= "0(0,0)\t"; $return .= "0,0,0,"; } if (exists($splits{"14|23"})) { #$return .= $splits{"14|23"}->{"CF"}.$splits{"14|23"}->{"95%_CI"}; $return .= $splits{"14|23"}->{"CF"}.",".$splits{"14|23"}->{"95%_CI_LO"}.",".$splits{"14|23"}->{"95%_CI_HI"}; } else { #$return .= "0(0,0)"; $return .= "0,0,0"; } # Append number of genes used $return .= ",$ngenes"; return $return; } sub parse_mb_log { my $log_file_name = shift; # Open the specified mb log file and parse useful information from it my @taxa; my $ngen; my $nruns; my $burnin; my $burninfrac; my $samplefreq; open(my $log_file, "<", $log_file_name); while (my $line = <$log_file>) { if ($line =~ /Taxon\s+\d+ -> (\S+)/) { push(@taxa, $1); } elsif ($line =~ /Setting number of runs to (\d+)/) { $nruns = $1; } elsif ($line =~ /Setting burnin fraction to (\S+)/) { $burninfrac = $1; } elsif ($line =~ /Setting chain burn-in to (\d+)/) { $burnin = $1; } elsif ($line =~ /Setting sample frequency to (\d+)/) { $samplefreq = $1; } elsif ($line =~ /Setting number of generations to (\d+)/) { $ngen = $1; } } close($log_file); if (defined($burnin)) { #return {'SAMPLEFREQ' => $samplefreq, 'NRUNS' => $nruns, 'BURNIN' => $burnin }; return {'SAMPLEFREQ' => $samplefreq, 'NRUNS' => $nruns, 'BURNIN' => $burnin, 'TAXA' => \@taxa}; } else { return {'NGEN' => $ngen, 'NRUNS' => $nruns, 'BURNINFRAC' => $burninfrac, 'SAMPLEFREQ' => $samplefreq, 'TAXA' => \@taxa}; } } sub parse_mbsum_taxa { my $mbsum_file_name = shift; # Open the specified mbsum file and parse its taxa list my @taxa; my $in_translate_block = 0; open(my $mbsum_file, "<", $mbsum_file_name); while (my $line = <$mbsum_file>) { $in_translate_block++ if ($line =~ /translate/); if ($in_translate_block == 1 && $line =~ /\d+\s+([^,;]+)/) { push(@taxa, $1); } } close($mbsum_file); # Check if there were multiple translate blocks in the file which is indicative of an error with the creation of the file die "Something is amiss with '$mbsum_file_name', multiple translate blocks ($in_translate_block) were detected when there should only be one.\n" if ($in_translate_block > 1); # Check that we actually parsed something, otherwise input is improperly formatted/not mbsum output die "No taxa parsed for file '$mbsum_file_name', does '$archive' actually contain mbsum output?.\n" if (!@taxa); return \@taxa; } sub run_mbsum { my $tarball = shift; # Unzip specified tarball chomp(my @mb_files = `tar xvf '$mb_out_dir$tarball' -C '$mb_out_dir' 2>&1`); @mb_files = map { s/x //; $_ } @mb_files if ($os_name eq "darwin"); # Determine name for this partition's log file my $log_file_name; foreach my $file (@mb_files) { if ($file =~ /\.log$/) { $log_file_name = $file; last; } } die "Could not locate log file for '$tarball'.\n" if (!defined($log_file_name)); # Parse log file my $mb = parse_mb_log("$mb_out_dir$log_file_name"); (my $gene_name = $tarball) =~ s/\.nex\.tar\.gz//; # Determine number of trees mbsum should remove from each file my $trim; if ($mb->{BURNIN}) { $trim = $mb->{BURNIN} + 1; } else { $trim = ((($mb->{NGEN} / $mb->{SAMPLEFREQ}) * $mb->{NRUNS} * $mb->{BURNINFRAC}) / $mb->{NRUNS}) + 1; } # Summarize gene's tree files system("$mbsum '$mb_out_dir$gene_name.'*.t -n $trim -o '$mb_sum_dir$gene_name.sum' >/dev/null 2>&1"); # Clean up extracted files chdir($mb_out_dir); unlink(@mb_files); } sub okay_to_run { my $pids = shift; # Free up a CPU by sleeping for 10 ms usleep(10000); my $current_forks = scalar(@{$pids}); foreach my $index (reverse(0 .. $current_forks - 1)) { next if ($index < 0); my $pid = @{$pids}[$index]; my $wait = waitpid($pid, WNOHANG); # Successfully reaped child if ($wait > 0) { $current_forks--; #splice(@pids, $index, 1); splice(@{$pids}, $index, 1); } } return ($current_forks < $max_forks); } sub hashsum { my $settings = shift; my $file_path = $settings->{'FILE_PATH'}; open(my $file, "<", $file_path) or die "Couldn't open file '$file_path': $!.\n"; my $md5 = Digest::MD5->new; my $md5sum = $md5->addfile(*$file)->hexdigest; close($file); return $md5sum; } sub send_file { my $settings = shift; my $file_path = $settings->{'FILE_PATH'}; my $file_handle = $settings->{'FILE_HANDLE'}; my $hash = hashsum({'FILE_PATH' => $file_path}); print {$file_handle} "SEND_FILE: $file_path\n"; open(my $file, "<", $file_path) or die "Couldn't open file '$file_path': $!.\n"; while (<$file>) { print {$file_handle} $_; } close($file); print {$file_handle} " END_FILE: $hash\n"; # Stall until we know status of file transfer while (defined(my $response = <$file_handle>)) { chomp($response); last if ($response eq "TRANSFER_SUCCESS"); die "Unsuccessful file transfer, checksums did not match.\n" if ($response eq "TRANSFER_FAILURE"); } } sub receive_file { my $settings = shift; my $file_path = $settings->{'FILE_PATH'}; my $file_handle = $settings->{'FILE_HANDLE'}; my $check_hash; open(my $file, ">", $file_path); while (<$file_handle>) { if ($_ =~ /(.*) END_FILE: (\S+)/) { print {$file} $1; $check_hash = $2; last; } else { print {$file} $_; } } close($file); # Use md5 hashsum to make sure transfer worked my $hash = hashsum({'FILE_PATH' => $file_path}); if ($hash ne $check_hash) { die "Unsuccessful file transfer, checksums do not match.\n'$hash' - '$check_hash'\n"; # hopefully this never pops up print {$file_handle} "TRANSFER_FAILURE\n" } else { print {$file_handle} "TRANSFER_SUCCESS\n"; } } sub INT_handler { #dump_quartets(\%complete_queue); unlink(@unlink); # Kill ssh process(es) spawned by this script foreach my $pid (@pids) { #kill(-9, $pid); #kill(15, $pid); kill(1, $pid); } # Move into gene directory chdir("$initial_directory/$project_name"); rmdir($mb_out_dir); # Try to delete directory once per second for five seconds, if it can't be deleted print an error message # I've found this method is necessary for analyses performed on AFS drives my $count = 0; until (!-e $mb_sum_dir || $count == 5) { $count++; remove_tree($mb_sum_dir, {error => \my $err}); sleep(1); } #logger("Could not clean all files in './$gene_dir/'.") if ($count == 5); print "Could not clean all files in './$mb_sum_dir/'.\n" if ($count == 5); exit(0); } sub clean_up { my $settings = shift; my $remove_dirs = $settings->{'DIRS'}; my $current_dir = getcwd(); # chdir($alignment_root); # unlink(glob($gene_dir."$alignment_name*")); # #unlink($server_check_file) if (defined($server_check_file)); # # if ($remove_dirs) { # rmdir($gene_dir); # } chdir($current_dir); } sub get_num_digits { my $settings = shift; my $number = $settings->{'NUMBER'}; my $digits = 1; while (floor($number / 10) != 0) { $number = floor($number / 10); $digits++; } return $digits; } sub sec2human { my $secs = shift; # Constants my $secs_in_min = 60; my $secs_in_hour = 60 * 60; my $secs_in_day = 24 * 60 * 60; $secs = int($secs); return "0 seconds" if (!$secs); # Calculate units of time my $days = int($secs / $secs_in_day); my $hours = ($secs / $secs_in_hour) % 24; my $mins = ($secs / $secs_in_min) % 60; $secs = $secs % 60; # Format return nicely my $time; if ($days) { $time .= ($days != 1) ? "$days days, " : "$days day, "; } if ($hours) { $time .= ($hours != 1) ? "$hours hours, " : "$hours hour, "; } if ($mins) { $time .= ($mins != 1) ? "$mins minutes, " : "$mins minute, "; } if ($secs) { $time .= ($secs != 1) ? "$secs seconds " : "$secs second "; } else { # Remove comma chop($time); } chop($time); return $time; } sub get_free_cpus { my $os_name = $^O; # Returns a two-member array containing CPU usage observed by top, # top is run twice as its first output is usually inaccurate my @percent_free_cpu; if ($os_name eq "darwin") { # Mac OS chomp(@percent_free_cpu = `top -i 1 -l 2 | grep "CPU usage"`); } else { # Linux chomp(@percent_free_cpu = `top -b -n2 -d0.05 | grep "Cpu(s)"`); } my $percent_free_cpu = pop(@percent_free_cpu); if ($os_name eq "darwin") { # Mac OS $percent_free_cpu =~ s/.*?(\d+\.\d+)%\s+id.*/$1/; } else { # linux $percent_free_cpu =~ s/.*?(\d+\.\d)\s*%?ni,\s*(\d+\.\d)\s*%?id.*/$1 + $2/; # also includes %nice as free $percent_free_cpu = eval($percent_free_cpu); } my $total_cpus; if ($os_name eq "darwin") { # Mac OS $total_cpus = `sysctl -n hw.ncpu`; } else { # linux $total_cpus = `grep --count 'cpu' /proc/stat` - 1; } my $free_cpus = ceil($total_cpus * $percent_free_cpu / 100); if ($free_cpus == 0 || $free_cpus !~ /^\d+$/) { $free_cpus = 1; # assume that at least one cpu can be used } return $free_cpus; } sub run_cmd { my $command = shift; my $return = system($command); if ($return) { logger("'$command' died with error: '$return'.\n"); #kill(2, $parent_pid); exit(0); } } sub check_path_for_exec { my $exec = shift; my $path = $ENV{PATH}.":."; # include current directory as well my @path_dirs = split(":", $path); my $exec_path; foreach my $dir (@path_dirs) { $dir .= "/" if ($dir !~ /\/$/); $exec_path = abs_path($dir.$exec) if (-e $dir.$exec && -x $dir.$exec && !-d $dir.$exec); } die "Could not find the following executable: '$exec'. This script requires this program in your path.\n" if (!defined($exec_path)); return $exec_path; } # I grabbed this from StackOverflow so that's why its style is different #DontFixWhatIsntBroken: # https://stackoverflow.com/questions/10299961/in-perl-how-can-i-generate-all-possible-combinations-of-a-list sub combine { my ($list, $n) = @_; die "Insufficient list members" if ($n > @$list); return map [$_], @$list if ($n <= 1); my @comb; for (my $i = 0; $i+$n <= @$list; ++$i) { my $val = $list->[$i]; my @rest = @$list[$i + 1 .. $#$list]; push(@comb, [$val, @$_]) for combine(\@rest, $n - 1); } return @comb; } sub check_bucky_version { my $bucky = shift; print "\nChecking for BUCKy version >= 1.4.4...\n"; # Run BUCKy with --version and extract version info chomp(my @version_info = grep { /BUCKy version/ } `$bucky --version`); my $version_info = shift(@version_info); die " Could not determine BUCKy version.\n" if (!defined($version_info)); # Get the actual version number my $version; if ($version_info =~ /BUCKy\s+version\s+([^\s|,]+)/) { $version = $1; } die " Could not determine BUCKy version.\n" if (!defined($version_info)); # Version testing #my @versions = qw/2.1b 1.2.1000 1 0.9.8 2.3 1.4.5 1.4.3 1.4 1.500.2 1.4.4 1.4.4.1/; #foreach my $version (@versions) { print " BUCKy version: $version.\n"; # Split version number based on period delimiters my @version_parts = split(/\./, $version); # Future proofing if letters are ever used (we won't ever care about them) @version_parts = map { s/[a-zA-Z]+//g; $_ } @version_parts; die " Error determining BUCKy version.\n" if (!@version_parts); # Check that version is >= 1.4.4 if (defined($version_parts[0]) && int($version_parts[0]) > 1) { print " BUCKy version check passed.\n"; return; } elsif ((defined($version_parts[0]) && int($version_parts[0]) == 1) && (defined($version_parts[1]) && int($version_parts[1]) > 4)) { print " BUCKy version check passed.\n"; return; } elsif (((defined($version_parts[0]) && int($version_parts[0]) == 1) && (defined($version_parts[1]) && int($version_parts[1]) == 4)) && defined($version_parts[2]) && int($version_parts[2]) >= 4) { print " BUCKy version check passed.\n"; return; } else { die " BUCKy version check failed, update to version >= 1.4.4.\n"; } #print "\n"; #} } sub usage { return "Usage: bucky.pl [MRBAYES TARBALL]\n"; } sub help { print < EOF exit(0); }