knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
knowrob::mongo::QueryWatch Class Reference

#include <QueryWatch.h>

Public Member Functions

 QueryWatch ()
 
 QueryWatch (const QueryWatch &)=delete
 
 ~QueryWatch ()
 
void setWatchRate (uint32_t rate)
 
long watch (const std::shared_ptr< Collection > &collection, const bson_t *query, const ChangeStreamCallback &callback)
 
void unwatch (long watcher_id)
 
 QueryWatch ()
 
 QueryWatch (const QueryWatch &)=delete
 
 ~QueryWatch ()
 
void setWatchRate (uint32_t rate)
 
long watch (const std::shared_ptr< Collection > &collection, const bson_t *query, const ChangeStreamCallback &callback)
 
void unwatch (long watcher_id)
 

Protected Member Functions

void startWatchThread ()
 
void stopWatchThread ()
 
void loop ()
 
void startWatchThread ()
 
void stopWatchThread ()
 
void loop ()
 

Protected Attributes

std::map< long, std::unique_ptr< ChangeStream > > watcher_map_
 
std::thread * thread_
 
bool isRunning_
 
std::mutex lock_
 
uint32_t watchRate_
 

Static Protected Attributes

static std::atomic< long > id_counter_ = 0
 

Detailed Description

Keeps track over time of query results and notifies a callback for each new result.

Definition at line 26 of file QueryWatch.h.

Constructor & Destructor Documentation

◆ QueryWatch() [1/4]

QueryWatch::QueryWatch ( )

Definition at line 16 of file QueryWatch.cpp.

17  : thread_(nullptr),
18  isRunning_(false),
19  watchRate_(200) {
20 }

◆ QueryWatch() [2/4]

knowrob::mongo::QueryWatch::QueryWatch ( const QueryWatch )
delete

◆ ~QueryWatch() [1/2]

QueryWatch::~QueryWatch ( )

Definition at line 22 of file QueryWatch.cpp.

22  {
23  // first let the thread terminate
25  // then stop all watcher
26  watcher_map_.clear();
27 }
std::map< long, std::unique_ptr< ChangeStream > > watcher_map_
Definition: QueryWatch.h:58

◆ QueryWatch() [3/4]

knowrob::mongo::QueryWatch::QueryWatch ( )

◆ QueryWatch() [4/4]

knowrob::mongo::QueryWatch::QueryWatch ( const QueryWatch )
delete

◆ ~QueryWatch() [2/2]

knowrob::mongo::QueryWatch::~QueryWatch ( )

Member Function Documentation

◆ loop() [1/2]

void QueryWatch::loop ( )
protected

Definition at line 73 of file QueryWatch.cpp.

73  {
74  // loop as long isRunning_=true
75  auto next = std::chrono::system_clock::now();
76  while (isRunning_) {
77  {
78  std::lock_guard<std::mutex> guard(lock_);
79  for (auto &it: watcher_map_) {
80  try {
81  it.second->next();
82  }
83  catch (MongoException &exc) {
84  KB_WARN("exception in mongo watch: {}", exc.what());
85  }
86  }
87  }
88  // try to run at constant rate
89  next += std::chrono::milliseconds(watchRate_);
90  std::this_thread::sleep_until(next);
91  }
92 }
#define KB_WARN
Definition: Logger.h:27
TimePoint now()
Definition: TimePoint.cpp:12

◆ loop() [2/2]

void knowrob::mongo::QueryWatch::loop ( )
protected

◆ setWatchRate() [1/2]

void knowrob::mongo::QueryWatch::setWatchRate ( uint32_t  rate)
inline

The query watch actively polls change streams in this interval.

Parameters
ratethe rate in milliseconds.

Definition at line 38 of file QueryWatch.h.

38 { watchRate_ = rate; }

◆ setWatchRate() [2/2]

void knowrob::mongo::QueryWatch::setWatchRate ( uint32_t  rate)
inline

The query watch actively polls change streams in this interval.

Parameters
ratethe rate in milliseconds.

