knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
Transaction.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of KnowRob, please consult
3  * https://github.com/knowrob/knowrob for license details.
4  */
5 
6 #include "knowrob/storage/Transaction.h"
7 #include "knowrob/storage/ReifiedTriple.h"
8 #include "knowrob/storage/ReificationContainer.h"
9 #include "knowrob/semweb/rdfs.h"
10 #include "knowrob/semweb/owl.h"
11 #include "knowrob/semweb/rdf.h"
12 #include "knowrob/storage/ReifiedQuery.h"
13 #include "knowrob/knowrob.h"
14 #include "knowrob/integration/python/gil.h"
15 
16 using namespace knowrob;
17 using namespace knowrob::transaction;
18 
19 static void setReificationVariable( // NOLINT(misc-no-recursion)
20  const std::shared_ptr<GraphTerm> &t,
21  const VariablePtr &variable) {
22  switch (t->termType()) {
24  auto &pattern = std::static_pointer_cast<GraphPattern>(t)->value();
25  pattern->setSubjectTerm(variable);
26  break;
27  }
30  auto connective = std::static_pointer_cast<GraphConnective>(t);
31  for (auto &term: connective->terms()) {
32  setReificationVariable(term, variable);
33  }
34  break;
35  }
37  break;
38  };
39 }
40 
42  static auto v_reification = std::make_shared<Variable>("reification");
43  auto pat = std::make_shared<TriplePattern>(triple);
44  auto query = std::make_shared<GraphPathQuery>(pat);
45  auto reified = std::make_shared<ReifiedQuery>(query, vocabulary_);
46  setReificationVariable(reified->term(), v_reification);
47 
48  IRIAtomPtr reifiedName;
49  queryable_->query(reified, [&](const BindingsPtr &bindings) {
50  auto t_reifiedName = bindings->get(v_reification->name());
51  if (t_reifiedName && t_reifiedName->isIRI()) {
52  reifiedName = IRIAtom::Tabled(std::static_pointer_cast<IRIAtom>(t_reifiedName)->stringForm());
53  }
54  });
55  return reifiedName;
56 }
57 
58 bool Transaction::commit(const Triple &triple) {
59  static auto v_reification = std::make_shared<Variable>("reification");
60  if (isRemoval_ && ReifiedTriple::isReifiable(triple)) {
61  return commit(triple, queryReifiedName(triple));
62  } else {
63  return commit(triple, nullptr);
64  }
65 }
66 
67 bool Transaction::commit(const Triple &triple, const IRIAtomPtr &reifiedName) {
68  ReifiedTriplePtr reification;
69  bool success = true;
70 
71  // make sure the vocabulary is updated before committing the triple as the vocabulary
72  // could be used within some backends.
73  updateVocabulary(triple);
74 
75  for (auto &definedBackend: backends_) {
76  auto &backend = definedBackend->value();
77  if (!backend->supports(StorageFeature::TripleContext) && ReifiedTriple::isReifiable(triple)) {
78  if (!reification) reification = std::make_shared<ReifiedTriple>(triple, vocabulary_, reifiedName);
79  for (auto &reified: *reification) {
80  success = success && doCommit(*reified.ptr, backend);
81  }
82  } else {
83  success = doCommit(triple, backend);
84  }
85  if (!success) break;
86  }
87  return success;
88 }
89 
91  static auto v_reification = std::make_shared<Variable>("reification");
93  // FIXME: The name lookup only works in case the queryable backend does store reified triples.
94  // - current: commit generates a ReificationContainer over triples, and the container forms
95  // ground triples with explicit names of reified individuals for removal.
96  // (1) we could search for a queryable backend without context support and use this instead
97  // (2) we could store the reified names also in backends that support context, such they
98  // can be queried from any queryable backend.
99  // (3) remove for reified backends could use "removeAllMatching" instead, where the reified
100  // name is used as a variable in the query. but this interface was removed from DataBackend class!
101  // or allow none value as subject in removals, could be ok in the backends.
102  // Note: the container type does not provide a size method because it internally uses a generator
103  // without knowing when it will end. Also, container with additional filtering could be implemented.
104  // So we need to resize the reifiedNames vector while looping over the triples, but we can use
105  // the default batch size as initial size.
106  auto estimatedSize = GlobalSettings::batchSize();
107  ReifiedNames reifiedNames = std::make_shared<std::vector<IRIAtomPtr>>();
108  reifiedNames->reserve(estimatedSize);
109  for (auto &triple: *triples) {
110  if (ReifiedTriple::isReifiable(*triple)) {
111  reifiedNames->push_back(queryReifiedName(*triple));
112  } else {
113  reifiedNames->push_back(nullptr);
114  }
115  }
116  // If fewer elements were added, resize the vector
117  if (reifiedNames->size() < estimatedSize) {
118  reifiedNames->resize(reifiedNames->size());
119  }
120  return commit(triples, reifiedNames);
121  } else {
122  return commit(triples, {});
123  }
124 }
125 
126 bool Transaction::commit(const TripleContainerPtr &triples, const ReifiedNames &reifiedNames) {
127  TripleContainerPtr reified;
128  std::vector<std::shared_ptr<ThreadPool::Runner>> transactions;
129  bool success = true;
130 
131  // make sure the vocabulary is updated before committing the triples as the vocabulary
132  // could be used within some backends.
133  for (auto &triple: *triples) {
134  updateVocabulary(*triple);
135  }
136 
137  for (auto &definedBackend: backends_) {
138  auto &backend = definedBackend->value();
139  const TripleContainerPtr *backendTriples;
140  if (!backend->supports(StorageFeature::TripleContext)) {
141  if (!reified) reified = std::make_shared<ReificationContainer>(triples, vocabulary_, reifiedNames);
142  backendTriples = &reified;
143  } else {
144  backendTriples = &triples;
145  }
146  auto worker = std::make_shared<ThreadPool::LambdaRunner>(
147  [&](const std::function<bool()> &) { success = success && commitProtected(*backendTriples, backend); });
148  transactions.push_back(worker);
149 
150  DefaultThreadPool()->pushWork(worker,
151  [&definedBackend](const std::exception &exc) {
152  KB_ERROR("transaction failed for backend '{}': {}", definedBackend->name(),
153  exc.what());
154  });
155  }
156 
157  for (auto &transaction: transactions) transaction->join();
158 
159  return success;
160 }
161 
162 bool Transaction::commitProtected(const TripleContainerPtr &triples, const StoragePtr &backend) {
163  if (backend->storageLanguage() == PluginLanguage::PYTHON) {
164  py::gil_lock lock;
165  return doCommit(triples, backend);
166  } else {
167  return doCommit(triples, backend);
168  }
169 }
170 
171 std::shared_ptr<ThreadPool::Runner> Transaction::createTripleWorker(
172  const TripleContainerPtr &triples,
173  const std::function<void(const TriplePtr &)> &fn) {
174  auto perTripleWorker =
175  std::make_shared<ThreadPool::LambdaRunner>([fn, triples](const ThreadPool::LambdaRunner::StopChecker &) {
176  std::for_each(triples->begin(), triples->end(), fn);
177  });
178  DefaultThreadPool()->pushWork(perTripleWorker,
179  [](const std::exception &exc) {
180  KB_ERROR("failed to perform per triple work: {}", exc.what());
181  });
182  return perTripleWorker;
183 }
184 
185 bool Insert::doCommit(const Triple &triple, const StoragePtr &backend) {
186  return backend->insertOne(triple);
187 }
188 
189 bool Remove::doCommit(const Triple &triple, const StoragePtr &backend) {
190  return backend->removeOne(triple);
191 }
192 
193 bool Insert::doCommit(const TripleContainerPtr &triples, const knowrob::StoragePtr &backend) {
194  return backend->insertAll(triples);
195 }
196 
197 bool Remove::doCommit(const TripleContainerPtr &triples, const knowrob::StoragePtr &backend) {
198  return backend->removeAll(triples);
199 }
200 
201 void Insert::updateVocabulary(const Triple &triple) {
202  // keep track of imports, subclasses, and subproperties
203  if (isSubClassOfIRI(triple.predicate())) {
204  auto sub = vocabulary_->defineClass(triple.subject());
205  auto sup = vocabulary_->defineClass(triple.valueAsString());
206  sub->addDirectParent(sup, triple.graph());
207  vocabulary_->increaseFrequency(rdfs::subClassOf->stringForm());
208  } else if (isSubPropertyOfIRI(triple.predicate())) {
209  auto sub = vocabulary_->defineProperty(triple.subject());
210  auto sup = vocabulary_->defineProperty(triple.valueAsString());
211  sub->addDirectParent(sup, triple.graph());
212  vocabulary_->increaseFrequency(rdfs::subPropertyOf->stringForm());
213  } else if (isTypeIRI(triple.predicate())) {
214  vocabulary_->addResourceType(triple.subject(), triple.valueAsString());
215  // increase frequency in vocabulary
216  static std::set<std::string_view> skippedTypes = {
217  owl::Class->stringForm(),
218  owl::Restriction->stringForm(),
219  owl::NamedIndividual->stringForm(),
220  owl::AnnotationProperty->stringForm(),
221  owl::ObjectProperty->stringForm(),
222  owl::DatatypeProperty->stringForm(),
223  rdfs::Class->stringForm(),
224  rdf::Property->stringForm()
225  };
226  if (vocabulary_->isDefinedClass(triple.valueAsString()) &&
227  !skippedTypes.count(triple.valueAsString())) {
228  vocabulary_->increaseFrequency(triple.valueAsString());
229  }
230  vocabulary_->increaseFrequency(rdf::type->stringForm());
231  } else if (isInverseOfIRI(triple.predicate())) {
232  auto p = vocabulary_->defineProperty(triple.subject());
233  auto q = vocabulary_->defineProperty(triple.valueAsString());
234  p->setInverse(q);
235  q->setInverse(p);
236  vocabulary_->increaseFrequency(owl::inverseOf->stringForm());
237  } else if (owl::imports->stringForm() == triple.predicate()) {
238  auto resolvedImport = URI::resolve(triple.valueAsString());
239  auto importedGraph = DataSource::getNameFromURI(resolvedImport);
240  if (triple.graph()) {
241  vocabulary_->importHierarchy()->addDirectImport(triple.graph().value(), importedGraph);
242  } else {
243  KB_WARN("import statement without graph");
244  }
245  } else if (vocabulary_->isObjectProperty(triple.predicate()) ||
246  vocabulary_->isDatatypeProperty(triple.predicate())) {
247  // increase frequency of property in vocabulary
248  vocabulary_->increaseFrequency(triple.predicate());
249  }
250 }
251 
252 void Remove::updateVocabulary(const Triple &triple) {
253  // remove subclass and subproperty relations from the vocabulary.
254  if (isSubClassOfIRI(triple.predicate())) {
255  auto sub = vocabulary_->defineClass(triple.subject());
256  auto sup = vocabulary_->defineClass(triple.valueAsString());
257  sub->removeDirectParent(sup, triple.graph());
258  } else if (isSubPropertyOfIRI(triple.predicate())) {
259  auto sub = vocabulary_->defineProperty(triple.subject());
260  auto sup = vocabulary_->defineProperty(triple.valueAsString());
261  sub->removeDirectParent(sup, triple.graph());
262  }
263 }
#define KB_ERROR
Definition: Logger.h:28
#define KB_WARN
Definition: Logger.h:27
static std::string getNameFromURI(const std::string &uriString)
Definition: DataSource.cpp:36
static uint32_t batchSize()
Definition: knowrob.h:68
static bool isReifiable(const Triple &triple)
std::function< bool()> StopChecker
Definition: ThreadPool.h:152
virtual std::string_view valueAsString() const =0
virtual std::optional< std::string_view > graph() const =0
virtual std::string_view subject() const =0
virtual std::string_view predicate() const =0
static std::string resolve(const std::string_view &uriString)
Definition: URI.cpp:79
bool doCommit(const Triple &triple, const StoragePtr &backend) override
void updateVocabulary(const Triple &triple) override
void updateVocabulary(const Triple &triple) override
bool doCommit(const Triple &triple, const StoragePtr &backend) override
std::shared_ptr< Vocabulary > vocabulary_
Definition: Transaction.h:71
bool commit(const Triple &triple)
Definition: Transaction.cpp:58
IRIAtomPtr queryReifiedName(const Triple &triple)
Definition: Transaction.cpp:41
bool commitProtected(const TripleContainerPtr &triple, const StoragePtr &backend)
std::shared_ptr< QueryableStorage > queryable_
Definition: Transaction.h:72
std::vector< std::shared_ptr< NamedBackend > > backends_
Definition: Transaction.h:73
static std::shared_ptr< ThreadPool::Runner > createTripleWorker(const TripleContainerPtr &triples, const std::function< void(const TriplePtr &)> &fn)
virtual void updateVocabulary(const Triple &triple)=0
virtual bool doCommit(const Triple &triple, const StoragePtr &backend)=0
const IRIAtomPtr inverseOf
Definition: owl.h:17
const IRIAtomPtr ObjectProperty
Definition: owl.h:21
const IRIAtomPtr NamedIndividual
Definition: owl.h:27
const IRIAtomPtr Restriction
Definition: owl.h:20
const IRIAtomPtr imports
Definition: owl.h:15
const IRIAtomPtr AnnotationProperty
Definition: owl.h:23
const IRIAtomPtr Class
Definition: owl.h:18
const IRIAtomPtr DatatypeProperty
Definition: owl.h:22
GraphTermRule & pattern()
Definition: graph.cpp:23
FunctionRule & function()
Definition: terms.cpp:140
TermRule & term()
Definition: terms.cpp:136
const IRIAtomPtr Property
Definition: rdf.h:16
const IRIAtomPtr type
Definition: rdf.h:15
const IRIAtomPtr Class
Definition: rdfs.h:22
const IRIAtomPtr subPropertyOf
Definition: rdfs.h:16
const IRIAtomPtr subClassOf
Definition: rdfs.h:15
std::shared_ptr< TripleContainer > TripleContainerPtr
bool isSubClassOfIRI(std::string_view iri)
Definition: rdfs.cpp:9
std::shared_ptr< ReifiedTriple > ReifiedTriplePtr
Definition: ReifiedTriple.h:75
std::shared_ptr< const Bindings > BindingsPtr
Definition: Bindings.h:151
bool isTypeIRI(std::string_view iri)
Definition: rdf.cpp:9
std::shared_ptr< ThreadPool > DefaultThreadPool()
Definition: ThreadPool.cpp:19
bool isSubPropertyOfIRI(std::string_view iri)
Definition: rdfs.cpp:12
std::shared_ptr< IRIAtom > IRIAtomPtr
Definition: IRIAtom.h:57
bool isInverseOfIRI(std::string_view iri)
Definition: owl.cpp:12
std::shared_ptr< std::vector< IRIAtomPtr > > ReifiedNames
std::shared_ptr< Variable > VariablePtr
Definition: Variable.h:60
std::shared_ptr< Storage > StoragePtr
Definition: Storage.h:154