12 #ifndef ROOT_TPoolProcessor 13 #define ROOT_TPoolProcessor 30 #include <type_traits> 42 auto th1p =
dynamic_cast<TH1*
>(res);
48 auto ttreep =
dynamic_cast<TTree*
>(res);
49 if(ttreep !=
nullptr) {
54 auto tentrylist =
dynamic_cast<TEntryList*
>(res);
55 if(tentrylist !=
nullptr) {
60 auto teventlist =
dynamic_cast<TEventList*
>(res);
61 if(teventlist !=
nullptr) {
73 TPoolProcessor(
F procFunc,
const std::vector<std::string>& fileNames,
const std::string& treeName,
unsigned nWorkers,
ULong64_t maxEntries);
78 void Init(
int fd,
unsigned workerN);
118 unsigned code = msg.first;
130 std::string reply =
"S" + std::to_string(
GetNWorker());
131 reply +=
": unknown code received: " + std::to_string(code);
151 unsigned nProcessed = 0;
155 std::cerr <<
"[S]: Process:kProcTree fTree undefined!\n";
159 nProcessed = ReadBuffer<unsigned>(msg.second.get());
164 fileN = ReadBuffer<unsigned>(msg.second.get());
167 std::unique_ptr<TFile> fp;
168 TTree *tree =
nullptr;
186 if(tree ==
nullptr) {
203 unsigned rangeN = nProcessed %
fNWorkers;
204 start = rangeN*nBunch;
206 finish = (rangeN+1)*nBunch;
225 std::string reply =
"S" + std::to_string(
GetNWorker());
226 reply +=
": could not set TTreeReader to range " + std::to_string(start) +
" " + std::to_string(finish);
238 fProcessedEntries += finish - start;
261 if (fp ==
nullptr || fp->
IsZombie()) {
262 std::string reply =
"S" + std::to_string(
GetNWorker());
263 reply.append(
": could not open file ");
264 reply.append(fileName);
278 TTree *tree =
nullptr;
292 if (tree ==
nullptr) {
293 std::string reply =
"S" + std::to_string(
GetNWorker());
294 std::stringstream ss;
295 ss <<
": cannot find tree with name " <<
fTreeName <<
" in file " << fp->
GetName();
296 reply.append(ss.str());
virtual const char * GetName() const
Returns name of object.
std::result_of< F(std::reference_wrapper< TTreeReader >)>::type fReducedResult
the results of the executions of fProcFunc merged together
bool fCanReduce
true if fReducedResult can be reduced with a new result, false until we have produced one result ...
Tell a TPoolProcessor to process the tree that was passed to it at construction time.
TTreeReader is a simple, robust and fast interface to read values from a TTree, TChain or TNtuple...
Tell a TPoolProcessor which tree and entries range to process. The object sent is a TreeRangeInfo...
virtual void SetDirectory(TDirectory *dir)
By default when an histogram is created, it is added to the list of histogram objects in the current ...
ULong64_t fMaxNEntries
the maximum number of entries to be processed by this worker
virtual const char * GetClassName() const
std::string fTreeName
the name of the tree to be processed
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
TFile * OpenFile(const std::string &fileName)
std::vector< std::string > fFileNames
the files to be processed by all workers
TTree * RetrieveTree(TFile *fp)
Tell a TPoolProcessor which tree to process. The object sent is a TreeInfo.
We are ready for the next task.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
virtual void SetDirectory(TDirectory *dir)
Add reference to directory dir. dir can be 0.
unsigned GetNWorker() const
void Process(unsigned code, MPCodeBufPair &msg)
Book space in a file, create I/O buffers, to fill them, (un)compress them.
ULong64_t EvalMaxEntries(ULong64_t maxEntries)
void HandleInput(MPCodeBufPair &msg)
Execute instructions received from a TPool client.
F fProcFunc
the function to be executed
TObject * ReduceObjects(const std::vector< TObject *> &objs)
Merge collection of TObjects.
virtual void SetDirectory(TDirectory *dir)
Remove reference to this EventList from current directory and add reference to new directory dir...
TFile * GetCurrentFile() const
Return pointer to the current file.
TPoolProcessor(F procFunc, const std::vector< std::string > &fileNames, const std::string &treeName, unsigned nWorkers, ULong64_t maxEntries)
A TEventList object is a list of selected events (entries) in a TTree.
unsigned fNWorkers
the number of workers spawned
The message contains the result of the processing of a TTree.
std::pair< unsigned, std::unique_ptr< TBufferFile >> MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Tell the client there was an error while processing.
virtual void SetDirectory(TDirectory *dir)
Change the tree's directory.
Ask for a kFuncResult/kProcResult.
unsigned long long ULong64_t
virtual Long64_t GetEntries() const
virtual TList * GetListOfKeys() const
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
A TTree object has a header with a name and a title.
A List of entry numbers in a TTree or TChain.
EEntryStatus SetEntriesRange(Long64_t first, Long64_t last)
Set the range of entries to be processed.
ULong64_t fProcessedEntries
the number of entries processed by this worker so far
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcPool::Pro...