knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
knowrob::ObserverJob Class Reference

#include <ObserverJob.h>

Classes

struct  Node
 

Public Member Functions

 ObserverJob (const std::shared_ptr< ObserverManager > &manager, const GraphQueryPtr &query, const BindingsHandler &callback)
 
 ~ObserverJob ()
 
void stop ()
 
void processInsertion (const TripleContainerPtr &triples)
 
void processRemoval (const TripleContainerPtr &triples)
 
auto & manager () const
 
 ObserverJob (const std::shared_ptr< ObserverManager > &manager, const GraphQueryPtr &query, const BindingsHandler &callback)
 
 ~ObserverJob ()
 
void stop ()
 
void processInsertion (const TripleContainerPtr &triples)
 
void processRemoval (const TripleContainerPtr &triples)
 
auto & manager () const
 

Protected Types

using NodeParents = std::vector< std::shared_ptr< Node > >
 
using NodeParents = std::vector< std::shared_ptr< Node > >
 

Protected Member Functions

NodeParents createGraph (const std::shared_ptr< GraphTerm > &term, const NodeParents &parents)
 
std::shared_ptr< ObserverJob::NodecreateNode (const std::shared_ptr< GraphPattern > &pattern)
 
void initializeNode (const std::shared_ptr< Node > &node)
 
void initializeNode (const std::shared_ptr< Node > &node, const BindingsPtr &bindings)
 
GraphQueryPtr makeQuery (const std::vector< Node * > &reverseSequence)
 
GraphQueryPtr makeAtomicQuery (const std::shared_ptr< Node > &node, const BindingsPtr &bindings)
 
bool matches (const Node &node, const Triple &triple)
 
void remove (const std::shared_ptr< Node > &node, const Triple &triple)
 
void insert (const std::shared_ptr< Node > &node, const Triple &triple)
 
void doInsert (const std::shared_ptr< Node > &node, const BindingsPtr &newBindings)
 
void doInsert (const std::shared_ptr< Node > &node, const BindingsPtr &parentBindings, const BindingsPtr &nodeBindings)
 
BindingsPtr applyBuiltins (const std::shared_ptr< Node > &node, const BindingsPtr &bindings)
 
NodeParents createGraph (const std::shared_ptr< GraphTerm > &term, const NodeParents &parents)
 
std::shared_ptr< ObserverJob::NodecreateNode (const std::shared_ptr< GraphPattern > &pattern)
 
void initializeNode (const std::shared_ptr< Node > &node)
 
void initializeNode (const std::shared_ptr< Node > &node, const BindingsPtr &bindings)
 
GraphQueryPtr makeQuery (const std::vector< Node * > &reverseSequence)
 
GraphQueryPtr makeAtomicQuery (const std::shared_ptr< Node > &node, const BindingsPtr &bindings)
 
bool matches (const Node &node, const Triple &triple)
 
void remove (const std::shared_ptr< Node > &node, const Triple &triple)
 
void insert (const std::shared_ptr< Node > &node, const Triple &triple)
 
void doInsert (const std::shared_ptr< Node > &node, const BindingsPtr &newBindings)
 
void doInsert (const std::shared_ptr< Node > &node, const BindingsPtr &parentBindings, const BindingsPtr &nodeBindings)
 
BindingsPtr applyBuiltins (const std::shared_ptr< Node > &node, const BindingsPtr &bindings)
 

Protected Attributes

std::shared_ptr< ObserverManagermanager_
 
GraphQueryPtr query_
 
BindingsHandler callback_
 
std::vector< std::shared_ptr< Node > > nodes_
 
NodeParents terminalNodes_
 

Detailed Description

An observer job is a query that is observed by an observer manager.

Definition at line 19 of file ObserverJob.h.

Member Typedef Documentation

◆ NodeParents [1/2]

using knowrob::ObserverJob::NodeParents = std::vector<std::shared_ptr<Node> >
protected

Definition at line 69 of file ObserverJob.h.

◆ NodeParents [2/2]

using knowrob::ObserverJob::NodeParents = std::vector<std::shared_ptr<Node> >
protected

Definition at line 69 of file ObserverJob.h.

Constructor & Destructor Documentation

◆ ObserverJob() [1/2]

