Checkpointing work, while serialization / deserialization is figured out.
This commit is contained in:
@@ -26,6 +26,9 @@ namespace daggy {
|
||||
std::vector<Task> buildTasks(const rj::Document & spec, const ParameterValues & parameters = {});
|
||||
std::vector<Command> expandCommands(const std::vector<std::string> & command, const ParameterValues & parameters);
|
||||
|
||||
// Serialization
|
||||
rj::Document taskToJSON(const Task & task);
|
||||
|
||||
// DAG execution
|
||||
// DAG vertex IDs should correspond to the position of tasks in vector. e.g. Vertex ID 0 corresponds to tasks[0]
|
||||
// I'm not crazy about this loose coupling, but
|
||||
|
||||
@@ -47,9 +47,13 @@ namespace daggy {
|
||||
|
||||
private:
|
||||
fs::path root_;
|
||||
std::atomic<DAGRunID> nextRunID_;
|
||||
std::mutex lock_;
|
||||
|
||||
const fs::path getCurrentPath() const;
|
||||
const fs::path getRunsRoot() const;
|
||||
std::unordered_map<fs::path, std::mutex> runLocks;
|
||||
|
||||
inline const fs::path getCurrentPath() const;
|
||||
inline const fs::path getRunsRoot() const;
|
||||
inline const fs::path getRunRoot(DAGRunID runID) const;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -3,20 +3,39 @@
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace daggy {
|
||||
const fs::path FileSystemLogger::getCurrentPath() const { return root_ / "current"; }
|
||||
const fs::path FileSystemLogger::getRunsRoot() const { return root_ / "runs"; }
|
||||
inline const fs::path FileSystemLogger::getCurrentPath() const { return root_ / "current"; }
|
||||
inline const fs::path FileSystemLogger::getRunsRoot() const { return root_ / "runs"; }
|
||||
inline const fs::path getRunRoot(DAGRunID runID) const { return getRunsRoot() / std::to_string(runID); }
|
||||
|
||||
FileSystemLogger(fs::path root)
|
||||
: root_(root)
|
||||
, nextRunID_(0)
|
||||
{
|
||||
std::vector<fs::paths> reqPaths{ root_, getCurrentPath(), getRunsRoot()};
|
||||
for (const path : reqPaths) {
|
||||
const std::vector<fs::paths> reqPaths{ root_, getCurrentPath(), getRunsRoot()};
|
||||
for (const auto & path : reqPaths) {
|
||||
if (! fs::exists(path)) { fs::create_directory(path); }
|
||||
}
|
||||
|
||||
// Get the next run ID
|
||||
size_t runID = 0;
|
||||
for (auto & dir : fs::std::filesystem::directory_iterator(getRunsRoot())) {
|
||||
try {
|
||||
runID = std::stoull(dir.stem());
|
||||
if (runID > nextRunID_) nextRunID_ = runID + 1;
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
|
||||
// Execution
|
||||
DAGRunID FileSystemLogger::startDAGRun(std::string name, const std::vector<Task> & tasks){ }
|
||||
DAGRunID FileSystemLogger::startDAGRun(std::string name, const std::vector<Task> & tasks){
|
||||
DAGRunID runID = nextRunID_++;
|
||||
|
||||
// TODO make this threadsafe
|
||||
fs::path runDir = getRunRoot(runID);
|
||||
std::lock_guard<std::mutex> guard(runLocks[runDir]);
|
||||
|
||||
// Init the directory
|
||||
}
|
||||
void FileSystemLogger::updateDAGRunState(DAGRunID dagRunId, RunState state){ }
|
||||
void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt){ }
|
||||
void FileSystemLogger::markTaskComplete(DAGRunID dagRun, size_t taskID){ }
|
||||
|
||||
Reference in New Issue
Block a user