8 #include <knowrob/Logger.h>
9 #include <knowrob/ThreadPool.h>
10 #include <boost/python/errors.hpp>
11 #include "knowrob/integration/python/PythonError.h"
13 #define MAX_WORKER_TERMINATE_TIME_MS 2000
14 #define KNOWROB_THREADING_DETACH_ON_EXIT
21 std::make_shared<ThreadPool>(std::thread::hardware_concurrency());
27 : maxNumThreads_(maxNumThreads),
28 numFinishedThreads_(0),
34 KB_DEBUG(
"Maximum number of threads: {}.", maxNumThreads);
42 for (
Worker *t: workerThreads_) {
43 t->hasTerminateRequest_ =
true;
46 #ifdef KNOWROB_THREADING_DETACH_ON_EXIT
49 for (
Worker *t: workerThreads_) {
50 #ifdef KNOWROB_THREADING_DETACH_ON_EXIT
51 while (!t->isTerminated_) {
53 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
now - start).count();
55 KB_WARN(
"Worker thread does not seem to exit, it will be detached!");
61 std::this_thread::sleep_for(std::chrono::milliseconds(100));
67 workerThreads_.clear();
72 std::lock_guard<std::mutex> scoped_lock(workMutex_);
73 goal->setExceptionHandler(std::move(exceptionHandler));
74 workQueue_.push(goal);
76 uint32_t numAliveThreads = workerThreads_.size() - numFinishedThreads_;
77 uint32_t numAvailableThreads = numAliveThreads - numActiveWorker_;
78 if (numAvailableThreads == 0) {
80 if (workerThreads_.size() < (maxNumThreads_ + numFinishedThreads_)) {
81 workerThreads_.push_back(
new Worker(
this));
89 std::shared_ptr<ThreadPool::Runner> ThreadPool::popWork() {
90 std::lock_guard<std::mutex> scoped_lock(workMutex_);
91 if (workQueue_.empty()) {
94 std::shared_ptr<ThreadPool::Runner> x = workQueue_.front();
102 : threadPool_(threadPool),
103 isTerminated_(false),
104 hasTerminateRequest_(false),
109 hasTerminateRequest_ =
true;
110 if (thread_.joinable()) thread_.join();
116 if (!threadPool_->initializeWorker()) {
117 KB_ERROR(
"Worker initialization failed.");
118 isTerminated_ =
true;
119 threadPool_->numFinishedThreads_ += 1;
122 threadPool_->numActiveWorker_ += 1;
125 while (!hasTerminateRequest_) {
128 std::unique_lock<std::mutex> lk(threadPool_->workMutex_);
129 threadPool_->numActiveWorker_ -= 1;
130 if (!hasTerminateRequest_) {
131 threadPool_->workCV_.wait(lk, [
this] {
132 return hasTerminateRequest_ || !threadPool_->workQueue_.empty();
135 threadPool_->numActiveWorker_ += 1;
137 if (hasTerminateRequest_) {
142 auto goal = threadPool_->popWork();
149 isTerminated_ =
true;
151 if (threadPool_->finalizeWorker_) {
152 threadPool_->finalizeWorker_();
155 threadPool_->numFinishedThreads_ += 1;
156 threadPool_->numActiveWorker_ -= 1;
162 : isTerminated_(false),
164 hasStopRequest_(false),
165 exceptionHandler_(nullptr) {}
168 if (isRunning_) stop(
true);
172 if (!isTerminated()) {
173 std::unique_lock<std::mutex> lk(mutex_);
174 finishedCV_.wait(lk, [
this] {
return isTerminated(); });
180 isTerminated_ =
false;
181 hasStopRequest_ =
false;
186 catch (
const boost::python::error_already_set &) {
188 if (exceptionHandler_) {
189 exceptionHandler_(py_error);
191 KB_WARN(
"Worker error in Python code: {}.", py_error.what());
194 catch (
const std::exception &e) {
195 if (exceptionHandler_) {
196 exceptionHandler_(e);
198 KB_WARN(
"Worker error: {}.", e.what());
201 catch (abi::__forced_unwind
const &) {
203 KB_WARN(
"Worker forced unwind.");
207 KB_WARN(
"Unknown worker error.");
210 isTerminated_ =
true;
212 finishedCV_.notify_all();
217 hasStopRequest_ =
true;
#define MAX_WORKER_TERMINATE_TIME_MS
Worker(ThreadPool *thread_pool)
std::function< void(const std::exception &)> ExceptionHandler
ThreadPool(uint32_t maxNumThreads)
void pushWork(const std::shared_ptr< ThreadPool::Runner > &goal, ThreadPool::ExceptionHandler exceptionHandler)
std::shared_ptr< ThreadPool > DefaultThreadPool()
int run(int argc, char **argv)