Skip to content
Snippets Groups Projects
Commit 1a4d99ad authored by Philippe Canal's avatar Philippe Canal
Browse files

Allow TBufferMerger to skip write/read cycle.

If TBufferMerger can take the merge lock, then rather than queuing the input file, merge it immediately
and skip the Writing the TTree, doing a memcpy, Pushing to the queue then Reading and then deleting parts.
parent 69f4b97f
No related branches found
No related tags found
No related merge requests found
......@@ -110,8 +110,11 @@ private:
void Init(std::unique_ptr<TFile>);
void MergeImpl();
void Merge();
void Push(TBufferFile *buffer);
bool TryMerge(TBufferMergerFile *memfile);
size_t fAutoSave{0}; //< AutoSave only every fAutoSave bytes
size_t fBuffered{0}; //< Number of bytes currently buffered
......
......@@ -112,24 +112,45 @@ void TBufferMerger::SetMergeOptions(const TString& options)
void TBufferMerger::Merge()
{
if (fMergeMutex.try_lock()) {
std::queue<TBufferFile *> queue;
{
std::lock_guard<std::mutex> q(fQueueMutex);
std::swap(queue, fQueue);
fBuffered = 0;
}
while (!queue.empty()) {
std::unique_ptr<TBufferFile> buffer{queue.front()};
fMerger.AddAdoptFile(new TMemFile(fMerger.GetOutputFileName(), std::move(buffer)));
queue.pop();
}
fMerger.PartialMerge(TFileMerger::kAll | TFileMerger::kIncremental | TFileMerger::kDelayWrite);
fMerger.Reset();
MergeImpl();
fMergeMutex.unlock();
}
}
void TBufferMerger::MergeImpl()
{
std::queue<TBufferFile *> queue;
{
std::lock_guard<std::mutex> q(fQueueMutex);
std::swap(queue, fQueue);
fBuffered = 0;
}
while (!queue.empty()) {
std::unique_ptr<TBufferFile> buffer{queue.front()};
fMerger.AddAdoptFile(new TMemFile(fMerger.GetOutputFileName(), std::move(buffer)));
queue.pop();
}
fMerger.PartialMerge(TFileMerger::kAll | TFileMerger::kIncremental | TFileMerger::kDelayWrite);
fMerger.Reset();
}
bool TBufferMerger::TryMerge(ROOT::Experimental::TBufferMergerFile *memfile)
{
if (fMergeMutex.try_lock())
{
memfile->WriteStreamerInfo();
fMerger.AddFile(memfile);
MergeImpl();
fMergeMutex.unlock();
return true;
} else
{
return false;
}
}
} // namespace Experimental
} // namespace ROOT
......@@ -29,6 +29,14 @@ TBufferMergerFile::~TBufferMergerFile()
Int_t TBufferMergerFile::Write(const char *name, Int_t opt, Int_t bufsize)
{
// Instead of Writing the TTree, doing a memcpy, Pushing to the queue
// then Reading and then deleting, let's see if we can just merge using
// the live TTree.
if (fMerger.TryMerge(this)) {
ResetAfterMerge(0);
return 0;
}
Int_t nbytes = TMemFile::Write(name, opt, bufsize);
if (nbytes) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment