74 fAsyncReading(kFALSE),
144 Info(
"TTreeCacheUnzip",
"Enabling Parallel Unzipping");
154 Warning(
"TTreeCacheUnzip",
"Parallel Option unknown");
235 if (entry == -1) entry=0;
269 if (!lbaskets || !entries)
continue;
273 for (
Int_t j=0;j<nb;j++) {
275 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j))
continue;
278 Int_t len = lbaskets[j];
279 if (pos <= 0 || len <= 0)
continue;
282 if (entries[j] < entry && (j<nb-1 && entries[j+1] <= entry))
continue;
285 if (j<nb-1) emax = entries[j+1]-1;
286 if (!elist->
ContainsRange(entries[j]+chainOffset,emax+chainOffset))
continue;
424 if (
gDebug > 0)
Info(
"SendSignal",
" fUnzipCondition->Signal()");
454 class TTreeCacheUnzipData {
469 if (nt > 10) nt = 10;
472 Info(
"StartThreadUnzip",
"Going to start %d threads.", nt);
474 for (
Int_t i = 0; i < nt; i++) {
480 Info(
"StartThreadUnzip",
"Going to start thread '%s'", nm.
Data());
482 TTreeCacheUnzipData *d =
new TTreeCacheUnzipData;
488 Error(
"TTreeCacheUnzip::StartThreadUnzip",
" Unable to create new thread.");
513 for (
Int_t i = 0; i < 1; i++) {
540 TTreeCacheUnzipData *d = (TTreeCacheUnzipData *)arg;
546 Int_t thrnum = d->fCount;
547 Int_t startindex = thrnum;
548 Int_t locbuffsz = 16384;
549 char *locbuff =
new char[16384];
558 if (myCycle != unzipMng->
fCycle) startindex = thrnum;
559 myCycle = unzipMng->
fCycle;
560 if (unzipMng->
fNseek) startindex = startindex % unzipMng->
fNseek;
561 else startindex = -1;
565 res = unzipMng->
UnzipCache(startindex, locbuffsz, locbuff);
615 Int_t nread = maxbytes;
618 if (nb < 0)
return nread;
620 const Int_t headerSize = 16;
621 if (nread < headerSize)
return nread;
626 if (!olen) olen = nbytes-klen;
665 Info(
"ResetCache",
"Changing fNseekMax from:%d to:%d", fNseekMax,
fNseek);
673 char **aUnzipChunks =
new char *[
fNseek];
674 memset(aUnzipChunks, 0,
fNseek*
sizeof(
char *));
738 char **aUnzipChunks =
new char *[
fNseek];
739 memset(aUnzipChunks, 0,
fNseek*
sizeof(
char *));
805 if ( myCycle !=
fCycle ) {
807 Info(
"GetUnzipBuffer",
"Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
951 const Int_t hlen=128;
952 Int_t nbytes=0, objlen=0, keylen=0;
960 Error(
"UnzipBuffer",
"Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
974 Bool_t oldCase = objlen==nbytes-keylen
978 if (objlen > nbytes-keylen || oldCase) {
981 memcpy(*dest, src, keylen);
984 char *objbuf = *dest + keylen;
994 Info(
"UnzipBuffer",
" nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
995 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
996 if (oldCase && (nin > objlen || nbuf > objlen)) {
998 Info(
"UnzipBuffer",
"oldcase objlen :%d ", objlen);
1001 memcpy( *dest + keylen, src + keylen, objlen);
1006 R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
1009 Info(
"UnzipBuffer",
"R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
1010 nin, bufcur, nbuf, objbuf, nout);
1014 if (noutot >= objlen)
break;
1019 if (noutot != objlen) {
1020 Error(
"UnzipBuffer",
"nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
1021 nbytes,keylen,objlen, noutot,nout,nin,nbuf);
1023 if(alloc)
delete [] *
dest;
1029 memcpy(*dest, src, keylen);
1031 memcpy(*dest + keylen, src + keylen, objlen);
1061 const Int_t hlen=128;
1062 Int_t objlen=0, keylen=0;
1066 Int_t idxtounzip = -1;
1074 Info(
"UnzipCache",
"Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1090 Int_t reqi = (startindex+ii) % fNseek;
1096 rdoffs =
fSeek[idxtounzip];
1107 if (idxtounzip < 0) {
1109 Info(
"UnzipCache",
"Nothing to do... startindex:%d fTotalUnzipBytes:%lld fUnzipBufferSize:%lld fNseek:%d",
1119 Info(
"UnzipCache",
"Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1127 if(locbuffsz < rdlen) {
1128 if (locbuff)
delete [] locbuff;
1130 locbuff =
new char[locbuffsz];
1132 }
else if(locbuffsz > rdlen*3) {
1133 if (locbuff)
delete [] locbuff;
1134 locbuffsz = rdlen*2;
1135 locbuff =
new char[locbuffsz];
1140 Info(
"UnzipCache",
"Going to unzip block %d", idxtounzip);
1149 Info(
"UnzipCache",
"Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1166 Info(
"UnzipCache",
"Block %d not done. rdoffs=%lld rdlen=%d readbuf=%d", idxtounzip, rdoffs, rdlen, readbuf);
1172 Int_t len = (objlen > nbytes-keylen)? keylen+objlen : nbytes;
1180 Info(
"UnzipCache",
"Block %d is too big, skipping.", idxtounzip);
1198 if ((loclen > 0) && (loclen == objlen+keylen)) {
1203 Info(
"UnzipCache",
"Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1224 Info(
"UnzipCache",
"reqi:%d, rdoffs:%lld, rdlen: %d, loclen:%d",
1225 idxtounzip, rdoffs, rdlen, loclen);
1231 Info(
"argh",
"loclen:%d objlen:%d loc:%d readbuf:%d", loclen, objlen, loc, readbuf);
1245 printf(
"******TreeCacheUnzip statistics for file: %s ******\n",
fFile->
GetName());
1247 printf(
"Number of blocks unzipped by threads: %d\n",
fNUnzip);
1248 printf(
"Number of hits: %d\n",
fNFound);
1249 printf(
"Number of stalls: %d\n",
fNStalls);
1250 printf(
"Number of misses: %d\n",
fNMissed);
TCondition * fUnzipStartCondition
virtual const char * GetName() const
Returns name of object.
static Int_t SetCancelDeferred()
Static method to set the cancellation response type of the calling thread to deferred, i.e.
Int_t fNtot
Total size of prefetched blocks.
void frombuf(char *&buf, Bool_t *x)
Long64_t fEntryMax
first entry in the cache
Long64_t * GetBasketEntry() const
void WaitUnzipStartSignal()
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
TFile * fFile
Pointer to file.
Int_t StopThreadUnzip()
To stop the thread we only need to change the value of the variable fActiveThread to false and the lo...
void UpdateBranches(TTree *tree)
update pointer to current Tree and recompute pointers to the branches in the cache ...
TObjArray * GetListOfBaskets()
virtual void ResetCache()
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
Int_t fNStalls
number of blocks that were found in the cache
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
virtual void StopLearningPhase()
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
A specialized TFileCacheRead object for a TTree.
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
Byte_t * fUnzipStatus
[fNseek] Individual unzipped chunks. Their summed size is kept under control.
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
TDirectory * GetDirectory() const
Bool_t FillBuffer()
Fill the cache buffer with the branches in the cache.
static Int_t SetCancelOn()
Static method to turn on thread cancellation.
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase...
TThread * fUnzipThread[10]
virtual Int_t AddBranch(TBranch *b, Bool_t subgbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
Long64_t * GetTreeOffset() const
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache...
Int_t fNFound
number of blocks that were unzipped
static Bool_t IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer...
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
We try to read a buffer that has already been unzipped Returns -1 in case of read failure...
std::queue< Int_t > fActiveBlks
number of blocks that were not found in the cache and were unzipped
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Helper class to iterate over cluster of baskets.
Int_t * GetBasketBytes() const
static void * UnzipLoop(void *arg)
This is a static function.
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Vc_ALWAYS_INLINE void free(T *p)
Frees memory that was allocated with Vc::malloc.
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
virtual Long64_t GetReadEntry() const
Bool_t fIsTransferred
True when fBuffer contains something valid.
Int_t fNMissed
number of hits which caused a stall
virtual Bool_t ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
void Init(TClassEdit::TInterpreterLookupHelper *helper)
virtual TClusterIterator GetClusterIterator(Long64_t firstentry)
Return an iterator over the cluster of baskets starting at firstentry.
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
void Print(Option_t *option="") const
Print cache statistics.
static Long_t SelfId()
Static method returning the id for the current thread.
Specialization of TTreeCache for parallel Unzipping.
Int_t Run(void *arg=0)
Start the thread.
virtual TFile * GetFile() const
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
TTree * fTree
list of branch names in the cache
virtual Int_t AddBranch(TBranch *b, Bool_t subbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
static Double_t fgRelBuffSize
Max Size for the ready unzipped blocks (default is 2*fBufferSize)
void SendUnzipStartSignal(Bool_t broadcast)
This will send the signal corresponfing to the queue...
virtual ~TTreeCacheUnzip()
Destructor. (in general called by the TFile destructor)
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
R__EXTERN TSystem * gSystem
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Int_t fNseekMax
The total sum of the currently unzipped blks.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Int_t GetMaxBaskets() const
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread) ...
A TEventList object is a list of selected events (entries) in a TTree.
Long_t Join(void **ret=0)
Join this thread.
virtual Long64_t GetBasketSeek(Int_t basket) const
Return address of basket in the file.
Long64_t fEntryCurrent
last entry in the cache
Bool_t IsQueueEmpty()
It says if the queue is empty... useful to see if we have to process it.
TObject * UncheckedAt(Int_t i) const
virtual void Print(Option_t *option="") const
Print cache statistics.
Bool_t fIsLearning
pointer to the current Tree
Long64_t fUnzipBufferSize
fNseek can change so we need to know its max size
virtual Int_t GetTreeNumber() const
virtual Int_t GetBufferSize() const
TCondition * fUnzipDoneCondition
#define R__LOCKGUARD(mutex)
void Init()
Initialization procedure common to all the constructors.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relatibe buffer size
static TTreeCacheUnzip::EParUnzipMode fgParallel
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
virtual Long64_t GetEntries() const
char ** fUnzipChunks
[fNseek] Length of the unzipped buffers
Int_t fNbranches
next entry number where cache must be filled
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Bool_t IsActiveThread()
This indicates if the thread is active in this moment...
Int_t StartThreadUnzip(Int_t nthreads)
The Thread is only a part of the TTreeCache but it is the part that waits for info in the queue and p...
#define dest(otri, vertexptr)
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
A chain is a collection of files containg TTree objects.
Int_t TimedWaitRelative(ULong_t ms)
Wait to be signaled or till the timer times out.
A TTree object has a header with a name and a title.
TEventList * GetEventList() const
virtual const char * GetName() const
Returns name of object.
virtual Int_t GetSize() const
A TTree is a list of TBranches.
Long64_t fEntryNext
current lowest entry number in the cache
Long64_t fTotalUnzipBytes
[fNSeek] For each blk, tells us if it's unzipped or pending
Int_t UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff)
This inflates all the buffers in the cache.
void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
Long64_t BinarySearch(Long64_t n, const T *array, T value)
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Int_t fNseek
Number of blocks to be prefetched.
const char * Data() const