diff --git a/daggy/include/daggy/dagloggers/FileSystemLogger.hpp b/daggy/include/daggy/dagloggers/FileSystemLogger.hpp index 75d9d23..a166d7e 100644 --- a/daggy/include/daggy/dagloggers/FileSystemLogger.hpp +++ b/daggy/include/daggy/dagloggers/FileSystemLogger.hpp @@ -47,9 +47,13 @@ namespace daggy { private: fs::path root_; + std::atomic nextRunID_; std::mutex lock_; - const fs::path getCurrentPath() const; - const fs::path getRunsRoot() const; + std::unordered_map runLocks; + + inline const fs::path getCurrentPath() const; + inline const fs::path getRunsRoot() const; + inline const fs::path getRunRoot(DAGRunID runID) const; }; } diff --git a/daggy/src/dagloggers/FileSystemLogger.cpp b/daggy/src/dagloggers/FileSystemLogger.cpp index 390f615..d37315d 100644 --- a/daggy/src/dagloggers/FileSystemLogger.cpp +++ b/daggy/src/dagloggers/FileSystemLogger.cpp @@ -3,20 +3,39 @@ namespace fs = std::filesystem; namespace daggy { - const fs::path FileSystemLogger::getCurrentPath() const { return root_ / "current"; } - const fs::path FileSystemLogger::getRunsRoot() const { return root_ / "runs"; } + inline const fs::path FileSystemLogger::getCurrentPath() const { return root_ / "current"; } + inline const fs::path FileSystemLogger::getRunsRoot() const { return root_ / "runs"; } + inline const fs::path getRunRoot(DAGRunID runID) const { return getRunsRoot() / std::to_string(runID); } FileSystemLogger(fs::path root) : root_(root) + , nextRunID_(0) { - std::vector reqPaths{ root_, getCurrentPath(), getRunsRoot()}; - for (const path : reqPaths) { + const std::vector reqPaths{ root_, getCurrentPath(), getRunsRoot()}; + for (const auto & path : reqPaths) { if (! fs::exists(path)) { fs::create_directory(path); } } + + // Get the next run ID + size_t runID = 0; + for (auto & dir : fs::std::filesystem::directory_iterator(getRunsRoot())) { + try { + runID = std::stoull(dir.stem()); + if (runID > nextRunID_) nextRunID_ = runID + 1; + } catch {} + } } // Execution - DAGRunID FileSystemLogger::startDAGRun(std::string name, const std::vector & tasks){ } + DAGRunID FileSystemLogger::startDAGRun(std::string name, const std::vector & tasks){ + DAGRunID runID = nextRunID_++; + + // TODO make this threadsafe + fs::path runDir = getRunRoot(runID); + std::lock_guard guard(runLocks[runDir]); + + // Init the directory + } void FileSystemLogger::updateDAGRunState(DAGRunID dagRunId, RunState state){ } void FileSystemLogger::logTaskAttempt(DAGRunID, size_t taskID, const AttemptRecord & attempt){ } void FileSystemLogger::markTaskComplete(DAGRunID dagRun, size_t taskID){ }