6 #include "knowrob/storage/mongo/MongoKnowledgeGraph.h"
7 #include "knowrob/storage/mongo/MongoTriplePattern.h"
8 #include "knowrob/storage/mongo/TripleCursor.h"
9 #include "knowrob/storage/mongo/MongoTriple.h"
10 #include "knowrob/storage/mongo/MongoInterface.h"
11 #include "knowrob/storage/StorageManager.h"
12 #include "knowrob/semweb/GraphSequence.h"
13 #include "knowrob/semweb/rdfs.h"
14 #include "knowrob/knowrob.h"
15 #include <boost/foreach.hpp>
17 #define MONGO_KG_ONE_COLLECTION "one"
19 #define MONGO_KG_SETTING_HOST "host"
20 #define MONGO_KG_SETTING_PORT "port"
21 #define MONGO_KG_SETTING_USER "user"
22 #define MONGO_KG_SETTING_PASSWORD "password"
23 #define MONGO_KG_SETTING_DB "db"
24 #define MONGO_KG_SETTING_COLLECTION "collection"
25 #define MONGO_KG_SETTING_READ_ONLY "read-only"
26 #define MONGO_KG_SETTING_DROP "drop"
28 #define MONGO_KG_DEFAULT_HOST "localhost"
29 #define MONGO_KG_DEFAULT_PORT "27017"
30 #define MONGO_KG_DEFAULT_DB "knowrob"
31 #define MONGO_KG_DEFAULT_COLLECTION "triples"
33 #define MONGO_KG_NUM_KEEP_ALIVE 5
35 #define PIPELINE_RELATION_COUNTER "storage/mongo/aggregation/relation-counter.json"
36 #define PIPELINE_CLASS_COUNTER "storage/mongo/aggregation/class-counter.json"
64 : mongo(kg->acquireStore()), kg(kg) {}
67 kg->releaseStore(mongo);
77 auto oneCollection = std::make_shared<Collection>(
97 std::string_view collectionName) {
109 auto ptree = config.
ptree();
124 if (o_readOnly.has_value()) {
133 if (o_drop.has_value()) {
134 if (std::string_view(
"*") == std::string_view(o_drop.value().data())) {
138 BOOST_FOREACH(
const auto &v, o_drop.value()) {
160 bson_t scopeDoc, timeDoc;
161 bson_decimal128_t infinity, zero;
162 bson_decimal128_from_string(BSON_DECIMAL128_INF, &infinity);
163 bson_decimal128_from_string(
"0", &zero);
164 BSON_APPEND_DOCUMENT_BEGIN(oneDoc.
bson(),
"v_scope", &scopeDoc);
165 BSON_APPEND_DOCUMENT_BEGIN(&scopeDoc,
"time", &timeDoc);
166 BSON_APPEND_DECIMAL128(&timeDoc,
"since", &zero);
167 BSON_APPEND_DECIMAL128(&timeDoc,
"until", &infinity);
168 bson_append_document_end(&scopeDoc, &timeDoc);
169 bson_append_document_end(oneDoc.
bson(), &scopeDoc);
179 const std::string_view db_uri,
180 const std::string_view db_name,
181 const std::string_view collectionName) {
182 auto coll = MongoInterface::get().connect(db_uri.data(), db_name.data(), collectionName.data());
184 KB_INFO(
"[mongodb] connected to {} ({}.{}).", db_uri, db_name, collectionName);
186 KB_ERROR(
"[mongodb] failed to connect to {} ({}.{}).", db_uri, db_name, collectionName);
198 return (o_dbname ? o_dbname.value() : defaultDBName);
204 return (o_collection ? o_collection.value() : defaultCollectionName);
213 std::stringstream uriStream;
214 uriStream <<
"mongodb://";
216 uriStream << o_user.value();
217 if (o_password) uriStream <<
':' << o_password.value();
224 return uriStream.str();
235 auto &fallbackOrigin =
vocabulary_->importHierarchy()->defaultGraph();
252 auto &fallbackOrigin =
vocabulary_->importHierarchy()->defaultGraph();
254 std::vector<MongoTaxonomy::StringPair> subClassAssertions;
255 std::vector<MongoTaxonomy::StringPair> subPropertyAssertions;
257 std::for_each(triples->begin(), triples->end(),
259 MongoTriple mngTriple(vocabulary_, *data, fallbackOrigin,
260 vocabulary_->isTaxonomicProperty(data->predicate().data()));
261 bulk->pushInsert(mngTriple.document().bson());
263 if (isSubClassOfIRI(data->predicate())) {
264 subClassAssertions.emplace_back(data->subject(), data->valueAsString());
266 subPropertyAssertions.emplace_back(data->subject(), data->valueAsString());
271 taxonomy_->updateInsert(subClassAssertions, subPropertyAssertions);
296 std::vector<MongoTaxonomy::StringPair> subClassAssertions;
297 std::vector<MongoTaxonomy::StringPair> subPropertyAssertions;
299 std::for_each(triples->begin(), triples->end(),
301 MongoTriplePattern mngQuery(
302 TriplePattern(*data),
303 vocabulary_->isTaxonomicProperty(data->predicate()),
304 vocabulary_->importHierarchy());
305 bulk->pushRemoveOne(mngQuery.bson());
307 if (isSubClassOfIRI(data->predicate())) {
308 subClassAssertions.emplace_back(data->subject(), data->valueAsString());
310 subPropertyAssertions.emplace_back(data->subject(), data->valueAsString());
315 taxonomy_->updateRemove(subClassAssertions, subPropertyAssertions);
320 bool MongoKnowledgeGraph::dropOrigin(std::string_view graphName) {
321 KB_DEBUG(
"[mongodb] dropping triples with origin \"{}\".", graphName);
324 BCON_NEW(
"graph", BCON_UTF8(graphName.data()))));
335 const bson_t *result;
337 Document document(Pipeline::loadFromJSON(
342 while (cursor.
next(&result)) {
344 if (!bson_iter_init(&iter, result))
break;
345 if (!bson_iter_find(&iter,
"resource"))
break;
346 auto property = bson_iter_utf8(&iter,
nullptr);
347 if (!bson_iter_find(&iter,
"count"))
break;
348 auto count = bson_iter_as_int64(&iter);
349 callback(property,
count);
357 triplePtr.
ptr = &tripleData;
361 triplePtr.
owned =
false;
376 batch_(
const std::shared_ptr<mongo::Collection> &collection,
const TripleHandler &callback, bson_t *filter) {
379 cursor.filter(filter);
382 uint32_t currentSize = 0;
385 auto ¤t = batchData[currentSize];
386 if (current.ptr && current.owned) {
390 current.owned =
true;
392 if (!cursor.nextTriple(*current.ptr)) {
397 auto batch = std::make_shared<ProxyTripleContainer>(&batchData);
402 if (currentSize != 0) {
403 batchData.resize(currentSize);
404 auto batch = std::make_shared<ProxyTripleContainer>(&batchData);
417 BSON_APPEND_UTF8(&filterDoc,
"graph", origin.data());
423 bool b_isTaxonomicProperty;
425 b_isTaxonomicProperty =
vocabulary_->isTaxonomicProperty(((
Atomic *)
query.propertyTerm().get())->stringForm());
427 b_isTaxonomicProperty =
false;
438 auto pipelineDoc = bson_new();
439 bson_t pipelineArray;
440 BSON_APPEND_ARRAY_BEGIN(pipelineDoc,
"pipeline", &pipelineArray);
442 pipeline.append(query, store);
443 bson_append_array_end(pipelineDoc, &pipelineArray);
445 auto cursor = std::make_shared<BindingsCursor>(store.
oneCollection);
446 cursor->aggregate(pipelineDoc);
447 bson_destroy(pipelineDoc);
469 auto next = std::make_shared<Bindings>();
470 if (cursor->nextBindings(next)) callback(next);
472 if (onlyOneSol)
break;
#define MONGO_KG_SETTING_COLLECTION
#define PIPELINE_CLASS_COUNTER
#define MONGO_KG_DEFAULT_PORT
#define MONGO_KG_SETTING_DROP
#define MONGO_KG_DEFAULT_HOST
#define PIPELINE_RELATION_COUNTER
#define MONGO_KG_SETTING_USER
#define MONGO_KG_DEFAULT_DB
#define MONGO_KG_ONE_COLLECTION
#define MONGO_KG_DEFAULT_COLLECTION
#define MONGO_KG_SETTING_READ_ONLY
#define MONGO_KG_SETTING_DB
#define MONGO_KG_SETTING_PASSWORD
#define MONGO_KG_SETTING_HOST
#define MONGO_KG_NUM_KEEP_ALIVE
#define MONGO_KG_SETTING_PORT
#define KNOWROB_BUILTIN_STORAGE(Name, Type)
static uint32_t batchSize()
ConnectionRAII(const MongoKnowledgeGraph *kg)
void batchOrigin(std::string_view origin, const TripleHandler &callback) override
void initializeMongo(const std::shared_ptr< mongo::Collection > &tripleCollection)
static const std::string COLL_NAME_TRIPLES
bool insertAll(const TripleContainerPtr &triples) override
mongo::TripleStore acquireStore() const
std::shared_ptr< mongo::Collection > tripleCollection_
bool removeOne(const Triple &triple) override
static std::shared_ptr< mongo::Collection > connect(const boost::property_tree::ptree &config)
static std::string getURI(const boost::property_tree::ptree &config)
bool removeAll(const TripleContainerPtr &triples) override
static const std::string DB_NAME_KNOWROB
void match(const TriplePattern &query, const TripleVisitor &visitor) override
bool dropOrigin(std::string_view origin)
mongo::BindingsCursorPtr lookup(const TriplePattern &query)
static const std::string COLL_NAME_TESTS
void batch(const TripleHandler &callback) const override
void releaseStore(mongo::TripleStore &store) const
static const std::string DB_NAME_TESTS
static std::string getCollectionName(const boost::property_tree::ptree &config)
void count(const ResourceCounter &callback) const override
static std::string getDBName(const boost::property_tree::ptree &config)
bool initializeBackend(std::string_view db_uri, std::string_view db_name="knowrob", std::string_view collectionName="triples")
void query(const GraphQueryPtr &query, const BindingsHandler &callback) override
bool insertOne(const Triple &triple) override
static const std::string DB_URI_DEFAULT
bool removeAllWithOrigin(std::string_view origin) override
static void iterate(mongo::TripleCursor &cursor, const TripleVisitor &visitor)
std::shared_ptr< mongo::Collection > oneCollection_
std::shared_ptr< mongo::MongoTaxonomy > taxonomy_
void foreach(const TripleVisitor &callback) const override
std::list< mongo::TripleStore > connections_
void dropSessionOrigins()
std::shared_ptr< Vocabulary > vocabulary_
virtual std::string_view valueAsString() const =0
virtual std::string_view subject() const =0
virtual std::string_view predicate() const =0
void filter(const bson_t *query_doc)
bool next(const bson_t **doc, bool ignore_empty=false)
void aggregate(const bson_t *query_doc)
bool nextTriple(Triple &tripleData, const bson_oid_t **tripleOID)
std::shared_ptr< BindingsCursor > BindingsCursorPtr
std::function< void(const TriplePtr &)> TripleVisitor
TripleTemplate< std::string > TripleCopy
std::shared_ptr< TripleContainer > TripleContainerPtr
bool isSubClassOfIRI(std::string_view iri)
std::function< void(std::string_view, uint64_t)> ResourceCounter
@ QUERY_FLAG_ONE_SOLUTION
std::function< void(const BindingsPtr &)> BindingsHandler
std::function< void(const TripleContainerPtr &)> TripleHandler
bool isSubPropertyOfIRI(std::string_view iri)
std::shared_ptr< GraphQuery > GraphQueryPtr
std::shared_ptr< knowrob::mongo::Collection > oneCollection
std::shared_ptr< knowrob::mongo::Collection > tripleCollection