ObserverJob::ObserverJob ( const std::shared_ptr< ObserverManager > &  manager,
const GraphQueryPtr query,
const BindingsHandler callback 
)

Create an observer job.

Parameters
managerthe observer manager.
querythe query to observe.
callbackthe callback to invoke when the query matches.

Definition at line 10 of file ObserverJob.cpp.

13  : manager_(manager), query_(query), callback_(callback) {
14  // construct the observation graph given the query
15  terminalNodes_ = createGraph(query->term(), {});
16  // initialize the tables for the query by running a DB query
17  // for each node in the observation graph.
18  for (const auto &node: nodes_) {
19  initializeNode(node);
20  }
21 }
void initializeNode(const std::shared_ptr< Node > &node)
auto & manager() const
Definition: ObserverJob.h:53
std::vector< std::shared_ptr< Node > > nodes_
Definition: ObserverJob.h:67
NodeParents createGraph(const std::shared_ptr< GraphTerm > &term, const NodeParents &parents)
Definition: ObserverJob.cpp:41
GraphQueryPtr query_
Definition: ObserverJob.h:57
BindingsHandler callback_
Definition: ObserverJob.h:58
std::shared_ptr< ObserverManager > manager_
Definition: ObserverJob.h:56
NodeParents terminalNodes_
Definition: ObserverJob.h:70

◆ ~ObserverJob() [1/2]

ObserverJob::~ObserverJob ( )

Definition at line 23 of file ObserverJob.cpp.

23  {
24  stop();
25 }

◆ ObserverJob() [2/2]

knowrob::ObserverJob::ObserverJob ( const std::shared_ptr< ObserverManager > &  manager,
const GraphQueryPtr query,
const BindingsHandler callback 
)

Create an observer job.

Parameters
managerthe observer manager.
querythe query to observe.
callbackthe callback to invoke when the query matches.

◆ ~ObserverJob() [2/2]

knowrob::ObserverJob::~ObserverJob ( )

Member Function Documentation

◆ applyBuiltins() [1/2]

BindingsPtr ObserverJob::applyBuiltins ( const std::shared_ptr< Node > &  node,
const BindingsPtr bindings 
)
protected

Definition at line 214 of file ObserverJob.cpp.

214  {
215  if (node->builtins.empty()) {
216  return bindings;
217  } else {
218  auto bindings_rw = std::make_shared<Bindings>(*bindings);
219  for (const auto &builtin: node->builtins) {
220  builtin->apply(bindings_rw);
221  }
222  return bindings_rw;
223  }
224 }

◆ applyBuiltins() [2/2]

BindingsPtr knowrob::ObserverJob::applyBuiltins ( const std::shared_ptr< Node > &  node,
const BindingsPtr bindings 
)
protected

◆ createGraph() [1/2]

ObserverJob::NodeParents ObserverJob::createGraph ( const std::shared_ptr< GraphTerm > &  term,
const NodeParents parents 
)
protected

Definition at line 41 of file ObserverJob.cpp.

41  {
42  switch (term->termType()) {
44  auto node = createNode(std::static_pointer_cast<GraphPattern>(term));
45  for (const auto &parent: parents) {
46  node->parents.push_back(parent.get());
47  parent->children.push_back(node);
48  }
49  return {node};
50  }
52  if (!parents.empty()) {
53  for (const auto &parent: parents) {
54  auto builtin = std::static_pointer_cast<GraphBuiltin>(term);
55  parent->builtins.push_back(builtin);
56  }
57  } else {
58  KB_WARN("Builtins are not allowed at the top level of an observed query.");
59  }
60  return parents;
61  }
63  auto sequence = std::static_pointer_cast<GraphSequence>(term);
64  ObserverJob::NodeParents nextParents = parents;
65  for (const auto &subTerm: sequence->terms()) {
66  nextParents = createGraph(subTerm, nextParents);
67  }
68  return nextParents;
69  }
70  case GraphTermType::Union: {
71  auto unionTerm = std::static_pointer_cast<GraphUnion>(term);
72  ObserverJob::NodeParents nextParents;
73  for (const auto &subTerm: unionTerm->terms()) {
74  auto subParents = createGraph(subTerm, parents);
75  nextParents.insert(nextParents.end(), subParents.begin(), subParents.end());
76  }
77  return nextParents;
78  }
79  }
80  return {};
81 }
#define KB_WARN
Definition: Logger.h:27
std::shared_ptr< ObserverJob::Node > createNode(const std::shared_ptr< GraphPattern > &pattern)
Definition: ObserverJob.cpp:33
std::vector< std::shared_ptr< Node > > NodeParents
Definition: ObserverJob.h:69
TermRule & term()
Definition: terms.cpp:136

