10 #define PL_SAFE_ARG_MACROS
15 #include "knowrob/storage/mongo/MongoInterface.h"
16 #include "knowrob/reasoner/mongolog/bson_pl.h"
17 #include "knowrob/storage/mongo/Document.h"
22 #define PREDICATE_COLLECTION MongoInterface::get().connect(PL_A1, (char*)PL_A2)
23 #define PREDICATE_CURSOR MongoInterface::get().cursor((char*)PL_A1)
26 class MongoPLException :
public PlException, std::exception {
29 : PlException(PlCompound(
33 explicit MongoPLException(
const std::exception &exc)
34 : PlException(PlCompound(
"mng_error", PlTerm(exc.what()))) {}
38 static inline bson_t* termToDocument(
const PlTerm &
term) {
40 auto document = bson_new();
52 if ((strv = mongoc_database_get_collection_names_with_opts(
53 db_handle->db(),
nullptr , &err))) {
55 for (
int i=0; strv[i]; i++) {
63 throw MongoPLException(
MongoException(
"collection_lookup_failed", err));
69 char* coll_name = (
char*)PL_A2;
70 char* key = (
char*)PL_A3;
74 bson_t *command = BCON_NEW(
"distinct", BCON_UTF8(coll_name),
"key", BCON_UTF8(key));
75 bool success = mongoc_database_command_simple(
76 db_handle->db(), command,
nullptr, &reply, &err);
78 char* str = bson_as_canonical_extended_json(&reply,
nullptr);
82 bson_destroy(command);
87 static const PlAtom ATOM_minus(
"-");
89 std::vector<mongo::IndexKey> indexes;
90 PlTail pl_list(PL_A3);
92 while (pl_list.next(pl_member)) {
93 const PlAtom mode_atom(pl_member.name());
94 const PlTerm &pl_value = pl_member[1];
95 if (mode_atom == ATOM_minus) {
112 catch(
const std::exception &exc) {
throw MongoPLException(exc); }
121 catch(
const std::exception &exc) {
throw MongoPLException(exc); }
130 catch(
const std::exception &exc) {
throw MongoPLException(exc); }
141 catch(
const std::exception &exc) {
throw MongoPLException(exc); }
145 static const PlAtom ATOM_insert(
"insert");
146 static const PlAtom ATOM_remove(
"remove");
147 static const PlAtom ATOM_update(
"update");
151 PlTail pl_list(PL_A3);
154 while(pl_list.next(pl_member)) {
155 const PlAtom operation_name(pl_member.name());
156 const auto &pl_value1 = pl_member[1];
165 if(operation_name == ATOM_insert) {
166 bulk->pushInsert(doc1.bson());
168 else if(operation_name == ATOM_remove) {
169 bulk->pushRemoveAll(doc1.bson());
171 else if(operation_name == ATOM_update) {
172 const auto &pl_value2 = pl_member[2];
177 bulk->pushUpdate(doc1.bson(), doc2.bson());
181 MONGOC_ERROR_COMMAND,
182 MONGOC_ERROR_COMMAND_INVALID_ARG,
183 "unknown bulk operation '%s'", pl_member.name());
192 catch(
const std::exception &exc) {
throw MongoPLException(exc); }
201 catch(
const std::exception &exc) {
throw MongoPLException(exc); }
206 Document doc_a(termToDocument(PL_A4));
208 cursor->filter(doc_a.
bson());
209 PL_A3 = cursor->id().c_str();
213 catch(
const std::exception &exc) {
throw MongoPLException(exc); }
218 char* cursor_id = (
char*)PL_A1;
223 catch(
const std::exception &exc) {
throw MongoPLException(exc); }
231 catch(
const std::exception &exc) {
throw MongoPLException(exc); }
236 Document doc_a(termToDocument(PL_A2));
241 catch(
const std::exception &exc) {
throw MongoPLException(exc); }
246 Document doc_a(termToDocument(PL_A2));
251 catch(
const std::exception &exc) {
throw MongoPLException(exc); }
260 catch(
const std::exception &exc) {
throw MongoPLException(exc); }
269 catch(
const std::exception &exc) {
throw MongoPLException(exc); }
278 catch(
const std::exception &exc) {
throw MongoPLException(exc); }
293 catch(
const std::exception &exc) {
throw MongoPLException(exc); }
300 char* str = bson_as_canonical_extended_json(doc,
nullptr);
310 catch(
const std::exception &exc) {
throw MongoPLException(exc); }
PlTerm bson_to_term(const bson_t *bson)
bool bsonpl_concat(bson_t *doc, const PlTerm &term, bson_error_t *err)
const std::string bsonMessage_
const std::string contextMessage_
std::shared_ptr< Database > connect(const PlTerm &dbTerm)
std::shared_ptr< Cursor > cursor_create(const PlTerm &db_term, const char *coll_name)
static MongoInterface & get()
void cursor_destroy(const char *curser_id)
PREDICATE(mng_collections, 2)
#define PREDICATE_COLLECTION