Definition at line 38 of file QueryWatch.h.

38 { watchRate_ = rate; }

◆ startWatchThread() [1/2]

void QueryWatch::startWatchThread ( )
protected

Definition at line 29 of file QueryWatch.cpp.

29  {
30  std::lock_guard<std::mutex> guard(lock_);
31  if (!isRunning_ && !thread_) {
32  isRunning_ = true;
33  thread_ = new std::thread(&QueryWatch::loop, this);
34  }
35 }

◆ startWatchThread() [2/2]

void knowrob::mongo::QueryWatch::startWatchThread ( )
protected

◆ stopWatchThread() [1/2]

void QueryWatch::stopWatchThread ( )
protected

Definition at line 37 of file QueryWatch.cpp.

37  {
38  std::lock_guard<std::mutex> guard(lock_);
39  if (isRunning_ && thread_) {
40  isRunning_ = false;
41  thread_->join();
42  delete thread_;
43  thread_ = nullptr;
44  }
45 }

◆ stopWatchThread() [2/2]

void knowrob::mongo::QueryWatch::stopWatchThread ( )
protected

◆ unwatch() [1/2]

void QueryWatch::unwatch ( long  watcher_id)

Stop watching a collection.

Parameters
watcher_idthe watcher id returned by watch.

Definition at line 62 of file QueryWatch.cpp.

62  {
63  auto needle = watcher_map_.find(watcher_id);
64  if (needle != watcher_map_.end()) {
65  std::lock_guard<std::mutex> guard(lock_);
66  watcher_map_.erase(needle);
67  }
68  if (watcher_map_.empty()) {
70  }
71 }

◆ unwatch() [2/2]

void knowrob::mongo::QueryWatch::unwatch ( long  watcher_id)

Stop watching a collection.

Parameters
watcher_idthe watcher id returned by watch.

◆ watch() [1/2]

long QueryWatch::watch ( const std::shared_ptr< Collection > &  collection,
const bson_t *  query,
const ChangeStreamCallback callback 
)

Watch for changes in a collection.

Parameters
collectionthe collection to watch.
querythe query to watch.
callbackthe callback to invoke for each change.
Returns
a watcher id that can be used to unwatch.

Definition at line 47 of file QueryWatch.cpp.

50  {
51  auto next_id = (id_counter_++);
52  // add to map
53  {
54  std::lock_guard<std::mutex> guard(lock_);
55  watcher_map_.emplace(next_id, std::make_unique<ChangeStream>(collection, query, callback));
56  }
57  // start the thread when the first watch is added
59  return next_id;
60 }
static std::atomic< long > id_counter_
Definition: QueryWatch.h:64

◆ watch() [2/2]

long knowrob::mongo::QueryWatch::watch ( const std::shared_ptr< Collection > &  collection,
const bson_t *  query,
const ChangeStreamCallback callback 
)

Watch for changes in a collection.

Parameters
collectionthe collection to watch.
querythe query to watch.
callbackthe callback to invoke for each change.
Returns
a watcher id that can be used to unwatch.

Member Data Documentation

◆ id_counter_

std::atomic< long > QueryWatch::id_counter_ = 0
staticprotected

Definition at line 64 of file QueryWatch.h.

◆ isRunning_

bool knowrob::mongo::QueryWatch::isRunning_
protected

Definition at line 61 of file QueryWatch.h.

◆ lock_

std::mutex knowrob::mongo::QueryWatch::lock_
protected

Definition at line 62 of file QueryWatch.h.

◆ thread_

std::thread * knowrob::mongo::QueryWatch::thread_
protected

Definition at line 60 of file QueryWatch.h.

◆ watcher_map_

std::map< long, std::unique_ptr< ChangeStream > > knowrob::mongo::QueryWatch::watcher_map_
protected

Definition at line 58 of file QueryWatch.h.

◆ watchRate_

uint32_t knowrob::mongo::QueryWatch::watchRate_
protected

Definition at line 63 of file QueryWatch.h.


The documentation for this class was generated from the following files: