-
Eike Cochu authored
added patched version of dtm project updated dtm analyzer, unfinished added sequence to models, topicfull, stores a single sequence
Eike Cochu authoredadded patched version of dtm project updated dtm analyzer, unfinished added sequence to models, topicfull, stores a single sequence
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
c2_lib.c 9.57 KiB
#include "gflags.h"
#include "strutil.h"
#include "c2_lib.h"
#include <sys/stat.h>
#include <string.h>
#include <hash_set.h>
#include <hash_map.h>
#include <vector.h>
#include "sys/wait.h"
#include <stdlib.h>
DEFINE_bool(run_locally, false,
"If true, run locally.");
DEFINE_string(bin_directory, ".",
"The directory in which binaries live.");
DEFINE_bool(resume, false,
"If true, resume a halted job.");
namespace c2_lib {
bool FileRemove(string file_name) {
return remove(file_name.c_str()) == 0;
}
string CurrentWorkingDirectory() {
char* cwd = getcwd(NULL, 0);
string result(cwd);
free(cwd);
return result;
}
bool FileExists(string file_name) {
struct stat stFileInfo;
bool blnReturn;
int intStat;
// Attempt to get the file attributes
intStat = stat(file_name.c_str(), &stFileInfo);
if(intStat == 0) {
return(true);
} else {
return(false);
}
}
void WaitOnTask(const Task& task) {
bool done = false;
double backoff(1.0);
while (!done) {
done = true;
if (task.Status() != Task::SUCCESS) {
done = false;
if (task.Status() == Task::FAIL) {
printf("Error. Task failed. Debug: %s\n",
task.DebugString().c_str());
exit(1);
}
}
if (backoff > 2.0) {
sleep(backoff);
}
backoff *= 1.1;
if (backoff > 30) {
backoff = 30;
}
}
}
void WaitOnTasks(const vector<Task*>& tasks) {
double backoff(1.0);
vector<Task*> pending_tasks = tasks;
hash_map<long, int> task_tries;
while (!pending_tasks.empty()) {
for (vector<Task*>::iterator it=pending_tasks.begin();
it != pending_tasks.end();
) {
Task* task = *it;
if (task->Status() != Task::SUCCESS) {
if (task->Status() == Task::FAIL) {
if (task_tries.find((long) task) == task_tries.end()) {
task_tries[(long) task] = 1;
}
// Sometimes tasks fail for no good reason.
// Retry up to 3 times.
if (task_tries[(long) task] > 3) {
printf("Error. Task %d failed.\n",
task->id);
printf("Debug string: %s\n",
task->DebugString().c_str());
exit(1);
} else {
printf("Task %d failed. Retrying. Try: %lld\n",
task->id,
task_tries[(long) task]);
printf("Debug string: %s\n",
task->DebugString().c_str());
++task_tries[(long) task];
task->Start();
}
}
// Don't break, since it's worth
// checking whether any tasks have failed
// (in which case, we should die).
++it;
} else {
printf("Removing task.\n");
// Delete this element.
it = pending_tasks.erase(it);
}
}
printf("Waiting on %d tasks.\n",
pending_tasks.size());
if (backoff > 2.0) {
sleep(backoff);
}
backoff *= 1.1;
if (backoff > 30) {
backoff = 30;
}
}
}
Resource::Resource(string filename)
: filename(filename) {
}
Resource::~Resource() {
// We should not let this resource go out of scope if tasks are
// still using it.
if (references_.size() != 0) {
printf("Error. Task exists for deleted reference.");
exit(1);
}
}
bool Resource::Available() {
return FileExists(filename);
}
void Resource::TaskDone(int id) {
hash_set<int>::iterator it = references_.find(id);
if (it != references_.end()) {
references_.erase(it);
}
}
// Task definitions.
string Task::working_directory_ = "";
void Task::AddResource(Resource* resource) {
resources_.push_back(resource);
resource->AddReference(id);
}
Task::Task(string name, int id)
: command_(""),
id(id),
name(name),
done_(false),
resources_()
{
}
Task::~Task() {
for (vector<Resource*>::iterator it = resources_.begin();
it != resources_.end();
++it) {
(*it)->TaskDone(id);
}
}
void Task::ResourcesAvailable_(bool* available,
string* resource_filename) {
*available = true;
for (vector<Resource*>::iterator it = resources_.begin();
it != resources_.end() && *available;
++it) {
if (!(*it)->Available()) {
*available = false;
*resource_filename = (*it)->filename;
}
}
*available = true;
return;
}
}
namespace dtm {
using c2_lib::FileRemove;
using c2_lib::FileExists;
using c2_lib::CurrentWorkingDirectory;
string QSubTask::binary_ = "";
double ReadLikelihoods(const vector<string>& done_sentinels) {
double l_hood = 0.0;
for (vector<string>::const_iterator it = done_sentinels.begin();
it != done_sentinels.end();
++it) {
double l_hood_tmp;
printf("Likelihood: %.2lf\n", l_hood);
FILE* f = fopen((*it).c_str(), "r");
if (f) {
fscanf(f, "%lf", &l_hood_tmp);
fclose(f);
} else {
printf("Error reading likelihood from file %s.\n",
(*it).c_str());
}
l_hood += l_hood_tmp;
printf("Likelihood: %.2lf\n", l_hood);
}
return l_hood;
}
string QSubTask::DebugString() const {
return StringPrintf("stderr: %s\n"
"stdout: %s\n"
"sentinel: %s\n"
"full commandline: %s\n",
stderr_filename_.c_str(),
stdout_filename_.c_str(),
done_sentinel_.c_str(),
full_commandline_.c_str());
}
static int vsystem(string command) {
int pid;
int return_code = 0;
pid = vfork();
if (pid == 0) {
// We're the child process. Execute.
execl("/bin/sh",
"/bin/sh",
command.c_str(),
(char*) 0);
_exit (EXIT_FAILURE);
} else if (pid < 0) {
// The fork failed. Report failure.
return_code = -1;
} else {
// This is the parent process. Wait for the
// child to complete.
if (waitpid(pid, &return_code, 0) != pid) {
return_code = -1;
}
}
return return_code;
}
void QSubTask::Start() {
string resource_filename;
bool available;
ResourcesAvailable_(&available,
&resource_filename);
if (!available) {
printf("Error. Task resource unavailable:%s\n",
resource_filename.c_str());
}
if (done_sentinel_.empty()) {
stdout_filename_ = StringPrintf(
"%s/%s_%d.out",
working_directory_.c_str(),
name.c_str(),
id);
stderr_filename_ = StringPrintf(
"%s/%s_%d.err",
working_directory_.c_str(),
name.c_str(),
id);
done_sentinel_ = StringPrintf(
"%s/%s_%d.done",
working_directory_.c_str(),
name.c_str(),
id);
}
if (FileExists(done_sentinel_)) {
if (!FileRemove(done_sentinel_)) {
printf("Error removing file. Failing.\n");
exit(1);
}
}
if (FileExists(stdout_filename_)) {
if (!FileRemove(stdout_filename_)) {
printf("Error removing file. Failing.\n");
exit(1);
}
}
if (FileExists(stderr_filename_)) {
if (!FileRemove(stderr_filename_)) {
printf("Error removing file. Failing.\n");
exit(1);
}
}
string job_command = StringPrintf("%s/%s",
FLAGS_bin_directory.c_str(),
command_.c_str());
const string kFilename = "/tmp/command_asdf.sh";
if (FLAGS_run_locally) {
full_commandline_ = StringPrintf("nohup ./%s",
job_command.c_str(),
binary_.c_str(),
stdout_filename_.c_str(),
stderr_filename_.c_str());
} else {
// TODO(sgerrish): Add proper temp file naming.
full_commandline_ = StringPrintf(
"echo \'cd %s ; %s ; \n \' "
"| %s -o %s -e %s "
"-l walltime=%d:00:00,mem=%dmb\n",
CurrentWorkingDirectory().c_str(),
job_command.c_str(),
binary_.c_str(),
stdout_filename_.c_str(),
stderr_filename_.c_str(),
walltime_hours_,
memory_mb_);
}
printf("Current working directory: %s\n.", CurrentWorkingDirectory().c_str());
FILE* f = fopen(kFilename.c_str(), "w");
if (f) {
fprintf(f, full_commandline_.c_str());
fclose(f);
chmod(kFilename.c_str(), 0744);
} else {
printf("Error opening file /tmp/command_asdf.txt\n Skipping.");
}
printf("Running command: %s.\n", full_commandline_.c_str());
int return_code = vsystem(kFilename.c_str());
if (return_code) {
printf("Return code %d when running command:\n"
"%s\n",
return_code,
full_commandline_.c_str());
exit(1);
}
}
Task::RunStatus QSubTask::Status() const {
if (FileExists(done_sentinel_)) {
return Task::SUCCESS;
}
if (!FileExists(stderr_filename_)) {
return(Task::RUN);
} else {
if (!FileExists(done_sentinel_)) {
return(Task::FAIL);
}
}
}
QSubTask* TaskFactory::NewTask(int id,
string job_name,
const char* root,
const hash_map<std::string, std::string>& flags,
const vector<Resource*>& resources,
const string& done_sentinel,
string binary) {
QSubTask* task = new QSubTask(job_name, id);
string flags_string;
/*
for (int i=0; i < resources.size(); ++i) {
task->AddResource(resources[i]);
if (i) {
resource_string += " ";
}
resource_string += ("--"
+ resources[i]->name
+ "="
+ resources[i]->filename);
}
*/
for (hash_map<string, string>::const_iterator it=flags.begin();
it != flags.end();
++it) {
if (it != flags.begin()) {
flags_string += " ";
}
flags_string += ("--"
+ it->first
+ "="
+ it->second);
}
task->set_done_sentinel(done_sentinel);
task->set_working_directory(root);
task->set_command(binary + " " + flags_string);
if (flags.find("done_sentinel") != flags.end()) {
// task->set_done_sentinel(flags["done_sentinel"]);
task->set_done_sentinel(flags.find("done_sentinel")->second);
}
// Set the parallel-processing binary appropriately.
task->set_binary("/opt/torque/bin/qsub");
return(task);
}
void CreateSentinel(const string& sentinel_filename,
double l_hood) {
FILE* f = fopen(sentinel_filename.c_str(), "w");
fprintf(f, "%lf\n", l_hood);
fclose(f);
}
} // namespace dtm