6 #include <knowrob/storage/mongo/Collection.h>
7 #include <knowrob/storage/mongo/MongoException.h>
8 #include <knowrob/storage/mongo/Document.h>
13 static const mongoc_insert_flags_t INSERT_NO_VALIDATE_FLAG =
14 (mongoc_insert_flags_t) MONGOC_INSERT_NO_VALIDATE;
15 static const mongoc_update_flags_t UPDATE_NO_VALIDATE_FLAG =
16 (mongoc_update_flags_t) MONGOC_UPDATE_NO_VALIDATE;
18 static mongoc_client_t* pop_client(mongoc_client_pool_t *pool) {
19 mongoc_client_t *client = mongoc_client_pool_try_pop(pool);
20 if (client ==
nullptr) {
25 "Failed to acquire a mongo client! maxPoolSize reached?");
32 const std::shared_ptr<Connection> &connection,
33 std::string_view databaseName,
34 std::string_view collectionName)
35 : connection_(connection),
37 name_(collectionName),
38 dbName_(databaseName) {
39 client_ = pop_client(connection_->pool_);
40 coll_ = mongoc_client_get_collection(client_, dbName_.c_str(), name_.c_str());
41 db_ = mongoc_client_get_database(client_, dbName_.c_str());
45 : connection_(other.connection_),
48 dbName_(other.dbName_) {
49 client_ = pop_client(connection_->pool_);
50 coll_ = mongoc_client_get_collection(client_, dbName_.c_str(), name_.c_str());
51 db_ = mongoc_client_get_database(client_, dbName_.c_str());
56 mongoc_client_session_destroy(session_);
59 mongoc_database_destroy(db_);
60 mongoc_collection_destroy(coll_);
61 mongoc_client_pool_push(connection_->pool_, client_);
67 session_ = mongoc_client_start_session(client_,
nullptr, &error);
76 if (!mongoc_client_session_append(s, opts, &error)) {
84 if (!mongoc_collection_drop(coll_, &err)) {
92 bson_t *command = BCON_NEW(
"distinct", BCON_UTF8(name_.c_str()),
"key", BCON_UTF8(key.data()));
93 bool success = mongoc_database_command_simple(
94 db_, command,
nullptr, &reply, &err);
96 bson_destroy(command);
101 std::vector<std::string> values;
103 if (bson_iter_init(&iter, &reply)) {
104 const bson_value_t *value;
105 if (bson_iter_find(&iter,
"values") && BSON_ITER_HOLDS_ARRAY(&iter)) {
107 bson_iter_recurse(&iter, &array);
108 while (bson_iter_next(&array)) {
109 value = bson_iter_value(&array);
110 values.emplace_back(value->value.v_utf8.str);
114 bson_destroy(command);
115 bson_destroy(&reply);
121 if (!mongoc_collection_insert(
123 INSERT_NO_VALIDATE_FLAG,
132 remove(document, MONGOC_REMOVE_NONE);
140 remove(document, MONGOC_REMOVE_SINGLE_REMOVE);
143 void Collection::remove(
const Document &document, mongoc_remove_flags_t flag) {
145 if (!mongoc_collection_remove(
159 int flags = UPDATE_NO_VALIDATE_FLAG;
160 if (upsert) flags |= MONGOC_UPDATE_UPSERT;
162 if (!mongoc_collection_update(
164 (mongoc_update_flags_t) flags,
174 auto cursor = mongoc_collection_aggregate(
182 if (mongoc_cursor_error(cursor, &err)) {
183 mongoc_cursor_destroy(cursor);
187 const bson_t *cursor_doc;
188 while (mongoc_cursor_next(cursor, &cursor_doc)) {}
189 mongoc_cursor_destroy(cursor);
193 bson_t opts = BSON_INITIALIZER;
194 BSON_APPEND_BOOL(&opts,
"ordered",
false);
196 mongoc_bulk_operation_t *bulk =
197 mongoc_collection_create_bulk_operation_with_opts(coll_, &opts);
199 return std::make_shared<BulkOperation>(bulk);
202 void Collection::createIndex_internal(
const bson_t &keys) {
206 char *index_name = mongoc_collection_keys_to_index_string(&keys);
207 bson_t *cmd = BCON_NEW (
"createIndexes", BCON_UTF8(mongoc_collection_get_name(coll_)),
209 "key", BCON_DOCUMENT(&keys),
210 "name", BCON_UTF8(index_name),
212 bool success = mongoc_database_write_command_with_opts(
219 bson_free(index_name);
220 bson_destroy(&reply);
230 for (
auto key: keys) BSON_APPEND_INT32(&b_keys, key, 1);
231 createIndex_internal(b_keys);
237 for (
auto &key: keys) {
239 BSON_APPEND_INT32(&b_keys, key.value.c_str(), 1);
241 BSON_APPEND_INT32(&b_keys, key.value.c_str(), -1);
244 createIndex_internal(b_keys);
272 bson_t *opts = BCON_NEW(
"limit", BCON_INT64(1));
273 bson_t filter = BSON_INITIALIZER;
277 count = mongoc_collection_count_documents(
285 bson_destroy(&filter);
void createAscendingIndex(const std::vector< const char * > &keys)
void evalAggregation(const bson_t *pipeline)
std::shared_ptr< BulkOperation > createBulkOperation()
void update(const Document &query, const Document &update, bool upsert=false)
Collection(const std::shared_ptr< Connection > &connection, std::string_view databaseName, std::string_view collectionName)
void createIndex(const std::vector< IndexKey > &keys)
void removeOne(const Document &document)
void appendSession(bson_t *opts)
mongoc_client_session_t * session()
std::vector< std::string > distinctValues(std::string_view key)
void storeOne(const Document &document)
void removeAll(const Document &document)