ROOT  6.06/08
Reference Guide
TPacketizerAdaptive.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Jan Iwaszkiewicz 11/12/06
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TPacketizerAdaptive //
15 // //
16 // This packetizer is based on TPacketizer but uses different //
17 // load-balancing algorithms and data structures. //
18 // Two main improvements in the load-balancing strategy: //
19 // - First one was to change the order in which the files are assigned //
20 // to the computing nodes in such a way that network transfers are //
21 // evenly distributed in the query time. Transfer of the remote files //
22 // was often becoming a bottleneck at the end of a query. //
23 // - The other improvement is the use of time-based packet size. We //
24 // measure the processing rate of all the nodes and calculate the //
25 // packet size, so that it takes certain amount of time. In this way //
26 // packetizer prevents the situation where the query can't finish //
27 // because of one slow node. //
28 // //
29 // The data structures: TFileStat, TFileNode and TSlaveStat are //
30 // enriched + changed and TFileNode::Compare method is changed. //
31 // //
32 //////////////////////////////////////////////////////////////////////////
33 
34 
35 #include "TPacketizerAdaptive.h"
36 
37 #include "Riostream.h"
38 #include "TDSet.h"
39 #include "TError.h"
40 #include "TEnv.h"
41 #include "TEntryList.h"
42 #include "TEventList.h"
43 #include "TMap.h"
44 #include "TMessage.h"
45 #include "TMonitor.h"
46 #include "TNtupleD.h"
47 #include "TObject.h"
48 #include "TParameter.h"
49 #include "TPerfStats.h"
50 #include "TProofDebug.h"
51 #include "TProof.h"
52 #include "TProofServ.h"
53 #include "TSlave.h"
54 #include "TSocket.h"
55 #include "TSortedList.h"
56 #include "TUrl.h"
57 #include "TClass.h"
58 #include "TRandom.h"
59 #include "TMath.h"
60 #include "TObjString.h"
61 #include "TList.h"
62 
63 //
64 // The following three utility classes manage the state of the
65 // work to be performed and the slaves involved in the process.
66 // A list of TFileNode(s) describes the hosts with files, each
67 // has a list of TFileStat(s) keeping the state for each TDSet
68 // element (file).
69 //
70 // The list of TSlaveStat(s) keep track of the work (being) done
71 // by each slave
72 //
73 
74 
75 //------------------------------------------------------------------------------
76 
77 class TPacketizerAdaptive::TFileStat : public TObject {
78 
79 private:
80  Bool_t fIsDone; // is this element processed
81  TFileNode *fNode; // my FileNode
82  TDSetElement *fElement; // location of the file and its range
83  Long64_t fNextEntry; // cursor in the range, -1 when done // needs changing
84 
85 public:
86  TFileStat(TFileNode *node, TDSetElement *elem, TList *file);
87 
88  Bool_t IsDone() const {return fIsDone;}
89  Bool_t IsSortable() const { return kTRUE; }
90  void SetDone() {fIsDone = kTRUE;}
91  TFileNode *GetNode() const {return fNode;}
92  TDSetElement *GetElement() const {return fElement;}
93  Long64_t GetNextEntry() const {return fNextEntry;}
94  void MoveNextEntry(Long64_t step) {fNextEntry += step;}
95 
96  // This method is used to keep a sorted list of remaining files to be processed
97  Int_t Compare(const TObject* obj) const
98  {
99  // Return -1 if elem.entries < obj.elem.entries, 0 if elem.entries equal
100  // and 1 if elem.entries < obj.elem.entries.
101  const TFileStat *fst = dynamic_cast<const TFileStat*>(obj);
102  if (fst && GetElement() && fst->GetElement()) {
103  Long64_t ent = GetElement()->GetNum();
104  Long64_t entfst = fst->GetElement()->GetNum();
105  if (ent > 0 && entfst > 0) {
106  if (ent > entfst) {
107  return 1;
108  } else if (ent < entfst) {
109  return -1;
110  } else {
111  return 0;
112  }
113  }
114  }
115  // No info: assume equal (no change in order)
116  return 0;
117  }
118  void Print(Option_t * = 0) const
119  { // Notify file name and entries
120  Printf("TFileStat: %s %lld", fElement ? fElement->GetName() : "---",
121  fElement ? fElement->GetNum() : -1);
122  }
123 };
124 
125 TPacketizerAdaptive::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem, TList *files)
126  : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
127 {
128  // Constructor: add to the global list
129  if (files) files->Add(this);
130 }
131 
132 //------------------------------------------------------------------------------
133 
134 // a class describing a file node as a part of a session
135 class TPacketizerAdaptive::TFileNode : public TObject {
136 
137 private:
138  TString fNodeName; // FQDN of the node
139  TList *fFiles; // TDSetElements (files) stored on this node
140  TObject *fUnAllocFileNext; // cursor in fFiles
141  TList *fActFiles; // files with work remaining
142  TObject *fActFileNext; // cursor in fActFiles
143  Int_t fMySlaveCnt; // number of slaves running on this node
144  // (which can process remote files)
145  Int_t fExtSlaveCnt; // number of external slaves processing
146  // files on this node
147  Int_t fRunSlaveCnt; // total number of slaves processing files
148  // on this node
149  Long64_t fProcessed; // number of events processed on this node
150  Long64_t fEvents; // number of entries in files on this node
151 
152  Int_t fStrategy; // 0 means the classic and 1 (default) - the adaptive strategy
153 
154  TSortedList *fFilesToProcess; // Global list of files (TFileStat) to be processed (owned by TPacketizer)
155 
156 public:
157  TFileNode(const char *name, Int_t strategy, TSortedList *files);
158  ~TFileNode() { delete fFiles; delete fActFiles; }
159 
160  void IncMySlaveCnt() { fMySlaveCnt++; }
161  Int_t GetMySlaveCnt() const { return fMySlaveCnt; }
162  void IncExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt++; }
163  void DecExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt--; R__ASSERT(fExtSlaveCnt >= 0); }
164  Int_t GetSlaveCnt() const { return fMySlaveCnt + fExtSlaveCnt; }
165  void IncRunSlaveCnt() { fRunSlaveCnt++; }
166  void DecRunSlaveCnt() { fRunSlaveCnt--; R__ASSERT(fRunSlaveCnt >= 0); }
167  Int_t GetRunSlaveCnt() const { return fRunSlaveCnt; }
168  Int_t GetExtSlaveCnt() const { return fExtSlaveCnt; }
169  Int_t GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
170  Bool_t IsSortable() const { return kTRUE; }
171  Int_t GetNumberOfFiles() { return fFiles->GetSize(); }
172  void IncProcessed(Long64_t nEvents)
173  { fProcessed += nEvents; }
174  Long64_t GetProcessed() const { return fProcessed; }
175  void DecreaseProcessed(Long64_t nEvents) { fProcessed -= nEvents; }
176  // this method is used by Compare() it adds 1, so it returns a number that
177  // would be true if one more slave is added.
178  Long64_t GetEventsLeftPerSlave() const
179  { return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
180  void IncEvents(Long64_t nEvents) { fEvents += nEvents; }
181  const char *GetName() const { return fNodeName.Data(); }
182  Long64_t GetNEvents() const { return fEvents; }
183 
184  void Print(Option_t * = 0) const
185  {
186  TFileStat *fs = 0;
187  TDSetElement *e = 0;
188  Int_t nn = 0;
189  Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
190  Printf("+++ TFileNode: %s +++", fNodeName.Data());
191  Printf("+++ Evts: %lld (total: %lld) ", fProcessed, fEvents);
192  Printf("+++ Worker count: int:%d, ext: %d, tot:%d ", fMySlaveCnt, fExtSlaveCnt, fRunSlaveCnt);
193  Printf("+++ Files: %d ", fFiles ? fFiles->GetSize() : 0);
194  if (fFiles && fFiles->GetSize() > 0) {
195  TIter nxf(fFiles);
196  while ((fs = (TFileStat *) nxf())) {
197  if ((e = fs->GetElement())) {
198  Printf("+++ #%d: %s %lld - %lld (%lld) - next: %lld ", ++nn, e->GetName(),
199  e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
200  } else {
201  Printf("+++ #%d: no element! ", ++nn);
202  }
203  }
204  }
205  Printf("+++ Active files: %d ", fActFiles ? fActFiles->GetSize() : 0);
206  if (fActFiles && fActFiles->GetSize() > 0) {
207  TIter nxaf(fActFiles);
208  while ((fs = (TFileStat *) nxaf())) {
209  if ((e = fs->GetElement())) {
210  Printf("+++ #%d: %s %lld - %lld (%lld) - next: %lld", ++nn, e->GetName(),
211  e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
212  } else {
213  Printf("+++ #%d: no element! ", ++nn);
214  }
215  }
216  }
217  Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
218  }
219 
220  void Add(TDSetElement *elem, Bool_t tolist)
221  {
222  TList *files = tolist ? (TList *)fFilesToProcess : (TList *)0;
223  TFileStat *f = new TFileStat(this, elem, files);
224  fFiles->Add(f);
225  if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
226  }
227 
228  TFileStat *GetNextUnAlloc()
229  {
230  TObject *next = fUnAllocFileNext;
231 
232  if (next != 0) {
233  // make file active
234  fActFiles->Add(next);
235  if (fActFileNext == 0) fActFileNext = fActFiles->First();
236 
237  // move cursor
238  fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
239  }
240  return (TFileStat *) next;
241  }
242 
243  TFileStat *GetNextActive()
244  {
245  TObject *next = fActFileNext;
246 
247  if (fActFileNext != 0) {
248  fActFileNext = fActFiles->After(fActFileNext);
249  if (fActFileNext == 0) fActFileNext = fActFiles->First();
250  }
251 
252  return (TFileStat *) next;
253  }
254 
255  void RemoveActive(TFileStat *file)
256  {
257  if (fActFileNext == file) fActFileNext = fActFiles->After(file);
258  fActFiles->Remove(file);
259  if (fFilesToProcess) fFilesToProcess->Remove(file);
260  if (fActFileNext == 0) fActFileNext = fActFiles->First();
261  }
262 
263  Int_t Compare(const TObject *other) const
264  {
265  // Must return -1 if this is smaller than obj, 0 if objects are equal
266  // and 1 if this is larger than obj.
267  // smaller means more needing a new worker.
268  // Two cases are considered depending on
269  // relation between harddrive speed and network bandwidth.
270 
271  const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
272  if (!obj) {
273  Error("Compare", "input is not a TPacketizer::TFileNode object");
274  return 0;
275  }
276 
277  // how many more events it has than obj
278 
279  if (fStrategy == 1) {
280  // The default adaptive strategy.
281  Int_t myVal = GetRunSlaveCnt();
282  Int_t otherVal = obj->GetRunSlaveCnt();
283  if (myVal < otherVal) {
284  return -1;
285  } else if (myVal > otherVal) {
286  return 1;
287  } else {
288  // if this has more events to process than obj
289  if ((fEvents - fProcessed) >
290  (obj->GetNEvents() - obj->GetProcessed())) {
291  return -1;
292  } else {
293  return 1;
294  }
295  }
296  } else {
297  Int_t myVal = GetSlaveCnt();
298  Int_t otherVal = obj->GetSlaveCnt();
299  if (myVal < otherVal) {
300  return -1;
301  } else if (myVal > otherVal) {
302  return 1;
303  } else {
304  return 0;
305  }
306  }
307  }
308 
309  void Reset()
310  {
311  fUnAllocFileNext = fFiles->First();
312  fActFiles->Clear();
313  fActFileNext = 0;
314  fExtSlaveCnt = 0;
315  fMySlaveCnt = 0;
316  fRunSlaveCnt = 0;
317  }
318 };
319 
320 
321 TPacketizerAdaptive::TFileNode::TFileNode(const char *name, Int_t strategy, TSortedList *files)
322  : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),
323  fActFiles(new TList), fActFileNext(0), fMySlaveCnt(0),
324  fExtSlaveCnt(0), fRunSlaveCnt(0), fProcessed(0), fEvents(0),
325  fStrategy(strategy), fFilesToProcess(files)
326 {
327  // Constructor
328 
329  fFiles->SetOwner();
330  fActFiles->SetOwner(kFALSE);
331 }
332 
333 //------------------------------------------------------------------------------
334 
335 class TPacketizerAdaptive::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
336 
337 friend class TPacketizerAdaptive;
338 
339 private:
340  TFileNode *fFileNode; // corresponding node or 0
341  TFileStat *fCurFile; // file currently being processed
342  TDSetElement *fCurElem; // TDSetElement currently being processed
343  Long64_t fCurProcessed; // events processed in the current file
344  Float_t fCurProcTime; // proc time spent on the current file
345  TList *fDSubSet; // packets processed by this worker
346 
347 public:
348  TSlaveStat(TSlave *slave);
349  ~TSlaveStat();
350  TFileNode *GetFileNode() const { return fFileNode; }
351  Long64_t GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; }
352  Double_t GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; }
353  TFileStat *GetCurFile() { return fCurFile; }
354  void SetFileNode(TFileNode *node) { fFileNode = node; }
355  void UpdateRates(TProofProgressStatus *st);
356  Float_t GetAvgRate() { return fStatus->GetRate(); }
357  Float_t GetCurRate() {
358  return (fCurProcTime?fCurProcessed/fCurProcTime:0); }
359  Int_t GetLocalEventsLeft() {
360  return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; }
361  TList *GetProcessedSubSet() { return fDSubSet; }
364 };
365 
366 ////////////////////////////////////////////////////////////////////////////////
367 /// Constructor
368 
369 TPacketizerAdaptive::TSlaveStat::TSlaveStat(TSlave *slave)
370  : fFileNode(0), fCurFile(0), fCurElem(0),
371  fCurProcessed(0), fCurProcTime(0)
372 {
373  fDSubSet = new TList();
374  fDSubSet->SetOwner();
375  fSlave = slave;
376  fStatus = new TProofProgressStatus();
377  // The slave name is a special one in PROOF-Lite: avoid blocking on the DNS
378  // for non existing names
379  fWrkFQDN = slave->GetName();
380  if (strcmp(slave->ClassName(), "TSlaveLite")) {
381  fWrkFQDN = TUrl(fWrkFQDN).GetHostFQDN();
382  // Get full name for local hosts
383  if (fWrkFQDN.Contains("localhost") || fWrkFQDN == "127.0.0.1")
384  fWrkFQDN = TUrl(gSystem->HostName()).GetHostFQDN();
385  }
386  PDB(kPacketizer, 2)
387  Info("TSlaveStat", "wrk FQDN: %s", fWrkFQDN.Data());
388 }
389 
390 ////////////////////////////////////////////////////////////////////////////////
391 /// Cleanup
392 
393 TPacketizerAdaptive::TSlaveStat::~TSlaveStat()
394 {
395  SafeDelete(fDSubSet);
396  SafeDelete(fStatus);
397 }
398 
399 ////////////////////////////////////////////////////////////////////////////////
400 /// Update packetizer rates
401 
402 void TPacketizerAdaptive::TSlaveStat::UpdateRates(TProofProgressStatus *st)
403 {
404  if (!st) {
405  Error("UpdateRates", "no status object!");
406  return;
407  }
408  if (fCurFile->IsDone()) {
409  fCurProcTime = 0;
410  fCurProcessed = 0;
411  } else {
412  fCurProcTime += st->GetProcTime() - GetProcTime();
413  fCurProcessed += st->GetEntries() - GetEntriesProcessed();
414  }
415  fCurFile->GetNode()->IncProcessed(st->GetEntries() - GetEntriesProcessed());
416  st->SetLastEntries(st->GetEntries() - fStatus->GetEntries());
417  SafeDelete(fStatus);
418  fStatus = st;
419 }
420 
421 ////////////////////////////////////////////////////////////////////////////////
422 /// Add the current element to the fDSubSet (subset processed by this worker)
423 /// and if the status arg is given, then change the size of the packet.
424 /// return the difference (*st - *fStatus)
425 
426 TProofProgressStatus *TPacketizerAdaptive::TSlaveStat::AddProcessed(TProofProgressStatus *st)
427 {
428  if (st && fDSubSet && fCurElem) {
429  if (fCurElem->GetNum() != st->GetEntries() - GetEntriesProcessed())
430  fCurElem->SetNum(st->GetEntries() - GetEntriesProcessed());
431  fDSubSet->Add(fCurElem);
432  TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
433  return diff;
434  } else {
435  Error("AddProcessed", "processed subset of current elem undefined");
436  return 0;
437  }
438 }
439 
440 //------------------------------------------------------------------------------
441 
443 
444 ////////////////////////////////////////////////////////////////////////////////
445 /// Constructor
446 
448  Long64_t first, Long64_t num,
449  TList *input, TProofProgressStatus *st)
450  : TVirtualPacketizer(input, st)
451 {
452  PDB(kPacketizer,1) Info("TPacketizerAdaptive",
453  "enter (first %lld, num %lld)", first, num);
454 
455  // Init pointer members
456  fSlaveStats = 0;
457  fUnAllocated = 0;
458  fActive = 0;
459  fFileNodes = 0;
460  fMaxPerfIdx = 1;
462  fMaxEntriesRatio = 2.;
463 
464  fMaxSlaveCnt = -1;
465  fPacketAsAFraction = 4;
466  fStrategy = 1;
467  fFilesToProcess = new TSortedList;
468  fFilesToProcess->SetOwner(kFALSE);
469 
470  if (!fProgressStatus) {
471  Error("TPacketizerAdaptive", "No progress status");
472  return;
473  }
474 
475  // Attempt to synchronize the packet size with the tree cache size
476  Int_t cpsync = -1;
477  if (TProof::GetParameter(input, "PROOF_PacketizerCachePacketSync", cpsync) != 0) {
478  // Check if there is a global cache-packet sync setting
479  cpsync = gEnv->GetValue("Packetizer.CachePacketSync", 1);
480  }
481  if (cpsync >= 0) fCachePacketSync = (cpsync > 0) ? kTRUE : kFALSE;
482 
483  // Max file entries to avg allowed ratio for cache-to-packet synchronization
484  // (applies only if fCachePacketSync is true; -1. disables the bound)
485  if (TProof::GetParameter(input, "PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio) != 0) {
486  // Check if there is a global ratio setting
487  fMaxEntriesRatio = gEnv->GetValue("Packetizer.MaxEntriesRatio", 2.);
488  }
489 
490  // The possibility to change packetizer strategy to the basic TPacketizer's
491  // one (in which workers always process their local data first).
492  Int_t strategy = -1;
493  if (TProof::GetParameter(input, "PROOF_PacketizerStrategy", strategy) != 0) {
494  // Check if there is a global strategy setting
495  strategy = gEnv->GetValue("Packetizer.Strategy", 1);
496  }
497  if (strategy == 0) {
498  fStrategy = 0;
499  Info("TPacketizerAdaptive", "using the basic strategy of TPacketizer");
500  } else if (strategy != 1) {
501  Warning("TPacketizerAdaptive", "unsupported strategy index (%d): ignore", strategy);
502  }
503 
504  Long_t maxSlaveCnt = 0;
505  if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
506  if (maxSlaveCnt < 0) {
507  Info("TPacketizerAdaptive",
508  "The value of PROOF_MaxSlavesPerNode must be positive");
509  maxSlaveCnt = 0;
510  }
511  } else {
512  // Try also with Int_t (recently supported in TProof::SetParameter)
513  Int_t mxslcnt = -1;
514  if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
515  if (mxslcnt < 0) {
516  Info("TPacketizerAdaptive",
517  "The value of PROOF_MaxSlavesPerNode must be positive");
518  mxslcnt = 0;
519  }
520  maxSlaveCnt = (Long_t) mxslcnt;
521  }
522  }
523 
524  if (!maxSlaveCnt)
525  maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", 0);
526  if (maxSlaveCnt > 0) {
527  fMaxSlaveCnt = maxSlaveCnt;
528  Info("TPacketizerAdaptive", "Setting max number of workers per node to %ld",
529  fMaxSlaveCnt);
530  }
531 
532  // if forceLocal parameter is set to 1 then eliminate the cross-worker
533  // processing;
534  // This minimizes the network usage on the PROOF cluser at the expense of
535  // longer jobs processing times.
536  // To process successfully the session must have workers with all the data!
538  Int_t forceLocal = 0;
539  if (TProof::GetParameter(input, "PROOF_ForceLocal", forceLocal) == 0) {
540  if (forceLocal == 1)
541  fForceLocal = kTRUE;
542  else
543  Info("TPacketizerAdaptive",
544  "The only accepted value of PROOF_ForceLocal parameter is 1 !");
545  }
546 
547  // Below we provide a possibility to change the way packet size is
548  // calculated or define the packet time directly.
549  // fPacketAsAFraction can be interpreted as follows:
550  // packet time is (expected job proc. time) / fPacketSizeAsAFraction.
551  // It substitutes 20 in the old formula to calculate the fPacketSize:
552  // fPacketSize = fTotalEntries / (20 * nslaves)
553  Int_t packetAsAFraction = 0;
554  if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0) {
555  if (packetAsAFraction > 0) {
556  fPacketAsAFraction = packetAsAFraction;
557  Info("TPacketizerAdaptive",
558  "using alternate fraction of query time as a packet size: %d",
559  packetAsAFraction);
560  } else
561  Info("TPacketizerAdaptive", "packetAsAFraction parameter must be higher than 0");
562  }
563 
564  // Packet re-assignement
565  fTryReassign = 0;
566  Int_t tryReassign = 0;
567  if (TProof::GetParameter(input, "PROOF_TryReassign", tryReassign) != 0)
568  tryReassign = gEnv->GetValue("Packetizer.TryReassign", 0);
569  fTryReassign = tryReassign;
570  if (fTryReassign != 0)
571  Info("TPacketizerAdaptive", "failed packets will be re-assigned");
572 
573  // Save the config parameters in the dedicated list so that they will be saved
574  // in the outputlist and therefore in the relevant TQueryResult
575  fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketizerCachePacketSync", (Int_t)fCachePacketSync));
576  fConfigParams->Add(new TParameter<Double_t>("PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio));
577  fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketizerStrategy", fStrategy));
578  fConfigParams->Add(new TParameter<Int_t>("PROOF_MaxWorkersPerNode", (Int_t)fMaxSlaveCnt));
579  fConfigParams->Add(new TParameter<Int_t>("PROOF_ForceLocal", (Int_t)fForceLocal));
580  fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketAsAFraction", fPacketAsAFraction));
581 
582  Double_t baseLocalPreference = 1.2;
583  fBaseLocalPreference = (Float_t)baseLocalPreference;
584  if (TProof::GetParameter(input, "PROOF_BaseLocalPreference", baseLocalPreference) == 0)
585  fBaseLocalPreference = (Float_t)baseLocalPreference;
586 
587  fFileNodes = new TList;
588  fFileNodes->SetOwner();
589  fUnAllocated = new TList;
591  fActive = new TList;
593 
594  fValid = kTRUE;
595 
596  // Resolve end-point urls to optmize distribution
597  // dset->Lookup(); // moved to TProofPlayerRemote::Process
598 
599  // Read list of mounted disks
600  TObjArray *partitions = 0;
601  TString partitionsStr;
602  if (TProof::GetParameter(input, "PROOF_PacketizerPartitions", partitionsStr) != 0)
603  partitionsStr = gEnv->GetValue("Packetizer.Partitions", "");
604  if (!partitionsStr.IsNull()) {
605  Info("TPacketizerAdaptive", "Partitions: %s", partitionsStr.Data());
606  partitions = partitionsStr.Tokenize(",");
607  }
608 
609  // Split into per host and disk entries
610  dset->Reset();
611  TDSetElement *e;
612  while ((e = (TDSetElement*)dset->Next())) {
613 
614  if (e->GetValid()) continue;
615 
616  // The dataset name, if any
617  if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
618  fDataSet = e->GetDataSet();
619 
620  TUrl url = e->GetFileName();
621  PDB(kPacketizer,2)
622  Info("TPacketizerAdaptive", "element name: %s (url: %s)", e->GetFileName(), url.GetUrl());
623 
624  // Map non URL filenames to dummy host
625  TString host;
626  if ( !url.IsValid() ||
627  (strncmp(url.GetProtocol(),"root", 4) &&
628  strncmp(url.GetProtocol(),"rfio", 4) &&
629  strncmp(url.GetProtocol(),"file", 4)) ) {
630  host = "no-host";
631  } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
632  host = "localhost";
633  url.SetProtocol("root");
634  } else {
635  host = url.GetHostFQDN();
636  }
637  // Get full name for local hosts
638  if (host.Contains("localhost") || host == "127.0.0.1") {
639  url.SetHost(gSystem->HostName());
640  host = url.GetHostFQDN();
641  }
642 
643  // Find on which disk is the file, if any
644  TString disk;
645  if (partitions) {
646  TIter iString(partitions);
647  TObjString* os = 0;
648  while ((os = (TObjString *)iString())) {
649  // Compare begining of the url with disk mountpoint
650  if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
651  disk = os->GetName();
652  break;
653  }
654  }
655  }
656  // Node's url
657  TString nodeStr;
658  if (disk.IsNull())
659  nodeStr.Form("%s://%s", url.GetProtocol(), host.Data());
660  else
661  nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
662  TFileNode *node = (TFileNode *) fFileNodes->FindObject(nodeStr);
663 
664  if (node == 0) {
665  node = new TFileNode(nodeStr, fStrategy, fFilesToProcess);
666  fFileNodes->Add(node);
667  PDB(kPacketizer,2)
668  Info("TPacketizerAdaptive", "creating new node '%s' or the element", nodeStr.Data());
669  } else {
670  PDB(kPacketizer,2)
671  Info("TPacketizerAdaptive", "adding element to existing node '%s'", nodeStr.Data());
672  }
673 
674  node->Add(e, kFALSE);
675  }
676 
677  fSlaveStats = new TMap;
679 
680  TSlave *slave;
681  TIter si(slaves);
682  while ((slave = (TSlave*) si.Next())) {
683  fSlaveStats->Add( slave, new TSlaveStat(slave) );
684  fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ?
685  slave->GetPerfIdx() : fMaxPerfIdx;
686  }
687 
688  // Setup file & filenode structure
689  Reset();
690  // Optimize the number of files to be open when running on subsample
691  Int_t validateMode = 0;
692  Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode);
693  Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
694  if (num > -1)
695  PDB(kPacketizer,2)
696  Info("TPacketizerAdaptive",
697  "processing subset of entries: validating by file? %s", byfile ? "yes": "no");
698  ValidateFiles(dset, slaves, num, byfile);
699 
700 
701  if (!fValid) return;
702 
703  // apply global range (first,num) to dset and rebuild structure
704  // ommitting TDSet elements that are not needed
705 
706  Int_t files = 0;
707  fTotalEntries = 0;
708  fUnAllocated->Clear(); // avoid dangling pointers
709  fActive->Clear();
710  fFileNodes->Clear(); // then delete all objects
711  PDB(kPacketizer,2)
712  Info("TPacketizerAdaptive",
713  "processing range: first %lld, num %lld", first, num);
714 
715  dset->Reset();
716  Long64_t cur = 0;
717  while (( e = (TDSetElement*)dset->Next())) {
718 
719  // Skip invalid or missing file; It will be moved
720  // from the dset to the 'MissingFiles' list in the player.
721  if (!e->GetValid()) continue;
722 
723  TUrl url = e->GetFileName();
724  Long64_t eFirst = e->GetFirst();
725  Long64_t eNum = e->GetNum();
726  PDB(kPacketizer,2)
727  Info("TPacketizerAdaptive", "processing element '%s'", e->GetFileName());
728  PDB(kPacketizer,2)
729  Info("TPacketizerAdaptive",
730  " --> first %lld, elenum %lld (cur %lld) (entrylist: %p)", eFirst, eNum, cur, e->GetEntryList());
731 
732  if (!e->GetEntryList()) {
733  // This element is before the start of the global range, skip it
734  if (cur + eNum < first) {
735  cur += eNum;
736  PDB(kPacketizer,2)
737  Info("TPacketizerAdaptive", " --> skip element cur %lld", cur);
738  continue;
739  }
740 
741  // This element is after the end of the global range, skip it
742  if (num != -1 && (first+num <= cur)) {
743  cur += eNum;
744  PDB(kPacketizer,2)
745  Info("TPacketizerAdaptive", " --> drop element cur %lld", cur);
746  continue; // break ??
747  }
748 
749  Bool_t inRange = kFALSE;
750  if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
751 
752  if (cur <= first) {
753  // If this element contains the start of the global range
754  // adjust its start and number of entries
755  e->SetFirst( eFirst + (first - cur) );
756  e->SetNum( e->GetNum() - (first - cur) );
757  PDB(kPacketizer,2)
758  Info("TPacketizerAdaptive", " --> adjust start %lld and end %lld",
759  eFirst + (first - cur), first + num - cur);
760  inRange = kTRUE;
761  }
762  if (num != -1 && (first+num <= cur+eNum)) {
763  // If this element contains the end of the global range
764  // adjust its number of entries
765  e->SetNum( first + num - e->GetFirst() - cur );
766  PDB(kPacketizer,2)
767  Info("TPacketizerAdaptive", " --> adjust end %lld", first + num - cur);
768  inRange = kTRUE;
769  }
770 
771  } else {
772  // Increment the counter ...
773  PDB(kPacketizer,2)
774  Info("TPacketizerAdaptive", " --> increment 'cur' by %lld", eNum);
775  cur += eNum;
776  }
777  // Re-adjust eNum and cur, if needed
778  if (inRange) {
779  cur += eNum;
780  eNum = e->GetNum();
781  }
782 
783  } else {
784  TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
785  if (enl) {
786  eNum = enl->GetN();
787  PDB(kPacketizer,2)
788  Info("TPacketizerAdaptive", " --> entry-list element: %lld entries", eNum);
789  } else {
790  TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
791  eNum = evl ? evl->GetN() : eNum;
792  PDB(kPacketizer,2)
793  Info("TPacketizerAdaptive", " --> event-list element: %lld entries (evl:%p)", eNum, evl);
794  }
795  if (!eNum) {
796  PDB(kPacketizer,2)
797  Info("TPacketizerAdaptive", " --> empty entry- or event-list element!");
798  continue;
799  }
800  }
801  PDB(kPacketizer,2)
802  Info("TPacketizerAdaptive", " --> next cur %lld", cur);
803 
804  // Map non URL filenames to dummy host
805  TString host;
806  if ( !url.IsValid() ||
807  (strncmp(url.GetProtocol(),"root", 4) &&
808  strncmp(url.GetProtocol(),"rfio", 4) &&
809  strncmp(url.GetProtocol(),"file", 4)) ) {
810  host = "no-host";
811  } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
812  host = "localhost";
813  url.SetProtocol("root");
814  } else {
815  host = url.GetHostFQDN();
816  }
817  // Get full name for local hosts
818  if (host.Contains("localhost") || host == "127.0.0.1") {
819  url.SetHost(gSystem->HostName());
820  host = url.GetHostFQDN();
821  }
822 
823  // Find, on which disk is the file
824  TString disk;
825  if (partitions) {
826  TIter iString(partitions);
827  TObjString* os = 0;
828  while ((os = (TObjString *)iString())) {
829  // Compare begining of the url with disk mountpoint
830  if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
831  disk = os->GetName();
832  break;
833  }
834  }
835  }
836  // Node's url
837  TString nodeStr;
838  if (disk.IsNull())
839  nodeStr.Form("%s://%s", url.GetProtocol(), host.Data());
840  else
841  nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
842  TFileNode *node = (TFileNode*) fFileNodes->FindObject(nodeStr);
843 
844 
845  if (node == 0) {
846  node = new TFileNode(nodeStr, fStrategy, fFilesToProcess);
847  fFileNodes->Add( node );
848  PDB(kPacketizer, 2)
849  Info("TPacketizerAdaptive", " --> creating new node '%s' for element", nodeStr.Data());
850  } else {
851  PDB(kPacketizer, 2)
852  Info("TPacketizerAdaptive", " --> adding element to exiting node '%s'", nodeStr.Data());
853  }
854 
855  ++files;
856  fTotalEntries += eNum;
857  node->Add(e, kTRUE);
858  node->IncEvents(eNum);
859  PDB(kPacketizer,2) e->Print("a");
860  }
861  PDB(kPacketizer,1)
862  Info("TPacketizerAdaptive", "processing %lld entries in %d files on %d hosts",
863  fTotalEntries, files, fFileNodes->GetSize());
864 
865  // Set the total number for monitoring
866  if (gPerfStats)
867  gPerfStats->SetNumEvents(fTotalEntries);
868 
869  Reset();
870 
871  InitStats();
872 
873  if (!fValid)
875 
876  PDB(kPacketizer,1) Info("TPacketizerAdaptive", "return");
877 }
878 
879 ////////////////////////////////////////////////////////////////////////////////
880 /// Destructor.
881 
883 {
884  if (fSlaveStats) {
886  }
887 
892  SafeDelete(fFilesToProcess);
893 }
894 
895 ////////////////////////////////////////////////////////////////////////////////
896 /// (re)initialise the statistics
897 /// called at the begining or after a worker dies.
898 
900 {
901  // calculating how many files from TDSet are not cached on
902  // any slave
903  Int_t noRemoteFiles = 0;
904  fNEventsOnRemLoc = 0;
905  Int_t totalNumberOfFiles = 0;
906  TIter next(fFileNodes);
907  while (TFileNode *fn = (TFileNode*)next()) {
908  totalNumberOfFiles += fn->GetNumberOfFiles();
909  if (fn->GetMySlaveCnt() == 0) {
910  noRemoteFiles += fn->GetNumberOfFiles();
911  fNEventsOnRemLoc += (fn->GetNEvents() - fn->GetProcessed());
912  }
913  }
914 
915  if (totalNumberOfFiles == 0) {
916  Info("InitStats", "no valid or non-empty file found: setting invalid");
917  // No valid files: set invalid and return
918  fValid = kFALSE;
919  return;
920  }
921 
922  fFractionOfRemoteFiles = (1.0 * noRemoteFiles) / totalNumberOfFiles;
923  Info("InitStats",
924  "fraction of remote files %f", fFractionOfRemoteFiles);
925 
926  if (!fValid)
928 
929  PDB(kPacketizer,1) Info("InitStats", "return");
930 }
931 
932 ////////////////////////////////////////////////////////////////////////////////
933 /// Get next unallocated file from 'node' or other nodes:
934 /// First try 'node'. If there is no more files, keep trying to
935 /// find an unallocated file on other nodes.
936 
937 TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextUnAlloc(TFileNode *node, const char *nodeHostName)
938 {
939  TFileStat *file = 0;
940 
941  if (node != 0) {
942  PDB(kPacketizer, 2)
943  Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
944  file = node->GetNextUnAlloc();
945  if (file == 0) RemoveUnAllocNode(node);
946  } else {
947  if (nodeHostName && strlen(nodeHostName) > 0) {
948 
949  TFileNode *fn;
950  // Make sure that they are in the corrected order
951  fUnAllocated->Sort();
952  PDB(kPacketizer,2) fUnAllocated->Print();
953 
954  // Loop over unallocated fileNode list
955  for (int i = 0; i < fUnAllocated->GetSize(); i++) {
956 
957  if ((fn = (TFileNode *) fUnAllocated->At(i))) {
958  TUrl uu(fn->GetName());
959  PDB(kPacketizer, 2)
960  Info("GetNextUnAlloc", "comparing %s with %s...", nodeHostName, uu.GetHost());
961 
962  // Check, whether node's hostname is matching with current fileNode (fn)
963  if (!strcmp(nodeHostName, uu.GetHost())) {
964  node = fn;
965 
966  // Fetch next unallocated file from this node
967  if ((file = node->GetNextUnAlloc()) == 0) {
968  RemoveUnAllocNode(node);
969  node = 0;
970  } else {
971  PDB(kPacketizer, 2)
972  Info("GetNextUnAlloc", "found! (host: %s)", uu.GetHost());
973  break;
974  }
975  }
976  } else {
977  Warning("GetNextUnAlloc", "unallocate entry %d is empty!", i);
978  }
979  }
980 
981  if (node != 0 && fMaxSlaveCnt > 0 && node->GetExtSlaveCnt() >= fMaxSlaveCnt) {
982  // Unlike in TPacketizer we look at the number of ext slaves only.
983  PDB(kPacketizer,1)
984  Info("GetNextUnAlloc", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
985  node = 0;
986  }
987  }
988 
989  if (node == 0) {
990  while (file == 0 && ((node = NextNode()) != 0)) {
991  PDB(kPacketizer, 2)
992  Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
993  if ((file = node->GetNextUnAlloc()) == 0) RemoveUnAllocNode(node);
994  }
995  }
996  }
997 
998  if (file != 0) {
999  // if needed make node active
1000  if (fActive->FindObject(node) == 0) {
1001  fActive->Add(node);
1002  }
1003  }
1004 
1005  PDB(kPacketizer, 2) {
1006  if (!file) {
1007  Info("GetNextUnAlloc", "no file found!");
1008  } else {
1009  file->Print();
1010  }
1011  }
1012 
1013  return file;
1014 }
1015 
1016 ////////////////////////////////////////////////////////////////////////////////
1017 /// Get next node which has unallocated files.
1018 /// the order is determined by TFileNode::Compare
1019 
1020 TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextNode()
1021 {
1022  fUnAllocated->Sort();
1023  PDB(kPacketizer,2) {
1024  fUnAllocated->Print();
1025  }
1026 
1027  TFileNode *fn = (TFileNode*) fUnAllocated->First();
1028  if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
1029  // unlike in TPacketizer we look at the number of ext slaves only.
1030  PDB(kPacketizer,1)
1031  Info("NextNode", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
1032  fn = 0;
1033  }
1034 
1035  return fn;
1036 }
1037 
1038 ////////////////////////////////////////////////////////////////////////////////
1039 /// Remove unallocated node.
1040 
1042 {
1043  fUnAllocated->Remove(node);
1044 }
1045 
1046 ////////////////////////////////////////////////////////////////////////////////
1047 /// Get next active file.
1048 
1049 TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextActive()
1050 {
1051  TFileNode *node;
1052  TFileStat *file = 0;
1053 
1054  while (file == 0 && ((node = NextActiveNode()) != 0)) {
1055  file = node->GetNextActive();
1056  if (file == 0) RemoveActiveNode(node);
1057  }
1058 
1059  return file;
1060 }
1061 
1062 
1063 ////////////////////////////////////////////////////////////////////////////////
1064 /// Get next active node.
1065 
1066 TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextActiveNode()
1067 {
1068  fActive->Sort();
1069  PDB(kPacketizer,2) {
1070  Info("NextActiveNode", "enter");
1071  fActive->Print();
1072  }
1073 
1074  TFileNode *fn = (TFileNode*) fActive->First();
1075  // look at only ext slaves
1076  if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
1077  PDB(kPacketizer,1)
1078  Info("NextActiveNode","reached Workers-per-Node limit (%ld)", fMaxSlaveCnt);
1079  fn = 0;
1080  }
1081 
1082  return fn;
1083 }
1084 
1085 ////////////////////////////////////////////////////////////////////////////////
1086 /// Remove file from the list of actives.
1087 
1089 {
1090  TFileNode *node = file->GetNode();
1091 
1092  node->RemoveActive(file);
1093  if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
1094 }
1095 
1096 ////////////////////////////////////////////////////////////////////////////////
1097 /// Remove node from the list of actives.
1098 
1100 {
1101  fActive->Remove(node);
1102 }
1103 
1104 ////////////////////////////////////////////////////////////////////////////////
1105 /// Reset the internal data structure for packet distribution.
1106 
1108 {
1109  fUnAllocated->Clear();
1111 
1112  fActive->Clear();
1113 
1114  TIter files(fFileNodes);
1115  TFileNode *fn;
1116  while ((fn = (TFileNode*) files.Next()) != 0) {
1117  fn->Reset();
1118  }
1119 
1120  TIter slaves(fSlaveStats);
1121  TObject *key;
1122  while ((key = slaves.Next()) != 0) {
1123  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
1124  if (!slstat) {
1125  Warning("Reset", "TSlaveStat associated to key '%s' is NULL", key->GetName());
1126  continue;
1127  }
1128  // Find out which file nodes are on the worker machine and assign the
1129  // one with less workers assigned
1130  TFileNode *fnmin = 0;
1131  Int_t fncnt = fSlaveStats->GetSize();
1132  files.Reset();
1133  while ((fn = (TFileNode*) files.Next()) != 0) {
1134  if (!strcmp(slstat->GetName(), TUrl(fn->GetName()).GetHost())) {
1135  if (fn->GetMySlaveCnt() < fncnt) {
1136  fnmin = fn;
1137  fncnt = fn->GetMySlaveCnt();
1138  }
1139  }
1140  }
1141  if (fnmin != 0 ) {
1142  slstat->SetFileNode(fnmin);
1143  fnmin->IncMySlaveCnt();
1144  PDB(kPacketizer, 2)
1145  Info("Reset","assigning node '%s' to '%s' (cnt: %d)",
1146  fnmin->GetName(), slstat->GetName(), fnmin->GetMySlaveCnt());
1147  }
1148  slstat->fCurFile = 0;
1149  }
1150 }
1151 
1152 ////////////////////////////////////////////////////////////////////////////////
1153 /// Check existence of file/dir/tree an get number of entries.
1154 /// Assumes the files have been setup.
1155 
1157  Long64_t maxent, Bool_t byfile)
1158 {
1159  TMap slaves_by_sock;
1160  TMonitor mon;
1161  TList workers;
1162 
1163 
1164  // Setup the communication infrastructure
1165 
1166  workers.AddAll(slaves);
1167  TIter si(slaves);
1168  TSlave *slm;
1169  while ((slm = (TSlave*)si.Next()) != 0) {
1170  PDB(kPacketizer,3)
1171  Info("ValidateFiles","socket added to monitor: %p (%s)",
1172  slm->GetSocket(), slm->GetName());
1173  mon.Add(slm->GetSocket());
1174  slaves_by_sock.Add(slm->GetSocket(), slm);
1175  }
1176 
1177  mon.DeActivateAll();
1178 
1179  ((TProof*)gProof)->DeActivateAsyncInput();
1180 
1181  // Some monitoring systems (TXSocketHandler) need to know this
1182  ((TProof*)gProof)->fCurrentMonitor = &mon;
1183 
1184  // Identify the type
1185  if (!strcmp(dset->GetType(), "TTree")) SetBit(TVirtualPacketizer::kIsTree);
1186 
1187  // Preparing for client notification
1188  TString msg("Validating files");
1189  UInt_t n = 0;
1190  UInt_t tot = dset->GetListOfElements()->GetSize();
1191  Bool_t st = kTRUE;
1192 
1193  Long64_t totent = 0, nopenf = 0;
1194  while (kTRUE) {
1195 
1196  // send work
1197  while (TSlave *s = (TSlave *)workers.First()) {
1198 
1199  workers.Remove(s);
1200 
1201  // find a file
1202 
1203  TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
1204  if (!slstat) {
1205  Error("ValidateFiles", "TSlaveStat associated to slave '%s' is NULL", s->GetName());
1206  continue;
1207  }
1208 
1209  TFileNode *node = 0;
1210  TFileStat *file = 0;
1211 
1212  // try its own node first
1213  if ((node = slstat->GetFileNode()) != 0) {
1214  PDB(kPacketizer,3) node->Print();
1215  file = GetNextUnAlloc(node);
1216  if (file == 0)
1217  slstat->SetFileNode(0);
1218  }
1219 
1220  // look for a file on any other node if necessary
1221  if (file == 0)
1222  file = GetNextUnAlloc();
1223 
1224  if (file != 0) {
1225  // files are done right away
1226  RemoveActive(file);
1227 
1228  slstat->fCurFile = file;
1229  TDSetElement *elem = file->GetElement();
1230  Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
1231  if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
1232  // This is decremented when we get the reply
1233  file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1235  m << dset->IsTree()
1236  << TString(elem->GetFileName())
1237  << TString(elem->GetDirectory())
1238  << TString(elem->GetObjName());
1239 
1240  s->GetSocket()->Send( m );
1241  mon.Activate(s->GetSocket());
1242  PDB(kPacketizer,2)
1243  Info("ValidateFiles",
1244  "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
1245  s->GetOrdinal(), s->GetName(), s->GetSocket(),
1246  dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
1247  elem->GetDirectory(), elem->GetObjName());
1248  } else {
1249  // Fill the info
1250  elem->SetTDSetOffset(entries);
1251  if (entries > 0) {
1252  // Most likely valid
1253  elem->SetValid();
1254  if (!elem->GetEntryList()) {
1255  if (elem->GetFirst() > entries) {
1256  Error("ValidateFiles",
1257  "first (%lld) higher then number of entries (%lld) in %s",
1258  elem->GetFirst(), entries, elem->GetFileName());
1259  // disable element
1260  slstat->fCurFile->SetDone();
1261  elem->Invalidate();
1262  dset->SetBit(TDSet::kSomeInvalid);
1263  }
1264  if (elem->GetNum() == -1) {
1265  elem->SetNum(entries - elem->GetFirst());
1266  } else if (elem->GetFirst() + elem->GetNum() > entries) {
1267  Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of"
1268  " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
1269  entries, elem->GetFileName());
1270  elem->SetNum(entries - elem->GetFirst());
1271  }
1272  PDB(kPacketizer,2)
1273  Info("ValidateFiles",
1274  "found elem '%s' with %lld entries", elem->GetFileName(), entries);
1275  }
1276  }
1277  // Count
1278  totent += entries;
1279  nopenf++;
1280  // Notify the client
1281  n++;
1282  gProof->SendDataSetStatus(msg, n, tot, st);
1283 
1284  // This worker is ready for the next validation
1285  workers.Add(s);
1286  }
1287  }
1288  }
1289 
1290  // Check if there is anything to wait for
1291  if (mon.GetActive() == 0) {
1292  if (byfile && maxent > 0) {
1293  // How many files do we still need ?
1294  Long64_t nrestf = (maxent - totent) * nopenf / totent ;
1295  if (nrestf <= 0 && maxent > totent) nrestf = 1;
1296  if (nrestf > 0) {
1297  PDB(kPacketizer,3)
1298  Info("ValidateFiles", "{%lld, %lld, %lld}: needs to validate %lld more files",
1299  maxent, totent, nopenf, nrestf);
1300  si.Reset();
1301  while ((slm = (TSlave *) si.Next()) && nrestf--) {
1302  workers.Add(slm);
1303  }
1304  continue;
1305  } else {
1306  PDB(kPacketizer,3)
1307  Info("ValidateFiles", "no need to validate more files");
1308  break;
1309  }
1310  } else {
1311  break;
1312  }
1313  }
1314 
1315  PDB(kPacketizer,3) {
1316  Info("ValidateFiles", "waiting for %d slaves:", mon.GetActive());
1317  TList *act = mon.GetListOfActives();
1318  TIter next(act);
1319  while (TSocket *s = (TSocket*) next()) {
1320  TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
1321  if (sl)
1322  Info("ValidateFiles", " worker-%s (%s)",
1323  sl->GetOrdinal(), sl->GetName());
1324  }
1325  delete act;
1326  }
1327 
1328  TSocket *sock = mon.Select();
1329  // If we have been interrupted break
1330  if (!sock) {
1331  Error("ValidateFiles", "selection has been interrupted - STOP");
1332  mon.DeActivateAll();
1333  fValid = kFALSE;
1334  break;
1335  }
1336  mon.DeActivate(sock);
1337 
1338  PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
1339 
1340  TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
1341  if (!sock->IsValid()) {
1342  // A socket got invalid during validation
1343  Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
1344  slave->GetOrdinal(), slave->GetName());
1345  ((TProof*)gProof)->MarkBad(slave, "socket got invalid during validation");
1346  fValid = kFALSE;
1347  break;
1348  }
1349 
1350  TMessage *reply;
1351 
1352  if (sock->Recv(reply) <= 0) {
1353  // Notify
1354  Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
1355  slave->GetOrdinal(), slave->GetName());
1356  // Help! lost a slave? ('slave' is deleted inside here ...)
1357  ((TProof*)gProof)->MarkBad(slave, "receive failed during validation");
1358  fValid = kFALSE;
1359  continue;
1360  }
1361 
1362  if (reply->What() != kPROOF_GETENTRIES) {
1363  // Not what we want: handover processing to the central machinery
1364  Int_t what = reply->What();
1365  ((TProof*)gProof)->HandleInputMessage(slave, reply);
1366  if (what == kPROOF_FATAL) {
1367  Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
1368  slave->GetOrdinal(), slave->GetName());
1369  fValid = kFALSE;
1370  } else {
1371  // Reactivate the socket
1372  mon.Activate(sock);
1373  }
1374  // Get next message
1375  continue;
1376  }
1377 
1378  TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1379  TDSetElement *e = slavestat->fCurFile->GetElement();
1380  slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName());
1381  Long64_t entries;
1382 
1383  (*reply) >> entries;
1384 
1385  // Extract object name, if there
1386  if ((reply->BufferSize() > reply->Length())) {
1387  TString objname;
1388  (*reply) >> objname;
1389  e->SetTitle(objname);
1390  }
1391 
1392  e->SetTDSetOffset(entries);
1393  if (entries > 0) {
1394 
1395  // This dataset element is most likely valid
1396  e->SetValid();
1397 
1398  if (!e->GetEntryList()) {
1399  if (e->GetFirst() > entries) {
1400  Error("ValidateFiles",
1401  "first (%lld) higher then number of entries (%lld) in %s",
1402  e->GetFirst(), entries, e->GetFileName());
1403 
1404  // Invalidate the element
1405  slavestat->fCurFile->SetDone();
1406  e->Invalidate();
1407  dset->SetBit(TDSet::kSomeInvalid);
1408  }
1409 
1410  if (e->GetNum() == -1) {
1411  e->SetNum(entries - e->GetFirst());
1412  } else if (e->GetFirst() + e->GetNum() > entries) {
1413  Error("ValidateFiles",
1414  "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1415  e->GetNum(), e->GetFirst(), entries, e->GetFileName());
1416  e->SetNum(entries - e->GetFirst());
1417  }
1418  }
1419 
1420  // Count
1421  totent += entries;
1422  nopenf++;
1423 
1424  // Notify the client
1425  n++;
1426  gProof->SendDataSetStatus(msg, n, tot, st);
1427 
1428  } else {
1429 
1430  Error("ValidateFiles", "cannot get entries for file: %s - skipping", e->GetFileName() );
1431  //
1432  // Need to fix this with a user option to allow incomplete file sets (rdm)
1433  //
1434  //fValid = kFALSE; // all element must be readable!
1435  if (gProofServ) {
1437  m << TString(Form("Cannot get entries for file: %s - skipping",
1438  e->GetFileName()));
1439  gProofServ->GetSocket()->Send(m);
1440  }
1441 
1442  // invalidate element
1443  e->Invalidate();
1444  dset->SetBit(TDSet::kSomeInvalid);
1445  }
1446  PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
1447 
1448  // Ready for the next job, unless we have enough files
1449  if (maxent < 0 || ((totent < maxent) && !byfile))
1450  workers.Add(slave);
1451  }
1452 
1453  // report std. output from slaves??
1454 
1455  ((TProof*)gProof)->ActivateAsyncInput();
1456 
1457  // This needs to be reset
1458  ((TProof*)gProof)->fCurrentMonitor = 0;
1459 
1460  // No reason to continue if invalid
1461  if (!fValid)
1462  return;
1463 
1464  // compute the offset for each file element
1465  Long64_t offset = 0;
1466  Long64_t newOffset = 0;
1467  TIter next(dset->GetListOfElements());
1468  TDSetElement *el;
1469  while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
1470  if (el->GetValid()) {
1471  newOffset = offset + el->GetTDSetOffset();
1472  el->SetTDSetOffset(offset);
1473  offset = newOffset;
1474  }
1475  }
1476 }
1477 
1478 ////////////////////////////////////////////////////////////////////////////////
1479 /// The result depends on the fStrategy
1480 
1482 {
1483  Long64_t num;
1484  if (fStrategy == 0) {
1485  // TPacketizer's heuristic for starting packet size
1486  // Constant packet size;
1487  Int_t nslaves = fSlaveStats->GetSize();
1488  if (nslaves > 0) {
1489  num = fTotalEntries / (fPacketAsAFraction * nslaves);
1490  } else {
1491  num = 1;
1492  }
1493  } else {
1494  // The dynamic heuristic for setting the packet size (default)
1495  // Calculates the packet size based on performance of this slave
1496  // and estimated time left until the end of the query.
1497  TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
1498  Float_t rate = slstat->GetCurRate();
1499  if (!rate)
1500  rate = slstat->GetAvgRate();
1501  if (rate) {
1502 
1503  // Global average rate
1504  Float_t avgProcRate = (GetEntriesProcessed()/(GetCumProcTime() / fSlaveStats->GetSize()));
1505  Float_t packetTime = ((fTotalEntries - GetEntriesProcessed())/avgProcRate)/fPacketAsAFraction;
1506 
1507  // Bytes-to-Event conversion
1508  Float_t bevt = (GetEntriesProcessed() > 0) ? GetBytesRead() / GetEntriesProcessed() : -1.;
1509 
1510  // Make sure it is not smaller then the cache, if the info is available and the size
1511  // synchronization is required. But apply the cache-packet size synchronization only if there
1512  // are enough left files to process and the files are all of similar sizes. Otherwise we risk
1513  // to not exploit optimally all potentially active workers.
1514  Bool_t cpsync = fCachePacketSync;
1515  if (fMaxEntriesRatio > 0. && cpsync) {
1516  if (fFilesToProcess && fFilesToProcess->GetSize() <= fSlaveStats->GetSize()) {
1517  Long64_t remEntries = fTotalEntries - GetEntriesProcessed();
1518  Long64_t maxEntries = -1;
1519  if (fFilesToProcess->Last()) {
1520  TDSetElement *elem = (TDSetElement *) ((TPacketizerAdaptive::TFileStat *) fFilesToProcess->Last())->GetElement();
1521  if (elem) maxEntries = elem->GetNum();
1522  }
1523  if (maxEntries > remEntries / fSlaveStats->GetSize() * fMaxEntriesRatio) {
1524  PDB(kPacketizer,3) {
1525  Info("CalculatePacketSize", "%s: switching off synchronization of packet and cache sizes:", slstat->GetOrdinal());
1526  Info("CalculatePacketSize", "%s: few files (%d) remaining of very different sizes (max/avg = %.2f > %.2f)",
1527  slstat->GetOrdinal(), fFilesToProcess->GetSize(),
1528  (Double_t)maxEntries / remEntries * fSlaveStats->GetSize(), fMaxEntriesRatio);
1529  }
1530  cpsync = kFALSE;
1531  }
1532  }
1533  }
1534  if (bevt > 0. && cachesz > 0 && cpsync) {
1535  if ((Long64_t)(rate * packetTime * bevt) < cachesz)
1536  packetTime = cachesz / bevt / rate;
1537  }
1538 
1539  // Apply min-max again, if required
1540  if (fMaxPacketTime > 0. && packetTime > fMaxPacketTime) packetTime = fMaxPacketTime;
1541  if (fMinPacketTime > 0. && packetTime < fMinPacketTime) packetTime = fMinPacketTime;
1542 
1543  // Translate the packet length in number of entries
1544  num = (Long64_t)(rate * packetTime);
1545 
1546  // Notify
1547  PDB(kPacketizer,2)
1548  Info("CalculatePacketSize","%s: avgr: %f, rate: %f, left: %lld, pacT: %f, sz: %f (csz: %f), num: %lld",
1549  slstat->GetOrdinal(), avgProcRate, rate, fTotalEntries - GetEntriesProcessed(),
1550  packetTime, ((bevt > 0) ? num*bevt/1048576. : -1.), cachesz/1048576., num);
1551 
1552  } else {
1553  // First packet for this worker in this query
1554  // Twice the learning phase
1555  num = (learnent > 0) ? 5 * learnent : 1000;
1556 
1557  // Notify
1558  PDB(kPacketizer,2)
1559  Info("CalculatePacketSize","%s: num: %lld", slstat->GetOrdinal(), num);
1560  }
1561  }
1562  if (num < 1) num = 1;
1563  return num;
1564 }
1565 
1566 ////////////////////////////////////////////////////////////////////////////////
1567 /// To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS
1568 /// message (when the worker was asked to stop processing during a packet).
1569 /// returns the #entries intended in the last packet - #processed entries
1570 
1572  TProofProgressStatus *status,
1573  Double_t latency,
1574  TList **listOfMissingFiles)
1575 {
1576  // find slave
1577  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1578  if (!slstat) {
1579  Error("AddProcessed", "%s: TSlaveStat instance for worker %s not found!",
1580  (sl ? sl->GetOrdinal() : "x.x"),
1581  (sl ? sl->GetName() : "**undef**"));
1582  return -1;
1583  }
1584 
1585  // update stats & free old element
1586 
1587  if ( slstat->fCurElem != 0 ) {
1588  Long64_t expectedNumEv = slstat->fCurElem->GetNum();
1589  // Calculate the number of events processed in the last packet
1590  Long64_t numev;
1591  if (status && status->GetEntries() > 0)
1592  numev = status->GetEntries() - slstat->GetEntriesProcessed();
1593  else
1594  numev = 0;
1595 
1596  // Calculate the progress made in the last packet
1597  TProofProgressStatus *progress = 0;
1598  if (numev > 0) {
1599  // This also moves the pointer in the corrsponding TFileInfo
1600  progress = slstat->AddProcessed(status);
1601  if (progress) {
1602  (*fProgressStatus) += *progress;
1603  // update processing rate
1604  slstat->UpdateRates(status);
1605  }
1606  } else {
1607  progress = new TProofProgressStatus();
1608  }
1609  if (progress) {
1610  PDB(kPacketizer,2)
1611  Info("AddProcessed", "%s: %s: %lld %7.3lf %7.3lf %7.3lf %lld",
1612  sl->GetOrdinal(), sl->GetName(), progress->GetEntries(), latency,
1613  progress->GetProcTime(), progress->GetCPUTime(), progress->GetBytesRead());
1614 
1615  if (gPerfStats)
1616  gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(),
1617  slstat->fCurElem->GetFileName(),
1618  progress->GetEntries(),
1619  latency,
1620  progress->GetProcTime(),
1621  progress->GetCPUTime(),
1622  progress->GetBytesRead());
1623  delete progress;
1624  }
1625  if (numev != expectedNumEv) {
1626  // The last packet was not fully processed
1627  // and will be split in two:
1628  // - The completed part was marked as done.
1629  // - Create a new packet with the part to be resubmitted.
1630  TDSetElement *newPacket = new TDSetElement(*(slstat->fCurElem));
1631  if (newPacket && numev < expectedNumEv) {
1632  Long64_t first = newPacket->GetFirst();
1633  newPacket->SetFirst(first + numev);
1634  if (ReassignPacket(newPacket, listOfMissingFiles) == -1)
1635  SafeDelete(newPacket);
1636  } else
1637  Error("AddProcessed", "%s: processed too much? (%lld, %lld)",
1638  sl->GetOrdinal(), numev, expectedNumEv);
1639 
1640  // TODO: a signal handler which will send info from the worker
1641  // after a packet fails.
1642  /* Add it to the failed packets list.
1643  if (!fFailedPackets) {
1644  fFailedPackets = new TList();
1645  }
1646  fFailedPackets->Add(slstat->fCurElem);
1647  */
1648  }
1649 
1650  slstat->fCurElem = 0;
1651  return (expectedNumEv - numev);
1652  } else {
1653  // the kPROOF_STOPPRPOCESS message is send after the worker receives zero
1654  // as the reply to kPROOF_GETNEXTPACKET
1655  return -1;
1656  }
1657 }
1658 
1659 ////////////////////////////////////////////////////////////////////////////////
1660 /// Get next packet;
1661 /// A meaningfull difference to TPacketizer is the fact that this
1662 /// packetizer, for each worker, tries to predict whether the worker
1663 /// will finish processing it's local files before the end of the query.
1664 /// If yes, it allocates, to those workers, files from non-slave filenodes
1665 /// or from slaves that are overloaded. The check is done every time a new
1666 /// file needs to be assigned.
1667 
1669 {
1670  if ( !fValid ) {
1671  return 0;
1672  }
1673 
1674  // find slave
1675 
1676  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1677  if (!slstat) {
1678  Error("GetNextPacket", "TSlaveStat instance for worker %s not found!",
1679  (sl ? sl->GetName() : "**undef**"));
1680  return 0;
1681  }
1682 
1683  // Attach to current file
1684  TFileStat *file = slstat->fCurFile;
1685 
1686  // Update stats & free old element
1687 
1688  Bool_t firstPacket = kFALSE;
1689  Long64_t cachesz = -1;
1690  Int_t learnent = -1;
1691  if ( slstat->fCurElem != 0 ) {
1692 
1693  Long64_t restEntries = 0;
1694  Double_t latency, proctime, proccpu;
1695  TProofProgressStatus *status = 0;
1696  Bool_t fileNotOpen = kFALSE, fileCorrupted = kFALSE;
1697 
1698  if (sl->GetProtocol() > 18) {
1699 
1700  (*r) >> latency;
1701  (*r) >> status;
1702 
1703  if (sl->GetProtocol() > 25) {
1704  (*r) >> cachesz >> learnent;
1705  if (r->BufferSize() > r->Length()) (*r) >> restEntries;
1706  }
1707  fileNotOpen = status->TestBit(TProofProgressStatus::kFileNotOpen) ? kTRUE : kFALSE;
1708  fileCorrupted = status->TestBit(TProofProgressStatus::kFileCorrupted) ? kTRUE : kFALSE;
1709 
1710  } else {
1711 
1712  Long64_t bytesRead = -1;
1713 
1714  (*r) >> latency >> proctime >> proccpu;
1715  // only read new info if available
1716  if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
1717  if (r->BufferSize() > r->Length()) (*r) >> restEntries;
1718  Long64_t totev = 0;
1719  if (r->BufferSize() > r->Length()) (*r) >> totev;
1720 
1721  status = new TProofProgressStatus(totev, bytesRead, -1, proctime, proccpu);
1722  fileNotOpen = (restEntries < 0) ? kTRUE : kFALSE;
1723  }
1724 
1725  if (!fileNotOpen && !fileCorrupted) {
1726  if (AddProcessed(sl, status, latency) != 0)
1727  Error("GetNextPacket", "%s: the worker processed a different # of entries", sl->GetOrdinal());
1730  Error("GetNextPacket", "%s: processed too many entries! (%lld, %lld)",
1732  // Send last timer message and stop the timer
1733  HandleTimer(0);
1735  }
1736  } else {
1737  if (file) {
1738  if (file->GetElement()) {
1739  if (fileCorrupted) {
1740  Info("GetNextPacket", "%s: file '%s' turned corrupted: invalidating file (%lld)",
1741  sl->GetOrdinal(), file->GetElement()->GetName(), restEntries);
1742  Int_t nunproc = AddProcessed(sl, status, latency);
1743  PDB(kPacketizer,1)
1744  Info("GetNextPacket", "%s: %d entries un-processed", sl->GetOrdinal(), nunproc);
1745  // Remaining to be processed
1746  Long64_t num = 0;
1747  if (file->GetElement()->TestBit(TDSetElement::kCorrupted)) {
1748  // Add the remainign entries in the packet to the ones already registered
1749  num = file->GetElement()->GetEntries() + restEntries;
1750  } else {
1751  // First call: add the remaining entries in the packet and those of the file
1752  // not yet assigned
1753  Long64_t rest = file->GetElement()->GetEntries() - file->GetNextEntry();
1754  num = restEntries + rest;
1755  }
1756  file->GetElement()->SetEntries(num);
1757  PDB(kPacketizer,1)
1758  Info("GetNextPacket", "%s: removed file: %s, entries left: %lld", sl->GetOrdinal(),
1759  file->GetElement()->GetName(), file->GetElement()->GetEntries());
1760  // Flag as corrupted
1761  file->GetElement()->SetBit(TDSetElement::kCorrupted);
1762  } else {
1763  Info("GetNextPacket", "%s: file '%s' could not be open: invalidating related element",
1764  sl->GetOrdinal(), file->GetElement()->GetName());
1765  }
1766  // Invalidate the element
1767  file->GetElement()->Invalidate();
1768  // Add it to the failed packets list
1769  if (!fFailedPackets) fFailedPackets = new TList();
1770  if (!fFailedPackets->FindObject(file->GetElement()))
1771  fFailedPackets->Add(file->GetElement());
1772  }
1773  // Deactivate this TFileStat
1774  file->SetDone();
1775  RemoveActive(file);
1776  } else {
1777  Info("GetNextPacket", "%s: error raised by worker, but TFileStat object invalid:"
1778  " protocol error?", sl->GetOrdinal());
1779  }
1780  }
1781  } else {
1782  firstPacket = kTRUE;
1783  }
1784 
1785  if ( fStop ) {
1786  HandleTimer(0);
1787  return 0;
1788  }
1789 
1790  TString nodeName;
1791  if (file != 0) nodeName = file->GetNode()->GetName();
1792  TString nodeHostName(slstat->GetName());
1793 
1794  PDB(kPacketizer,3)
1795  Info("GetNextPacket", "%s: entries processed: %lld - looking for a packet from node '%s'",
1796  sl->GetOrdinal(), fProgressStatus->GetEntries(), nodeName.Data());
1797 
1798  // If current file is just finished
1799  if ( file != 0 && file->IsDone() ) {
1800  file->GetNode()->DecExtSlaveCnt(slstat->GetName());
1801  file->GetNode()->DecRunSlaveCnt();
1802  if (gPerfStats)
1803  gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
1804  file->GetElement()->GetFileName(), kFALSE);
1805  file = 0;
1806  }
1807  // Reset the current file field
1808  slstat->fCurFile = file;
1809 
1810  Long64_t avgEventsLeftPerSlave =
1813  return 0;
1814  // Get a file if needed
1815  if ( file == 0) {
1816  // Needs a new file
1817  Bool_t openLocal;
1818  // Aiming for localPreference == 1 when #local == #remote events left
1819  Float_t localPreference = fBaseLocalPreference - (fNEventsOnRemLoc /
1820  (0.4 *(fTotalEntries - fProgressStatus->GetEntries())));
1821  if ( slstat->GetFileNode() != 0 ) {
1822  // Local file node exists and has more events to process.
1823  fUnAllocated->Sort();
1824  TFileNode* firstNonLocalNode = (TFileNode*)fUnAllocated->First();
1825  Bool_t nonLocalNodePossible;
1826  if (fForceLocal)
1827  nonLocalNodePossible = 0;
1828  else
1829  nonLocalNodePossible = firstNonLocalNode ?
1830  (fMaxSlaveCnt < 0 || (fMaxSlaveCnt > 0 && firstNonLocalNode->GetExtSlaveCnt() < fMaxSlaveCnt))
1831  : 0;
1832  openLocal = !nonLocalNodePossible;
1833  Float_t slaveRate = slstat->GetAvgRate();
1834  if ( nonLocalNodePossible && fStrategy == 1) {
1835  // OpenLocal is set to kFALSE
1836  if ( slstat->GetFileNode()->GetRunSlaveCnt() >
1837  slstat->GetFileNode()->GetMySlaveCnt() - 1 )
1838  // External slaves help slstat -> don't open nonlocal files
1839  // -1 because, at this point slstat is not running
1840  openLocal = kTRUE;
1841  else if ( slaveRate == 0 ) { // first file for this slave
1842  // GetLocalEventsLeft() counts the potential slave
1843  // as running on its fileNode.
1844  if ( slstat->GetLocalEventsLeft() * localPreference
1845  > (avgEventsLeftPerSlave))
1846  openLocal = kTRUE;
1847  else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
1848  < slstat->GetLocalEventsLeft() * localPreference )
1849  openLocal = kTRUE;
1850  else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 )
1851  openLocal = kTRUE;
1852  else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 )
1853  openLocal = kTRUE;
1854  } else {
1855  // At this point slstat has a non zero avg rate > 0
1856  Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
1857  // And thus fCumProcTime, fProcessed > 0
1858  Float_t avgTime = avgEventsLeftPerSlave
1860  if (slaveTime * localPreference > avgTime)
1861  openLocal = kTRUE;
1862  else if ((firstNonLocalNode->GetEventsLeftPerSlave())
1863  < slstat->GetLocalEventsLeft() * localPreference)
1864  openLocal = kTRUE;
1865  }
1866  }
1867  if (openLocal || fStrategy == 0) {
1868  // Try its own node
1869  file = slstat->GetFileNode()->GetNextUnAlloc();
1870  if (!file)
1871  file = slstat->GetFileNode()->GetNextActive();
1872  if ( file == 0 ) {
1873  // No more files on this worker
1874  slstat->SetFileNode(0);
1875  }
1876  }
1877  }
1878 
1879  // Try to find an unused filenode first
1880  if(file == 0 && !fForceLocal)
1881  file = GetNextUnAlloc(0, nodeHostName);
1882 
1883  // Then look at the active filenodes
1884  if(file == 0 && !fForceLocal)
1885  file = GetNextActive();
1886 
1887  if (file == 0) return 0;
1888 
1889  PDB(kPacketizer,3) if (fFilesToProcess) fFilesToProcess->Print();
1890 
1891  slstat->fCurFile = file;
1892  // if remote and unallocated file
1893  if (file->GetNode()->GetMySlaveCnt() == 0 &&
1894  file->GetElement()->GetFirst() == file->GetNextEntry()) {
1895  fNEventsOnRemLoc -= file->GetElement()->GetNum();
1896  if (fNEventsOnRemLoc < 0) {
1897  Error("GetNextPacket",
1898  "inconsistent value for fNEventsOnRemLoc (%lld): stop delivering packets!",
1900  return 0;
1901  }
1902  }
1903  file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1904  file->GetNode()->IncRunSlaveCnt();
1905  if (gPerfStats)
1906  gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
1907  file->GetNode()->GetName(),
1908  file->GetElement()->GetFileName(), kTRUE);
1909  }
1910 
1911  Long64_t num = CalculatePacketSize(slstat, cachesz, learnent);
1912 
1913  // Get a packet
1914 
1915  TDSetElement *base = file->GetElement();
1916  Long64_t first = file->GetNextEntry();
1917  Long64_t last = base->GetFirst() + base->GetNum();
1918 
1919  // If the remaining part is smaller than the (packetsize * 1.5)
1920  // then increase the packetsize
1921 
1922  if ( first + num * 1.5 >= last ) {
1923  num = last - first;
1924  file->SetDone(); // done
1925  // Delete file from active list (unalloc list is single pass, no delete needed)
1926  RemoveActive(file);
1927  }
1928 
1929  // Update NextEntry in the file object
1930  file->MoveNextEntry(num);
1931 
1932  slstat->fCurElem = CreateNewPacket(base, first, num);
1933  if (base->GetEntryList())
1934  slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
1935 
1936  // Flag the first packet of a new run (dataset)
1937  if (firstPacket)
1938  slstat->fCurElem->SetBit(TDSetElement::kNewRun);
1939  else
1940  slstat->fCurElem->ResetBit(TDSetElement::kNewRun);
1941 
1942  PDB(kPacketizer,2)
1943  Info("GetNextPacket","%s: %s %lld %lld (%lld)", sl->GetOrdinal(), base->GetFileName(), first, first + num - 1, num);
1944 
1945  return slstat->fCurElem;
1946 }
1947 
1948 ////////////////////////////////////////////////////////////////////////////////
1949 /// Return the number of workers still processing
1950 
1952 {
1953  Int_t actw = 0;
1954  TIter nxw(fSlaveStats);
1955  TObject *key;
1956  while ((key = nxw())) {
1957  TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1958  if (wrkstat && wrkstat->fCurFile) actw++;
1959  }
1960  // Done
1961  return actw;
1962 }
1963 
1964 ////////////////////////////////////////////////////////////////////////////////
1965 /// Get Estimation of the current rate; just summing the current rates of
1966 /// the active workers
1967 
1969 {
1970  all = kTRUE;
1971  // Loop over the workers
1972  Float_t currate = 0.;
1973  if (fSlaveStats && fSlaveStats->GetSize() > 0) {
1974  TIter nxw(fSlaveStats);
1975  TObject *key;
1976  while ((key = nxw()) != 0) {
1977  TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1978  if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1979  // Sum-up the current rates
1980  currate += slstat->GetProgressStatus()->GetCurrentRate();
1981  } else {
1982  all = kFALSE;
1983  }
1984  }
1985  }
1986  // Done
1987  return currate;
1988 }
1989 
1990 ////////////////////////////////////////////////////////////////////////////////
1991 /// Get estimation for the number of processed entries and bytes read at time t,
1992 /// based on the numbers already processed and the latests worker measured speeds.
1993 /// If t <= 0 the current time is used.
1994 /// Only the estimation for the entries is currently implemented.
1995 /// This is needed to smooth the instantaneous rate plot.
1996 
1998  Long64_t &bytes, Long64_t &calls)
1999 {
2000  // Default value
2001  ent = GetEntriesProcessed();
2002  bytes = GetBytesRead();
2003  calls = GetReadCalls();
2004 
2005  // Parse option
2006  if (fUseEstOpt == kEstOff)
2007  // Do not use estimation
2008  return 0;
2009  Bool_t current = (fUseEstOpt == kEstCurrent) ? kTRUE : kFALSE;
2010 
2011  TTime tnow = gSystem->Now();
2012  Double_t now = (t > 0) ? (Double_t)t : Long64_t(tnow) / (Double_t)1000.;
2013  Double_t dt = -1;
2014 
2015  // Loop over the workers
2016  Bool_t all = kTRUE;
2017  Float_t trate = 0.;
2018  if (fSlaveStats && fSlaveStats->GetSize() > 0) {
2019  ent = 0;
2020  TIter nxw(fSlaveStats);
2021  TObject *key;
2022  while ((key = nxw()) != 0) {
2023  TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
2024  if (slstat) {
2025  // Those surely processed
2026  Long64_t e = slstat->GetEntriesProcessed();
2027  if (e <= 0) all = kFALSE;
2028  // Time elapsed since last update
2029  dt = now - slstat->GetProgressStatus()->GetLastUpdate();
2030  // Add estimated entries processed since last update
2031  Float_t rate = (current && slstat->GetCurRate() > 0) ? slstat->GetCurRate()
2032  : slstat->GetAvgRate();
2033  trate += rate;
2034  // Add estimated entries processed since last update
2035  e += (Long64_t) (dt * rate);
2036  // Add to the total
2037  ent += e;
2038  // Notify
2039  PDB(kPacketizer,3)
2040  Info("GetEstEntriesProcessed","%s: e:%lld rate:%f dt:%f e:%lld",
2041  slstat->fSlave->GetOrdinal(),
2042  slstat->GetEntriesProcessed(), rate, dt, e);
2043  }
2044  }
2045  }
2046  // Notify
2047  dt = now - fProgressStatus->GetLastUpdate();
2048  PDB(kPacketizer,2)
2049  Info("GetEstEntriesProcessed",
2050  "dt: %f, estimated entries: %lld (%lld), bytes read: %lld rate: %f (all: %d)",
2051  dt, ent, GetEntriesProcessed(), bytes, trate, all);
2052 
2053  // Check values
2054  ent = (ent > 0) ? ent : fProgressStatus->GetEntries();
2055  ent = (ent <= fTotalEntries) ? ent : fTotalEntries;
2056  bytes = (bytes > 0) ? bytes : fProgressStatus->GetBytesRead();
2057 
2058  // Done
2059  return ((all) ? 0 : 1);
2060 }
2061 
2062 ////////////////////////////////////////////////////////////////////////////////
2063 /// This method can be called at any time during processing
2064 /// as an effect of handling kPROOF_STOPPROCESS
2065 /// If the output list from this worker is going to be sent back to the master,
2066 /// the 'status' includes the number of entries processed by the slave.
2067 /// From this we calculate the remaining part of the packet.
2068 /// 0 indicates that the results from that worker were lost completely.
2069 /// Assume that the filenodes for which we have a TFileNode object
2070 /// are still up and running.
2071 
2073  TList **listOfMissingFiles)
2074 {
2075  TSlaveStat *slaveStat = (TSlaveStat *)(fSlaveStats->GetValue(s));
2076  if (!slaveStat) {
2077  Error("MarkBad", "Worker does not exist");
2078  return;
2079  }
2080  // Update worker counters
2081  if (slaveStat->fCurFile && slaveStat->fCurFile->GetNode()) {
2082  slaveStat->fCurFile->GetNode()->DecExtSlaveCnt(slaveStat->GetName());
2083  slaveStat->fCurFile->GetNode()->DecRunSlaveCnt();
2084  }
2085 
2086  // If status is defined, the remaining part of the last packet is
2087  // reassigned in AddProcessed called from handling kPROOF_STOPPROCESS
2088  if (!status) {
2089  // Get the subset processed by the bad worker.
2090  TList *subSet = slaveStat->GetProcessedSubSet();
2091  if (subSet) {
2092  // Take care of the current packet
2093  if (slaveStat->fCurElem) {
2094  subSet->Add(slaveStat->fCurElem);
2095  }
2096  // Merge overlapping or subsequent elements
2097  Int_t nmg = 0, ntries = 100;
2098  TDSetElement *e = 0, *enxt = 0;
2099  do {
2100  nmg = 0;
2101  e = (TDSetElement *) subSet->First();
2102  while ((enxt = (TDSetElement *) subSet->After(e))) {
2103  if (e->MergeElement(enxt) >= 0) {
2104  nmg++;
2105  subSet->Remove(enxt);
2106  delete enxt;
2107  } else {
2108  e = enxt;
2109  }
2110  }
2111  } while (nmg > 0 && --ntries > 0);
2112  // reassign the packets assigned to the bad slave and save the size;
2113  SplitPerHost(subSet, listOfMissingFiles);
2114  // the elements were reassigned so should not be deleted
2115  subSet->SetOwner(0);
2116  } else {
2117  Warning("MarkBad", "subset processed by bad worker not found!");
2118  }
2119  (*fProgressStatus) -= *(slaveStat->GetProgressStatus());
2120  }
2121  // remove slavestat from the map
2122  fSlaveStats->Remove(s);
2123  delete slaveStat;
2124  // recalculate fNEventsOnRemLoc and others
2125  InitStats();
2126 }
2127 
2128 ////////////////////////////////////////////////////////////////////////////////
2129 /// The file in the listOfMissingFiles can appear several times;
2130 /// in order to fix that, a TDSetElement::Merge method is needed.
2131 
2133  TList **listOfMissingFiles)
2134 {
2135  if (!e) {
2136  Error("ReassignPacket", "empty packet!");
2137  return -1;
2138  }
2139  // Check the old filenode
2140  TUrl url = e->GetFileName();
2141  // Check the host from which 'e' was previously read.
2142  // Map non URL filenames to dummy host
2143  TString host;
2144  if ( !url.IsValid() ||
2145  (strncmp(url.GetProtocol(),"root", 4) &&
2146  strncmp(url.GetProtocol(),"rfio", 4))) {
2147  host = "no-host";
2148  } else {
2149  host = url.GetHost();
2150  }
2151 
2152  // If accessible add it back to the old node
2153  // and do DecProcessed
2154  TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
2155  if (node && fTryReassign) {
2156  // The packet 'e' was processing data from this node.
2157  node->DecreaseProcessed(e->GetNum());
2158  // The file should be already in fFilesToProcess ...
2159  node->Add(e, kFALSE);
2160  if (!fUnAllocated->FindObject(node))
2161  fUnAllocated->Add(node);
2162  return 0;
2163  } else {
2164  // Add to the list of missing files
2165  TFileInfo *fi = e->GetFileInfo();
2166  if (listOfMissingFiles && *listOfMissingFiles)
2167  (*listOfMissingFiles)->Add((TObject *)fi);
2168  return -1;
2169  }
2170 }
2171 
2172 ////////////////////////////////////////////////////////////////////////////////
2173 /// Split into per host entries
2174 /// The files in the listOfMissingFiles can appear several times;
2175 /// in order to fix that, a TDSetElement::Merge method is needed.
2176 
2178  TList **listOfMissingFiles)
2179 {
2180  if (!elements) {
2181  Error("SplitPerHost", "Empty list of packets!");
2182  return;
2183  }
2184  if (elements->GetSize() <= 0) {
2185  Error("SplitPerHost", "The input list contains no elements");
2186  return;
2187  }
2188  TIter subSetIter(elements);
2189  TDSetElement *e;
2190  while ((e = (TDSetElement*) subSetIter.Next())) {
2191  if (ReassignPacket(e, listOfMissingFiles) == -1) {
2192  // Remove from the list in order to delete it.
2193  if (elements->Remove(e))
2194  Error("SplitPerHost", "Error removing a missing file");
2195  delete e;
2196  }
2197 
2198  }
2199 }
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles)
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS I...
virtual Bool_t IsValid() const
Definition: TSocket.h:162
An array of TObjects.
Definition: TObjArray.h:39
TFileInfo * GetFileInfo(const char *type="TTree")
Return the content of this element in the form of a TFileInfo.
Definition: TDSet.cxx:234
Long64_t GetTDSetOffset() const
Definition: TDSet.h:130
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
void Reset()
Reset the internal data structure for packet distribution.
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
Definition: TDSet.cxx:432
void Add(THist< DIMENSION, PRECISIONA > &to, THist< DIMENSION, PRECISIONB > &from)
Definition: THist.h:335
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
long long Long64_t
Definition: RtypesCore.h:69
virtual Long64_t GetN() const
Definition: TEntryList.h:77
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Definition: TUrl.cxx:518
Collectable string class.
Definition: TObjString.h:32
float Float_t
Definition: RtypesCore.h:53
const char Option_t
Definition: RtypesCore.h:62
void SplitPerHost(TList *elements, TList **listOfMissingFiles)
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to...
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
void SetTDSetOffset(Long64_t offset)
Definition: TDSet.h:131
This class represents a WWW compatible URL.
Definition: TUrl.h:41
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:10485
Bool_t TestBit(UInt_t f) const
Definition: TObject.h:173
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:818
const char * GetProtocol() const
Definition: TUrl.h:73
Definition: TDSet.h:153
Long64_t GetReadCalls() const
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Bool_t GetValid() const
Definition: TDSet.h:121
Long64_t GetBytesRead() const
TSocket * GetSocket() const
Definition: TProofServ.h:269
Double_t GetProcTime() const
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
Float_t GetProcTime() const
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:580
virtual void AddAll(const TCollection *col)
Add all objects from collection col to this collection.
Definition: TCollection.cxx:57
Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
Get estimation for the number of processed entries and bytes read at time t, based on the numbers alr...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
Definition: TMap.cxx:52
#define R__ASSERT(e)
Definition: TError.h:98
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor&#39;s active list.
Definition: TMonitor.cxx:168
Long64_t GetFirst() const
Definition: TDSet.h:114
const char * GetOrdinal() const
Definition: TSlave.h:135
Basic string class.
Definition: TString.h:137
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
Definition: TObject.cxx:594
const char * GetName() const
Returns name of object.
Definition: TSlave.h:128
Basic time type with millisecond precision.
Definition: TTime.h:29
virtual void DeActivateAll()
De-activate all activated sockets.
Definition: TMonitor.cxx:302
Long64_t GetEntries() const
Int_t GetProtocol() const
Definition: TSlave.h:137
Bool_t IsValid() const
Definition: TUrl.h:88
TList * GetListOfElements() const
Definition: TDSet.h:231
void Reset()
Definition: TCollection.h:161
Double_t GetCumProcTime() const
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
Definition: TUrl.cxx:467
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:732
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
Definition: TUrl.cxx:385
virtual ~TPacketizerAdaptive()
Destructor.
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:496
Int_t GetActiveWorkers()
Return the number of workers still processing.
Double_t GetLastUpdate() const
TFileNode * NextNode()
Get next node which has unallocated files.
Int_t Length() const
Definition: TBuffer.h:94
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
Definition: TList.cxx:770
const char * GetFile() const
Definition: TUrl.h:78
const char * GetHost() const
Definition: TUrl.h:76
#define SafeDelete(p)
Definition: RConfig.h:449
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition: TObject.cxx:187
TFileStat * GetNextActive()
Get next active file.
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries...
Long64_t GetNum() const
Definition: TDSet.h:116
#define PDB(mask, level)
Definition: TProofDebug.h:58
Int_t GetPerfIdx() const
Definition: TSlave.h:136
TSocket * GetSocket() const
Definition: TSlave.h:138
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition: TMonitor.cxx:284
Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st, Double_t latency, TList **listOfMissingFiles=0)
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was a...
virtual Int_t GetN() const
Definition: TEventList.h:58
const char * GetDirectory() const
Return directory where to look for object.
Definition: TDSet.cxx:256
A sorted doubly linked list.
Definition: TSortedList.h:30
std::vector< std::vector< double > > Data
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer, for each worker, tries to predict whether the worker will finish processing it&#39;s local files before the end of the query.
Double_t GetRate() const
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
TProofProgressStatus * fProgressStatus
A doubly linked list.
Definition: TList.h:47
static const char * what
Definition: stlLoader.cc:6
const char * GetObjName() const
Definition: TDSet.h:122
const char * GetName() const
Returns name of object.
Definition: TObjString.h:42
void SetLastEntries(Long64_t entries)
const int nEvents
Definition: testRooFit.cxx:42
Named parameter, streamable and storable.
Definition: TParameter.h:49
virtual TTime Now()
Get current time in milliseconds since 0:00 Jan 1 1995.
Definition: TSystem.cxx:467
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
Definition: TProof.cxx:9903
Bool_t IsTree() const
Definition: TDSet.h:225
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:556
ROOT::R::TRInterface & r
Definition: Object.C:4
virtual Bool_t IsSortable() const
Definition: TObject.h:139
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
TObject * Remove(TObject *key)
Remove the (key,value) pair with key from the map.
Definition: TMap.cxx:294
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:480
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:674
TObject * Next()
Definition: TCollection.h:158
Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent)
The result depends on the fStrategy.
TFileNode * NextActiveNode()
Get next active node.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2321
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition: TMonitor.cxx:498
unsigned int UInt_t
Definition: RtypesCore.h:42
TMarker * m
Definition: textangle.C:8
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
char * Form(const char *fmt,...)
Ssiz_t Length() const
Definition: TString.h:390
void Invalidate()
Definition: TDSet.h:136
Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles)
The file in the listOfMissingFiles can appear several times; in order to fix that, a TDSetElement::Merge method is needed.
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:33
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
Definition: TList.cxx:288
virtual Int_t Compare(const TObject *obj) const
Compare abstract method.
Definition: TObject.cxx:218
virtual TObject * At(Int_t idx) const
Returns the object at position idx. Returns 0 if idx is out of range.
Definition: TList.cxx:310
void Reset(Detail::TBranchProxy *x)
Int_t MergeElement(TDSetElement *elem)
Check if &#39;elem&#39; is overlapping or subsequent and, if the case, return a merged element.
Definition: TDSet.cxx:185
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
Definition: TMonitor.cxx:250
void Print(Option_t *options="") const
Print a TDSetElement. When option="a" print full data.
Definition: TDSet.cxx:264
TString GetString() const
Definition: TObjString.h:50
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap...
Definition: TMap.cxx:149
#define Printf
Definition: TGeoToOCC.h:18
#define gPerfStats
void InitStats()
(re)initialise the statistics called at the begining or after a worker dies.
UInt_t What() const
Definition: TMessage.h:80
void SetHost(const char *host)
Definition: TUrl.h:93
long Long_t
Definition: RtypesCore.h:50
R__EXTERN TProof * gProof
Definition: TProof.h:1110
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
Definition: TString.cxx:2240
#define ClassImp(name)
Definition: Rtypes.h:279
TObject * GetEntryList() const
Definition: TDSet.h:133
void SetValid()
Definition: TDSet.h:137
double Double_t
Definition: RtypesCore.h:55
virtual const char * HostName()
Return the system&#39;s host name.
Definition: TSystem.cxx:307
Long64_t GetEntriesProcessed() const
Double_t GetCPUTime() const
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:44
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
TFileStat * GetNextUnAlloc(TFileNode *node=0, const char *nodeHostName=0)
Get next unallocated file from &#39;node&#39; or other nodes: First try &#39;node&#39;.
TSortedList * fFilesToProcess
Definition: TProof.h:339
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:567
void SetNum(Long64_t num)
Definition: TDSet.h:120
const char * GetFileName() const
Definition: TDSet.h:113
Int_t BufferSize() const
Definition: TBuffer.h:92
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:348
const char * GetType() const
Definition: TDSet.h:228
Bool_t IsNull() const
Definition: TString.h:387
Mother of all ROOT objects.
Definition: TObject.h:58
Long64_t GetBytesRead() const
const char * GetDataSet() const
Definition: TDSet.h:124
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:359
virtual void Add(TObject *obj)
Definition: TList.h:81
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:234
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
Class describing a generic file including meta information.
Definition: TFileInfo.h:50
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
TProofProgressStatus * GetProgressStatus()
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:415
virtual Int_t GetSize() const
Definition: TCollection.h:95
Definition: TSlave.h:50
const Bool_t kTRUE
Definition: Rtypes.h:91
void SetFirst(Long64_t first)
Definition: TDSet.h:115
virtual void SetTitle(const char *title="")
Change (i.e. set) the title of the TNamed.
Definition: TNamed.cxx:152
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:27
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
const Int_t n
Definition: legend1.C:16
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:904
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:52
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
const char * Data() const
Definition: TString.h:349