◆ createGraph() [2/2]

NodeParents knowrob::ObserverJob::createGraph ( const std::shared_ptr< GraphTerm > &  term,
const NodeParents parents 
)
protected

◆ createNode() [1/2]

std::shared_ptr< ObserverJob::Node > ObserverJob::createNode ( const std::shared_ptr< GraphPattern > &  pattern)
protected

Definition at line 33 of file ObserverJob.cpp.

33  {
34  auto node = std::make_shared<Node>();
35  node->pattern = pattern;
36  nodes_.push_back(node);
37  return node;
38 }
GraphTermRule & pattern()
Definition: graph.cpp:23

◆ createNode() [2/2]

std::shared_ptr<ObserverJob::Node> knowrob::ObserverJob::createNode ( const std::shared_ptr< GraphPattern > &  pattern)
protected

◆ doInsert() [1/4]

void ObserverJob::doInsert ( const std::shared_ptr< Node > &  node,
const BindingsPtr newBindings 
)
protected

Definition at line 309 of file ObserverJob.cpp.

309  {
310  auto solHash = newBindings->hash();
311 
312  if(node->solutions.find(solHash) != node->solutions.end()) {
313  // the solution is already in the node
314  return;
315  }
316  node->solutions[solHash] = newBindings;
317 
318  if (node->children.empty()) {
319  // if the node is terminal, then call the callback with the new bindings.
320  callback_(newBindings);
321  } else {
322  // if the node is not terminal, then a graph query must be constructed for each child node
323  // with the new bindings applied. If there are new solutions, then call insert on the child node
324  // with the new solutions to propagate the new data through the graph.
325 
326  for (auto &childNode: node->children) {
327  auto newQuery = makeAtomicQuery(childNode, newBindings);
328  manager_->query(newQuery, [this, childNode, newBindings](const BindingsPtr &bindings) {
329  doInsert(childNode, newBindings, bindings);
330  });
331  }
332  }
333 }
void doInsert(const std::shared_ptr< Node > &node, const BindingsPtr &newBindings)
GraphQueryPtr makeAtomicQuery(const std::shared_ptr< Node > &node, const BindingsPtr &bindings)
Definition: ObserverJob.cpp:98
std::shared_ptr< const Bindings > BindingsPtr
Definition: Bindings.h:151

◆ doInsert() [2/4]

void knowrob::ObserverJob::doInsert ( const std::shared_ptr< Node > &  node,
const BindingsPtr newBindings 
)
protected

◆ doInsert() [3/4]

void ObserverJob::doInsert ( const std::shared_ptr< Node > &  node,
const BindingsPtr parentBindings,
const BindingsPtr nodeBindings 
)
protected

Definition at line 300 of file ObserverJob.cpp.

302  {
303  auto newBindings_rw = std::make_shared<Bindings>(*parentBindings);
304  newBindings_rw->operator+=(*nodeBindings);
305  auto newBindings = applyBuiltins(node, newBindings_rw);
306  doInsert(node, newBindings);
307 }
BindingsPtr applyBuiltins(const std::shared_ptr< Node > &node, const BindingsPtr &bindings)

◆ doInsert() [4/4]

void knowrob::ObserverJob::doInsert ( const std::shared_ptr< Node > &  node,
const BindingsPtr parentBindings,
const BindingsPtr nodeBindings 
)
protected

◆ initializeNode() [1/4]

void ObserverJob::initializeNode ( const std::shared_ptr< Node > &  node)
protected

Definition at line 112 of file ObserverJob.cpp.

