knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
ThreadPool.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of KnowRob, please consult
3  * https://github.com/knowrob/knowrob for license details.
4  */
5 
6 #include <stdexcept>
7 #include <utility>
8 #include <knowrob/Logger.h>
9 #include <knowrob/ThreadPool.h>
10 #include <boost/python/errors.hpp>
11 #include "knowrob/integration/python/PythonError.h"
12 
13 #define MAX_WORKER_TERMINATE_TIME_MS 2000
14 #define KNOWROB_THREADING_DETACH_ON_EXIT
15 
16 using namespace knowrob;
17 
18 namespace knowrob {
19  std::shared_ptr<ThreadPool> DefaultThreadPool() {
20  static auto pool =
21  std::make_shared<ThreadPool>(std::thread::hardware_concurrency());
22  return pool;
23  }
24 }
25 
26 ThreadPool::ThreadPool(uint32_t maxNumThreads)
27  : maxNumThreads_(maxNumThreads),
28  numFinishedThreads_(0),
29  numActiveWorker_(0) {
30  // NOTE: do not add worker threads in the constructor.
31  // The problem is the virtual initializeWorker function that could be called
32  // in this case before a subclass of ThreadPool overrides it.
33  // i.e. if the thread starts before construction is complete.
34  KB_DEBUG("Maximum number of threads: {}.", maxNumThreads);
35 }
36 
38  shutdown();
39 }
40 
42  for (Worker *t: workerThreads_) {
43  t->hasTerminateRequest_ = true;
44  }
45  workCV_.notify_all();
46 #ifdef KNOWROB_THREADING_DETACH_ON_EXIT
47  auto start = std::chrono::steady_clock::now();
48 #endif
49  for (Worker *t: workerThreads_) {
50 #ifdef KNOWROB_THREADING_DETACH_ON_EXIT
51  while (!t->isTerminated_) {
53  auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count();
54  if (elapsed >= MAX_WORKER_TERMINATE_TIME_MS) {
55  KB_WARN("Worker thread does not seem to exit, it will be detached!");
56  // Note: the most reliable way I found to forcefully terminate a thread without causing
57  // a SIGABRT exit of KnowRob is detaching the worker threads that do not want to exit.
58  t->thread_.detach();
59  break;
60  }
61  std::this_thread::sleep_for(std::chrono::milliseconds(100));
62  }
63 #endif
64  // note: worker destructor joins the thread
65  delete t;
66  }
67  workerThreads_.clear();
68 }
69 
70 void ThreadPool::pushWork(const std::shared_ptr<ThreadPool::Runner> &goal, ExceptionHandler exceptionHandler) {
71  {
72  std::lock_guard<std::mutex> scoped_lock(workMutex_);
73  goal->setExceptionHandler(std::move(exceptionHandler));
74  workQueue_.push(goal);
75 
76  uint32_t numAliveThreads = workerThreads_.size() - numFinishedThreads_;
77  uint32_t numAvailableThreads = numAliveThreads - numActiveWorker_;
78  if (numAvailableThreads == 0) {
79  // add another thread if max num not reached yet
80  if (workerThreads_.size() < (maxNumThreads_ + numFinishedThreads_)) {
81  workerThreads_.push_back(new Worker(this));
82  }
83  }
84  }
85  // wake up a worker if any is sleeping
86  workCV_.notify_one();
87 }
88 
89 std::shared_ptr<ThreadPool::Runner> ThreadPool::popWork() {
90  std::lock_guard<std::mutex> scoped_lock(workMutex_);
91  if (workQueue_.empty()) {
92  return {};
93  } else {
94  std::shared_ptr<ThreadPool::Runner> x = workQueue_.front();
95  workQueue_.pop();
96  return x;
97  }
98 }
99 
100 
102  : threadPool_(threadPool),
103  isTerminated_(false),
104  hasTerminateRequest_(false),
105  thread_(&Worker::run, this) {
106 }
107 
109  hasTerminateRequest_ = true;
110  if (thread_.joinable()) thread_.join();
111 }
112 
114  KB_DEBUG("Worker started.");
115  // let the pool do some thread specific initialization
116  if (!threadPool_->initializeWorker()) {
117  KB_ERROR("Worker initialization failed.");
118  isTerminated_ = true;
119  threadPool_->numFinishedThreads_ += 1;
120  return;
121  }
122  threadPool_->numActiveWorker_ += 1;
123 
124  // loop until the application exits
125  while (!hasTerminateRequest_) {
126  // wait for a claim
127  {
128  std::unique_lock<std::mutex> lk(threadPool_->workMutex_);
129  threadPool_->numActiveWorker_ -= 1;
130  if (!hasTerminateRequest_) {
131  threadPool_->workCV_.wait(lk, [this] {
132  return hasTerminateRequest_ || !threadPool_->workQueue_.empty();
133  });
134  }
135  threadPool_->numActiveWorker_ += 1;
136  }
137  if (hasTerminateRequest_) {
138  break;
139  }
140 
141  // pop work from queue
142  auto goal = threadPool_->popWork();
143  // do the work
144  if (goal) {
145  goal->runInternal();
146  }
147  }
148 
149  isTerminated_ = true;
150  // tell the thread pool that a worker thread exited
151  if (threadPool_->finalizeWorker_) {
152  threadPool_->finalizeWorker_();
153  }
154  // note: counter indicates that there are finished threads in workerThreads_ list.
155  threadPool_->numFinishedThreads_ += 1;
156  threadPool_->numActiveWorker_ -= 1;
157  KB_DEBUG("Worker terminated.");
158 }
159 
160 
162  : isTerminated_(false),
163  isRunning_(false),
164  hasStopRequest_(false),
165  exceptionHandler_(nullptr) {}
166 
168  if (isRunning_) stop(true);
169 }
170 
172  if (!isTerminated()) {
173  std::unique_lock<std::mutex> lk(mutex_);
174  finishedCV_.wait(lk, [this] { return isTerminated(); });
175  }
176 }
177 
179  isRunning_ = true;
180  isTerminated_ = false;
181  hasStopRequest_ = false;
182  // do the work
183  try {
184  run();
185  }
186  catch (const boost::python::error_already_set &) {
187  PythonError py_error;
188  if (exceptionHandler_) {
189  exceptionHandler_(py_error);
190  } else {
191  KB_WARN("Worker error in Python code: {}.", py_error.what());
192  }
193  }
194  catch (const std::exception &e) {
195  if (exceptionHandler_) {
196  exceptionHandler_(e);
197  } else {
198  KB_WARN("Worker error: {}.", e.what());
199  }
200  }
201  catch (abi::__forced_unwind const &) {
202  // this is a forced unwind, rethrow. this happens when the thread is cancelled.
203  KB_WARN("Worker forced unwind.");
204  throw;
205  }
206  catch (...) {
207  KB_WARN("Unknown worker error.");
208  }
209  // toggle flag
210  isTerminated_ = true;
211  isRunning_ = false;
212  finishedCV_.notify_all();
213 }
214 
215 void ThreadPool::Runner::stop(bool wait) {
216  // toggle stop request flag on
217  hasStopRequest_ = true;
218  // wait for the runner to be finished if requested
219  if (wait) {
220  join();
221  }
222 }
#define MAX_WORKER_TERMINATE_TIME_MS
Definition: ThreadPool.cpp:13
#define KB_DEBUG
Definition: Logger.h:25
#define KB_ERROR
Definition: Logger.h:28
#define KB_WARN
Definition: Logger.h:27
Worker(ThreadPool *thread_pool)
Definition: ThreadPool.cpp:101
std::function< void(const std::exception &)> ExceptionHandler
Definition: ThreadPool.h:32
ThreadPool(uint32_t maxNumThreads)
Definition: ThreadPool.cpp:26
virtual ~ThreadPool()
Definition: ThreadPool.cpp:37
void pushWork(const std::shared_ptr< ThreadPool::Runner > &goal, ThreadPool::ExceptionHandler exceptionHandler)
Definition: ThreadPool.cpp:70
TimePoint now()
Definition: TimePoint.cpp:12
std::shared_ptr< ThreadPool > DefaultThreadPool()
Definition: ThreadPool.cpp:19
int run(int argc, char **argv)
Definition: terminal.cpp:608