knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
knowrob::ThreadPool Class Reference

#include <ThreadPool.h>

Inheritance diagram for knowrob::ThreadPool:

Classes

class  LambdaRunner
 
class  Runner
 
class  Worker
 

Public Types

using ExceptionHandler = std::function< void(const std::exception &)>
 
using ExceptionHandler = std::function< void(const std::exception &)>
 

Public Member Functions

 ThreadPool (uint32_t maxNumThreads)
 
virtual ~ThreadPool ()
 
 ThreadPool (const ThreadPool &)=delete
 
void shutdown ()
 
void pushWork (const std::shared_ptr< ThreadPool::Runner > &goal, ThreadPool::ExceptionHandler exceptionHandler)
 
 ThreadPool (uint32_t maxNumThreads)
 
virtual ~ThreadPool ()
 
 ThreadPool (const ThreadPool &)=delete
 
void shutdown ()
 
void pushWork (const std::shared_ptr< ThreadPool::Runner > &goal, ThreadPool::ExceptionHandler exceptionHandler)
 

Protected Attributes

std::function< void()> finalizeWorker_
 

Detailed Description

A pool of worker threads waiting on tasks to be pushed into a work queue.

Definition at line 27 of file ThreadPool.h.

Member Typedef Documentation

◆ ExceptionHandler [1/2]

using knowrob::ThreadPool::ExceptionHandler = std::function<void(const std::exception &)>

Definition at line 32 of file ThreadPool.h.

◆ ExceptionHandler [2/2]

using knowrob::ThreadPool::ExceptionHandler = std::function<void(const std::exception &)>

Definition at line 32 of file ThreadPool.h.

Constructor & Destructor Documentation

◆ ThreadPool() [1/4]

ThreadPool::ThreadPool ( uint32_t  maxNumThreads)
explicit
Parameters
maxNumThreadsthe maximum number of worker threads

Definition at line 26 of file ThreadPool.cpp.

27  : maxNumThreads_(maxNumThreads),
28  numFinishedThreads_(0),
29  numActiveWorker_(0) {
30  // NOTE: do not add worker threads in the constructor.
31  // The problem is the virtual initializeWorker function that could be called
32  // in this case before a subclass of ThreadPool overrides it.
33  // i.e. if the thread starts before construction is complete.
34  KB_DEBUG("Maximum number of threads: {}.", maxNumThreads);
35 }
#define KB_DEBUG
Definition: Logger.h:25

◆ ~ThreadPool() [1/2]

ThreadPool::~ThreadPool ( )
virtual

Definition at line 37 of file ThreadPool.cpp.

37  {
38  shutdown();
39 }

◆ ThreadPool() [2/4]

knowrob::ThreadPool::ThreadPool ( const ThreadPool )
delete

Cannot be copy-assigned.

◆ ThreadPool() [3/4]

knowrob::ThreadPool::ThreadPool ( uint32_t  maxNumThreads)
explicit
Parameters
maxNumThreadsthe maximum number of worker threads

◆ ~ThreadPool() [2/2]

virtual knowrob::ThreadPool::~ThreadPool ( )
virtual

◆ ThreadPool() [4/4]

knowrob::ThreadPool::ThreadPool ( const ThreadPool )
delete

Cannot be copy-assigned.

Member Function Documentation

◆ pushWork() [1/2]

void ThreadPool::pushWork ( const std::shared_ptr< ThreadPool::Runner > &  goal,
ThreadPool::ExceptionHandler  exceptionHandler 
)

Pushes a goal for a worker. The goal is assigned to a worker thread when one is available.

Parameters
goalthe work goal
exceptionHandleran exception handler

Definition at line 70 of file ThreadPool.cpp.

70  {
71  {
72  std::lock_guard<std::mutex> scoped_lock(workMutex_);
73  goal->setExceptionHandler(std::move(exceptionHandler));
74  workQueue_.push(goal);
75 
76  uint32_t numAliveThreads = workerThreads_.size() - numFinishedThreads_;
77  uint32_t numAvailableThreads = numAliveThreads - numActiveWorker_;
78  if (numAvailableThreads == 0) {
79  // add another thread if max num not reached yet
80  if (workerThreads_.size() < (maxNumThreads_ + numFinishedThreads_)) {
81  workerThreads_.push_back(new Worker(this));
82  }
83  }
84  }
85  // wake up a worker if any is sleeping
86  workCV_.notify_one();
87 }

◆ pushWork() [2/2]

void knowrob::ThreadPool::pushWork ( const std::shared_ptr< ThreadPool::Runner > &  goal,
ThreadPool::ExceptionHandler  exceptionHandler 
)

Pushes a goal for a worker. The goal is assigned to a worker thread when one is available.

Parameters
goalthe work goal
exceptionHandleran exception handler

◆ shutdown() [1/2]

void ThreadPool::shutdown ( )

Shutdown the thread pool. Joining all the worker threads. If one of them is busy, this call will block until the worker has finished its work, and can gracefully exit.

Definition at line 41 of file ThreadPool.cpp.

41  {
42  for (Worker *t: workerThreads_) {
43  t->hasTerminateRequest_ = true;
44  }
45  workCV_.notify_all();
46 #ifdef KNOWROB_THREADING_DETACH_ON_EXIT
47  auto start = std::chrono::steady_clock::now();
48 #endif
49  for (Worker *t: workerThreads_) {
50 #ifdef KNOWROB_THREADING_DETACH_ON_EXIT
51  while (!t->isTerminated_) {
53  auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count();
54  if (elapsed >= MAX_WORKER_TERMINATE_TIME_MS) {
55  KB_WARN("Worker thread does not seem to exit, it will be detached!");
56  // Note: the most reliable way I found to forcefully terminate a thread without causing
57  // a SIGABRT exit of KnowRob is detaching the worker threads that do not want to exit.
58  t->thread_.detach();
59  break;
60  }
61  std::this_thread::sleep_for(std::chrono::milliseconds(100));
62  }
63 #endif
64  // note: worker destructor joins the thread
65  delete t;
66  }
67  workerThreads_.clear();
68 }
#define MAX_WORKER_TERMINATE_TIME_MS
Definition: ThreadPool.cpp:13
#define KB_WARN
Definition: Logger.h:27
TimePoint now()
Definition: TimePoint.cpp:12

◆ shutdown() [2/2]

void knowrob::ThreadPool::shutdown ( )

Shutdown the thread pool. Joining all the worker threads. If one of them is busy, this call will block until the worker has finished its work, and can gracefully exit.

Member Data Documentation

◆ finalizeWorker_

std::function< void()> knowrob::ThreadPool::finalizeWorker_
protected

Definition at line 168 of file ThreadPool.h.


The documentation for this class was generated from the following files: