knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
ThreadPool.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2022, Daniel Beßler
3  * All rights reserved.
4  *
5  * This file is part of KnowRob, please consult
6  * https://github.com/knowrob/knowrob for license details.
7  */
8 
9 #ifndef KNOWROB_THREAD_POOL_H_
10 #define KNOWROB_THREAD_POOL_H_
11 
12 #include <queue>
13 #include <mutex>
14 #include <string>
15 #include <list>
16 #include <functional>
17 #include <condition_variable>
18 #include <atomic>
19 #include <iostream>
20 #include <thread>
21 
22 namespace knowrob {
27  class ThreadPool {
28  public:
29  // forward declarations
30  class Runner;
31 
32  using ExceptionHandler = std::function<void(const std::exception &)>;
33 
37  explicit ThreadPool(uint32_t maxNumThreads);
38 
39  virtual ~ThreadPool();
40 
44  ThreadPool(const ThreadPool &) = delete;
45 
52  void shutdown();
53 
60  void pushWork(const std::shared_ptr<ThreadPool::Runner> &goal, ThreadPool::ExceptionHandler exceptionHandler);
61 
65  class Worker {
66  public:
67  explicit Worker(ThreadPool *thread_pool);
68 
70 
74  Worker(const Worker &) = delete;
75 
76  protected:
78 
79  std::atomic<bool> isTerminated_;
80  std::atomic<bool> hasTerminateRequest_;
81 
82  std::thread thread_;
83 
84  void run();
85 
86  friend class ThreadPool;
87  };
88 
93  class Runner {
94  public:
95  Runner();
96 
97  virtual ~Runner();
98 
102  Runner(const Runner &) = delete;
103 
107  void join();
108 
112  virtual void run() = 0;
113 
118  void stop(bool wait);
119 
123  [[nodiscard]] bool hasStopRequest() const { return hasStopRequest_; }
124 
128  [[nodiscard]] bool isTerminated() const { return isTerminated_; }
129 
130  protected:
131  std::atomic<bool> isTerminated_;
132  std::atomic<bool> isRunning_;
133  std::atomic<bool> hasStopRequest_;
134  std::mutex mutex_;
135  std::condition_variable finishedCV_;
137 
138  void runInternal();
139 
140  void setExceptionHandler(ExceptionHandler exceptionHandler) { exceptionHandler_ = exceptionHandler; }
141 
142  friend class ThreadPool::Worker;
143 
144  friend class ThreadPool;
145  };
146 
150  class LambdaRunner : public Runner {
151  public:
152  using StopChecker = std::function<bool()>;
153 
157  explicit LambdaRunner(const std::function<void(const StopChecker&)> &fn) : fn_(fn) {}
158 
159  void run() override { fn_([&]{ return hasStopRequest(); }); }
160 
161  protected:
162  std::function<void(const StopChecker&)> fn_;
163  };
164 
165  protected:
166  // is called to finalize each worker thread
167  // Note: a virtual method is avoided as these cannot be called in destructors.
169 
170  private:
171  // list of threads doing work
172  std::list<Worker *> workerThreads_;
173  // currently queued work that has not been associated to a worker yet
174  std::queue<std::shared_ptr<ThreadPool::Runner>> workQueue_;
175  // condition variable used to wake up worker after new work was queued
176  std::condition_variable workCV_;
177  mutable std::mutex workMutex_;
178  // limit to this number of worker threads
179  uint32_t maxNumThreads_;
180  // number of terminated threads that are still in workerThreads_ list
181  std::atomic_uint32_t numFinishedThreads_;
182  // number of currently active workers
183  std::atomic_uint32_t numActiveWorker_;
184 
185  // get work from queue
186  std::shared_ptr<ThreadPool::Runner> popWork();
187 
188  // is called initially in each worker thread
189  virtual bool initializeWorker() { return true; }
190  };
191 
195  std::shared_ptr<ThreadPool> DefaultThreadPool();
196 }
197 
198 #endif //KNOWROB_THREAD_POOL_H_
std::function< bool()> StopChecker
Definition: ThreadPool.h:152
LambdaRunner(const std::function< void(const StopChecker &)> &fn)
Definition: ThreadPool.h:157
std::function< void(const StopChecker &)> fn_
Definition: ThreadPool.h:162
std::atomic< bool > hasStopRequest_
Definition: ThreadPool.h:133
void setExceptionHandler(ExceptionHandler exceptionHandler)
Definition: ThreadPool.h:140
std::atomic< bool > isRunning_
Definition: ThreadPool.h:132
std::condition_variable finishedCV_
Definition: ThreadPool.h:135
std::atomic< bool > isTerminated_
Definition: ThreadPool.h:131
ExceptionHandler exceptionHandler_
Definition: ThreadPool.h:136
Runner(const Runner &)=delete
Worker(ThreadPool *thread_pool)
std::atomic< bool > hasTerminateRequest_
Definition: ThreadPool.h:80
std::atomic< bool > isTerminated_
Definition: ThreadPool.h:79
Worker(const Worker &)=delete
void pushWork(const std::shared_ptr< ThreadPool::Runner > &goal, ThreadPool::ExceptionHandler exceptionHandler)
std::function< void(const std::exception &)> ExceptionHandler
Definition: ThreadPool.h:32
std::function< void()> finalizeWorker_
Definition: ThreadPool.h:168
ThreadPool(const ThreadPool &)=delete
ThreadPool(uint32_t maxNumThreads)
FunctionRule & function()
Definition: terms.cpp:140
std::shared_ptr< ThreadPool > DefaultThreadPool()
Definition: ThreadPool.cpp:19