knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
ObserverManager.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of KnowRob, please consult
3  * https://github.com/knowrob/knowrob for license details.
4  */
5 
6 #include <thread>
7 #include <queue>
8 #include <mutex>
9 #include <atomic>
10 #include "knowrob/storage/ObserverManager.h"
11 #include "knowrob/storage/ObserverJob.h"
12 
13 using namespace knowrob;
14 
15 struct ObserverManager::Impl {
16  Impl() = default;
17 
18  ~Impl() = default;
19 
20  enum class Mode {
21  INSERT,
22  REMOVE
23  };
24 
25  std::thread thread_;
26 
27  std::condition_variable syncCondition_;
28  std::mutex syncMutex_;
29 
30  std::queue<std::pair<Mode,TripleContainerPtr>> queue_;
31  std::mutex queueMutex_;
32  std::condition_variable queueCondition_;
33 
34  std::vector<std::shared_ptr<ObserverJob>> jobs_;
35  std::mutex jobMutex_;
36 
37  std::atomic<bool> running_{true};
38 };
39 
41  : impl_(std::make_unique<Impl>()), backend_(backend) {
42  impl_->thread_ = std::thread(&ObserverManager::run, this);
43 }
44 
46  stop();
47  if (impl_) {
48  if (impl_->thread_.joinable()) {
49  impl_->thread_.join();
50  }
51  impl_ = nullptr;
52  }
53 }
54 
56  {
57  std::lock_guard<std::mutex> lock(impl_->queueMutex_);
58  while (!impl_->queue_.empty()) {
59  impl_->queue_.pop();
60  }
61  }
62  {
63  std::lock_guard<std::mutex> lock(impl_->jobMutex_);
64  impl_->jobs_.clear();
65  }
66  impl_->running_ = false;
67  backend_ = nullptr;
68  impl_->queueCondition_.notify_one();
69 }
70 
71 void ObserverManager::query(const GraphQueryPtr &query, const BindingsHandler &callback) {
72  backend_->query(query, callback);
73 }
74 
76  auto job = std::make_shared<ObserverJob>(shared_from_this(), query, callback);
77  {
78  std::lock_guard<std::mutex> lock(impl_->jobMutex_);
79  impl_->jobs_.push_back(job);
80  }
81  return std::make_shared<Observer>(job);
82 }
83 
85  {
86  std::lock_guard<std::mutex> lock(impl_->queueMutex_);
87  if (impl_->queue_.empty()) return;
88  }
89  {
90  std::unique_lock<std::mutex> lock(impl_->syncMutex_);
91  impl_->syncCondition_.wait(lock, [this] { return impl_->queue_.empty(); });
92  }
93 }
94 
96  std::lock_guard<std::mutex> lock(impl_->jobMutex_);
97  for (auto it = impl_->jobs_.begin(); it != impl_->jobs_.end(); ++it) {
98  auto &job = *it;
99  if (job.get() == observer.job().get()) {
100  impl_->jobs_.erase(it);
101  break;
102  }
103  }
104 }
105 
107  {
108  std::lock_guard<std::mutex> lock(impl_->queueMutex_);
109  impl_->queue_.push({Impl::Mode::INSERT, triples});
110  }
111  impl_->queueCondition_.notify_one();
112 }
113 
115  {
116  std::lock_guard<std::mutex> lock(impl_->queueMutex_);
117  impl_->queue_.push({Impl::Mode::REMOVE, triples});
118  }
119  impl_->queueCondition_.notify_one();
120 }
121 
123  while (impl_->running_) {
124  std::pair<Impl::Mode,TripleContainerPtr> next;
125  {
126  std::unique_lock<std::mutex> lock(impl_->queueMutex_);
127  if(impl_->running_) {
128  impl_->queueCondition_.wait(lock, [this] { return !impl_->running_ || !impl_->queue_.empty(); });
129  }
130  if (!impl_->running_) {
131  break;
132  }
133  next = impl_->queue_.front();
134  }
135  {
136  std::lock_guard<std::mutex> lock(impl_->jobMutex_);
137  if (next.first == Impl::Mode::INSERT) {
138  for (auto &job: impl_->jobs_) {
139  job->processInsertion(next.second);
140  }
141  } else {
142  for (auto &job: impl_->jobs_) {
143  job->processRemoval(next.second);
144  }
145  }
146  }
147  {
148  std::unique_lock<std::mutex> lock(impl_->queueMutex_);
149  impl_->queue_.pop();
150  if (impl_->queue_.empty()) {
151  impl_->syncCondition_.notify_all();
152  }
153  }
154  }
155 }
auto & job() const
Definition: Observer.h:36
ObserverPtr observe(const GraphQueryPtr &query, const BindingsHandler &callback)
void insert(const TripleContainerPtr &triples)
void stopObservation(const Observer &observer)
std::unique_ptr< Impl > impl_
QueryableBackendPtr backend_
void query(const GraphQueryPtr &query, const BindingsHandler &callback)
ObserverManager(const QueryableBackendPtr &backend)
void remove(const TripleContainerPtr &triples)
std::shared_ptr< TripleContainer > TripleContainerPtr
std::shared_ptr< Observer > ObserverPtr
Definition: Observer.h:42
std::function< void(const BindingsPtr &)> BindingsHandler
Definition: Bindings.h:152
std::shared_ptr< GraphQuery > GraphQueryPtr
Definition: GraphQuery.h:65
std::shared_ptr< QueryableStorage > QueryableBackendPtr