112  {
113  std::vector<GraphQueryPtr> queries;
114  std::queue<std::pair<Node *, std::vector<Node *>>> queryQueue;
115  queryQueue.push({nodeToInit.get(), { nodeToInit.get() }});
116 
117  // build queries from the node and its parents
118  while (!queryQueue.empty()) {
119  auto [node, seq] = queryQueue.front();
120  queryQueue.pop();
121 
122  if (node->parents.empty()) {
123  queries.push_back(makeQuery(seq));
124  } else {
125  for (const auto &parent: node->parents) {
126  std::vector<Node *> newSeq = seq;
127  newSeq.push_back(parent);
128  queryQueue.push({parent, newSeq});
129  }
130  }
131  }
132 
133  // run queries against the database
134  for (const auto &nodeQuery: queries) {
135  manager_->query(nodeQuery, [this, nodeToInit](const BindingsPtr &bindings) {
136  initializeNode(nodeToInit, bindings);
137  });
138  }
139 }
GraphQueryPtr makeQuery(const std::vector< Node * > &reverseSequence)
Definition: ObserverJob.cpp:83

◆ initializeNode() [2/4]

void knowrob::ObserverJob::initializeNode ( const std::shared_ptr< Node > &  node)
protected

◆ initializeNode() [3/4]

void ObserverJob::initializeNode ( const std::shared_ptr< Node > &  node,
const BindingsPtr bindings 
)
protected

Definition at line 141 of file ObserverJob.cpp.

141  {
142  auto answerHash = answer->hash();
143  auto it = node->solutions.find(answerHash);
144  if (it == node->solutions.end()) {
145  // store the answer in the node
146  node->solutions[answerHash] = answer;
147  // check if the node is a terminal node
148  if (node->children.empty()) {
149  // if so, call the callback
150  callback_(answer);
151  }
152  }
153 }

◆ initializeNode() [4/4]

void knowrob::ObserverJob::initializeNode ( const std::shared_ptr< Node > &  node,
const BindingsPtr bindings 
)
protected

◆ insert() [1/2]

void ObserverJob::insert ( const std::shared_ptr< Node > &  node,
const Triple triple 
)
protected

Definition at line 264 of file ObserverJob.cpp.

264  {
265  auto nodePattern = node->pattern->value();
266 
267  // compute bindings for the triple
268  auto tripleBindings = std::make_shared<Bindings>();
269  if (nodePattern->subjectTerm()->isVariable()) {
270  auto var = std::static_pointer_cast<Variable>(nodePattern->subjectTerm());
271  tripleBindings->set(var, std::make_shared<IRIAtom>(triple.subject()));
272  }
273  if (nodePattern->propertyTerm()->isVariable()) {
274  auto var = std::static_pointer_cast<Variable>(nodePattern->propertyTerm());
275  tripleBindings->set(var, std::make_shared<IRIAtom>(triple.predicate()));
276  }
277  if (nodePattern->objectTerm()->isVariable()) {
278  auto var = std::static_pointer_cast<Variable>(nodePattern->objectTerm());
279  auto o_triple = Atomic::makeTripleValue(triple);
280  tripleBindings->set(var, o_triple);
281  }
282 
283  //insert(node, tripleBindings);
284  if (node->parents.empty()) {
285  // the node is a root node
286  doInsert(node, tripleBindings);
287  }
288  else {
289  for (auto &parentNode: node->parents) {
290  for (auto &parentSolPair: parentNode->solutions) {
291  auto &parentSolution = parentSolPair.second;
292  if (parentSolution->isConsistentWith(*tripleBindings)) {
293  doInsert(node, parentSolution, tripleBindings);
294  }
295  }
296  }
297  }
298 }
static std::shared_ptr< Atomic > makeTripleValue(const Triple &triple)
Definition: Atomic.cpp:31
virtual std::string_view subject() const =0
virtual std::string_view predicate() const =0
VariableRule & var()
Definition: terms.cpp:91

◆ insert() [2/2]

void knowrob::ObserverJob::insert ( const std::shared_ptr< Node > &  node,
const Triple triple 
)
protected

◆ makeAtomicQuery() [1/2]

GraphQueryPtr ObserverJob::makeAtomicQuery ( const std::shared_ptr< Node > &  node,
const BindingsPtr bindings 
)
protected

Definition at line 98 of file ObserverJob.cpp.

