knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
QueryWatch.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of KnowRob, please consult
3  * https://github.com/knowrob/knowrob for license details.
4  */
5 
6 #include "knowrob/Logger.h"
7 #include "knowrob/storage/mongo/QueryWatch.h"
8 #include "knowrob/storage/mongo/MongoException.h"
9 #include "knowrob/reasoner/mongolog/bson_pl.h"
10 #include <iostream>
11 
12 using namespace knowrob::mongo;
13 
14 std::atomic<long> QueryWatch::id_counter_ = 0;
15 
17  : thread_(nullptr),
18  isRunning_(false),
19  watchRate_(200) {
20 }
21 
23  // first let the thread terminate
25  // then stop all watcher
26  watcher_map_.clear();
27 }
28 
30  std::lock_guard<std::mutex> guard(lock_);
31  if (!isRunning_ && !thread_) {
32  isRunning_ = true;
33  thread_ = new std::thread(&QueryWatch::loop, this);
34  }
35 }
36 
38  std::lock_guard<std::mutex> guard(lock_);
39  if (isRunning_ && thread_) {
40  isRunning_ = false;
41  thread_->join();
42  delete thread_;
43  thread_ = nullptr;
44  }
45 }
46 
48  const std::shared_ptr<Collection> &collection,
49  const bson_t *query,
50  const ChangeStreamCallback &callback) {
51  auto next_id = (id_counter_++);
52  // add to map
53  {
54  std::lock_guard<std::mutex> guard(lock_);
55  watcher_map_.emplace(next_id, std::make_unique<ChangeStream>(collection, query, callback));
56  }
57  // start the thread when the first watch is added
59  return next_id;
60 }
61 
62 void QueryWatch::unwatch(long watcher_id) {
63  auto needle = watcher_map_.find(watcher_id);
64  if (needle != watcher_map_.end()) {
65  std::lock_guard<std::mutex> guard(lock_);
66  watcher_map_.erase(needle);
67  }
68  if (watcher_map_.empty()) {
70  }
71 }
72 
74  // loop as long isRunning_=true
75  auto next = std::chrono::system_clock::now();
76  while (isRunning_) {
77  {
78  std::lock_guard<std::mutex> guard(lock_);
79  for (auto &it: watcher_map_) {
80  try {
81  it.second->next();
82  }
83  catch (MongoException &exc) {
84  KB_WARN("exception in mongo watch: {}", exc.what());
85  }
86  }
87  }
88  // try to run at constant rate
89  next += std::chrono::milliseconds(watchRate_);
90  std::this_thread::sleep_until(next);
91  }
92 }
#define KB_WARN
Definition: Logger.h:27
void unwatch(long watcher_id)
Definition: QueryWatch.cpp:62
static std::atomic< long > id_counter_
Definition: QueryWatch.h:64
long watch(const std::shared_ptr< Collection > &collection, const bson_t *query, const ChangeStreamCallback &callback)
Definition: QueryWatch.cpp:47
std::map< long, std::unique_ptr< ChangeStream > > watcher_map_
Definition: QueryWatch.h:58
std::function< void(const bson_wrapper_ptr &)> ChangeStreamCallback
Definition: ChangeStream.h:21
TimePoint now()
Definition: TimePoint.cpp:12