knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
ObserverJob.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of KnowRob, please consult
3  * https://github.com/knowrob/knowrob for license details.
4  */
5 
6 #include "knowrob/storage/ObserverJob.h"
7 
8 using namespace knowrob;
9 
10 ObserverJob::ObserverJob(const std::shared_ptr<ObserverManager> &manager,
11  const GraphQueryPtr &query,
12  const BindingsHandler &callback)
13  : manager_(manager), query_(query), callback_(callback) {
14  // construct the observation graph given the query
15  terminalNodes_ = createGraph(query->term(), {});
16  // initialize the tables for the query by running a DB query
17  // for each node in the observation graph.
18  for (const auto &node: nodes_) {
19  initializeNode(node);
20  }
21 }
22 
24  stop();
25 }
26 
28  nodes_.clear();
29  terminalNodes_.clear();
30  manager_ = nullptr;
31 }
32 
33 std::shared_ptr<ObserverJob::Node> ObserverJob::createNode(const std::shared_ptr<GraphPattern> &pattern) {
34  auto node = std::make_shared<Node>();
35  node->pattern = pattern;
36  nodes_.push_back(node);
37  return node;
38 }
39 
41 ObserverJob::createGraph(const std::shared_ptr<GraphTerm> &term, const NodeParents &parents) {
42  switch (term->termType()) {
44  auto node = createNode(std::static_pointer_cast<GraphPattern>(term));
45  for (const auto &parent: parents) {
46  node->parents.push_back(parent.get());
47  parent->children.push_back(node);
48  }
49  return {node};
50  }
52  if (!parents.empty()) {
53  for (const auto &parent: parents) {
54  auto builtin = std::static_pointer_cast<GraphBuiltin>(term);
55  parent->builtins.push_back(builtin);
56  }
57  } else {
58  KB_WARN("Builtins are not allowed at the top level of an observed query.");
59  }
60  return parents;
61  }
63  auto sequence = std::static_pointer_cast<GraphSequence>(term);
64  ObserverJob::NodeParents nextParents = parents;
65  for (const auto &subTerm: sequence->terms()) {
66  nextParents = createGraph(subTerm, nextParents);
67  }
68  return nextParents;
69  }
70  case GraphTermType::Union: {
71  auto unionTerm = std::static_pointer_cast<GraphUnion>(term);
72  ObserverJob::NodeParents nextParents;
73  for (const auto &subTerm: unionTerm->terms()) {
74  auto subParents = createGraph(subTerm, parents);
75  nextParents.insert(nextParents.end(), subParents.begin(), subParents.end());
76  }
77  return nextParents;
78  }
79  }
80  return {};
81 }
82 
83 GraphQueryPtr ObserverJob::makeQuery(const std::vector<Node *> &reverseSequence) {
84  std::vector<std::shared_ptr<GraphTerm>> terms;
85  for (auto it = reverseSequence.rbegin(); it != reverseSequence.rend(); ++it) {
86  terms.push_back((*it)->pattern);
87  for (const auto &builtin: (*it)->builtins) {
88  terms.push_back(builtin);
89  }
90  }
91  if (terms.size() == 1) {
92  return std::make_shared<GraphQuery>(terms[0]);
93  } else {
94  return std::make_shared<GraphQuery>(std::make_shared<GraphSequence>(terms));
95  }
96 }
97 
98 GraphQueryPtr ObserverJob::makeAtomicQuery(const std::shared_ptr<Node> &node, const BindingsPtr &bindings) {
99  std::vector<std::shared_ptr<GraphTerm>> terms;
100  terms.push_back(applyBindings(node->pattern, *bindings));
101  for (const auto &builtin: node->builtins) {
102  std::shared_ptr<GraphTerm> builtinTerm = builtin;
103  terms.push_back(applyBindings(builtinTerm, *bindings));
104  }
105  if (terms.size() == 1) {
106  return std::make_shared<GraphQuery>(terms[0]);
107  } else {
108  return std::make_shared<GraphQuery>(std::make_shared<GraphSequence>(terms));
109  }
110 }
111 
112 void ObserverJob::initializeNode(const std::shared_ptr<Node> &nodeToInit) {
113  std::vector<GraphQueryPtr> queries;
114  std::queue<std::pair<Node *, std::vector<Node *>>> queryQueue;
115  queryQueue.push({nodeToInit.get(), { nodeToInit.get() }});
116 
117  // build queries from the node and its parents
118  while (!queryQueue.empty()) {
119  auto [node, seq] = queryQueue.front();
120  queryQueue.pop();
121 
122  if (node->parents.empty()) {
123  queries.push_back(makeQuery(seq));
124  } else {
125  for (const auto &parent: node->parents) {
126  std::vector<Node *> newSeq = seq;
127  newSeq.push_back(parent);
128  queryQueue.push({parent, newSeq});
129  }
130  }
131  }
132 
133  // run queries against the database
134  for (const auto &nodeQuery: queries) {
135  manager_->query(nodeQuery, [this, nodeToInit](const BindingsPtr &bindings) {
136  initializeNode(nodeToInit, bindings);
137  });
138  }
139 }
140 
141 void ObserverJob::initializeNode(const std::shared_ptr<Node> &node, const BindingsPtr &answer) {
142  auto answerHash = answer->hash();
143  auto it = node->solutions.find(answerHash);
144  if (it == node->solutions.end()) {
145  // store the answer in the node
146  node->solutions[answerHash] = answer;
147  // check if the node is a terminal node
148  if (node->children.empty()) {
149  // if so, call the callback
150  callback_(answer);
151  }
152  }
153 }
154 
155 bool ObserverJob::matches(const Node &node, const Triple &triple) {
156  auto &pat = node.pattern->value();
157 
158  auto &s_pat = pat->subjectTerm();
159  if (s_pat->isAtomic()) {
160  auto s_pat_atom = std::static_pointer_cast<Atomic>(s_pat);
161  if (s_pat_atom->stringForm() != triple.subject()) return false;
162  } else if (!s_pat->isVariable()) {
163  return false;
164  }
165 
166  auto &p_pat = pat->propertyTerm();
167  if (p_pat->isAtomic()) {
168  auto p_pat_atom = std::static_pointer_cast<Atomic>(p_pat);
169  if (p_pat_atom->stringForm() != triple.predicate()) return false;
170  } else if (!p_pat->isVariable()) {
171  return false;
172  }
173 
174  auto &o_pat = pat->objectTerm();
175  if (o_pat->isAtomic()) {
176  auto o_pat_atom = std::static_pointer_cast<Atomic>(o_pat);
177 
178  if (triple.isObjectIRI() || triple.isObjectBlank()) {
179  auto o_triple = triple.valueAsString();
180  if (o_pat_atom->stringForm() != o_triple) return false;
181  } else {
182  auto o_triple = Atomic::makeTripleValue(triple);
183  if (*o_pat_atom != *o_triple) return false;
184  }
185  } else if (!o_pat->isVariable()) {
186  return false;
187  }
188 
189  return true;
190 }
191 
193  for (auto &triplePtr: *triples) {
194  auto &triple = *triplePtr;
195  for (auto &node: nodes_) {
196  if (matches(*node, triple)) {
197  insert(node, triple);
198  }
199  }
200  }
201 }
202 
204  for (auto &triplePtr: *triples) {
205  auto &triple = *triplePtr;
206  for (auto &node: nodes_) {
207  if (matches(*node, triple)) {
208  remove(node, triple);
209  }
210  }
211  }
212 }
213 
214 BindingsPtr ObserverJob::applyBuiltins(const std::shared_ptr<Node> &node, const BindingsPtr &bindings) {
215  if (node->builtins.empty()) {
216  return bindings;
217  } else {
218  auto bindings_rw = std::make_shared<Bindings>(*bindings);
219  for (const auto &builtin: node->builtins) {
220  builtin->apply(bindings_rw);
221  }
222  return bindings_rw;
223  }
224 }
225 
226 void ObserverJob::remove(const std::shared_ptr<Node> &node, const Triple &triple) {
227  auto nodePattern = node->pattern->value();
228 
229  // compute bindings for the triple
230  auto tripleBindings = std::make_shared<Bindings>();
231  if (nodePattern->subjectTerm()->isVariable()) {
232  auto var = std::static_pointer_cast<Variable>(nodePattern->subjectTerm());
233  tripleBindings->set(var, std::make_shared<IRIAtom>(triple.subject()));
234  }
235  if (nodePattern->propertyTerm()->isVariable()) {
236  auto var = std::static_pointer_cast<Variable>(nodePattern->propertyTerm());
237  tripleBindings->set(var, std::make_shared<IRIAtom>(triple.predicate()));
238  }
239  if (nodePattern->objectTerm()->isVariable()) {
240  auto var = std::static_pointer_cast<Variable>(nodePattern->objectTerm());
241  auto o_triple = Atomic::makeTripleValue(triple);
242  tripleBindings->set(var, o_triple);
243  }
244 
245  // remove all solutions of this node or children nodes
246  // that are consistent with the removed triple
247  std::queue<Node*> nodeQueue;
248  nodeQueue.push(node.get());
249  while (!nodeQueue.empty()) {
250  auto nextNode = nodeQueue.front();
251  nodeQueue.pop();
252  for (auto &solPair: nextNode->solutions) {
253  auto &solution = solPair.second;
254  if (solution->isConsistentWith(*tripleBindings)) {
255  nextNode->solutions.erase(solPair.first);
256  }
257  }
258  for (auto &childNode: nextNode->children) {
259  nodeQueue.push(childNode.get());
260  }
261  }
262 }
263 
264 void ObserverJob::insert(const std::shared_ptr<Node> &node, const Triple &triple) {
265  auto nodePattern = node->pattern->value();
266 
267  // compute bindings for the triple
268  auto tripleBindings = std::make_shared<Bindings>();
269  if (nodePattern->subjectTerm()->isVariable()) {
270  auto var = std::static_pointer_cast<Variable>(nodePattern->subjectTerm());
271  tripleBindings->set(var, std::make_shared<IRIAtom>(triple.subject()));
272  }
273  if (nodePattern->propertyTerm()->isVariable()) {
274  auto var = std::static_pointer_cast<Variable>(nodePattern->propertyTerm());
275  tripleBindings->set(var, std::make_shared<IRIAtom>(triple.predicate()));
276  }
277  if (nodePattern->objectTerm()->isVariable()) {
278  auto var = std::static_pointer_cast<Variable>(nodePattern->objectTerm());
279  auto o_triple = Atomic::makeTripleValue(triple);
280  tripleBindings->set(var, o_triple);
281  }
282 
283  //insert(node, tripleBindings);
284  if (node->parents.empty()) {
285  // the node is a root node
286  doInsert(node, tripleBindings);
287  }
288  else {
289  for (auto &parentNode: node->parents) {
290  for (auto &parentSolPair: parentNode->solutions) {
291  auto &parentSolution = parentSolPair.second;
292  if (parentSolution->isConsistentWith(*tripleBindings)) {
293  doInsert(node, parentSolution, tripleBindings);
294  }
295  }
296  }
297  }
298 }
299 
300 void ObserverJob::doInsert(const std::shared_ptr<Node> &node,
301  const BindingsPtr &parentBindings,
302  const BindingsPtr &nodeBindings) {
303  auto newBindings_rw = std::make_shared<Bindings>(*parentBindings);
304  newBindings_rw->operator+=(*nodeBindings);
305  auto newBindings = applyBuiltins(node, newBindings_rw);
306  doInsert(node, newBindings);
307 }
308 
309 void ObserverJob::doInsert(const std::shared_ptr<Node> &node, const BindingsPtr &newBindings) {
310  auto solHash = newBindings->hash();
311 
312  if(node->solutions.find(solHash) != node->solutions.end()) {
313  // the solution is already in the node
314  return;
315  }
316  node->solutions[solHash] = newBindings;
317 
318  if (node->children.empty()) {
319  // if the node is terminal, then call the callback with the new bindings.
320  callback_(newBindings);
321  } else {
322  // if the node is not terminal, then a graph query must be constructed for each child node
323  // with the new bindings applied. If there are new solutions, then call insert on the child node
324  // with the new solutions to propagate the new data through the graph.
325 
326  for (auto &childNode: node->children) {
327  auto newQuery = makeAtomicQuery(childNode, newBindings);
328  manager_->query(newQuery, [this, childNode, newBindings](const BindingsPtr &bindings) {
329  doInsert(childNode, newBindings, bindings);
330  });
331  }
332  }
333 }
334 
#define KB_WARN
Definition: Logger.h:27
static std::shared_ptr< Atomic > makeTripleValue(const Triple &triple)
Definition: Atomic.cpp:31
ObserverJob(const std::shared_ptr< ObserverManager > &manager, const GraphQueryPtr &query, const BindingsHandler &callback)
Definition: ObserverJob.cpp:10
BindingsPtr applyBuiltins(const std::shared_ptr< Node > &node, const BindingsPtr &bindings)
void initializeNode(const std::shared_ptr< Node > &node)
std::vector< std::shared_ptr< Node > > nodes_
Definition: ObserverJob.h:67
void remove(const std::shared_ptr< Node > &node, const Triple &triple)
void processRemoval(const TripleContainerPtr &triples)
std::shared_ptr< ObserverJob::Node > createNode(const std::shared_ptr< GraphPattern > &pattern)
Definition: ObserverJob.cpp:33
NodeParents createGraph(const std::shared_ptr< GraphTerm > &term, const NodeParents &parents)
Definition: ObserverJob.cpp:41
bool matches(const Node &node, const Triple &triple)
BindingsHandler callback_
Definition: ObserverJob.h:58
void doInsert(const std::shared_ptr< Node > &node, const BindingsPtr &newBindings)
std::vector< std::shared_ptr< Node > > NodeParents
Definition: ObserverJob.h:69
void insert(const std::shared_ptr< Node > &node, const Triple &triple)
std::shared_ptr< ObserverManager > manager_
Definition: ObserverJob.h:56
void processInsertion(const TripleContainerPtr &triples)
GraphQueryPtr makeQuery(const std::vector< Node * > &reverseSequence)
Definition: ObserverJob.cpp:83
NodeParents terminalNodes_
Definition: ObserverJob.h:70
GraphQueryPtr makeAtomicQuery(const std::shared_ptr< Node > &node, const BindingsPtr &bindings)
Definition: ObserverJob.cpp:98
virtual std::string_view valueAsString() const =0
virtual std::string_view subject() const =0
virtual std::string_view predicate() const =0
bool isObjectIRI() const
Definition: Triple.h:54
bool isObjectBlank() const
Definition: Triple.h:49
GraphTermRule & pattern()
Definition: graph.cpp:23
VariableRule & var()
Definition: terms.cpp:91
TermRule & term()
Definition: terms.cpp:136
std::shared_ptr< TripleContainer > TripleContainerPtr
std::shared_ptr< const Bindings > BindingsPtr
Definition: Bindings.h:151
std::function< void(const BindingsPtr &)> BindingsHandler
Definition: Bindings.h:152
std::shared_ptr< GraphQuery > GraphQueryPtr
Definition: GraphQuery.h:65
FirstOrderLiteralPtr applyBindings(const FirstOrderLiteralPtr &lit, const Bindings &bindings)
std::shared_ptr< GraphPattern > pattern
Definition: ObserverJob.h:61