6 #include <boost/property_tree/ptree.hpp>
7 #include <boost/property_tree/json_parser.hpp>
9 #include "knowrob/storage/mongo/Pipeline.h"
10 #include "knowrob/Logger.h"
11 #include "knowrob/URI.h"
12 #include "knowrob/semweb/GraphPattern.h"
13 #include "knowrob/semweb/GraphSequence.h"
14 #include "knowrob/semweb/GraphBuiltin.h"
15 #include "knowrob/storage/mongo/MongoTerm.h"
16 #include "knowrob/storage/mongo/MongoTriplePattern.h"
21 : arrayDocument_(arrayDocument),
24 lastOperator_(nullptr) {
40 BSON_APPEND_DOCUMENT_BEGIN(&stage.
bson, stageOperatorString.data(), &stageOperator.
bson);
43 return &stageOperator.
bson;
65 std::set<std::string_view> groundedVariables;
71 std::set<std::string_view> &groundedVariables) {
80 if (!expr->isOptional()) {
81 for (
auto &
var: expr->getVariables()) {
82 groundedVariables.insert(
var->name());
104 std::set<std::string_view> &groundedVariables) {
106 for (uint32_t i = 0; i < unionTerm.
terms().size(); i++) {
107 auto branchVars = groundedVariables;
112 BSON_APPEND_UTF8(lookupStage,
"from", tripleStore.
oneCollection->name().data());
113 BSON_APPEND_UTF8(lookupStage,
"as", (
"next" + std::to_string(i)).data());
114 BSON_APPEND_DOCUMENT_BEGIN(lookupStage,
"let", &letDoc);
115 BSON_APPEND_UTF8(&letDoc,
"v_VARS",
isNested_ ?
"$$v_VARS" :
"$v_VARS");
116 bson_append_document_end(lookupStage, &letDoc);
119 bson_t pipelineArray;
120 BSON_APPEND_ARRAY_BEGIN(lookupStage,
"pipeline", &pipelineArray);
121 Pipeline nestedPipeline(&pipelineArray);
124 bson_append_array_end(lookupStage, &pipelineArray);
130 bson_t concatDoc, concatArray;
132 BSON_APPEND_DOCUMENT_BEGIN(setConcatStage,
"next", &concatDoc);
133 BSON_APPEND_ARRAY_BEGIN(&concatDoc,
"$concatArrays", &concatArray);
134 for (uint32_t i = 0; i < unionTerm.
terms().size(); i++) {
135 BSON_APPEND_UTF8(&concatArray, std::to_string(i).c_str(), (
"$next" + std::to_string(i)).data());
137 bson_append_array_end(&concatDoc, &concatArray);
138 bson_append_document_end(setConcatStage, &concatDoc);
141 for (uint32_t i = 0; i < unionTerm.
terms().size(); i++) {
142 unset(
"next" + std::to_string(i));
147 bson_t setDoc, mergeArray;
149 BSON_APPEND_DOCUMENT_BEGIN(setMergedStage,
"v_VARS", &setDoc);
150 BSON_APPEND_ARRAY_BEGIN(&setDoc,
"$mergeObjects", &mergeArray);
151 BSON_APPEND_UTF8(&mergeArray,
"0",
"$next.v_VARS");
152 BSON_APPEND_UTF8(&mergeArray,
"1",
"$v_VARS");
153 bson_append_array_end(&setDoc, &mergeArray);
154 bson_append_document_end(setMergedStage, &setDoc);
195 KB_ERROR(
"No variable to bind in $min/$max operation");
199 KB_ERROR(
"Bind operation requires one argument");
214 KB_ERROR(
"No variable to bind in $min/$max operation");
219 bson_t accumulatedDoc, inputArray;
222 BSON_APPEND_DOCUMENT_BEGIN(setStage, varKey.c_str(), &accumulatedDoc);
223 BSON_APPEND_ARRAY_BEGIN(&accumulatedDoc,
predicate.data(), &inputArray);
224 for (uint32_t i = 0; i < builtin.
arguments().size(); i++) {
227 bson_append_array_end(&accumulatedDoc, &inputArray);
228 bson_append_document_end(setStage, &accumulatedDoc);
235 KB_ERROR(
"Binary operation requires two arguments");
238 bson_t exprDoc, ltDoc;
240 BSON_APPEND_DOCUMENT_BEGIN(matchStage,
"$expr", &exprDoc);
241 BSON_APPEND_ARRAY_BEGIN(&exprDoc,
predicate.data(), <Doc);
244 bson_append_array_end(&exprDoc, <Doc);
245 bson_append_document_end(matchStage, &exprDoc);
251 BSON_APPEND_INT32(unwindStage,
"$limit", maxDocuments);
256 if (preserveNullAndEmptyArrays) {
258 BSON_APPEND_BOOL(unwindStage,
"preserveNullAndEmptyArrays", 1);
262 BSON_APPEND_UTF8(unwindStage,
"$unwind", field.data());
269 BSON_APPEND_UTF8(unwindStage,
"$unset", field.data());
275 BSON_APPEND_UTF8(unwindStage,
"newRoot", newRootField.data());
281 BSON_APPEND_INT32(sortStage, field.data(), 1);
287 BSON_APPEND_INT32(sortStage, field.data(), -1);
293 BSON_APPEND_UTF8(unwindStage,
"into", collection.data());
294 BSON_APPEND_UTF8(unwindStage,
"on",
"_id");
295 BSON_APPEND_UTF8(unwindStage,
"whenMatched",
"merge");
301 BSON_APPEND_INT32(projectStage, field.data(), 1);
307 for (
auto field: fields) {
308 BSON_APPEND_INT32(projectStage, field.data(), 1);
314 bson_t unionOperator, unionArray;
315 uint32_t numElements = 0;
317 BSON_APPEND_DOCUMENT_BEGIN(setStage, field.data(), &unionOperator);
319 BSON_APPEND_ARRAY_BEGIN(&unionOperator,
"$setUnion", &unionArray);
321 for (
auto setString: sets) {
322 auto arrayKey = std::to_string(numElements++);
323 BSON_APPEND_UTF8(&unionArray, arrayKey.c_str(), setString.data());
326 bson_append_array_end(&unionOperator, &unionArray);
328 bson_append_document_end(setStage, &unionOperator);
333 bson_t concatOperator, concatArray, concatArray1;
335 BSON_APPEND_DOCUMENT_BEGIN(setStage1, key.data(), &concatOperator);
337 BSON_APPEND_ARRAY_BEGIN(&concatOperator,
"$concatArrays", &concatArray);
339 BSON_APPEND_UTF8(&concatArray,
"0", arrayKey.data());
340 BSON_APPEND_ARRAY_BEGIN(&concatArray,
"1", &concatArray1);
342 BSON_APPEND_UTF8(&concatArray1,
"0", elementKey.data());
344 bson_append_array_end(&concatArray, &concatArray1);
346 bson_append_array_end(&concatOperator, &concatArray);
348 bson_append_document_end(setStage1, &concatOperator);
355 BSON_APPEND_ARRAY_BEGIN(matchStage, arrayKey.data(), &emptyArray);
356 bson_append_array_end(matchStage, &emptyArray);
362 while ((startPos = str.find(from, startPos)) != std::string::npos) {
363 str.replace(startPos, from.length(), to);
364 startPos += to.length();
371 boost::property_tree::ptree pt;
372 boost::property_tree::read_json(resolved, pt);
375 std::stringstream ss;
376 boost::property_tree::write_json(ss, pt);
380 for (
const auto ¶m: parameters) {
381 replaceAll(pipeline,
"${" + param.first +
"}", param.second);
386 bson_t *bson = bson_new_from_json((
const uint8_t *) pipeline.c_str(), pipeline.size(), &error);
389 KB_ERROR(
"Error loading pipeline: {}", error.message);
void replaceAll(std::string &str, const std::string &from, const std::string &to)
const auto & terms() const
static std::string resolve(const std::string_view &uriString)
static void appendWithVars(bson_t *doc, const char *key, const TermPtr &term, const char *queryOperator=nullptr, bool matchNullValue=false)
static bson_t * loadFromJSON(std::string_view filename, const std::map< std::string, std::string > ¶meters)
void setUnion(std::string_view field, const std::vector< std::string_view > &sets)
void setIsNested(bool isNested)
void replaceRoot(std::string_view newRootField)
void setAccumulated(const knowrob::GraphBuiltin &builtin, std::string_view predicate)
void appendTerm_recursive(const knowrob::GraphTerm &query, const TripleStore &tripleStore, std::set< std::string_view > &groundedVariables)
void appendStageEnd(bson_t *stage)
bson_t * appendStageBegin()
void project(std::string_view field)
void appendBuiltin(const knowrob::GraphBuiltin &builtin)
void matchBinary(const knowrob::GraphBuiltin &builtin, std::string_view predicate)
void addToArray(std::string_view key, std::string_view arrayKey, std::string_view elementKey)
void matchEmptyArray(std::string_view arrayKey)
void merge(std::string_view collection)
void appendUnion(const knowrob::GraphUnion &unionTerm, const TripleStore &tripleStore, std::set< std::string_view > &groundedVariables)
std::list< bson_wrapper > stageOperators_
void sortDescending(std::string_view field)
std::list< bson_wrapper > stages_
void append(const knowrob::GraphTerm &query, const TripleStore &tripleStore)
void unwind(std::string_view field, bool preserveNullAndEmptyArrays=false)
Pipeline(bson_t *arrayDocument=nullptr)
void unset(std::string_view field)
void limit(uint32_t maxDocuments)
void sortAscending(std::string_view field)
void bindValue(const knowrob::GraphBuiltin &builtin)
void lookupTriple(Pipeline &pipeline, const TripleStore &tripleStore, const TripleLookupData &lookupData)
std::set< std::string_view > knownGroundedVariables
bool mayHasMoreGroundings
std::shared_ptr< knowrob::mongo::Collection > oneCollection