knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
mongo_kb.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2020, Daniel Beßler
3  * All rights reserved.
4  *
5  * This file is part of KnowRob, please consult
6  * https://github.com/knowrob/knowrob for license details.
7  */
8 
9 // Prolog
10 #define PL_SAFE_ARG_MACROS
11 #include <SWI-cpp.h>
12 // STD
13 #include <iostream>
14 // KnowRob
15 #include "knowrob/storage/mongo/MongoInterface.h"
16 #include "knowrob/reasoner/mongolog/bson_pl.h"
17 #include "knowrob/storage/mongo/Document.h"
18 
19 using namespace knowrob;
20 using namespace knowrob::mongo;
21 
22 #define PREDICATE_COLLECTION MongoInterface::get().connect(PL_A1, (char*)PL_A2)
23 #define PREDICATE_CURSOR MongoInterface::get().cursor((char*)PL_A1)
24 
25 namespace knowrob {
26  class MongoPLException : public PlException, std::exception {
27  public:
28  explicit MongoPLException(const MongoException &exc)
29  : PlException(PlCompound(
30  "mng_error", PlCompound(exc.contextMessage_.c_str(), PlTerm(exc.bsonMessage_.c_str()))
31  )) {}
32 
33  explicit MongoPLException(const std::exception &exc)
34  : PlException(PlCompound("mng_error", PlTerm(exc.what()))) {}
35  };
36 }
37 
38 static inline bson_t* termToDocument(const PlTerm &term) {
39  bson_error_t err;
40  auto document = bson_new();
41  if(!bsonpl_concat(document,term, &err)) {
42  bson_free(document);
43  throw MongoPLException(MongoException("invalid_term", err));
44  }
45  return document;
46 }
47 
48 PREDICATE(mng_collections,2) {
49  auto db_handle = MongoInterface::get().connect(PL_A1);
50  bson_error_t err;
51  char **strv;
52  if ((strv = mongoc_database_get_collection_names_with_opts(
53  db_handle->db(), nullptr /* opts */, &err))) {
54  PlTail l(PL_A2);
55  for (int i=0; strv[i]; i++) {
56  l.append( strv[i] );
57  }
58  l.close();
59  bson_strfreev(strv);
60  return TRUE;
61  }
62  else {
63  throw MongoPLException(MongoException("collection_lookup_failed", err));
64  }
65 }
66 
67 PREDICATE(mng_distinct_values_json,4) {
68  auto db_handle = MongoInterface::get().connect(PL_A1);
69  char* coll_name = (char*)PL_A2;
70  char* key = (char*)PL_A3;
71  bson_error_t err;
72  bson_t reply;
73  //
74  bson_t *command = BCON_NEW("distinct", BCON_UTF8(coll_name), "key", BCON_UTF8(key));
75  bool success = mongoc_database_command_simple(
76  db_handle->db(), command, nullptr, &reply, &err);
77  if(success) {
78  char* str = bson_as_canonical_extended_json(&reply, nullptr);
79  PL_A4 = str;
80  bson_free(str);
81  }
82  bson_destroy(command);
83  return success;
84 }
85 
86 PREDICATE(mng_index_create_core, 3) {
87  static const PlAtom ATOM_minus("-");
88 
89  std::vector<mongo::IndexKey> indexes;
90  PlTail pl_list(PL_A3);
91  PlTerm pl_member;
92  while (pl_list.next(pl_member)) {
93  const PlAtom mode_atom(pl_member.name());
94  const PlTerm &pl_value = pl_member[1];
95  if (mode_atom == ATOM_minus) {
96  indexes.emplace_back((char *) pl_value, mongo::IndexType::DESCENDING);
97  } else {
98  indexes.emplace_back((char *) pl_value, mongo::IndexType::ASCENDING);
99  }
100  }
101 
102  return MongoInterface::get().connect(PL_A1)->create_index((char*)PL_A2,indexes);
103 }
104 
105 
106 PREDICATE(mng_drop_unsafe, 2) {
107  try {
108  PREDICATE_COLLECTION->drop();
109  return TRUE;
110  }
111  catch(const MongoException &exc) { throw MongoPLException(exc); }
112  catch(const std::exception &exc) { throw MongoPLException(exc); }
113 }
114 
115 PREDICATE(mng_store, 3) {
116  try {
117  PREDICATE_COLLECTION->storeOne(Document(termToDocument(PL_A3)));
118  return TRUE;
119  }
120  catch(const MongoException &exc) { throw MongoPLException(exc); }
121  catch(const std::exception &exc) { throw MongoPLException(exc); }
122 }
123 
124 PREDICATE(mng_remove, 3) {
125  try {
126  PREDICATE_COLLECTION->removeAll(Document(termToDocument(PL_A3)));
127  return TRUE;
128  }
129  catch(const MongoException &exc) { throw MongoPLException(exc); }
130  catch(const std::exception &exc) { throw MongoPLException(exc); }
131 }
132 
133 PREDICATE(mng_update, 4) {
134  try {
135  PREDICATE_COLLECTION->update(
136  Document(termToDocument(PL_A3)),
137  Document(termToDocument(PL_A4)));
138  return TRUE;
139  }
140  catch(const MongoException &exc) { throw MongoPLException(exc); }
141  catch(const std::exception &exc) { throw MongoPLException(exc); }
142 }
143 
144 PREDICATE(mng_bulk_write, 3) {
145  static const PlAtom ATOM_insert("insert");
146  static const PlAtom ATOM_remove("remove");
147  static const PlAtom ATOM_update("update");
148 
149  try {
150  auto bulk = PREDICATE_COLLECTION->createBulkOperation();
151  PlTail pl_list(PL_A3);
152  PlTerm pl_member;
153 
154  while(pl_list.next(pl_member)) {
155  const PlAtom operation_name(pl_member.name());
156  const auto &pl_value1 = pl_member[1];
157  bson_error_t err;
158 
159  // parse the document
160  auto doc1 = Document(bson_new());
161  if(!bsonpl_concat(doc1.bson(),pl_value1,&err)) {
162  throw MongoException("invalid_term", err);
163  }
164 
165  if(operation_name == ATOM_insert) {
166  bulk->pushInsert(doc1.bson());
167  }
168  else if(operation_name == ATOM_remove) {
169  bulk->pushRemoveAll(doc1.bson());
170  }
171  else if(operation_name == ATOM_update) {
172  const auto &pl_value2 = pl_member[2];
173  auto doc2 = Document(bson_new());
174  if(!bsonpl_concat(doc2.bson(), pl_value2, &err)) {
175  throw MongoException("invalid_term", err);
176  }
177  bulk->pushUpdate(doc1.bson(), doc2.bson());
178  }
179  else {
180  bson_set_error(&err,
181  MONGOC_ERROR_COMMAND,
182  MONGOC_ERROR_COMMAND_INVALID_ARG,
183  "unknown bulk operation '%s'", pl_member.name());
184  throw MongoException("bulk_error", err);
185  }
186  }
187 
188  bulk->execute();
189  return TRUE;
190  }
191  catch(const MongoException &exc) { throw MongoPLException(exc); }
192  catch(const std::exception &exc) { throw MongoPLException(exc); }
193 }
194 
195 PREDICATE(mng_cursor_create, 3) {
196  try {
197  PL_A3 = MongoInterface::get().cursor_create(PL_A1,(char*)PL_A2)->id().c_str();
198  return TRUE;
199  }
200  catch(const MongoException &exc) { throw MongoPLException(exc); }
201  catch(const std::exception &exc) { throw MongoPLException(exc); }
202 }
203 
204 PREDICATE(mng_cursor_create, 4) {
205  try {
206  Document doc_a(termToDocument(PL_A4));
207  auto cursor = MongoInterface::get().cursor_create(PL_A1,(char*)PL_A2);
208  cursor->filter(doc_a.bson());
209  PL_A3 = cursor->id().c_str();
210  return TRUE;
211  }
212  catch(const MongoException &exc) { throw MongoPLException(exc); }
213  catch(const std::exception &exc) { throw MongoPLException(exc); }
214 }
215 
216 PREDICATE(mng_cursor_destroy, 1) {
217  try {
218  char* cursor_id = (char*)PL_A1;
219  MongoInterface::get().cursor_destroy(cursor_id);
220  return TRUE;
221  }
222  catch(const MongoException &exc) { throw MongoPLException(exc); }
223  catch(const std::exception &exc) { throw MongoPLException(exc); }
224 }
225 
226 PREDICATE(mng_cursor_erase, 1) {
227  try {
228  return PREDICATE_CURSOR->erase();
229  }
230  catch(const MongoException &exc) { throw MongoPLException(exc); }
231  catch(const std::exception &exc) { throw MongoPLException(exc); }
232 }
233 
234 PREDICATE(mng_cursor_filter, 2) {
235  try {
236  Document doc_a(termToDocument(PL_A2));
237  PREDICATE_CURSOR->filter(doc_a.bson());
238  return TRUE;
239  }
240  catch(const MongoException &exc) { throw MongoPLException(exc); }
241  catch(const std::exception &exc) { throw MongoPLException(exc); }
242 }
243 
244 PREDICATE(mng_cursor_aggregate, 2) {
245  try {
246  Document doc_a(termToDocument(PL_A2));
247  PREDICATE_CURSOR->aggregate(doc_a.bson());
248  return TRUE;
249  }
250  catch(const MongoException &exc) { throw MongoPLException(exc); }
251  catch(const std::exception &exc) { throw MongoPLException(exc); }
252 }
253 
254 PREDICATE(mng_cursor_descending, 2) {
255  try {
256  PREDICATE_CURSOR->descending((char*)PL_A2);
257  return TRUE;
258  }
259  catch(const MongoException &exc) { throw MongoPLException(exc); }
260  catch(const std::exception &exc) { throw MongoPLException(exc); }
261 }
262 
263 PREDICATE(mng_cursor_ascending, 2) {
264  try {
265  PREDICATE_CURSOR->ascending((char*)PL_A2);
266  return TRUE;
267  }
268  catch(const MongoException &exc) { throw MongoPLException(exc); }
269  catch(const std::exception &exc) { throw MongoPLException(exc); }
270 }
271 
272 PREDICATE(mng_cursor_limit, 2) {
273  try {
274  PREDICATE_CURSOR->limit((int)PL_A2);
275  return TRUE;
276  }
277  catch(const MongoException &exc) { throw MongoPLException(exc); }
278  catch(const std::exception &exc) { throw MongoPLException(exc); }
279 }
280 
281 PREDICATE(mng_cursor_next_pairs, 2) {
282  try {
283  const bson_t *doc;
284  if(PREDICATE_CURSOR->next(&doc)) {
285  PL_A2 = bson_to_term(doc);
286  return TRUE;
287  }
288  else {
289  return FALSE;
290  }
291  }
292  catch(const MongoException &exc) { throw MongoPLException(exc); }
293  catch(const std::exception &exc) { throw MongoPLException(exc); }
294 }
295 
296 PREDICATE(mng_cursor_next_json, 2) {
297  try {
298  const bson_t *doc;
299  if(PREDICATE_CURSOR->next(&doc)) {
300  char* str = bson_as_canonical_extended_json(doc, nullptr);
301  PL_A2 = str;
302  bson_free(str);
303  return TRUE;
304  }
305  else {
306  return FALSE;
307  }
308  }
309  catch(const MongoException &exc) { throw MongoPLException(exc); }
310  catch(const std::exception &exc) { throw MongoPLException(exc); }
311 }
PlTerm bson_to_term(const bson_t *bson)
Definition: bson_pl.cpp:210
bool bsonpl_concat(bson_t *doc, const PlTerm &term, bson_error_t *err)
Definition: bson_pl.cpp:357
bson_t * bson() const
Definition: Document.h:27
const std::string bsonMessage_
const std::string contextMessage_
std::shared_ptr< Database > connect(const PlTerm &dbTerm)
std::shared_ptr< Cursor > cursor_create(const PlTerm &db_term, const char *coll_name)
static MongoInterface & get()
void cursor_destroy(const char *curser_id)
PREDICATE(mng_collections, 2)
Definition: mongo_kb.cpp:48
#define PREDICATE_COLLECTION
Definition: mongo_kb.cpp:22
#define PREDICATE_CURSOR
Definition: mongo_kb.cpp:23
TermRule & term()
Definition: terms.cpp:136