98  {
99  std::vector<std::shared_ptr<GraphTerm>> terms;
100  terms.push_back(applyBindings(node->pattern, *bindings));
101  for (const auto &builtin: node->builtins) {
102  std::shared_ptr<GraphTerm> builtinTerm = builtin;
103  terms.push_back(applyBindings(builtinTerm, *bindings));
104  }
105  if (terms.size() == 1) {
106  return std::make_shared<GraphQuery>(terms[0]);
107  } else {
108  return std::make_shared<GraphQuery>(std::make_shared<GraphSequence>(terms));
109  }
110 }
FirstOrderLiteralPtr applyBindings(const FirstOrderLiteralPtr &lit, const Bindings &bindings)

◆ makeAtomicQuery() [2/2]

GraphQueryPtr knowrob::ObserverJob::makeAtomicQuery ( const std::shared_ptr< Node > &  node,
const BindingsPtr bindings 
)
protected

◆ makeQuery() [1/2]

GraphQueryPtr ObserverJob::makeQuery ( const std::vector< Node * > &  reverseSequence)
protected

Definition at line 83 of file ObserverJob.cpp.

83  {
84  std::vector<std::shared_ptr<GraphTerm>> terms;
85  for (auto it = reverseSequence.rbegin(); it != reverseSequence.rend(); ++it) {
86  terms.push_back((*it)->pattern);
87  for (const auto &builtin: (*it)->builtins) {
88  terms.push_back(builtin);
89  }
90  }
91  if (terms.size() == 1) {
92  return std::make_shared<GraphQuery>(terms[0]);
93  } else {
94  return std::make_shared<GraphQuery>(std::make_shared<GraphSequence>(terms));
95  }
96 }

◆ makeQuery() [2/2]

GraphQueryPtr knowrob::ObserverJob::makeQuery ( const std::vector< Node * > &  reverseSequence)
protected

◆ manager() [1/2]

auto& knowrob::ObserverJob::manager ( ) const
inline
Returns
the observer manager.

Definition at line 53 of file ObserverJob.h.

53 { return manager_; }

◆ manager() [2/2]

auto& knowrob::ObserverJob::manager ( ) const
inline
Returns
the observer manager.

Definition at line 53 of file ObserverJob.h.

53 { return manager_; }

◆ matches() [1/2]

bool ObserverJob::matches ( const Node node,
const Triple triple 
)
protected

Definition at line 155 of file ObserverJob.cpp.

155  {
156  auto &pat = node.pattern->value();
157 
158  auto &s_pat = pat->subjectTerm();
159  if (s_pat->isAtomic()) {
160  auto s_pat_atom = std::static_pointer_cast<Atomic>(s_pat);
161  if (s_pat_atom->stringForm() != triple.subject()) return false;
162  } else if (!s_pat->isVariable()) {
163  return false;
164  }
165 
166  auto &p_pat = pat->propertyTerm();
167  if (p_pat->isAtomic()) {
168  auto p_pat_atom = std::static_pointer_cast<Atomic>(p_pat);
169  if (p_pat_atom->stringForm() != triple.predicate()) return false;
170  } else if (!p_pat->isVariable()) {
171  return false;
172  }
173 
174  auto &o_pat = pat->objectTerm();
175  if (o_pat->isAtomic()) {
176  auto o_pat_atom = std::static_pointer_cast<Atomic>(o_pat);
177 
178  if (triple.isObjectIRI() || triple.isObjectBlank()) {
179  auto o_triple = triple.valueAsString();
180  if (o_pat_atom->stringForm() != o_triple) return false;
181  } else {
182  auto o_triple = Atomic::makeTripleValue(triple);
183  if (*o_pat_atom != *o_triple) return false;
184  }
185  } else if (!o_pat->isVariable()) {
186  return false;
187  }
188 
189  return true;
190 }
virtual std::string_view valueAsString() const =0
bool isObjectIRI() const
Definition: Triple.h:54
bool isObjectBlank() const
Definition: Triple.h:49

◆ matches() [2/2]

bool knowrob::ObserverJob::matches ( const Node node,
const Triple triple 
)
protected

◆ processInsertion() [1/2]

void ObserverJob::processInsertion ( const TripleContainerPtr triples)

Process the insertion of triples.

Parameters
triplesthe triples to insert.

Definition at line 192 of file ObserverJob.cpp.

