12 #ifndef ROOT_TThreadPool 13 #define ROOT_TThreadPool 29 #ifndef ROOT_TCondition 71 template <
class aTask,
class aParam>
74 bool run(aParam ¶m) {
75 aTask *pThis =
reinterpret_cast<aTask *
>(
this);
76 return pThis->runTask(param);
88 template <
class aTask,
class aParam>
99 return fTask.run(fTaskParam);
117 template <
class aTask,
class aParam>
129 fIdleThreads(threadsCount),
133 fAllTasksDone =
new TCondition(&fMutexAllTasksDone);
135 for (
size_t i = 0; i < threadsCount; ++i) {
137 fThreads.push_back(pThread);
145 fThreadMonitor->Run();
152 threads_array_t::const_iterator iter = fThreads.begin();
153 threads_array_t::const_iterator iter_end = fThreads.end();
154 for (; iter != iter_end; ++iter)
157 delete fThreadJoinHelper;
159 delete fThreadNeeded;
160 delete fThreadAvailable;
161 delete fAllTasksDone;
167 fThreads.push_back(pThread);
174 DbgLog(
"Main thread. Try to push a task");
177 task_t *t =
new task_t(task, param);
181 DbgLog(
"Main thread. the task is pushed");
184 fThreadNeeded->Broadcast();
187 void Stop(
bool processRemainingJobs =
false) {
192 if (processRemainingJobs) {
195 while (!fTasks.empty() && !fStopped) {
196 DbgLog(
"Main thread is waiting");
197 fThreadAvailable->Wait();
198 DbgLog(
"Main thread is DONE waiting");
205 fThreadNeeded->Broadcast();
206 DbgLog(
"Main threads requests to STOP");
210 fThreadJoinHelper->Run();
211 fThreadJoinHelper->Join();
218 fAllTasksDone->Wait();
226 return fSuccessfulTasks;
240 std::stringstream ss;
242 <<
">>>> Check for tasks." 243 <<
" Number of Tasks: " << pThis->
fTasks.size()
262 pThis->
DbgLog(
"waiting for a task");
272 pThis->
DbgLog(
"done waiting for tasks");
278 if (!pThis->
fTasks.empty()) {
280 task = pThis->
fTasks.front();
283 pThis->
DbgLog(
"get the task");
288 pThis->
DbgLog(
"done Check <<<<");
293 pThis->
DbgLog(
"Run the task");
305 pThis->
DbgLog(
"Done Running the task");
312 pThis->
DbgLog(
"**** DONE ***");
318 threads_array_t::const_iterator iter = pThis->
fThreads.begin();
319 threads_array_t::const_iterator iter_end = pThis->
fThreads.end();
320 for (; iter != iter_end; ++iter)
TCondition * fThreadNeeded
TThreadPool(size_t threadsCount, bool needDbg=false)
TCondition * fThreadAvailable
TCondition * fAllTasksDone
static void * Executor(void *arg)
TThreadPoolTask< aTask, aParam > task_t
TThreadPoolTaskImp< aTask, aParam > task_t
static Long_t SelfId()
Static method returning the id for the current thread.
Int_t Run(void *arg=0)
Start the thread.
Int_t Wait()
Wait to be signaled.
static void * JoinHelper(void *arg)
size_t TasksCount() const
void PushTask(typename TThreadPoolTask< aTask, aParam >::task_t &task, aParam param)
const TNonCopyable & operator=(const TNonCopyable &)
static bool IsThreadActive(TThread *pThread)
std::vector< TThread * > threads_array_t
std::queue< task_t * > taskqueue_t
size_t IdleThreads() const
static void * Monitor(void *arg)
TThread * fThreadJoinHelper
TMutex fMutexAllTasksDone
void DbgLog(const std::string &msg)
R__EXTERN C unsigned int sleep(unsigned int seconds)
size_t SuccessfulTasks() const
void Stop(bool processRemainingJobs=false)
TThreadPoolTask(task_t &task, aParam ¶m)