knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
StorageInterface.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of KnowRob, please consult
3  * https://github.com/knowrob/knowrob for license details.
4  */
5 
6 #include "knowrob/storage/StorageInterface.h"
7 #include "knowrob/storage/ReifiedTriple.h"
8 #include "knowrob/storage/UnReificationContainer.h"
9 #include "knowrob/storage/ReifiedQuery.h"
10 #include "knowrob/storage/Transaction.h"
11 #include "knowrob/semweb/GraphBuiltin.h"
12 #include "knowrob/knowrob.h"
13 #include "knowrob/storage/reification.h"
14 
15 using namespace knowrob;
16 
17 std::shared_ptr<transaction::Transaction> StorageInterface::createTransaction(
18  const QueryableBackendPtr &queryable,
19  TransactionType transactionType,
20  BackendSelection transactionTargets,
21  const std::vector<std::shared_ptr<NamedBackend>> &backends) {
22  std::shared_ptr<transaction::Transaction> transaction;
23  if (transactionType == Insert) {
24  transaction = std::make_shared<transaction::Insert>(queryable, vocabulary());
25  } else {
26  transaction = std::make_shared<transaction::Remove>(queryable, vocabulary());
27  }
28  if (transactionTargets == Including) {
29  for (auto &backend: backends) {
30  transaction->addBackend(backend);
31  }
32  } else {
33  for (auto &definedBackend: backendManager_->plugins()) {
34  auto &backend = definedBackend.second->value();
35 
36  bool skip = false;
37  if (transactionTargets == Excluding) {
38  for (auto &excluded: backends) {
39  if (excluded && backend == excluded->value()) {
40  skip = true;
41  break;
42  }
43  }
44  }
45  if (skip) continue;
46  transaction->addBackend(definedBackend.second);
47  }
48  }
49  return transaction;
50 }
51 
52 bool StorageInterface::removeAllWithOrigin(std::string_view origin) {
53  // remove all triples with a given origin from all backends.
54  std::vector<std::shared_ptr<ThreadPool::Runner>> transactions;
55  for (auto &it: backendManager_->plugins()) {
56  auto definedBackend = it.second;
57  // create a worker goal that performs the transaction
58  auto transaction = std::make_shared<ThreadPool::LambdaRunner>(
59  [definedBackend, origin](const ThreadPool::LambdaRunner::StopChecker &) {
60  if (definedBackend->value()->removeAllWithOrigin(origin)) {
61  // unset version of origin in backend
62  definedBackend->value()->setVersionOfOrigin(origin, std::nullopt);
63  } else {
64  KB_WARN("removal of triples with origin '{}' from backend '{}' failed!", origin,
65  definedBackend->name());
66  }
67  });
68  // push goal to thread pool
69  DefaultThreadPool()->pushWork(
70  transaction,
71  [definedBackend](const std::exception &exc) {
72  KB_ERROR("transaction failed for backend '{}': {}", definedBackend->name(), exc.what());
73  });
74  transactions.push_back(transaction);
75  }
76 
77  // wait for all transactions to finish
78  for (auto &transaction: transactions) transaction->join();
79 
80  // remove origin from import hierarchy
81  if (!vocabulary()->importHierarchy()->isReservedOrigin(origin)) {
82  vocabulary()->importHierarchy()->removeCurrentGraph(origin);
83  }
84 
85  return true;
86 }
87 
88 bool StorageInterface::mergeInsert(const QueryableBackendPtr &backend, const Triple &triple) {
89  auto pat = std::make_shared<TriplePattern>(triple);
90  // Match triples where interval intersection is not empty
91  pat->setIsOccasionalTerm(groundable(Numeric::trueAtom()));
92  // Construct a merged triple
93  TripleView mergedTriple(triple);
94  // Store overlapping triples to remove them after matching
95  std::vector<TriplePtr> overlappingTriples;
96  // Match a triple pattern in backend.
97  // Note that the match will return the un-reified variant of the triple no matter
98  // if the backend stores the triple reified or not.
99  match(backend, *pat, [&](const TriplePtr &matchedTriple) {
100  if (mergedTriple.mergeFrame(*matchedTriple)) {
101  auto &x = overlappingTriples.emplace_back();
102  x.owned = true;
103  // try to take over ownership of matched triple
104  if (matchedTriple.owned) {
105  x.ptr = matchedTriple.ptr;
106  matchedTriple.owned = false;
107  } else {
108  x.ptr = new TripleCopy(*matchedTriple);
109  }
110  }
111  });
112  if (!overlappingTriples.empty()) {
113  // remove overlapping triples if any
114  auto container = std::make_shared<ProxyTripleContainer>(&overlappingTriples);
115  createTransaction(backend, Remove)->commit(container);
116  }
117  // Insert the triple after merging with overlapping existing ones
118  createTransaction(backend, Insert)->commit(mergedTriple);
119  return true;
120 }
121 
122 bool StorageInterface::contains(const QueryableBackendPtr &backend, const Triple &triple) const {
123  if (backend->supports(StorageFeature::TripleContext)) {
124  return backend->contains(triple);
125  }
126 
127  ReifiedTriple reification(triple, vocabulary());
128  bool containsAll = false;
129  for (auto &reified: reification) {
130  containsAll = backend->contains(*reified.ptr);
131  if (!containsAll) {
132  break;
133  }
134  }
135  return containsAll;
136 }
137 
138 void StorageInterface::foreach(const QueryableBackendPtr &backend, const TripleVisitor &visitor) {
139  if (backend->supports(StorageFeature::TripleContext)) {
140  backend->foreach(visitor);
141  return;
142  }
143 
144  // fill a container that reverses a reification.
145  UnReificationContainer unReifiedTriples;
146  // the UnReificationContainer uses triple views, but memory of original triples can be
147  // lost in the loop. So we need to store the original triples in a vector, and create
148  // a view on them in the UnReificationContainer.
149  std::vector<TriplePtr> originalTriples;
150  // finally loop over all original triples
151  backend->foreach([&](const TriplePtr &triple) {
152  if (ReifiedTriple::isPartOfReification(*triple)) {
153  auto &copy = originalTriples.emplace_back();
154  copy.owned = true;
155  // try to take over ownership of triple
156  if (triple.owned) {
157  copy.ptr = triple.ptr;
158  triple.owned = false;
159  } else {
160  copy.ptr = new TripleCopy(*triple.ptr);
161  }
162  unReifiedTriples.add(*copy.ptr);
163  } else {
164  visitor(triple);
165  }
166  });
167  // after looping over all original triples, also visit the un-reified ones
168  for (auto &triple: unReifiedTriples) {
169  visitor(triple);
170  }
171 }
172 
173 void StorageInterface::batch(const QueryableBackendPtr &backend, const TripleHandler &callback) {
174  if (backend->supports(StorageFeature::TripleContext)) {
175  backend->batch(callback);
176  return;
177  }
178 
179  auto batchSize = GlobalSettings::batchSize();
180  // fill a container that reverses a reification.
181  UnReificationContainer unReifiedTriples;
182  // take over ownership of triples in batches that need to be reified.
183  // note: reified triples could be split into multiple batches which makes
184  // the collapsing of them more difficult.
185  // to this end we defer the collapsing until the batchDirect call has completed
186  // while taking over ownership of the reified triples to avoid copies and allow
187  // the use of views in the UnReificationContainer.
188  std::vector<TriplePtr> reificationTriples;
189  auto batch = std::make_shared<TripleViewBatch>(batchSize);
190  backend->batch([&](const TripleContainerPtr &triples) {
191  for (auto &triple: *triples) {
192  if (ReifiedTriple::isPartOfReification(*triple.ptr)) {
193  // take over ownership of triple
194  if (triple.owned) {
195  triple.owned = false;
196  auto &newOwner = reificationTriples.emplace_back(triple.ptr);
197  unReifiedTriples.add(*newOwner.ptr);
198  } else {
199  auto &copy = reificationTriples.emplace_back(new TripleCopy(*triple.ptr));
200  unReifiedTriples.add(*copy.ptr);
201  }
202  } else {
203  batch->add(triple);
204  if (batch->size() >= batchSize) {
205  callback(batch);
206  batch->reset();
207  }
208  }
209  }
210  });
211  for (auto &triple: unReifiedTriples) {
212  batch->add(triple);
213  if (batch->size() >= batchSize) {
214  callback(batch);
215  batch->reset();
216  }
217  }
218  if (!batch->empty()) {
219  callback(batch);
220  }
221 }
222 
223 static void setReifiedVariables( // NOLINT(misc-no-recursion)
224  const std::shared_ptr<GraphTerm> &t,
225  const std::map<std::string_view, VariablePtr> &variables) {
226  switch (t->termType()) {
228  auto &pattern = std::static_pointer_cast<GraphPattern>(t)->value();
229  if (!pattern->propertyTerm() || !pattern->propertyTerm()->isAtomic()) break;
230  auto atomic = std::static_pointer_cast<Atomic>(pattern->propertyTerm());
231  auto needle = variables.find(atomic->stringForm());
232  if (needle != variables.end()) {
233  pattern->setObjectVariable(needle->second);
234  }
235  break;
236  }
239  auto connective = std::static_pointer_cast<GraphConnective>(t);
240  for (auto &term: connective->terms()) {
241  setReifiedVariables(term, variables);
242  }
243  break;
244  }
246  break;
247  };
248 }
249 
251  const TripleVisitor &visitor) const {
252  static auto ctx = std::make_shared<QueryContext>();
253  if (backend->supports(StorageFeature::TripleContext)) {
254  backend->match(q, visitor);
255  } else {
256  auto flags = ReifiedQuery::getReificationFlags(q);
257  if (flags & IncludeOriginal) {
258  backend->match(q, visitor);
259  }
260  if (flags & IncludeReified) {
261  static auto v_o = std::make_shared<Variable>("o");
262  static auto v_begin = std::make_shared<Variable>("begin");
263  static auto v_end = std::make_shared<Variable>("end");
264  static auto v_confidence = std::make_shared<Variable>("confidence");
265  static auto v_uncertain = std::make_shared<Variable>("uncertain");
266  static auto v_occasional = std::make_shared<Variable>("occasional");
267 
268  auto reified = std::make_shared<ReifiedQuery>(q, vocabulary(), true);
269  // insert variables for contextual parameters
270  setReifiedVariables(reified->term(), {
271  {reification::hasBeginTime->stringForm(), v_begin},
272  {reification::hasEndTime->stringForm(), v_end},
273  {reification::hasConfidence->stringForm(), v_confidence},
274  {reification::isUncertain->stringForm(), v_uncertain},
275  {reification::isOccasional->stringForm(), v_occasional}
276  });
277 
278  backend->query(reified, [&](const BindingsPtr &bindings) {
279  TriplePtr triple;
280  triple.ptr = new TripleView();
281  triple.owned = true;
282 
283  if (q.instantiateInto(*triple, bindings)) {
284  if (bindings->contains(v_begin->name())) {
285  auto &t_begin = bindings->get(v_begin->name());
286  if (t_begin->isNumeric()) {
287  triple->setBegin(std::static_pointer_cast<Numeric>(t_begin)->asDouble());
288  }
289  }
290  if (bindings->contains(v_end->name())) {
291  auto &t_end = bindings->get(v_end->name());
292  if (t_end->isNumeric()) {
293  triple->setEnd(std::static_pointer_cast<Numeric>(t_end)->asDouble());
294  }
295  }
296  if (bindings->contains(v_confidence->name())) {
297  auto &t_confidence = bindings->get(v_confidence->name());
298  if (t_confidence->isNumeric()) {
299  triple->setConfidence(std::static_pointer_cast<Numeric>(t_confidence)->asDouble());
300  }
301  }
302  if (bindings->contains(v_uncertain->name())) {
303  auto &t_uncertain = bindings->get(v_uncertain->name());
304  if (t_uncertain->isNumeric()) {
305  triple->setIsUncertain(std::static_pointer_cast<Boolean>(t_uncertain)->asBoolean());
306  }
307  } else {
308  triple->setIsUncertain(false);
309  }
310  if (bindings->contains(v_occasional->name())) {
311  auto &t_occasional = bindings->get(v_occasional->name());
312  if (t_occasional->isNumeric()) {
313  triple->setIsOccasional(std::static_pointer_cast<Boolean>(t_occasional)->asBoolean());
314  }
315  } else {
316  triple->setIsOccasional(false);
317  }
318  visitor(triple);
319  }
320  });
321  }
322  }
323 }
324 
326  const BindingsHandler &callback) const {
328  // if there is at least one reifiable pattern, we need to reify the query entirely,
329  // and run the reified query on the original backend.
330  auto reified = std::make_shared<ReifiedQuery>(q, vocabulary());
331  backend->query(reified, [&](const BindingsPtr &bindings) {
332  callback(bindings);
333  });
334  } else {
335  backend->query(q, callback);
336  }
337 }
338 
340  const TokenBufferPtr &resultStream) const {
341  auto expanded = backend->expand(q);
342  auto channel = TokenStream::Channel::create(resultStream);
343  try {
344  bool hasPositiveAnswer = false;
345  query(backend, expanded->expanded, [&](const BindingsPtr &bindings) {
346  channel->push(backend->yes(q, expanded, bindings));
347  hasPositiveAnswer = true;
348  });
349  if (!hasPositiveAnswer) {
350  channel->push(backend->no(q));
351  }
352  channel->push(EndOfEvaluation::get());
353  }
354  catch (const std::exception &e) {
355  // make sure EOS is pushed to the stream
356  channel->push(EndOfEvaluation::get());
357  throw;
358  }
359 }
360 
362  std::shared_ptr<TokenBuffer> result = std::make_shared<TokenBuffer>();
363  auto runner =
364  std::make_shared<ThreadPool::LambdaRunner>(
365  [this, q, result, backend](const ThreadPool::LambdaRunner::StopChecker &) {
366  pushIntoCursor(backend, q, result);
367  });
368  DefaultThreadPool()->pushWork(runner, [result, q](const std::exception &e) {
369  KB_WARN("an exception occurred for graph query ({}): {}.", *q, e.what());
370  result->close();
371  });
372  return result;
373 }
#define KB_ERROR
Definition: Logger.h:28
#define KB_WARN
Definition: Logger.h:27
static uint32_t batchSize()
Definition: knowrob.h:68
static std::shared_ptr< Numeric > trueAtom()
Definition: Numeric.cpp:15
static int getReificationFlags(const TriplePattern &q)
static bool hasReifiablePattern(const std::shared_ptr< GraphQuery > &nonReified)
static bool isPartOfReification(const Triple &triple)
std::shared_ptr< transaction::Transaction > createTransaction(const QueryableBackendPtr &queryable, TransactionType type, BackendSelection mode=Excluding, const std::vector< std::shared_ptr< NamedBackend >> &backends={})
bool removeAllWithOrigin(std::string_view origin)
bool mergeInsert(const QueryableBackendPtr &backend, const Triple &triple)
static void foreach(const QueryableBackendPtr &backend, const TripleVisitor &visitor)
void pushIntoCursor(const QueryableBackendPtr &backend, const GraphPathQueryPtr &query, const TokenBufferPtr &resultStream) const
std::shared_ptr< StorageManager > backendManager_
void query(const QueryableBackendPtr &backend, const GraphQueryPtr &q, const BindingsHandler &callback) const
static void batch(const QueryableBackendPtr &backend, const TripleHandler &callback)
TokenBufferPtr getAnswerCursor(const QueryableBackendPtr &backend, const GraphPathQueryPtr &query)
void match(const QueryableBackendPtr &backend, const TriplePattern &query, const TripleVisitor &visitor) const
std::function< bool()> StopChecker
Definition: ThreadPool.h:152
static std::shared_ptr< Channel > create(const std::shared_ptr< TokenStream > &stream)
Definition: TokenStream.cpp:88
void setConfidence(double confidence)
Definition: Triple.h:307
void setIsUncertain(bool isUncertain)
Definition: Triple.h:292
void setIsOccasional(bool isOccasional)
Definition: Triple.h:287
bool mergeFrame(const Triple &other)
Definition: Triple.cpp:274
void setBegin(double begin)
Definition: Triple.h:297
void setEnd(double end)
Definition: Triple.h:302
bool instantiateInto(Triple &triple, const std::shared_ptr< const Bindings > &bindings=Bindings::emptyBindings()) const
GraphTermRule & pattern()
Definition: graph.cpp:23
TermRule & atomic()
Definition: terms.cpp:79
TermRule & term()
Definition: terms.cpp:136
std::function< void(const TriplePtr &)> TripleVisitor
std::shared_ptr< TripleContainer > TripleContainerPtr
std::shared_ptr< const Bindings > BindingsPtr
Definition: Bindings.h:151
@ IncludeReified
Definition: ReifiedQuery.h:18
@ IncludeOriginal
Definition: ReifiedQuery.h:17
std::shared_ptr< TokenBuffer > TokenBufferPtr
Definition: TokenBuffer.h:43
std::function< void(const BindingsPtr &)> BindingsHandler
Definition: Bindings.h:152
std::shared_ptr< ThreadPool > DefaultThreadPool()
Definition: ThreadPool.cpp:19
std::function< void(const TripleContainerPtr &)> TripleHandler
std::shared_ptr< GraphQuery > GraphQueryPtr
Definition: GraphQuery.h:65
std::shared_ptr< QueryableStorage > QueryableBackendPtr
TripleTemplate< std::string_view > TripleView
Definition: Triple.h:581
std::shared_ptr< GraphPathQuery > GraphPathQueryPtr
Triple * ptr
Definition: Triple.h:590