- Removing duplicate information (taskName stored in 3 places)

This commit is contained in:
Ian Roddis
2021-08-31 12:21:34 -03:00
parent 2c00001e0b
commit 7b07380e16
10 changed files with 41 additions and 54 deletions

View File

@@ -8,12 +8,6 @@ Tasks
- Core Functionality
- Allow `{{,!DATES}}` style interpolation, where values from `{{DATES}}` are preserved as a single, comma
separated list
- Allow for tasks to define next tasks
- Refactor [de]serialization so that a task can be parsed by itself
- Add notation of parameterValues
- Tasks are now refered by two names:
- baseName is the original name in the spec
- name is the individual tasks
- Add execution gates
- Executors
- [ ] Slurm Executor

View File

@@ -23,7 +23,6 @@ namespace daggy {
struct Vertex {
RunState state;
uint32_t depCount;
K key;
V data;
std::unordered_set<K> children;
};
@@ -64,11 +63,11 @@ namespace daggy {
void setVertexState(const K &id, RunState state);
void forEach(std::function<void(const Vertex<K, V> &)> fun) const;
void forEach(std::function<void(const std::pair<K, Vertex<K, V>> &)> fun) const;
bool allVisited() const;
std::optional<const Vertex<K, V>> visitNext();
std::optional<std::pair<K, V>> visitNext();
Vertex<K, V> &getVertex(const K &id);

View File

@@ -18,8 +18,7 @@ namespace daggy {
ss << "A vertex with ID " << id << " already exists in the DAG";
throw std::runtime_error(ss.str());
}
vertices_.emplace(id, Vertex<K, V>{.state = RunState::QUEUED, .depCount = 0, .key = id, .data = data
});
vertices_.emplace(id, Vertex<K, V>{.state = RunState::QUEUED, .depCount = 0, .data = data});
}
template<typename K, typename V>
@@ -90,12 +89,13 @@ namespace daggy {
}
template<typename K, typename V>
std::optional<const Vertex <K, V>> DAG<K, V>::visitNext() {
std::optional<std::pair<K, V>>
DAG<K, V>::visitNext() {
for (auto &[k, v]: vertices_) {
if (v.state != +RunState::QUEUED) continue;
if (v.depCount != 0) continue;
v.state = RunState::RUNNING;
return v;
return std::make_pair(k, v.data);
}
return {};
}
@@ -110,10 +110,17 @@ namespace daggy {
}
template<typename K, typename V>
void DAG<K, V>::forEach(std::function<void(const Vertex <K, V> &)> fun) const {
for (const auto &[_, v]: vertices_) {
fun(v);
}
}
void DAG<K, V>::forEach(std::function<void(const std::pair<K, Vertex < K, V>> &)
> fun) const {
for (
auto it = vertices_.begin();
it != vertices_.
end();
++it) {
fun(*it);
}
}
}

View File

@@ -33,8 +33,6 @@ namespace daggy {
);
struct Task {
std::string name;
// definedName is the name from the original DAGDefinition.
std::string definedName;
std::vector<std::string> command;
uint32_t maxRetries;
@@ -44,7 +42,7 @@ namespace daggy {
bool isGenerator; // True if the output of this task is a JSON set of tasks to complete
bool operator==(const Task &other) const {
return (name == other.name)
return (definedName == other.definedName)
and (maxRetries == other.maxRetries)
and (retryIntervalSeconds == other.retryIntervalSeconds)
and (command == other.command)

View File

@@ -19,9 +19,6 @@ namespace daggy {
std::vector<Command> expandCommands(const std::vector<std::string> &command, const ParameterValues &parameters);
std::unordered_set<std::string>
findDerivedVertices(TaskDAG &dag, const std::string &definedName);
TaskDAG
buildDAGFromTasks(TaskList &tasks,
const std::vector<loggers::dag_run::TaskUpdateRecord> &updates = {});
@@ -31,7 +28,7 @@ namespace daggy {
// Blocking call
std::vector<AttemptRecord>
runTask(DAGRunID runID,
TaskID taskID,
const std::string &taskName,
const Task &task,
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger);

View File

@@ -93,7 +93,6 @@ namespace daggy {
for (size_t tid = 0; tid < commands.size(); ++tid) {
std::string taskName = (commands.size() == 1 ? name : name + "_" + std::to_string(tid));
tasks.emplace(taskName, Task{
.name = taskName,
.definedName = name,
.command = commands[tid],
.maxRetries = maxRetries,
@@ -136,7 +135,6 @@ namespace daggy {
bool first = false;
ss << "{"
<< R"("name": )" << std::quoted(task.name) << ','
<< R"("maxRetries": )" << task.maxRetries << ','
<< R"("retryIntervalSeconds": )" << task.retryIntervalSeconds << ',';

View File

@@ -53,17 +53,6 @@ namespace daggy {
return commands;
}
std::unordered_set<std::string>
findDerivedVertices(TaskDAG &dag, const std::string &definedName) {
std::unordered_set<std::string> vertices;
dag.forEach([&](const auto &v) {
if (v.data.definedName == definedName) {
vertices.insert(v.data.name);
}
});
return vertices;
}
void updateDAGFromTasks(TaskDAG &dag, TaskList &tasks) {
// Add all the vertices
std::unordered_map<std::string, std::unordered_set<std::string>> definedSets;
@@ -113,17 +102,18 @@ namespace daggy {
}
std::vector<AttemptRecord> runTask(DAGRunID runID,
const std::string &taskName,
const Task &task,
executors::task::TaskExecutor &executor,
loggers::dag_run::DAGRunLogger &logger) {
std::vector<AttemptRecord> attempts;
logger.updateTaskState(runID, task.name, RunState::RUNNING);
logger.updateTaskState(runID, taskName, RunState::RUNNING);
while (attempts.size() < task.maxRetries + 1) {
attempts.push_back(executor.runCommand(task));
logger.logTaskAttempt(runID, task.name, attempts.back());
logger.logTaskAttempt(runID, taskName, attempts.back());
if (attempts.back().rc == 0) break;
logger.updateTaskState(runID, task.name, RunState::RETRY);
logger.updateTaskState(runID, taskName, RunState::RETRY);
}
return attempts;
}
@@ -167,7 +157,7 @@ namespace daggy {
}
logger.updateTask(runID, taskName, task);
} catch (std::exception &e) {
logger.updateTaskState(runID, task.name, RunState::ERRORED);
logger.updateTaskState(runID, taskName, RunState::ERRORED);
++errored;
}
}
@@ -185,9 +175,10 @@ namespace daggy {
auto t = dag.visitNext();
while (t.has_value()) {
// Schedule the task to run
auto vertex = t.value();
runningTasks.emplace(vertex.data.name, tq->addTask([runID, vertex, &executor, &logger]() {
return runTask(runID, vertex.data, executor, logger);
auto &taskName = t.value().first;
auto &task = t.value().second;
runningTasks.emplace(taskName, tq->addTask([runID, taskName, task, &executor, &logger]() {
return runTask(runID, taskName, task, executor, logger);
}));
++running;

View File

@@ -158,10 +158,10 @@ namespace daggy {
ifh.close();
// Task states
for (const auto &[_, task]: record.tasks) {
auto taskStateFile = runRoot / task.name / "states.csv";
for (const auto &[taskName, task]: record.tasks) {
auto taskStateFile = runRoot / taskName / "states.csv";
if (!fs::exists(taskStateFile)) {
record.taskRunStates.emplace(task.name, RunState::QUEUED);
record.taskRunStates.emplace(taskName, RunState::QUEUED);
continue;
}
@@ -170,7 +170,7 @@ namespace daggy {
std::stringstream ss{line};
while (std::getline(ss, token, ',')) { continue; }
RunState taskState = RunState::_from_string(token.c_str());
record.taskRunStates.emplace(task.name, taskState);
record.taskRunStates.emplace(taskName, taskState);
ifh.close();
}
return record;

View File

@@ -25,8 +25,8 @@ namespace daggy {
os_ << "Starting new DAGRun named " << name << " with ID " << runID << " and " << tasks.size()
<< " tasks" << std::endl;
for (const auto &[_, task]: tasks) {
os_ << "TASK (" << task.name << "): ";
for (const auto &[name, task]: tasks) {
os_ << "TASK (" << name << "): ";
std::copy(task.command.begin(), task.command.end(),
std::ostream_iterator<std::string>(os_, " "));
os_ << std::endl;

View File

@@ -70,8 +70,8 @@ TEST_CASE("DAG Traversal Tests", "[dag]") {
size_t i = 0;
while (!dag.allVisited()) {
const auto &v = dag.visitNext().value();
dag.completeVisit(v.key);
visitOrder[v.key] = i;
dag.completeVisit(v.first);
visitOrder[v.first] = i;
++i;
}
@@ -83,7 +83,10 @@ TEST_CASE("DAG Traversal Tests", "[dag]") {
SECTION("Iteration") {
size_t nVisited = 0;
dag.forEach([&](const daggy::Vertex<size_t, size_t> &) { ++nVisited; });
dag.forEach([&](auto &k) {
(void) k;
++nVisited;
});
REQUIRE(nVisited == dag.size());
}
}