knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
MongoKnowledgeGraph.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of KnowRob, please consult
3  * https://github.com/knowrob/knowrob for license details.
4  */
5 
6 #include "knowrob/storage/mongo/MongoKnowledgeGraph.h"
7 #include "knowrob/storage/mongo/MongoTriplePattern.h"
8 #include "knowrob/storage/mongo/TripleCursor.h"
9 #include "knowrob/storage/mongo/MongoTriple.h"
10 #include "knowrob/storage/mongo/MongoInterface.h"
11 #include "knowrob/storage/StorageManager.h"
12 #include "knowrob/semweb/GraphSequence.h"
13 #include "knowrob/semweb/rdfs.h"
14 #include "knowrob/knowrob.h"
15 #include <boost/foreach.hpp>
16 
17 #define MONGO_KG_ONE_COLLECTION "one"
18 
19 #define MONGO_KG_SETTING_HOST "host"
20 #define MONGO_KG_SETTING_PORT "port"
21 #define MONGO_KG_SETTING_USER "user"
22 #define MONGO_KG_SETTING_PASSWORD "password"
23 #define MONGO_KG_SETTING_DB "db"
24 #define MONGO_KG_SETTING_COLLECTION "collection"
25 #define MONGO_KG_SETTING_READ_ONLY "read-only"
26 #define MONGO_KG_SETTING_DROP "drop"
27 
28 #define MONGO_KG_DEFAULT_HOST "localhost"
29 #define MONGO_KG_DEFAULT_PORT "27017"
30 #define MONGO_KG_DEFAULT_DB "knowrob"
31 #define MONGO_KG_DEFAULT_COLLECTION "triples"
32 
33 #define MONGO_KG_NUM_KEEP_ALIVE 5
34 
35 #define PIPELINE_RELATION_COUNTER "storage/mongo/aggregation/relation-counter.json"
36 #define PIPELINE_CLASS_COUNTER "storage/mongo/aggregation/class-counter.json"
37 
38 using namespace knowrob;
39 using namespace knowrob::mongo;
40 using namespace knowrob::semweb;
41 
46 
47 const std::string MongoKnowledgeGraph::DB_URI_DEFAULT = "mongodb://localhost:27017";
49 const std::string MongoKnowledgeGraph::DB_NAME_TESTS = "knowrob_test";
52 
53 static inline StorageFeatures mongoBackendFeatures() {
56 }
57 
59  : QueryableStorage(mongoBackendFeatures()),
60  isReadOnly_(false) {
61 }
62 
64  : mongo(kg->acquireStore()), kg(kg) {}
65 
67  kg->releaseStore(mongo);
68 }
69 
71  // Note: We cannot use a Collection object in different threads, so instead we manage
72  // a list of active connections and hand them out to threads that need them.
73  // If no connection is available, a new one is created on the fly.
74  std::lock_guard<std::mutex> lock(storeMutex_);
75  if (connections_.empty()) {
76  auto tripleCollection = std::make_shared<Collection>(*tripleCollection_);
77  auto oneCollection = std::make_shared<Collection>(
78  tripleCollection->connection(),
79  tripleCollection->dbName().c_str(),
81  return {tripleCollection, oneCollection, vocabulary_};
82  } else {
83  auto store = connections_.front();
84  connections_.pop_front();
85  return store;
86  }
87 }
88 
90  std::lock_guard<std::mutex> lock(storeMutex_);
92  connections_.push_back(store);
93  }
94 }
95 
96 bool MongoKnowledgeGraph::initializeBackend(std::string_view db_uri, std::string_view db_name,
97  std::string_view collectionName) {
98  auto tripleCollection = connect(db_uri, db_name, collectionName);
99  if (tripleCollection) {
102  return true;
103  } else {
104  return false;
105  }
106 }
107 
109  auto ptree = config.ptree();
110  if (!ptree) {
112  if (tripleCollection) {
115  return true;
116  } else {
117  return false;
118  }
119  }
120  initializeMongo(connect(*ptree));
121 
122  // set isReadOnly_ flag
123  auto o_readOnly = ptree->get_optional<bool>(MONGO_KG_SETTING_READ_ONLY);
124  if (o_readOnly.has_value()) {
125  isReadOnly_ = o_readOnly.value();
126  }
127  if (!isReadOnly_) {
129  }
130 
131  // Auto-drop some named graphs
132  auto o_drop = ptree->get_child_optional(MONGO_KG_SETTING_DROP);
133  if (o_drop.has_value()) {
134  if (std::string_view("*") == std::string_view(o_drop.value().data())) {
135  drop();
136  tripleCollection_->createTripleIndex();
137  } else {
138  BOOST_FOREACH(const auto &v, o_drop.value()) {
139  removeAllWithOrigin(v.second.data());
140  }
141  }
142  }
143 
144  return true;
145 }
146 
147 void MongoKnowledgeGraph::initializeMongo(const std::shared_ptr<mongo::Collection> &tripleCollection) {
149  // make sure s/p/o index is defined
150  tripleCollection_->createTripleIndex();
151  // a collection with just a single document used for querying
152  oneCollection_ = std::make_shared<Collection>(
153  tripleCollection_->connection(),
154  tripleCollection_->dbName().c_str(),
156  // Make sure there is one document in the "one" collection.
157  // The collection is used to initiate a pipeline for a single input document.
158  if (oneCollection_->empty()) {
159  Document oneDoc(bson_new());
160  bson_t scopeDoc, timeDoc;
161  bson_decimal128_t infinity, zero;
162  bson_decimal128_from_string(BSON_DECIMAL128_INF, &infinity);
163  bson_decimal128_from_string("0", &zero);
164  BSON_APPEND_DOCUMENT_BEGIN(oneDoc.bson(), "v_scope", &scopeDoc);
165  BSON_APPEND_DOCUMENT_BEGIN(&scopeDoc, "time", &timeDoc);
166  BSON_APPEND_DECIMAL128(&timeDoc, "since", &zero);
167  BSON_APPEND_DECIMAL128(&timeDoc, "until", &infinity);
168  bson_append_document_end(&scopeDoc, &timeDoc);
169  bson_append_document_end(oneDoc.bson(), &scopeDoc);
170  oneCollection_->storeOne(oneDoc);
171  }
172  // Create an object used for taxonomy operations
173  taxonomy_ = std::make_shared<MongoTaxonomy>(tripleCollection_, oneCollection_, vocabulary_);
174  // Add the connection to connection list
176 }
177 
178 std::shared_ptr<Collection> MongoKnowledgeGraph::connect(
179  const std::string_view db_uri,
180  const std::string_view db_name,
181  const std::string_view collectionName) {
182  auto coll = MongoInterface::get().connect(db_uri.data(), db_name.data(), collectionName.data());
183  if (coll) {
184  KB_INFO("[mongodb] connected to {} ({}.{}).", db_uri, db_name, collectionName);
185  } else {
186  KB_ERROR("[mongodb] failed to connect to {} ({}.{}).", db_uri, db_name, collectionName);
187  }
188  return coll;
189 }
190 
191 std::shared_ptr<Collection> MongoKnowledgeGraph::connect(const boost::property_tree::ptree &config) {
192  return connect(getURI(config), getDBName(config), getCollectionName(config));
193 }
194 
195 std::string MongoKnowledgeGraph::getDBName(const boost::property_tree::ptree &config) {
196  static std::string defaultDBName = MONGO_KG_DEFAULT_DB;
197  auto o_dbname = config.get_optional<std::string>(MONGO_KG_SETTING_DB);
198  return (o_dbname ? o_dbname.value() : defaultDBName);
199 }
200 
201 std::string MongoKnowledgeGraph::getCollectionName(const boost::property_tree::ptree &config) {
202  static std::string defaultCollectionName = MONGO_KG_DEFAULT_COLLECTION;
203  auto o_collection = config.get_optional<std::string>(MONGO_KG_SETTING_COLLECTION);
204  return (o_collection ? o_collection.value() : defaultCollectionName);
205 }
206 
207 std::string MongoKnowledgeGraph::getURI(const boost::property_tree::ptree &config) {
208  auto o_host = config.get_optional<std::string>(MONGO_KG_SETTING_HOST);
209  auto o_port = config.get_optional<std::string>(MONGO_KG_SETTING_PORT);
210  auto o_user = config.get_optional<std::string>(MONGO_KG_SETTING_USER);
211  auto o_password = config.get_optional<std::string>(MONGO_KG_SETTING_PASSWORD);
212  // format URI of the form "mongodb://USER:PW@HOST:PORT"
213  std::stringstream uriStream;
214  uriStream << "mongodb://";
215  if (o_user) {
216  uriStream << o_user.value();
217  if (o_password) uriStream << ':' << o_password.value();
218  uriStream << '@';
219  }
220  uriStream
221  << (o_host ? o_host.value() : MONGO_KG_DEFAULT_HOST)
222  << ':'
223  << (o_port ? o_port.value() : MONGO_KG_DEFAULT_PORT);
224  return uriStream.str();
225 }
226 
228  ConnectionRAII scoped(this);
229  scoped.mongo.tripleCollection->drop();
230  vocabulary_ = std::make_shared<Vocabulary>();
231 }
232 
233 bool MongoKnowledgeGraph::insertOne(const Triple &tripleData) {
234  ConnectionRAII scoped(this);
235  auto &fallbackOrigin = vocabulary_->importHierarchy()->defaultGraph();
236  bool isTaxonomic = vocabulary_->isTaxonomicProperty(tripleData.predicate());
237  MongoTriple mngTriple(vocabulary_, tripleData, fallbackOrigin, isTaxonomic);
238  scoped.mongo.tripleCollection->storeOne(mngTriple.document());
239 
240  if (isSubClassOfIRI(tripleData.predicate())) {
241  taxonomy_->updateInsert({{tripleData.subject(), tripleData.valueAsString()}}, {});
242  } else if (isSubPropertyOfIRI(tripleData.predicate())) {
243  taxonomy_->updateInsert({}, {{tripleData.subject(), tripleData.valueAsString()}});
244  }
245 
246  return true;
247 }
248 
250  ConnectionRAII scoped(this);
251  // only used in case triples do not specify origin field
252  auto &fallbackOrigin = vocabulary_->importHierarchy()->defaultGraph();
253  auto bulk = scoped.mongo.tripleCollection->createBulkOperation();
254  std::vector<MongoTaxonomy::StringPair> subClassAssertions;
255  std::vector<MongoTaxonomy::StringPair> subPropertyAssertions;
256 
257  std::for_each(triples->begin(), triples->end(),
258  [&](auto &data) {
259  MongoTriple mngTriple(vocabulary_, *data, fallbackOrigin,
260  vocabulary_->isTaxonomicProperty(data->predicate().data()));
261  bulk->pushInsert(mngTriple.document().bson());
262 
263  if (isSubClassOfIRI(data->predicate())) {
264  subClassAssertions.emplace_back(data->subject(), data->valueAsString());
265  } else if (isSubPropertyOfIRI(data->predicate())) {
266  subPropertyAssertions.emplace_back(data->subject(), data->valueAsString());
267  }
268  });
269  bulk->execute();
270 
271  taxonomy_->updateInsert(subClassAssertions, subPropertyAssertions);
272 
273  return true;
274 }
275 
277  ConnectionRAII scoped(this);
278  MongoTriplePattern mngQuery(
279  TriplePattern(triple),
280  vocabulary_->isTaxonomicProperty(triple.predicate()),
281  vocabulary_->importHierarchy());
282  scoped.mongo.tripleCollection->removeOne(mngQuery.document());
283 
284  if (isSubClassOfIRI(triple.predicate())) {
285  taxonomy_->updateRemove({{triple.subject(), triple.valueAsString()}}, {});
286  } else if (isSubPropertyOfIRI(triple.predicate())) {
287  taxonomy_->updateRemove({}, {{triple.subject(), triple.valueAsString()}});
288  }
289 
290  return true;
291 }
292 
294  ConnectionRAII scoped(this);
295  auto bulk = scoped.mongo.tripleCollection->createBulkOperation();
296  std::vector<MongoTaxonomy::StringPair> subClassAssertions;
297  std::vector<MongoTaxonomy::StringPair> subPropertyAssertions;
298 
299  std::for_each(triples->begin(), triples->end(),
300  [&](auto &data) {
301  MongoTriplePattern mngQuery(
302  TriplePattern(*data),
303  vocabulary_->isTaxonomicProperty(data->predicate()),
304  vocabulary_->importHierarchy());
305  bulk->pushRemoveOne(mngQuery.bson());
306 
307  if (isSubClassOfIRI(data->predicate())) {
308  subClassAssertions.emplace_back(data->subject(), data->valueAsString());
309  } else if (isSubPropertyOfIRI(data->predicate())) {
310  subPropertyAssertions.emplace_back(data->subject(), data->valueAsString());
311  }
312  });
313  bulk->execute();
314 
315  taxonomy_->updateRemove(subClassAssertions, subPropertyAssertions);
316 
317  return true;
318 }
319 
320 bool MongoKnowledgeGraph::dropOrigin(std::string_view graphName) {
321  KB_DEBUG("[mongodb] dropping triples with origin \"{}\".", graphName);
322  ConnectionRAII scoped(this);
323  scoped.mongo.tripleCollection->removeAll(Document(
324  BCON_NEW("graph", BCON_UTF8(graphName.data()))));
325  return true;
326 }
327 
328 bool MongoKnowledgeGraph::removeAllWithOrigin(std::string_view graphName) {
329  return dropOrigin(graphName);
330 }
331 
332 void MongoKnowledgeGraph::count(const ResourceCounter &callback) const {
333  ConnectionRAII scoped(this);
334  for (auto &filename: {PIPELINE_RELATION_COUNTER, PIPELINE_CLASS_COUNTER}) {
335  const bson_t *result;
336  Cursor cursor(scoped.mongo.tripleCollection);
337  Document document(Pipeline::loadFromJSON(
338  filename, {
339  {"COLLECTION", scoped.mongo.tripleCollection->name()}
340  }));
341  cursor.aggregate(document.bson());
342  while (cursor.next(&result)) {
343  bson_iter_t iter;
344  if (!bson_iter_init(&iter, result)) break;
345  if (!bson_iter_find(&iter, "resource")) break;
346  auto property = bson_iter_utf8(&iter, nullptr);
347  if (!bson_iter_find(&iter, "count")) break;
348  auto count = bson_iter_as_int64(&iter);
349  callback(property, count);
350  }
351  }
352 }
353 
355  TripleView tripleData;
356  TriplePtr triplePtr;
357  triplePtr.ptr = &tripleData;
358  // Mongo cursor own the allocation, and the memory of a document will be deallocated by the cursor during iteration.
359  // @see https://mongoc.org/libmongoc/current/mongoc_cursor_next.html
360  // So it cannot be allowed that the visitor takes over ownership, hence owned is set to false.
361  triplePtr.owned = false;
362 
363  // iterate over matching documents
364  while (cursor.nextTriple(*triplePtr.ptr)) {
365  visitor(triplePtr);
366  }
367 }
368 
369 void MongoKnowledgeGraph::foreach(const TripleVisitor &visitor) const {
370  ConnectionRAII scoped(this);
371  TripleCursor cursor(scoped.mongo.tripleCollection);
372  iterate(cursor, visitor);
373 }
374 
375 static void
376 batch_(const std::shared_ptr<mongo::Collection> &collection, const TripleHandler &callback, bson_t *filter) {
377  TripleCursor cursor(collection);
378  if (filter) {
379  cursor.filter(filter);
380  }
381  std::vector<TriplePtr> batchData(GlobalSettings::batchSize());
382  uint32_t currentSize = 0;
383 
384  while (true) {
385  auto &current = batchData[currentSize];
386  if (current.ptr && current.owned) {
387  delete current.ptr;
388  }
389  current.ptr = new TripleCopy();
390  current.owned = true;
391 
392  if (!cursor.nextTriple(*current.ptr)) {
393  break;
394  }
395  currentSize++;
396  if (currentSize == GlobalSettings::batchSize()) {
397  auto batch = std::make_shared<ProxyTripleContainer>(&batchData);
398  callback(batch);
399  currentSize = 0;
400  }
401  }
402  if (currentSize != 0) {
403  batchData.resize(currentSize);
404  auto batch = std::make_shared<ProxyTripleContainer>(&batchData);
405  callback(batch);
406  }
407 }
408 
409 void MongoKnowledgeGraph::batch(const TripleHandler &callback) const {
410  ConnectionRAII scoped(this);
411  batch_(scoped.mongo.tripleCollection, callback, nullptr);
412 }
413 
414 void MongoKnowledgeGraph::batchOrigin(std::string_view origin, const TripleHandler &callback) {
415  ConnectionRAII scoped(this);
416  bson_t filterDoc;
417  BSON_APPEND_UTF8(&filterDoc, "graph", origin.data());
418  batch_(scoped.mongo.tripleCollection, callback, &filterDoc);
419 }
420 
421 void MongoKnowledgeGraph::match(const TriplePattern &query, const TripleVisitor &visitor) {
422  ConnectionRAII scoped(this);
423  bool b_isTaxonomicProperty;
424  if (query.propertyTerm()->termType() == TermType::ATOMIC) {
425  b_isTaxonomicProperty = vocabulary_->isTaxonomicProperty(((Atomic *) query.propertyTerm().get())->stringForm());
426  } else {
427  b_isTaxonomicProperty = false;
428  }
429  TripleCursor cursor(scoped.mongo.tripleCollection);
430  // filter documents by triple pattern
431  MongoTriplePattern mngQuery(query, b_isTaxonomicProperty, vocabulary_->importHierarchy());
432  cursor.filter(mngQuery.bson());
433  iterate(cursor, visitor);
434 }
435 
436 template<typename T>
437 static inline BindingsCursorPtr doLookup(const T &query, const TripleStore &store) {
438  auto pipelineDoc = bson_new();
439  bson_t pipelineArray;
440  BSON_APPEND_ARRAY_BEGIN(pipelineDoc, "pipeline", &pipelineArray);
441  Pipeline pipeline(&pipelineArray);
442  pipeline.append(query, store);
443  bson_append_array_end(pipelineDoc, &pipelineArray);
444 
445  auto cursor = std::make_shared<BindingsCursor>(store.oneCollection);
446  cursor->aggregate(pipelineDoc);
447  bson_destroy(pipelineDoc);
448  return cursor;
449 }
450 
452  ConnectionRAII scoped(this);
453  return doLookup(query, scoped.mongo);
454 }
455 
457  ConnectionRAII scoped(this);
458  return doLookup(query, scoped.mongo);
459 }
460 
462  const bool onlyOneSol = (q->ctx()->queryFlags & QUERY_FLAG_ONE_SOLUTION);
463  ConnectionRAII scoped(this);
464  BindingsCursorPtr cursor = doLookup(*q->term(), scoped.mongo);
465  // NOTE: for some reason below causes a cursor error. looks like a bug in libmongoc to me!
466  //if(query->flags() & QUERY_FLAG_ONE_SOLUTION) { cursor->limit(1); }
467 
468  while (true) {
469  auto next = std::make_shared<Bindings>();
470  if (cursor->nextBindings(next)) callback(next);
471  else break;
472  if (onlyOneSol) break;
473  }
474 }
#define MONGO_KG_SETTING_COLLECTION
#define PIPELINE_CLASS_COUNTER
#define MONGO_KG_DEFAULT_PORT
#define MONGO_KG_SETTING_DROP
#define MONGO_KG_DEFAULT_HOST
#define PIPELINE_RELATION_COUNTER
#define MONGO_KG_SETTING_USER
#define MONGO_KG_DEFAULT_DB
#define MONGO_KG_ONE_COLLECTION
#define MONGO_KG_DEFAULT_COLLECTION
#define MONGO_KG_SETTING_READ_ONLY
#define MONGO_KG_SETTING_DB
#define MONGO_KG_SETTING_PASSWORD
#define MONGO_KG_SETTING_HOST
#define MONGO_KG_NUM_KEEP_ALIVE
#define MONGO_KG_SETTING_PORT
#define KB_DEBUG
Definition: Logger.h:25
#define KB_INFO
Definition: Logger.h:26
#define KB_ERROR
Definition: Logger.h:28
#define KNOWROB_BUILTIN_STORAGE(Name, Type)
static uint32_t batchSize()
Definition: knowrob.h:68
void batchOrigin(std::string_view origin, const TripleHandler &callback) override
void initializeMongo(const std::shared_ptr< mongo::Collection > &tripleCollection)
static const std::string COLL_NAME_TRIPLES
bool insertAll(const TripleContainerPtr &triples) override
mongo::TripleStore acquireStore() const
std::shared_ptr< mongo::Collection > tripleCollection_
bool removeOne(const Triple &triple) override
static std::shared_ptr< mongo::Collection > connect(const boost::property_tree::ptree &config)
static std::string getURI(const boost::property_tree::ptree &config)
bool removeAll(const TripleContainerPtr &triples) override
static const std::string DB_NAME_KNOWROB
void match(const TriplePattern &query, const TripleVisitor &visitor) override
bool dropOrigin(std::string_view origin)
mongo::BindingsCursorPtr lookup(const TriplePattern &query)
static const std::string COLL_NAME_TESTS
void batch(const TripleHandler &callback) const override
void releaseStore(mongo::TripleStore &store) const
static const std::string DB_NAME_TESTS
static std::string getCollectionName(const boost::property_tree::ptree &config)
void count(const ResourceCounter &callback) const override
static std::string getDBName(const boost::property_tree::ptree &config)
bool initializeBackend(std::string_view db_uri, std::string_view db_name="knowrob", std::string_view collectionName="triples")
void query(const GraphQueryPtr &query, const BindingsHandler &callback) override
bool insertOne(const Triple &triple) override
static const std::string DB_URI_DEFAULT
bool removeAllWithOrigin(std::string_view origin) override
static void iterate(mongo::TripleCursor &cursor, const TripleVisitor &visitor)
std::shared_ptr< mongo::Collection > oneCollection_
std::shared_ptr< mongo::MongoTaxonomy > taxonomy_
void foreach(const TripleVisitor &callback) const override
std::list< mongo::TripleStore > connections_
auto ptree() const
Definition: PropertyTree.h:88
std::shared_ptr< Vocabulary > vocabulary_
Definition: Storage.h:143
virtual std::string_view valueAsString() const =0
virtual std::string_view subject() const =0
virtual std::string_view predicate() const =0
void filter(const bson_t *query_doc)
Definition: Cursor.cpp:58
bool next(const bson_t **doc, bool ignore_empty=false)
Definition: Cursor.cpp:67
void aggregate(const bson_t *query_doc)
Definition: Cursor.cpp:62
bson_t * bson() const
Definition: Document.h:27
bool nextTriple(Triple &tripleData, const bson_oid_t **tripleOID)
std::shared_ptr< BindingsCursor > BindingsCursorPtr
TermRule & string()
Definition: terms.cpp:63
std::function< void(const TriplePtr &)> TripleVisitor
StorageFeature
Definition: Storage.h:24
TripleTemplate< std::string > TripleCopy
Definition: Triple.h:577
std::shared_ptr< TripleContainer > TripleContainerPtr
bool isSubClassOfIRI(std::string_view iri)
Definition: rdfs.cpp:9
std::function< void(std::string_view, uint64_t)> ResourceCounter
@ QUERY_FLAG_ONE_SOLUTION
Definition: QueryFlag.h:17
std::function< void(const BindingsPtr &)> BindingsHandler
Definition: Bindings.h:152
std::function< void(const TripleContainerPtr &)> TripleHandler
bool isSubPropertyOfIRI(std::string_view iri)
Definition: rdfs.cpp:12
std::shared_ptr< GraphQuery > GraphQueryPtr
Definition: GraphQuery.h:65
Triple * ptr
Definition: Triple.h:590
std::shared_ptr< knowrob::mongo::Collection > oneCollection
Definition: TripleStore.h:28
std::shared_ptr< knowrob::mongo::Collection > tripleCollection
Definition: TripleStore.h:27