Checkpointing work, while serialization / deserialization is figured out.

This commit is contained in:
Ian Roddis
2021-08-08 13:05:13 -03:00
parent 1a467428a2
commit df63d944c1
2 changed files with 30 additions and 7 deletions

View File

@@ -47,9 +47,13 @@ namespace daggy {
private:
fs::path root_;
std::atomic<DAGRunID> nextRunID_;
std::mutex lock_;
const fs::path getCurrentPath() const;
const fs::path getRunsRoot() const;
std::unordered_map<fs::path, std::mutex> runLocks;
inline const fs::path getCurrentPath() const;
inline const fs::path getRunsRoot() const;
inline const fs::path getRunRoot(DAGRunID runID) const;
};
}

View File

@@ -3,20 +3,39 @@
namespace fs = std::filesystem;
namespace daggy {
const fs::path FileSystemLogger::getCurrentPath() const { return root_ / "current"; }
const fs::path FileSystemLogger::getRunsRoot() const { return root_ / "runs"; }
inline const fs::path FileSystemLogger::getCurrentPath() const { return root_ / "current"; }
inline const fs::path FileSystemLogger::getRunsRoot() const { return root_ / "runs"; }
inline const fs::path getRunRoot(DAGRunID runID) const { return getRunsRoot() / std::to_string(runID); }
FileSystemLogger(fs::path root)
: root_(root)
, nextRunID_(0)
{
std::vector<fs::paths> reqPaths{ root_, getCurrentPath(), getRunsRoot()};
for (const path : reqPaths) {
const std::vector<fs::paths> reqPaths{ root_, getCurrentPath(), getRunsRoot()};
for (const auto & path : reqPaths) {
if (! fs::exists(path)) { fs::create_directory(path); }
}
// Get the next run ID
size_t runID = 0;
for (auto & dir : fs::std::filesystem::directory_iterator(getRunsRoot())) {
try {
runID = std::stoull(dir.stem());
if (runID > nextRunID_) nextRunID_ = runID + 1;
} catch {}
}
}
// Execution
DAGRunID FileSystemLogger::startDAGRun(std::string name, const std::vector<Task> & tasks){ }
DAGRunID FileSystemLogger::startDAGRun(std::string name, const std::vector<Task> & tasks){
DAGRunID runID = nextRunID_++;
// TODO make this threadsafe
fs::path runDir = getRunRoot(runID);
std::lock_guard<std::mutex> guard(runLocks[runDir]);
// Init the directory
}
void FileSystemLogger::updateDAGRunState(DAGRunID dagRunId, RunState state){ }
void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt){ }
void FileSystemLogger::markTaskComplete(DAGRunID dagRun, size_t taskID){ }