From 1a4d99ade746e68d6b1c76afab0b4ecfa85a732b Mon Sep 17 00:00:00 2001 From: Philippe Canal <pcanal@fnal.gov> Date: Mon, 8 Feb 2021 15:46:14 -0600 Subject: [PATCH] Allow TBufferMerger to skip write/read cycle. If TBufferMerger can take the merge lock, then rather than queuing the input file, merge it immediately and skip the Writing the TTree, doing a memcpy, Pushing to the queue then Reading and then deleting parts. --- io/io/inc/ROOT/TBufferMerger.hxx | 3 ++ io/io/src/TBufferMerger.cxx | 51 ++++++++++++++++++++++---------- io/io/src/TBufferMergerFile.cxx | 8 +++++ 3 files changed, 47 insertions(+), 15 deletions(-) diff --git a/io/io/inc/ROOT/TBufferMerger.hxx b/io/io/inc/ROOT/TBufferMerger.hxx index c2e6822578d..cfca3648147 100644 --- a/io/io/inc/ROOT/TBufferMerger.hxx +++ b/io/io/inc/ROOT/TBufferMerger.hxx @@ -110,8 +110,11 @@ private: void Init(std::unique_ptr<TFile>); + void MergeImpl(); + void Merge(); void Push(TBufferFile *buffer); + bool TryMerge(TBufferMergerFile *memfile); size_t fAutoSave{0}; //< AutoSave only every fAutoSave bytes size_t fBuffered{0}; //< Number of bytes currently buffered diff --git a/io/io/src/TBufferMerger.cxx b/io/io/src/TBufferMerger.cxx index 6eae4df2dd5..72c1a07d3f6 100644 --- a/io/io/src/TBufferMerger.cxx +++ b/io/io/src/TBufferMerger.cxx @@ -112,24 +112,45 @@ void TBufferMerger::SetMergeOptions(const TString& options) void TBufferMerger::Merge() { if (fMergeMutex.try_lock()) { - std::queue<TBufferFile *> queue; - { - std::lock_guard<std::mutex> q(fQueueMutex); - std::swap(queue, fQueue); - fBuffered = 0; - } - - while (!queue.empty()) { - std::unique_ptr<TBufferFile> buffer{queue.front()}; - fMerger.AddAdoptFile(new TMemFile(fMerger.GetOutputFileName(), std::move(buffer))); - queue.pop(); - } - - fMerger.PartialMerge(TFileMerger::kAll | TFileMerger::kIncremental | TFileMerger::kDelayWrite); - fMerger.Reset(); + MergeImpl(); fMergeMutex.unlock(); } } +void TBufferMerger::MergeImpl() +{ + std::queue<TBufferFile *> queue; + { + std::lock_guard<std::mutex> q(fQueueMutex); + std::swap(queue, fQueue); + fBuffered = 0; + } + + while (!queue.empty()) { + std::unique_ptr<TBufferFile> buffer{queue.front()}; + fMerger.AddAdoptFile(new TMemFile(fMerger.GetOutputFileName(), std::move(buffer))); + queue.pop(); + } + + fMerger.PartialMerge(TFileMerger::kAll | TFileMerger::kIncremental | TFileMerger::kDelayWrite); + fMerger.Reset(); +} + +bool TBufferMerger::TryMerge(ROOT::Experimental::TBufferMergerFile *memfile) +{ + if (fMergeMutex.try_lock()) + { + memfile->WriteStreamerInfo(); + fMerger.AddFile(memfile); + MergeImpl(); + fMergeMutex.unlock(); + return true; + } else + { + return false; + } + +} + } // namespace Experimental } // namespace ROOT diff --git a/io/io/src/TBufferMergerFile.cxx b/io/io/src/TBufferMergerFile.cxx index f4d7205b29d..249882bfc67 100644 --- a/io/io/src/TBufferMergerFile.cxx +++ b/io/io/src/TBufferMergerFile.cxx @@ -29,6 +29,14 @@ TBufferMergerFile::~TBufferMergerFile() Int_t TBufferMergerFile::Write(const char *name, Int_t opt, Int_t bufsize) { + // Instead of Writing the TTree, doing a memcpy, Pushing to the queue + // then Reading and then deleting, let's see if we can just merge using + // the live TTree. + if (fMerger.TryMerge(this)) { + ResetAfterMerge(0); + return 0; + } + Int_t nbytes = TMemFile::Write(name, opt, bufsize); if (nbytes) { -- GitLab