Logo ROOT   6.10/02
Reference Guide
TDFNodes.cxx
Go to the documentation of this file.
1 // Author: Enrico Guiraud, Danilo Piparo CERN 03/2017
2 
3 /*************************************************************************
4  * Copyright (C) 1995-2016, Rene Brun and Fons Rademakers. *
5  * All rights reserved. *
6  * *
7  * For the licensing terms see $ROOTSYS/LICENSE. *
8  * For the list of contributors see $ROOTSYS/README/CREDITS. *
9  *************************************************************************/
10 
11 #include "RConfigure.h" // R__USE_IMT
12 #include "ROOT/TDFNodes.hxx"
13 #include "ROOT/TSpinMutex.hxx"
15 #ifdef R__USE_IMT
16 #include "ROOT/TThreadExecutor.hxx"
17 #endif
18 #include "RtypesCore.h" // Long64_t
19 #include "TROOT.h" // IsImplicitMTEnabled
20 #include "TTreeReader.h"
21 
22 #include <cassert>
23 #include <mutex>
24 #include <numeric> // std::accumulate
25 #include <string>
26 class TDirectory;
27 class TTree;
28 using namespace ROOT::Detail::TDF;
29 using namespace ROOT::Internal::TDF;
30 
31 namespace ROOT {
32 namespace Internal {
33 namespace TDF {
34 
35 TActionBase::TActionBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches, unsigned int nSlots)
36  : fImplPtr(implPtr), fTmpBranches(tmpBranches), fNSlots(nSlots)
37 {
38 }
39 
40 } // end NS TDF
41 } // end NS Internal
42 } // end NS ROOT
43 
44 TCustomColumnBase::TCustomColumnBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches, std::string_view name,
45  unsigned int nSlots)
46  : fImplPtr(implPtr), fTmpBranches(tmpBranches), fName(name), fNSlots(nSlots){};
47 
48 ColumnNames_t TCustomColumnBase::GetTmpBranches() const
49 {
50  return fTmpBranches;
51 }
52 
53 std::string TCustomColumnBase::GetName() const
54 {
55  return fName;
56 }
57 
59 {
60  return fImplPtr;
61 }
62 
63 TFilterBase::TFilterBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches, std::string_view name,
64  unsigned int nSlots)
65  : fImplPtr(implPtr), fTmpBranches(tmpBranches), fLastCheckedEntry(nSlots, -1), fLastResult(nSlots),
66  fAccepted(nSlots), fRejected(nSlots), fName(name), fNSlots(nSlots)
67 {
68 }
69 
71 {
72  return fImplPtr;
73 }
74 
75 ColumnNames_t TFilterBase::GetTmpBranches() const
76 {
77  return fTmpBranches;
78 }
79 
81 {
82  return !fName.empty();
83 };
84 
86 {
87  if (fName.empty()) // PrintReport is no-op for unnamed filters
88  return;
89  const auto accepted = std::accumulate(fAccepted.begin(), fAccepted.end(), 0ULL);
90  const auto all = accepted + std::accumulate(fRejected.begin(), fRejected.end(), 0ULL);
91  double perc = accepted;
92  if (all > 0) perc /= all;
93  perc *= 100.;
94  Printf("%-10s: pass=%-10lld all=%-10lld -- %8.3f %%", fName.c_str(), accepted, all, perc);
95 }
96 
97 // This is an helper class to allow to pick a slot without resorting to a map
98 // indexed by thread ids.
99 // WARNING: this class does not work as a regular stack. The size is
100 // fixed at construction time and no blocking is foreseen.
101 class TSlotStack {
102 private:
103  unsigned int fCursor;
104  std::vector<unsigned int> fBuf;
105  ROOT::TSpinMutex fMutex;
106 
107 public:
108  TSlotStack() = delete;
109  TSlotStack(unsigned int size) : fCursor(size), fBuf(size) { std::iota(fBuf.begin(), fBuf.end(), 0U); }
110  void Push(unsigned int slotNumber);
111  unsigned int Pop();
112 };
113 
114 void TSlotStack::Push(unsigned int slotNumber)
115 {
116  std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
117  fBuf[fCursor++] = slotNumber;
118  assert(fCursor <= fBuf.size() && "TSlotStack assumes that at most a fixed number of values can be present in the "
119  "stack. fCursor is greater than the size of the internal buffer. This violates "
120  "such assumption.");
121 }
122 
123 unsigned int TSlotStack::Pop()
124 {
125  assert(fCursor > 0 &&
126  "TSlotStack assumes that a value can be always popped. fCursor is <=0 and this violates such assumption.");
127  std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
128  return fBuf[--fCursor];
129 }
130 
131 TLoopManager::TLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
132  : fTree(std::shared_ptr<TTree>(tree, [](TTree *) {})), fDefaultBranches(defaultBranches),
134 {
135 }
136 
138 {
139 }
140 
141 void TLoopManager::RunAndCheckFilters(unsigned int slot, Long64_t entry)
142 {
143  for (auto &actionPtr : fBookedActions) actionPtr->Run(slot, entry);
144  for (auto &namedFilterPtr : fBookedNamedFilters) namedFilterPtr->CheckFilters(slot, entry);
145 }
146 
148 {
149 #ifdef R__USE_IMT
151  TSlotStack slotStack(fNSlots);
152  InitNodes();
153 
154  if (fNEmptyEntries > 0) {
155  // Working with an empty tree.
156  // Evenly partition the entries according to fNSlots
157  const auto nEntriesPerSlot = fNEmptyEntries / fNSlots;
158  auto remainder = fNEmptyEntries % fNSlots;
159  std::vector<std::pair<Long64_t, Long64_t>> entryRanges;
160  Long64_t start = 0;
161  while (start < fNEmptyEntries) {
162  Long64_t end = start + nEntriesPerSlot;
163  if (remainder > 0) {
164  ++end;
165  --remainder;
166  }
167  entryRanges.emplace_back(start, end);
168  start = end;
169  }
170 
171  // Each task will generate a subrange of entries
172  auto genFunction = [this, &slotStack](const std::pair<Long64_t, Long64_t> &range) {
173  auto slot = slotStack.Pop();
174  InitAllNodes(nullptr, slot);
175  for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
176  RunAndCheckFilters(slot, currEntry);
177  }
178  slotStack.Push(slot);
179  };
180 
182  pool.Foreach(genFunction, entryRanges);
183  } else {
184  using ttpmt_t = ROOT::TTreeProcessorMT;
185  std::unique_ptr<ttpmt_t> tp;
186  tp.reset(new ttpmt_t(*fTree));
187 
188  tp->Process([this, &slotStack](TTreeReader &r) -> void {
189  auto slot = slotStack.Pop();
190  InitAllNodes(&r, slot);
191  // recursive call to check filters and conditionally execute actions
192  while (r.Next()) {
194  }
195  slotStack.Push(slot);
196  });
197  }
198  } else {
199 #endif // R__USE_IMT
200  InitNodes();
201  if (fNEmptyEntries > 0) {
202  InitAllNodes(nullptr, 0);
203  for (Long64_t currEntry = 0; currEntry < fNEmptyEntries && fNStopsReceived < fNChildren; ++currEntry) {
204  RunAndCheckFilters(0, currEntry);
205  }
206  } else {
207  TTreeReader r(fTree.get());
208  InitAllNodes(&r, 0);
209 
210  // recursive call to check filters and conditionally execute actions
211  // in the non-MT case processing can be stopped early by ranges, hence the check on fNStopsReceived
212  while (r.Next() && fNStopsReceived < fNChildren) {
213  RunAndCheckFilters(0, r.GetCurrentEntry());
214  }
215  }
216 #ifdef R__USE_IMT
217  }
218 #endif // R__USE_IMT
219 
220  fHasRunAtLeastOnce = true;
221  // forget actions
222  fBookedActions.clear();
223  // make all TResultProxies ready
224  for (auto readiness : fResProxyReadiness) {
225  *readiness.get() = true;
226  }
227  // forget TResultProxies
228  fResProxyReadiness.clear();
229 }
230 
231 /// Build TTreeReaderValues for all nodes
232 ///
233 /// This method loops over all filters, actions and other booked objects and
234 /// calls their `BuildReaderValues` methods. It is called once per node per slot, before
235 /// running the event loop. It also informs each node of the TTreeReader that
236 /// a particular slot will be using.
237 void TLoopManager::InitAllNodes(TTreeReader *r, unsigned int slot)
238 {
239  // booked branches must be initialized first
240  // because actions and filters might need to point to the values encapsulate
241  for (auto &bookedBranch : fBookedBranches) bookedBranch.second->Init(r, slot);
242  for (auto &ptr : fBookedActions) ptr->Init(r, slot);
243  for (auto &ptr : fBookedFilters) ptr->Init(r, slot);
244 }
245 
246 /// Initialize all nodes of the functional graph before running the event loop.
247 /// This method is called once per event-loop and performs generic initialization
248 /// operations that do not depend on the specific processing slot (i.e. operations
249 /// that are common for all threads).
251 {
252  for (auto &namedFilterPtr : fBookedNamedFilters) namedFilterPtr->ResetReportCount();
253 }
254 
256 {
257  return this;
258 }
259 
260 const ColumnNames_t &TLoopManager::GetDefaultBranches() const
261 {
262  return fDefaultBranches;
263 }
264 
265 TTree *TLoopManager::GetTree() const
266 {
267  return fTree.get();
268 }
269 
271 {
272  auto it = fBookedBranches.find(name);
273  return it == fBookedBranches.end() ? nullptr : it->second.get();
274 }
275 
277 {
278  return fDirPtr;
279 }
280 
281 void TLoopManager::Book(const ActionBasePtr_t &actionPtr)
282 {
283  fBookedActions.emplace_back(actionPtr);
284 }
285 
286 void TLoopManager::Book(const FilterBasePtr_t &filterPtr)
287 {
288  fBookedFilters.emplace_back(filterPtr);
289  if (filterPtr->HasName()) {
290  fBookedNamedFilters.emplace_back(filterPtr);
291  }
292 }
293 
295 {
296  fBookedBranches[branchPtr->GetName()] = branchPtr;
297 }
298 
299 void TLoopManager::Book(const std::shared_ptr<bool> &readinessPtr)
300 {
301  fResProxyReadiness.emplace_back(readinessPtr);
302 }
303 
304 void TLoopManager::Book(const RangeBasePtr_t &rangePtr)
305 {
306  fBookedRanges.emplace_back(rangePtr);
307 }
308 
309 // dummy call, end of recursive chain of calls
310 bool TLoopManager::CheckFilters(int, unsigned int)
311 {
312  return true;
313 }
314 
315 /// Call `PrintReport` on all booked filters
317 {
318  for (const auto &fPtr : fBookedNamedFilters) fPtr->PrintReport();
319 }
320 
321 TRangeBase::TRangeBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches, unsigned int start, unsigned int stop,
322  unsigned int stride, unsigned int nSlots)
323  : fImplPtr(implPtr), fTmpBranches(tmpBranches), fStart(start), fStop(stop), fStride(stride), fNSlots(nSlots)
324 {
325 }
326 
328 {
329  return fImplPtr;
330 }
331 
332 ColumnNames_t TRangeBase::GetTmpBranches() const
333 {
334  return fTmpBranches;
335 }
void Foreach(F func, unsigned nTimes)
Execute func (with no arguments) nTimes in parallel.
FilterBaseVec_t fBookedFilters
Definition: TDFNodes.hxx:53
long long Long64_t
Definition: RtypesCore.h:69
TTreeReader is a simple, robust and fast interface to read values from a TTree, TChain or TNtuple...
Definition: TTreeReader.h:42
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
TRangeBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches, unsigned int start, unsigned int stop, unsigned int stride, unsigned int nSlots)
Definition: TDFNodes.cxx:321
TTree()
Default constructor and I/O constructor.
Definition: TTree.cxx:652
ColumnNames_t GetTmpBranches() const
Definition: TDFNodes.cxx:332
void Report() const
Call PrintReport on all booked filters.
Definition: TDFNodes.cxx:316
::TDirectory * GetDirectory() const
Definition: TDFNodes.cxx:276
TLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
Definition: TDFNodes.cxx:131
std::map< std::string, TmpBranchBasePtr_t > fBookedBranches
Definition: TDFNodes.hxx:55
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
Definition: TDFNodes.cxx:250
STL namespace.
A spin mutex class which respects the STL interface for mutexes.
Definition: TSpinMutex.hxx:40
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition: TDFNodes.hxx:64
void InitAllNodes(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes.
Definition: TDFNodes.cxx:237
ColumnNames_t GetTmpBranches() const
Definition: TDFNodes.cxx:75
void Book(const ActionBasePtr_t &actionPtr)
Definition: TDFNodes.cxx:281
const ColumnNames_t fTmpBranches
Definition: TDFNodes.hxx:205
std::vector< ULong64_t > fAccepted
Definition: TDFNodes.hxx:369
std::vector< ULong64_t > fRejected
Definition: TDFNodes.hxx:370
std::shared_ptr< TFilterBase > FilterBasePtr_t
Definition: TDFNodes.hxx:44
TCustomColumnBase(TLoopManager *df, const ColumnNames_t &tmpBranches, std::string_view name, unsigned int nSlots)
Definition: TDFNodes.cxx:44
std::string GetName() const
Definition: TDFNodes.cxx:53
Long64_t GetCurrentEntry() const
Returns the index of the current entry being read.
Definition: TTreeReader.h:208
bool CheckFilters(int, unsigned int)
Definition: TDFNodes.cxx:310
const ColumnNames_t fTmpBranches
Definition: TDFNodes.hxx:366
TLoopManager * GetImplPtr() const
Definition: TDFNodes.cxx:58
ActionBaseVec_t fBookedActions
Definition: TDFNodes.hxx:52
This class provides a simple interface to execute the same task multiple times in parallel...
const unsigned int fNSlots
Number of thread slots used by this node, inherited from parent node.
Definition: TDFNodes.hxx:270
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Definition: TDFNodes.cxx:141
TRandom2 r(17)
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition: TDFNodes.hxx:65
std::shared_ptr< TCustomColumnBase > TmpBranchBasePtr_t
Definition: TDFNodes.hxx:42
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
Definition: TDFNodes.hxx:364
const unsigned int fNSlots
Definition: TDFNodes.hxx:62
unsigned int GetNSlots() const
Definition: TDFNodes.hxx:92
const ColumnNames_t & GetDefaultBranches() const
Definition: TDFNodes.cxx:260
FilterBaseVec_t fBookedNamedFilters
Definition: TDFNodes.hxx:54
#define Printf
Definition: TGeoToOCC.h:18
TFilterBase(TLoopManager *df, const ColumnNames_t &tmpBranches, std::string_view name, unsigned int nSlots)
Definition: TDFNodes.cxx:63
Describe directory structure in memory.
Definition: TDirectory.h:34
const unsigned int fNSlots
Number of thread slots used by this node.
Definition: TDFNodes.hxx:206
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
Definition: TDFNodes.hxx:467
const ColumnNames_t fDefaultBranches
Definition: TDFNodes.hxx:60
TLoopManager * GetImplPtr() const
Definition: TDFNodes.cxx:327
std::shared_ptr< TDFInternal::TActionBase > ActionBasePtr_t
Definition: TDFNodes.hxx:39
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
Definition: TDFNodes.hxx:264
TCustomColumnBase * GetBookedBranch(const std::string &name) const
Definition: TDFNodes.cxx:270
TLoopManager * GetImplPtr() const
Definition: TDFNodes.cxx:70
Bool_t Next()
Move to the next entry (or index of the TEntryList if that is set).
Definition: TTreeReader.h:160
std::shared_ptr< TTree > fTree
Definition: TDFNodes.hxx:59
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition: TROOT.cxx:552
unsigned int GetNSlots()
Definition: TDFUtils.cxx:125
ColumnNames_t GetTmpBranches() const
Definition: TDFNodes.cxx:48
Definition: tree.py:1
std::vector< std::shared_ptr< bool > > fResProxyReadiness
Definition: TDFNodes.hxx:57
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
Definition: TDFNodes.hxx:202
A class to process the entries of a TTree in parallel.
char name[80]
Definition: TGX11.cxx:109
std::shared_ptr< TRangeBase > RangeBasePtr_t
Definition: TDFNodes.hxx:47