knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
Collection.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of KnowRob, please consult
3  * https://github.com/knowrob/knowrob for license details.
4  */
5 
6 #include <knowrob/storage/mongo/Collection.h>
7 #include <knowrob/storage/mongo/MongoException.h>
8 #include <knowrob/storage/mongo/Document.h>
9 #include <iostream>
10 
11 using namespace knowrob::mongo;
12 
13 static const mongoc_insert_flags_t INSERT_NO_VALIDATE_FLAG =
14  (mongoc_insert_flags_t) MONGOC_INSERT_NO_VALIDATE;
15 static const mongoc_update_flags_t UPDATE_NO_VALIDATE_FLAG =
16  (mongoc_update_flags_t) MONGOC_UPDATE_NO_VALIDATE;
17 
18 static mongoc_client_t* pop_client(mongoc_client_pool_t *pool) {
19  mongoc_client_t *client = mongoc_client_pool_try_pop(pool);
20  if (client == nullptr) {
21  bson_error_t err;
22  bson_set_error(&err,
23  MONGOC_ERROR_CLIENT,
24  MONGOC_ERROR_CLIENT,
25  "Failed to acquire a mongo client! maxPoolSize reached?");
26  throw MongoException("client", err);
27  }
28  return client;
29 }
30 
32  const std::shared_ptr<Connection> &connection,
33  std::string_view databaseName,
34  std::string_view collectionName)
35  : connection_(connection),
36  session_(nullptr),
37  name_(collectionName),
38  dbName_(databaseName) {
39  client_ = pop_client(connection_->pool_);
40  coll_ = mongoc_client_get_collection(client_, dbName_.c_str(), name_.c_str());
41  db_ = mongoc_client_get_database(client_, dbName_.c_str());
42 }
43 
45  : connection_(other.connection_),
46  session_(nullptr),
47  name_(other.name_),
48  dbName_(other.dbName_) {
49  client_ = pop_client(connection_->pool_);
50  coll_ = mongoc_client_get_collection(client_, dbName_.c_str(), name_.c_str());
51  db_ = mongoc_client_get_database(client_, dbName_.c_str());
52 }
53 
55  if (session_) {
56  mongoc_client_session_destroy(session_);
57  session_ = nullptr;
58  }
59  mongoc_database_destroy(db_);
60  mongoc_collection_destroy(coll_);
61  mongoc_client_pool_push(connection_->pool_, client_);
62 }
63 
64 mongoc_client_session_t *Collection::session() {
65  if (!session_) {
66  bson_error_t error;
67  session_ = mongoc_client_start_session(client_, nullptr, &error);
68  }
69  return session_;
70 }
71 
72 void Collection::appendSession(bson_t *opts) {
73  auto s = session();
74  if (s != nullptr) {
75  bson_error_t error;
76  if (!mongoc_client_session_append(s, opts, &error)) {
77  throw MongoException("append_session", error);
78  }
79  }
80 }
81 
83  bson_error_t err;
84  if (!mongoc_collection_drop(coll_, &err)) {
85  throw MongoException("drop_failed", err);
86  }
87 }
88 
89 std::vector<std::string> Collection::distinctValues(std::string_view key) {
90  bson_error_t err;
91  bson_t reply;
92  bson_t *command = BCON_NEW("distinct", BCON_UTF8(name_.c_str()), "key", BCON_UTF8(key.data()));
93  bool success = mongoc_database_command_simple(
94  db_, command, nullptr, &reply, &err);
95  if (!success) {
96  bson_destroy(command);
97  bson_destroy(&reply);
98  throw MongoException("distinct_failed", err);
99  }
100 
101  std::vector<std::string> values;
102  bson_iter_t iter;
103  if (bson_iter_init(&iter, &reply)) {
104  const bson_value_t *value;
105  if (bson_iter_find(&iter, "values") && BSON_ITER_HOLDS_ARRAY(&iter)) {
106  bson_iter_t array;
107  bson_iter_recurse(&iter, &array);
108  while (bson_iter_next(&array)) {
109  value = bson_iter_value(&array);
110  values.emplace_back(value->value.v_utf8.str);
111  }
112  }
113  }
114  bson_destroy(command);
115  bson_destroy(&reply);
116  return values;
117 }
118 
119 void Collection::storeOne(const Document &document) {
120  bson_error_t err;
121  if (!mongoc_collection_insert(
122  coll_,
123  INSERT_NO_VALIDATE_FLAG,
124  document.bson(),
125  nullptr,
126  &err)) {
127  throw MongoException("insert_failed", err);
128  }
129 }
130 
131 void Collection::removeAll(const Document &document) {
132  remove(document, MONGOC_REMOVE_NONE);
133 }
134 
135 void Collection::removeOne(const bson_oid_t &oid) {
136  removeOne(Document(BCON_NEW("_id", BCON_OID(&oid))));
137 }
138 
139 void Collection::removeOne(const Document &document) {
140  remove(document, MONGOC_REMOVE_SINGLE_REMOVE);
141 }
142 
143 void Collection::remove(const Document &document, mongoc_remove_flags_t flag) {
144  bson_error_t err;
145  if (!mongoc_collection_remove(
146  coll_,
147  flag,
148  document.bson(),
149  nullptr,
150  &err)) {
151  throw MongoException("collection_remove", err);
152  }
153 }
154 
155 
156 void Collection::update(const Document &query, const Document &update, bool upsert) {
157  bson_error_t err;
158 
159  int flags = UPDATE_NO_VALIDATE_FLAG;
160  if (upsert) flags |= MONGOC_UPDATE_UPSERT;
161 
162  if (!mongoc_collection_update(
163  coll_,
164  (mongoc_update_flags_t) flags,
165  query.bson(),
166  update.bson(),
167  nullptr,
168  &err)) {
169  throw MongoException("update_failed", err);
170  }
171 }
172 
173 void Collection::evalAggregation(const bson_t *pipeline) {
174  auto cursor = mongoc_collection_aggregate(
175  coll_,
176  MONGOC_QUERY_NONE,
177  pipeline,
178  nullptr,
179  nullptr);
180  // make sure cursor has no error after creation
181  bson_error_t err;
182  if (mongoc_cursor_error(cursor, &err)) {
183  mongoc_cursor_destroy(cursor);
184  throw MongoException("cursor_error", err);
185  }
186  // process the query (ignore results)
187  const bson_t *cursor_doc;
188  while (mongoc_cursor_next(cursor, &cursor_doc)) {}
189  mongoc_cursor_destroy(cursor);
190 }
191 
192 std::shared_ptr<BulkOperation> Collection::createBulkOperation() {
193  bson_t opts = BSON_INITIALIZER;
194  BSON_APPEND_BOOL(&opts, "ordered", false);
195  // create the bulk operation
196  mongoc_bulk_operation_t *bulk =
197  mongoc_collection_create_bulk_operation_with_opts(coll_, &opts);
198  bson_destroy(&opts);
199  return std::make_shared<BulkOperation>(bulk);
200 }
201 
202 void Collection::createIndex_internal(const bson_t &keys) {
203  bson_error_t err;
204  bson_t reply;
205 
206  char *index_name = mongoc_collection_keys_to_index_string(&keys);
207  bson_t *cmd = BCON_NEW ("createIndexes", BCON_UTF8(mongoc_collection_get_name(coll_)),
208  "indexes", "[", "{",
209  "key", BCON_DOCUMENT(&keys),
210  "name", BCON_UTF8(index_name),
211  "}", "]");
212  bool success = mongoc_database_write_command_with_opts(
213  db_,
214  cmd,
215  nullptr /* opts */,
216  &reply,
217  &err);
218 
219  bson_free(index_name);
220  bson_destroy(&reply);
221  bson_destroy(cmd);
222  if (!success) {
223  throw MongoException("create_index_failed", err);
224  }
225 }
226 
227 void Collection::createAscendingIndex(const std::vector<const char *> &keys) {
228  bson_t b_keys;
229  bson_init(&b_keys);
230  for (auto key: keys) BSON_APPEND_INT32(&b_keys, key, 1);
231  createIndex_internal(b_keys);
232 }
233 
234 void Collection::createIndex(const std::vector<IndexKey> &keys) {
235  bson_t b_keys;
236  bson_init(&b_keys);
237  for (auto &key: keys) {
238  if (key.type == IndexType::ASCENDING) {
239  BSON_APPEND_INT32(&b_keys, key.value.c_str(), 1);
240  } else {
241  BSON_APPEND_INT32(&b_keys, key.value.c_str(), -1);
242  }
243  }
244  createIndex_internal(b_keys);
245 }
246 
248  // Some notes on indexing:
249  // - Compound indexes contain their prefixes as sub-indexes.
250  // e.g. {s: 1, p: 1} contains {s: 1} and {s: 1, p: 1}.
251  // - Keep equality matches first in the index.
252  // @see https://www.mongodb.com/docs/manual/tutorial/equality-sort-range-rule/#std-label-esr-indexing-rule
253  // - Optional fields receive null values for documents that do not contain them.
254  // - The context fields are not indexed here as there are too many of them.
255  // Assuming each of them may appear as a variable in a query, too many indexes would be needed.
256  // If they are not allowed to appear as variables, one could append them to the existing indexes
257  // (still causing a lot of indexes, as many prefixes exist in large compound indexes).
258 
259  createAscendingIndex({"s", "p"});
260  createAscendingIndex({"s", "p*"});
261  createAscendingIndex({"s", "o", "p"});
262  createAscendingIndex({"s", "o", "p*"});
263  createAscendingIndex({"s", "p", "o*"});
264  createAscendingIndex({"o", "p"});
265  createAscendingIndex({"o", "p*"});
266  createAscendingIndex({"o*"});
267  createAscendingIndex({"p", "o*"});
268  createAscendingIndex({"p*"});
269 }
270 
272  bson_t *opts = BCON_NEW("limit", BCON_INT64(1));
273  bson_t filter = BSON_INITIALIZER;
274  bson_error_t error;
275  int64_t count;
276 
277  count = mongoc_collection_count_documents(
278  coll_,
279  &filter,
280  opts,
281  nullptr,
282  nullptr,
283  &error);
284  bson_destroy(opts);
285  bson_destroy(&filter);
286 
287  return count == 0;
288 }
289 
void createAscendingIndex(const std::vector< const char * > &keys)
Definition: Collection.cpp:227
void evalAggregation(const bson_t *pipeline)
Definition: Collection.cpp:173
std::shared_ptr< BulkOperation > createBulkOperation()
Definition: Collection.cpp:192
void update(const Document &query, const Document &update, bool upsert=false)
Definition: Collection.cpp:156
Collection(const std::shared_ptr< Connection > &connection, std::string_view databaseName, std::string_view collectionName)
Definition: Collection.cpp:31
void createIndex(const std::vector< IndexKey > &keys)
Definition: Collection.cpp:234
void removeOne(const Document &document)
Definition: Collection.cpp:139
void appendSession(bson_t *opts)
Definition: Collection.cpp:72
mongoc_client_session_t * session()
Definition: Collection.cpp:64
std::vector< std::string > distinctValues(std::string_view key)
Definition: Collection.cpp:89
void storeOne(const Document &document)
Definition: Collection.cpp:119
void removeAll(const Document &document)
Definition: Collection.cpp:131
bson_t * bson() const
Definition: Document.h:27