knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
ChangeStream.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of KnowRob, please consult
3  * https://github.com/knowrob/knowrob for license details.
4  */
5 
6 #include "knowrob/storage/mongo/ChangeStream.h"
7 #include "knowrob/storage/mongo/MongoException.h"
8 
9 using namespace knowrob::mongo;
10 
12  const std::shared_ptr<Collection> &collection,
13  const bson_t *query,
14  ChangeStreamCallback callback)
15  : collection_(collection),
16  callback_(std::move(callback)),
17  stream_(nullptr),
18  next_ptr_() {
19  // connect and append session ID to options
20  bson_t *opts = BCON_NEW(
21  "maxAwaitTimeMS", BCON_INT32(1), // the watcher should be non-blocking
22  "fullDocument", BCON_UTF8("updateLookup") // always fetch full document
23  );
24  collection_->appendSession(opts);
25  bson_destroy(opts);
26  // create the stream object
27  stream_ = mongoc_collection_watch(collection_->coll(), query, opts);
28 }
29 
31  if (stream_ != nullptr) {
32  mongoc_change_stream_destroy(stream_);
33  stream_ = nullptr;
34  }
35 }
36 
38  if (stream_ == nullptr) {
39  // stream had an error before
40  return false;
41  }
42 
43  // try retrieving next document
44  const bson_t *doc;
45  if (mongoc_change_stream_next(stream_, &doc)) {
46  next_ptr_.bson = doc;
48  return true;
49  }
50 
51  // check if stream has an error
52  const bson_t *err_doc;
53  bson_error_t error;
54  if (mongoc_change_stream_error_document(stream_, &error, &err_doc)) {
55  mongoc_change_stream_destroy(stream_);
56  stream_ = nullptr;
57  throw MongoException("watch_error", error);
58  }
59 
60  return false;
61 }
mongoc_change_stream_t * stream_
Definition: ChangeStream.h:43
ChangeStream(const std::shared_ptr< Collection > &collection, const bson_t *query, ChangeStreamCallback callback)
std::shared_ptr< Collection > collection_
Definition: ChangeStream.h:41
ChangeStreamCallback callback_
Definition: ChangeStream.h:42
bson_wrapper_ptr next_ptr_
Definition: ChangeStream.h:44
std::function< void(const bson_wrapper_ptr &)> ChangeStreamCallback
Definition: ChangeStream.h:21