knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
Pipeline.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of KnowRob, please consult
3  * https://github.com/knowrob/knowrob for license details.
4  */
5 
6 #include <boost/property_tree/ptree.hpp>
7 #include <boost/property_tree/json_parser.hpp>
8 #include "string"
9 #include "knowrob/storage/mongo/Pipeline.h"
10 #include "knowrob/Logger.h"
11 #include "knowrob/URI.h"
12 #include "knowrob/semweb/GraphPattern.h"
13 #include "knowrob/semweb/GraphSequence.h"
14 #include "knowrob/semweb/GraphBuiltin.h"
15 #include "knowrob/storage/mongo/MongoTerm.h"
16 #include "knowrob/storage/mongo/MongoTriplePattern.h"
17 
18 using namespace knowrob::mongo;
19 
20 Pipeline::Pipeline(bson_t *arrayDocument)
21  : arrayDocument_(arrayDocument),
22  numStages_(0),
23  lastStage_(nullptr),
24  lastOperator_(nullptr) {
25 }
26 
28  auto arrayKey = std::to_string(numStages_++);
29  bson_wrapper &stage = stages_.emplace_back();
30  BSON_APPEND_DOCUMENT_BEGIN(arrayDocument_, arrayKey.c_str(), &stage.bson);
31  lastStage_ = &stage.bson;
32  return &stage.bson;
33 }
34 
35 bson_t *Pipeline::appendStageBegin(std::string_view stageOperatorString) {
36  auto arrayKey = std::to_string(numStages_++);
37  bson_wrapper &stage = stages_.emplace_back();
38  bson_wrapper &stageOperator = stageOperators_.emplace_back();
39  BSON_APPEND_DOCUMENT_BEGIN(arrayDocument_, arrayKey.c_str(), &stage.bson);
40  BSON_APPEND_DOCUMENT_BEGIN(&stage.bson, stageOperatorString.data(), &stageOperator.bson);
41  lastOperator_ = &stageOperator.bson;
42  lastStage_ = &stage.bson;
43  return &stageOperator.bson;
44 }
45 
46 void Pipeline::appendStageEnd(bson_t *stage) {
47  if (lastOperator_ == stage) {
48  bson_append_document_end(lastStage_, lastOperator_);
49  bson_append_document_end(arrayDocument_, lastStage_);
50  } else {
51  bson_append_document_end(arrayDocument_, stage);
52  }
53 }
54 
55 void Pipeline::append(const knowrob::TriplePattern &query, const TripleStore &tripleStore) {
56  // append lookup stages to pipeline
57  TripleLookupData lookupData(&query);
58  // indicate that no variables in tripleExpression may have been instantiated
59  // by a previous step to allow for some optimizations.
60  lookupData.mayHasMoreGroundings = false;
61  lookupTriple(*this, tripleStore, lookupData);
62 }
63 
64 void Pipeline::append(const knowrob::GraphTerm &query, const TripleStore &tripleStore) {
65  std::set<std::string_view> groundedVariables;
66  appendTerm_recursive(query, tripleStore, groundedVariables);
67 }
68 
70  const TripleStore &tripleStore,
71  std::set<std::string_view> &groundedVariables) {
72  switch (query.termType()) {
74  auto &expr = ((const GraphPattern &) query).value();
75  TripleLookupData lookupData(expr.get());
76  // indicate that all previous groundings of variables are known
77  lookupData.mayHasMoreGroundings = false;
78  lookupData.knownGroundedVariables = groundedVariables;
79  // remember variables in tripleExpression, they have a grounding in next step
80  if (!expr->isOptional()) {
81  for (auto &var: expr->getVariables()) {
82  groundedVariables.insert(var->name());
83  }
84  }
85  lookupTriple(*this, tripleStore, lookupData);
86  break;
87  }
89  appendBuiltin((const knowrob::GraphBuiltin &) query);
90  break;
92  for (auto &elem: ((const knowrob::GraphSequence &) query).terms()) {
93  appendTerm_recursive(*elem, tripleStore, groundedVariables);
94  }
95  break;
97  appendUnion((const knowrob::GraphUnion &) query, tripleStore, groundedVariables);
98  break;
99  }
100 }
101 
103  const TripleStore &tripleStore,
104  std::set<std::string_view> &groundedVariables) {
105  // First run a $lookup operation for each branch of the union.
106  for (uint32_t i = 0; i < unionTerm.terms().size(); i++) {
107  auto branchVars = groundedVariables;
108 
109  // construct a lookup
110  bson_t letDoc;
111  auto lookupStage = appendStageBegin("$lookup");
112  BSON_APPEND_UTF8(lookupStage, "from", tripleStore.oneCollection->name().data());
113  BSON_APPEND_UTF8(lookupStage, "as", ("next" + std::to_string(i)).data());
114  BSON_APPEND_DOCUMENT_BEGIN(lookupStage, "let", &letDoc);
115  BSON_APPEND_UTF8(&letDoc, "v_VARS", isNested_ ? "$$v_VARS" : "$v_VARS");
116  bson_append_document_end(lookupStage, &letDoc);
117 
118  // construct inner pipelines, one for each branch of the union
119  bson_t pipelineArray;
120  BSON_APPEND_ARRAY_BEGIN(lookupStage, "pipeline", &pipelineArray);
121  Pipeline nestedPipeline(&pipelineArray);
122  nestedPipeline.setIsNested(true);
123  nestedPipeline.appendTerm_recursive(*unionTerm.terms()[i], tripleStore, branchVars);
124  bson_append_array_end(lookupStage, &pipelineArray);
125 
126  appendStageEnd(lookupStage);
127  }
128 
129  // concatenate individual results
130  bson_t concatDoc, concatArray;
131  auto setConcatStage = appendStageBegin("$set");
132  BSON_APPEND_DOCUMENT_BEGIN(setConcatStage, "next", &concatDoc);
133  BSON_APPEND_ARRAY_BEGIN(&concatDoc, "$concatArrays", &concatArray);
134  for (uint32_t i = 0; i < unionTerm.terms().size(); i++) {
135  BSON_APPEND_UTF8(&concatArray, std::to_string(i).c_str(), ("$next" + std::to_string(i)).data());
136  }
137  bson_append_array_end(&concatDoc, &concatArray);
138  bson_append_document_end(setConcatStage, &concatDoc);
139  appendStageEnd(setConcatStage);
140  // delete individual results
141  for (uint32_t i = 0; i < unionTerm.terms().size(); i++) {
142  unset("next" + std::to_string(i));
143  }
144  // unwind the concatenated array
145  unwind("$next");
146  // project the bindings of one of the branches into v_VARS field
147  bson_t setDoc, mergeArray;
148  auto setMergedStage = appendStageBegin("$set");
149  BSON_APPEND_DOCUMENT_BEGIN(setMergedStage, "v_VARS", &setDoc);
150  BSON_APPEND_ARRAY_BEGIN(&setDoc, "$mergeObjects", &mergeArray);
151  BSON_APPEND_UTF8(&mergeArray, "0", "$next.v_VARS");
152  BSON_APPEND_UTF8(&mergeArray, "1", "$v_VARS");
153  bson_append_array_end(&setDoc, &mergeArray);
154  bson_append_document_end(setMergedStage, &setDoc);
155  appendStageEnd(setMergedStage);
156  // and finally unset the next field
157  unset("next");
158 }
159 
161  switch (builtin.builtinType()) {
163  bindValue(builtin);
164  break;
166  setAccumulated(builtin, "$max");
167  break;
169  setAccumulated(builtin, "$min");
170  break;
172  matchBinary(builtin, "$lte");
173  break;
175  matchBinary(builtin, "$lt");
176  break;
178  matchBinary(builtin, "$gt");
179  break;
181  matchBinary(builtin, "$gte");
182  break;
184  matchBinary(builtin, "$eq");
185  break;
187  matchBinary(builtin, "$ne");
188  break;
189  }
190 }
191 
193  // e.g. `{ $set: { "begin": "$next.begin" } }`
194  if (!builtin.bindVar()) {
195  KB_ERROR("No variable to bind in $min/$max operation");
196  return;
197  }
198  if (builtin.arguments().size() != 1) {
199  KB_ERROR("Bind operation requires one argument");
200  return;
201  }
202  static const std::string varPrefix = "v_VARS.";
203  static const std::string varSuffix = ".val";
204  auto setStage = appendStageBegin("$set");
205  auto varKey = varPrefix + std::string(builtin.bindVar()->name()) + varSuffix;
206  MongoTerm::appendWithVars(setStage, varKey.c_str(), builtin.arguments()[0]);
207  appendStageEnd(setStage);
208 }
209 
210 void Pipeline::setAccumulated(const knowrob::GraphBuiltin &builtin, std::string_view predicate) {
211  // e.g. `{ $set: "begin", { $max: ["$begin", "$next.begin"] } }`
212  // NOTE: `$min [null,2]` -> 2 and `$max [null,2]` -> 2
213  if (!builtin.bindVar()) {
214  KB_ERROR("No variable to bind in $min/$max operation");
215  return;
216  }
217  static const std::string varPrefix = "v_VARS.";
218  static const std::string varSuffix = ".val";
219  bson_t accumulatedDoc, inputArray;
220  auto setStage = appendStageBegin("$set");
221  auto varKey = varPrefix + std::string(builtin.bindVar()->name()) + varSuffix;
222  BSON_APPEND_DOCUMENT_BEGIN(setStage, varKey.c_str(), &accumulatedDoc);
223  BSON_APPEND_ARRAY_BEGIN(&accumulatedDoc, predicate.data(), &inputArray);
224  for (uint32_t i = 0; i < builtin.arguments().size(); i++) {
225  MongoTerm::appendWithVars(&inputArray, std::to_string(i).c_str(), builtin.arguments()[i]);
226  }
227  bson_append_array_end(&accumulatedDoc, &inputArray);
228  bson_append_document_end(setStage, &accumulatedDoc);
229  appendStageEnd(setStage);
230 }
231 
232 void Pipeline::matchBinary(const knowrob::GraphBuiltin &builtin, std::string_view predicate) {
233  // e.g.: `{ $match: { $expr: { $lte: ["$v_scope.begin", "$v_scope.end"] } } }`
234  if (builtin.arguments().size() != 2) {
235  KB_ERROR("Binary operation requires two arguments");
236  return;
237  }
238  bson_t exprDoc, ltDoc;
239  auto matchStage = appendStageBegin("$match");
240  BSON_APPEND_DOCUMENT_BEGIN(matchStage, "$expr", &exprDoc);
241  BSON_APPEND_ARRAY_BEGIN(&exprDoc, predicate.data(), &ltDoc);
242  MongoTerm::appendWithVars(&ltDoc, "0", builtin.arguments()[0]);
243  MongoTerm::appendWithVars(&ltDoc, "1", builtin.arguments()[1]);
244  bson_append_array_end(&exprDoc, &ltDoc);
245  bson_append_document_end(matchStage, &exprDoc);
246  appendStageEnd(matchStage);
247 }
248 
249 void Pipeline::limit(uint32_t maxDocuments) {
250  auto unwindStage = appendStageBegin();
251  BSON_APPEND_INT32(unwindStage, "$limit", maxDocuments);
252  appendStageEnd(unwindStage);
253 }
254 
255 void Pipeline::unwind(std::string_view field, bool preserveNullAndEmptyArrays) {
256  if (preserveNullAndEmptyArrays) {
257  auto unwindStage = appendStageBegin("$unwind");
258  BSON_APPEND_BOOL(unwindStage, "preserveNullAndEmptyArrays", 1);
259  appendStageEnd(unwindStage);
260  } else {
261  auto unwindStage = appendStageBegin();
262  BSON_APPEND_UTF8(unwindStage, "$unwind", field.data());
263  appendStageEnd(unwindStage);
264  }
265 }
266 
267 void Pipeline::unset(std::string_view field) {
268  auto unwindStage = appendStageBegin();
269  BSON_APPEND_UTF8(unwindStage, "$unset", field.data());
270  appendStageEnd(unwindStage);
271 }
272 
273 void Pipeline::replaceRoot(std::string_view newRootField) {
274  auto unwindStage = appendStageBegin("$replaceRoot");
275  BSON_APPEND_UTF8(unwindStage, "newRoot", newRootField.data());
276  appendStageEnd(unwindStage);
277 }
278 
279 void Pipeline::sortAscending(std::string_view field) {
280  auto sortStage = appendStageBegin("$sort");
281  BSON_APPEND_INT32(sortStage, field.data(), 1);
282  appendStageEnd(sortStage);
283 }
284 
285 void Pipeline::sortDescending(std::string_view field) {
286  auto sortStage = appendStageBegin("$sort");
287  BSON_APPEND_INT32(sortStage, field.data(), -1);
288  appendStageEnd(sortStage);
289 }
290 
291 void Pipeline::merge(std::string_view collection) {
292  auto unwindStage = appendStageBegin("$merge");
293  BSON_APPEND_UTF8(unwindStage, "into", collection.data());
294  BSON_APPEND_UTF8(unwindStage, "on", "_id");
295  BSON_APPEND_UTF8(unwindStage, "whenMatched", "merge");
296  appendStageEnd(unwindStage);
297 }
298 
299 void Pipeline::project(std::string_view field) {
300  auto projectStage = appendStageBegin("$project");
301  BSON_APPEND_INT32(projectStage, field.data(), 1);
302  appendStageEnd(projectStage);
303 }
304 
305 void Pipeline::project(const std::vector<std::string_view> &fields) {
306  auto projectStage = appendStageBegin("$project");
307  for (auto field: fields) {
308  BSON_APPEND_INT32(projectStage, field.data(), 1);
309  }
310  appendStageEnd(projectStage);
311 }
312 
313 void Pipeline::setUnion(std::string_view field, const std::vector<std::string_view> &sets) {
314  bson_t unionOperator, unionArray;
315  uint32_t numElements = 0;
316  auto setStage = appendStageBegin("$set");
317  BSON_APPEND_DOCUMENT_BEGIN(setStage, field.data(), &unionOperator);
318  {
319  BSON_APPEND_ARRAY_BEGIN(&unionOperator, "$setUnion", &unionArray);
320  {
321  for (auto setString: sets) {
322  auto arrayKey = std::to_string(numElements++);
323  BSON_APPEND_UTF8(&unionArray, arrayKey.c_str(), setString.data());
324  }
325  }
326  bson_append_array_end(&unionOperator, &unionArray);
327  }
328  bson_append_document_end(setStage, &unionOperator);
329  appendStageEnd(setStage);
330 }
331 
332 void Pipeline::addToArray(std::string_view key, std::string_view arrayKey, std::string_view elementKey) {
333  bson_t concatOperator, concatArray, concatArray1;
334  auto setStage1 = appendStageBegin("$set");
335  BSON_APPEND_DOCUMENT_BEGIN(setStage1, key.data(), &concatOperator);
336  {
337  BSON_APPEND_ARRAY_BEGIN(&concatOperator, "$concatArrays", &concatArray);
338  {
339  BSON_APPEND_UTF8(&concatArray, "0", arrayKey.data());
340  BSON_APPEND_ARRAY_BEGIN(&concatArray, "1", &concatArray1);
341  {
342  BSON_APPEND_UTF8(&concatArray1, "0", elementKey.data());
343  }
344  bson_append_array_end(&concatArray, &concatArray1);
345  }
346  bson_append_array_end(&concatOperator, &concatArray);
347  }
348  bson_append_document_end(setStage1, &concatOperator);
349  appendStageEnd(setStage1);
350 }
351 
352 void Pipeline::matchEmptyArray(std::string_view arrayKey) {
353  bson_t emptyArray;
354  auto matchStage = appendStageBegin("$match");
355  BSON_APPEND_ARRAY_BEGIN(matchStage, arrayKey.data(), &emptyArray);
356  bson_append_array_end(matchStage, &emptyArray);
357  appendStageEnd(matchStage);
358 }
359 
360 void replaceAll(std::string &str, const std::string &from, const std::string &to) {
361  size_t startPos = 0;
362  while ((startPos = str.find(from, startPos)) != std::string::npos) {
363  str.replace(startPos, from.length(), to);
364  startPos += to.length(); // Handles case where 'to' is a substring of 'from'
365  }
366 }
367 
368 bson_t *Pipeline::loadFromJSON(std::string_view filename, const std::map<std::string, std::string> &parameters) {
369  auto resolved = URI::resolve(filename);
370  // Load JSON file
371  boost::property_tree::ptree pt;
372  boost::property_tree::read_json(resolved, pt);
373 
374  // Convert JSON to string
375  std::stringstream ss;
376  boost::property_tree::write_json(ss, pt);
377 
378  // Replace placeholders with actual values
379  std::string pipeline = ss.str();
380  for (const auto &param: parameters) {
381  replaceAll(pipeline, "${" + param.first + "}", param.second);
382  }
383 
384  // Convert JSON to BSON
385  bson_error_t error;
386  bson_t *bson = bson_new_from_json((const uint8_t *) pipeline.c_str(), pipeline.size(), &error);
387 
388  if (!bson) {
389  KB_ERROR("Error loading pipeline: {}", error.message);
390  return nullptr;
391  }
392 
393  return bson;
394 }
void replaceAll(std::string &str, const std::string &from, const std::string &to)
Definition: Pipeline.cpp:360
#define KB_ERROR
Definition: Logger.h:28
auto & arguments() const
Definition: Function.h:47
auto bindVar() const
Definition: GraphBuiltin.h:113
auto builtinType() const
Definition: GraphBuiltin.h:108
const auto & terms() const
auto termType() const
Definition: GraphTerm.h:43
static std::string resolve(const std::string_view &uriString)
Definition: URI.cpp:79
static void appendWithVars(bson_t *doc, const char *key, const TermPtr &term, const char *queryOperator=nullptr, bool matchNullValue=false)
Definition: MongoTerm.cpp:101
static bson_t * loadFromJSON(std::string_view filename, const std::map< std::string, std::string > &parameters)
Definition: Pipeline.cpp:368
void setUnion(std::string_view field, const std::vector< std::string_view > &sets)
Definition: Pipeline.cpp:313
void setIsNested(bool isNested)
Definition: Pipeline.h:38
void replaceRoot(std::string_view newRootField)
Definition: Pipeline.cpp:273
void setAccumulated(const knowrob::GraphBuiltin &builtin, std::string_view predicate)
Definition: Pipeline.cpp:210
void appendTerm_recursive(const knowrob::GraphTerm &query, const TripleStore &tripleStore, std::set< std::string_view > &groundedVariables)
Definition: Pipeline.cpp:69
void appendStageEnd(bson_t *stage)
Definition: Pipeline.cpp:46
bson_t * appendStageBegin()
Definition: Pipeline.cpp:27
void project(std::string_view field)
Definition: Pipeline.cpp:299
void appendBuiltin(const knowrob::GraphBuiltin &builtin)
Definition: Pipeline.cpp:160
void matchBinary(const knowrob::GraphBuiltin &builtin, std::string_view predicate)
Definition: Pipeline.cpp:232
void addToArray(std::string_view key, std::string_view arrayKey, std::string_view elementKey)
Definition: Pipeline.cpp:332
void matchEmptyArray(std::string_view arrayKey)
Definition: Pipeline.cpp:352
void merge(std::string_view collection)
Definition: Pipeline.cpp:291
void appendUnion(const knowrob::GraphUnion &unionTerm, const TripleStore &tripleStore, std::set< std::string_view > &groundedVariables)
Definition: Pipeline.cpp:102
std::list< bson_wrapper > stageOperators_
Definition: Pipeline.h:140
void sortDescending(std::string_view field)
Definition: Pipeline.cpp:285
std::list< bson_wrapper > stages_
Definition: Pipeline.h:139
void append(const knowrob::GraphTerm &query, const TripleStore &tripleStore)
Definition: Pipeline.cpp:64
void unwind(std::string_view field, bool preserveNullAndEmptyArrays=false)
Definition: Pipeline.cpp:255
Pipeline(bson_t *arrayDocument=nullptr)
Definition: Pipeline.cpp:20
void unset(std::string_view field)
Definition: Pipeline.cpp:267
void limit(uint32_t maxDocuments)
Definition: Pipeline.cpp:249
void sortAscending(std::string_view field)
Definition: Pipeline.cpp:279
void bindValue(const knowrob::GraphBuiltin &builtin)
Definition: Pipeline.cpp:192
void lookupTriple(Pipeline &pipeline, const TripleStore &tripleStore, const TripleLookupData &lookupData)
PredicateRule & predicate()
Definition: formula.cpp:221
VariableRule & var()
Definition: terms.cpp:91
TermRule & string()
Definition: terms.cpp:63
std::set< std::string_view > knownGroundedVariables
std::shared_ptr< knowrob::mongo::Collection > oneCollection
Definition: TripleStore.h:28