192  {
193  for (auto &triplePtr: *triples) {
194  auto &triple = *triplePtr;
195  for (auto &node: nodes_) {
196  if (matches(*node, triple)) {
197  insert(node, triple);
198  }
199  }
200  }
201 }
bool matches(const Node &node, const Triple &triple)
void insert(const std::shared_ptr< Node > &node, const Triple &triple)

◆ processInsertion() [2/2]

void knowrob::ObserverJob::processInsertion ( const TripleContainerPtr triples)

Process the insertion of triples.

Parameters
triplesthe triples to insert.

◆ processRemoval() [1/2]

void ObserverJob::processRemoval ( const TripleContainerPtr triples)

Process the removal of triples.

Parameters
triplesthe triples to remove.

Definition at line 203 of file ObserverJob.cpp.

203  {
204  for (auto &triplePtr: *triples) {
205  auto &triple = *triplePtr;
206  for (auto &node: nodes_) {
207  if (matches(*node, triple)) {
208  remove(node, triple);
209  }
210  }
211  }
212 }
void remove(const std::shared_ptr< Node > &node, const Triple &triple)

◆ processRemoval() [2/2]

void knowrob::ObserverJob::processRemoval ( const TripleContainerPtr triples)

Process the removal of triples.

Parameters
triplesthe triples to remove.

◆ remove() [1/2]

void ObserverJob::remove ( const std::shared_ptr< Node > &  node,
const Triple triple 
)
protected

Definition at line 226 of file ObserverJob.cpp.

226  {
227  auto nodePattern = node->pattern->value();
228 
229  // compute bindings for the triple
230  auto tripleBindings = std::make_shared<Bindings>();
231  if (nodePattern->subjectTerm()->isVariable()) {
232  auto var = std::static_pointer_cast<Variable>(nodePattern->subjectTerm());
233  tripleBindings->set(var, std::make_shared<IRIAtom>(triple.subject()));
234  }
235  if (nodePattern->propertyTerm()->isVariable()) {
236  auto var = std::static_pointer_cast<Variable>(nodePattern->propertyTerm());
237  tripleBindings->set(var, std::make_shared<IRIAtom>(triple.predicate()));
238  }
239  if (nodePattern->objectTerm()->isVariable()) {
240  auto var = std::static_pointer_cast<Variable>(nodePattern->objectTerm());
241  auto o_triple = Atomic::makeTripleValue(triple);
242  tripleBindings->set(var, o_triple);
243  }
244 
245  // remove all solutions of this node or children nodes
246  // that are consistent with the removed triple
247  std::queue<Node*> nodeQueue;
248  nodeQueue.push(node.get());
249  while (!nodeQueue.empty()) {
250  auto nextNode = nodeQueue.front();
251  nodeQueue.pop();
252  for (auto &solPair: nextNode->solutions) {
253  auto &solution = solPair.second;
254  if (solution->isConsistentWith(*tripleBindings)) {
255  nextNode->solutions.erase(solPair.first);
256  }
257  }
258  for (auto &childNode: nextNode->children) {
259  nodeQueue.push(childNode.get());
260  }
261  }
262 }

◆ remove() [2/2]

void knowrob::ObserverJob::remove ( const std::shared_ptr< Node > &  node,
const Triple triple 
)
protected

◆ stop() [1/2]

void ObserverJob::stop ( )

Stop the observer job.

Definition at line 27 of file ObserverJob.cpp.

27  {
28  nodes_.clear();
29  terminalNodes_.clear();
30  manager_ = nullptr;
31 }

◆ stop() [2/2]

void knowrob::ObserverJob::stop ( )

Stop the observer job.

Member Data Documentation

◆ callback_

BindingsHandler knowrob::ObserverJob::callback_
protected

Definition at line 58 of file ObserverJob.h.

◆ manager_

std::shared_ptr< ObserverManager > knowrob::ObserverJob::manager_
protected

Definition at line 56 of file ObserverJob.h.

◆ nodes_

std::vector< std::shared_ptr< Node > > knowrob::ObserverJob::nodes_
protected

Definition at line 67 of file ObserverJob.h.

◆ query_

GraphQueryPtr knowrob::ObserverJob::query_
protected

Definition at line 57 of file ObserverJob.h.

◆ terminalNodes_

NodeParents knowrob::ObserverJob::terminalNodes_
protected

Definition at line 70 of file ObserverJob.h.


The documentation for this class was generated from the following files: