knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
DataDrivenReasoner.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of KnowRob, please consult
3  * https://github.com/knowrob/knowrob for license details.
4  */
5 
6 #include "knowrob/KnowledgeBase.h"
7 #include "knowrob/reasoner/DataDrivenReasoner.h"
8 #include "knowrob/reasoner/ReasonerManager.h"
9 #include "knowrob/integration/python/utils.h"
10 #include "knowrob/integration/python/gil.h"
11 #include "knowrob/queries/QueryParser.h"
12 
13 using namespace knowrob;
14 
16  using StopChecker = ThreadPool::LambdaRunner::StopChecker;
17  using Runner = ThreadPool::LambdaRunner;
18  updateRunner_ = std::make_shared<Runner>([this](const StopChecker &) { doUpdate(); });
19 }
20 
22  features_ = features_ | static_cast<uint32_t>(feature);
23 }
24 
26  return (features_ & static_cast<uint32_t>(feature)) == static_cast<uint32_t>(feature);
27 }
28 
29 void DataDrivenReasoner::setUpdateInterval(double intervalInSeconds) {
30  updateInterval_ = std::chrono::duration<double>(intervalInSeconds);
31 }
32 
34  if (isUpdateQueued_) return;
35  isUpdateQueued_ = true;
36  DefaultThreadPool()->pushWork(
38  [](const std::exception &exc) {
39  KB_ERROR("Error in reasoner update: {}", exc.what());
40  });
41 }
42 
44  // do the update
46  // If the reasoner uses Python code, then we must make sure that the GIL is acquired in the current thread.
47  py::gil_lock acquire;
48  update();
49  } else {
50  update();
51  }
53  isUpdateQueued_ = false;
54  isInvalidated_ = false;
55 
57  // if the reasoner does not update itself and does not invalidate itself, then
58  // periodically update the reasoner.
59  auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now - lastUpdate_);
60  if (duration > updateInterval_) {
61  // we cannot keep up with the update rate, so directly re-queue the update.
62  queueUpdate();
63  } else {
64  auto remaining = std::chrono::milliseconds(
65  static_cast<int>(updateInterval_.count()) - duration.count());
66  // Create a timer that triggers the update after the remaining time.
67  // It is not optimal to create a new thread for each update though,
68  // but it would be good to stick to the worker thread pool for performing the
69  // update. However, the worker thread pool does not support delayed execution yet.
70  // Note: this is done so to avoid calling sleep for the worker thread
71  // as it would block it from being used for other tasks.
72  std::thread timeout([&remaining, this]() {
73  std::this_thread::sleep_for(remaining);
74  queueUpdate();
75  });
76  timeout.detach();
77  }
78  }
79 
80  lastUpdate_ = now;
81 }
82 
84  // Only start if not already running
85  if (isRunning_) return;
86  isRunning_ = true;
87  // update the reasoner if it does not update itself
88  if (!hasFeature(UpdatesItself)) {
89  queueUpdate();
90  }
91 }
92 
94  if (!isRunning_) return;
95  isRunning_ = false;
96  // stop the periodic updating
97  if (!hasFeature(UpdatesItself)) {
98  if (updateRunner_ && !updateRunner_->isTerminated()) {
99  updateRunner_->stop(false);
100  }
101  }
102 }
103 
104 void DataDrivenReasoner::emit(const std::shared_ptr<reasoner::Event> &event) {
105  switch (event->eventType()) {
107  processAssertion(std::static_pointer_cast<reasoner::AssertionEvent>(event)->triples());
108  break;
110  processRetraction(std::static_pointer_cast<reasoner::RetractionEvent>(event)->triples());
111  break;
113  processReplacement(std::static_pointer_cast<reasoner::ReplacementEvent>(event)->triples());
114  break;
117  break;
118  }
119 }
120 
121 void DataDrivenReasoner::observe(const std::shared_ptr<GraphQuery> &query, const BindingsHandler &handler) {
122  reasonerManager().kb()->observe(query, handler);
123 }
124 
125 void DataDrivenReasoner::observe(const std::string &queryString, const BindingsHandler &handler) {
126  auto term = QueryParser::parseGraphTerm(queryString);
127  auto query = std::make_shared<GraphQuery>(term);
128  reasonerManager().kb()->observe(query, handler);
129 }
130 
133  KB_WARN("Reasoner has no feature to invalidate itself, but still generated an invalidation event. Ignoring.");
134  return;
135  }
136  if (!isInvalidated_) {
137  isInvalidated_ = true;
138  queueUpdate();
139  }
140 }
141 
142 void DataDrivenReasoner::setReasonerOrigin(const std::vector<TriplePtr> &triples) {
143  for (auto &triple: triples) {
144  triple.ptr->setGraph(reasonerName()->stringForm());
145  }
146 }
147 
148 void DataDrivenReasoner::processAssertion(const std::vector<TriplePtr> &triples) {
149  setReasonerOrigin(triples);
150  reasonerManager().kb()->insertAll(triples);
151 }
152 
153 void DataDrivenReasoner::processRetraction(const std::vector<TriplePtr> &triples) {
154  setReasonerOrigin(triples);
155  reasonerManager().kb()->removeAll(triples);
156 }
157 
158 void DataDrivenReasoner::processReplacement(const std::vector<TriplePtr> &triples) {
159  setReasonerOrigin(triples);
160 
161  if (inferredTriples_.empty()) {
162  inferredTriples_.insert(triples.begin(), triples.end());
163  reasonerManager().kb()->insertAll(triples);
164  } else {
165  auto &oldTriples = inferredTriples_;
166  // ensure that input triples are sorted which is required for set_difference
167  std::set<TriplePtr> newTriples(triples.begin(), triples.end());
168 
169  std::vector<TriplePtr> triplesToRemove, triplesToAdd;
170  // old inferences without new inferences are the ones that do not hold anymore.
171  std::set_difference(oldTriples.begin(), oldTriples.end(),
172  newTriples.begin(), newTriples.end(),
173  std::inserter(triplesToRemove, triplesToRemove.begin()));
174  // new inferences without old inferences are the ones that are really new.
175  std::set_difference(newTriples.begin(), newTriples.end(),
176  oldTriples.begin(), oldTriples.end(),
177  std::inserter(triplesToAdd, triplesToAdd.begin()));
178  // update the set of inferred triples.
179  for (auto &triple: triplesToRemove) {
180  inferredTriples_.erase(triple);
181  }
182  inferredTriples_.insert(triplesToAdd.begin(), triplesToAdd.end());
183  // update the knowledge base
184  if (!triplesToAdd.empty()) {
185  reasonerManager().kb()->insertAll(triplesToAdd);
186  }
187  if (!triplesToRemove.empty()) {
188  reasonerManager().kb()->removeAll(triplesToRemove);
189  }
190  }
191 }
192 
193 namespace knowrob::py {
194  // this struct is needed because Reasoner has pure virtual methods
195  struct DataDrivenReasonerWrap : public DataDrivenReasoner, boost::python::wrapper<DataDrivenReasoner> {
196  explicit DataDrivenReasonerWrap(PyObject *p) : DataDrivenReasoner(), self(p) {}
197 
198  bool initializeReasoner(const PropertyTree &config) override {
199  return call_method<bool>(self, "initializeReasoner", config);
200  }
201 
202  void start() override {
203  // Note: In case there is an overwrite, we also want to make a call to the default implementation.
204  // However, in case there is no overwrite, below will call the default implementation twice.
205  // But the call is guarded by a check in the default implementation, so no problem.
206  start_default();
207  call_method<void>(self, "start");
208  }
209 
210  void start_default() { return this->DataDrivenReasoner::start(); }
211 
212  void stop() override {
213  // Note: In case there is an overwrite, we also want to make a call to the default implementation.
214  // However, in case there is no overwrite, below will call the default implementation twice.
215  // But the call is guarded by a check in the default implementation, so no problem.
216  call_method<void>(self, "stop");
217  stop_default();
218  }
219 
220  void stop_default() { return this->DataDrivenReasoner::stop(); }
221 
222  void update() override { call_method<void>(self, "update"); }
223 
224  private:
225  PyObject *self;
226  };
227 
228  template<>
230  using namespace boost::python;
231  enum_<DataDrivenReasoner::Feature>("DataDrivenReasonerFeature")
232  .value("NothingSpecial", DataDrivenReasoner::NothingSpecial)
233  .value("UpdatesItself", DataDrivenReasoner::UpdatesItself)
234  .value("InvalidatesItself", DataDrivenReasoner::InvalidatesItself)
235  .export_values();
236 
237  py::createType<reasoner::Event>();
238 
239  class_<DataDrivenReasoner, std::shared_ptr<DataDrivenReasonerWrap>, bases<Reasoner>, boost::noncopyable>
240  ("DataDrivenReasoner", init<>())
241  .def("enableFeature", &DataDrivenReasonerWrap::enableFeature)
242  .def("hasFeature", &DataDrivenReasonerWrap::hasFeature)
243  .def("emit", &DataDrivenReasonerWrap::emit)
244  .def("observe", +[](DataDrivenReasoner &x, const std::shared_ptr<GraphQuery> &query, object &handler) {
245  no_gil unlock;
246  x.observe(query, [handler](const BindingsPtr &bindings) {
247  py::gil_lock lock;
248  handler(bindings);
249  });
250  })
251  .def("observe", +[](DataDrivenReasoner &x, const std::string &queryString, object &handler) {
252  no_gil unlock;
253  x.observe(queryString, [handler](const BindingsPtr &bindings) {
254  py::gil_lock lock;
255  handler(bindings);
256  });
257  })
258  .def("setUpdateInterval", &DataDrivenReasonerWrap::setUpdateInterval)
259  .def("updateInterval", &DataDrivenReasonerWrap::updateInterval)
260  // methods that must be implemented by reasoner plugins
261  .def("update", &DataDrivenReasonerWrap::update)
262  .def("start", &DataDrivenReasonerWrap::start, &DataDrivenReasonerWrap::start_default)
263  .def("stop", &DataDrivenReasonerWrap::stop, &DataDrivenReasonerWrap::stop_default);
264  }
265 }
#define KB_ERROR
Definition: Logger.h:28
#define KB_WARN
Definition: Logger.h:27
void processReplacement(const std::vector< TriplePtr > &triples)
void setReasonerOrigin(const std::vector< TriplePtr > &triples)
std::shared_ptr< ThreadPool::Runner > updateRunner_
void observe(const std::shared_ptr< GraphQuery > &query, const BindingsHandler &handler)
void processAssertion(const std::vector< TriplePtr > &triples)
void setUpdateInterval(double intervalInSeconds)
void enableFeature(Feature feature)
std::chrono::duration< double > updateInterval_
bool hasFeature(Feature feature) const
void processRetraction(const std::vector< TriplePtr > &triples)
std::set< TriplePtr > inferredTriples_
void emit(const std::shared_ptr< reasoner::Event > &event)
std::chrono::time_point< std::chrono::high_resolution_clock > lastUpdate_
static std::shared_ptr< GraphTerm > parseGraphTerm(const std::string &queryString)
Definition: QueryParser.cpp:43
ReasonerManager & reasonerManager() const
Definition: Reasoner.cpp:24
auto & reasonerName() const
Definition: Reasoner.h:37
auto reasonerLanguage() const
Definition: Reasoner.h:42
std::function< bool()> StopChecker
Definition: ThreadPool.h:152
TermRule & string()
Definition: terms.cpp:63
TermRule & term()
Definition: terms.cpp:136
void createType< DataDrivenReasoner >()
TimePoint now()
Definition: TimePoint.cpp:12
std::shared_ptr< const Bindings > BindingsPtr
Definition: Bindings.h:151
std::function< void(const BindingsPtr &)> BindingsHandler
Definition: Bindings.h:152
std::shared_ptr< ThreadPool > DefaultThreadPool()
Definition: ThreadPool.cpp:19