knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
knowrob::ObserverManager Class Reference

#include <ObserverManager.h>

Inheritance diagram for knowrob::ObserverManager:
Collaboration diagram for knowrob::ObserverManager:

Public Member Functions

 ObserverManager (const QueryableBackendPtr &backend)
 
 ~ObserverManager ()
 
void stop ()
 
ObserverPtr observe (const GraphQueryPtr &query, const BindingsHandler &callback)
 
void synchronize ()
 
void stopObservation (const Observer &observer)
 
void insert (const TripleContainerPtr &triples)
 
void remove (const TripleContainerPtr &triples)
 
void query (const GraphQueryPtr &query, const BindingsHandler &callback)
 
 ObserverManager (const QueryableBackendPtr &backend)
 
 ~ObserverManager ()
 
void stop ()
 
ObserverPtr observe (const GraphQueryPtr &query, const BindingsHandler &callback)
 
void synchronize ()
 
void stopObservation (const Observer &observer)
 
void insert (const TripleContainerPtr &triples)
 
void remove (const TripleContainerPtr &triples)
 
void query (const GraphQueryPtr &query, const BindingsHandler &callback)
 

Protected Member Functions

void run ()
 
void run ()
 

Protected Attributes

std::unique_ptr< Impl > impl_
 
QueryableBackendPtr backend_
 

Detailed Description

An observer manager is responsible for managing observers.

Definition at line 18 of file ObserverManager.h.

Constructor & Destructor Documentation

◆ ObserverManager() [1/2]

ObserverManager::ObserverManager ( const QueryableBackendPtr backend)
explicit

Create an observer manager.

Parameters
backendthe backend to observe.

Definition at line 40 of file ObserverManager.cpp.

41  : impl_(std::make_unique<Impl>()), backend_(backend) {
42  impl_->thread_ = std::thread(&ObserverManager::run, this);
43 }
std::unique_ptr< Impl > impl_
QueryableBackendPtr backend_

◆ ~ObserverManager() [1/2]

ObserverManager::~ObserverManager ( )

Definition at line 45 of file ObserverManager.cpp.

45  {
46  stop();
47  if (impl_) {
48  if (impl_->thread_.joinable()) {
49  impl_->thread_.join();
50  }
51  impl_ = nullptr;
52  }
53 }

◆ ObserverManager() [2/2]

knowrob::ObserverManager::ObserverManager ( const QueryableBackendPtr backend)
explicit

Create an observer manager.

Parameters
backendthe backend to observe.

◆ ~ObserverManager() [2/2]

knowrob::ObserverManager::~ObserverManager ( )

Member Function Documentation

◆ insert() [1/2]

void ObserverManager::insert ( const TripleContainerPtr triples)

Insert triples.

Parameters
triplesthe triples to insert.

Definition at line 106 of file ObserverManager.cpp.

106  {
107  {
108  std::lock_guard<std::mutex> lock(impl_->queueMutex_);
109  impl_->queue_.push({Impl::Mode::INSERT, triples});
110  }
111  impl_->queueCondition_.notify_one();
112 }

◆ insert() [2/2]

void knowrob::ObserverManager::insert ( const TripleContainerPtr triples)

Insert triples.

Parameters
triplesthe triples to insert.

◆ observe() [1/2]

ObserverPtr ObserverManager::observe ( const GraphQueryPtr query,
const BindingsHandler callback 
)

Observe a query.

Parameters
querythe query to observe.
callbackthe callback to invoke when the query matches.
Returns
the observer.

Definition at line 75 of file ObserverManager.cpp.

75  {
76  auto job = std::make_shared<ObserverJob>(shared_from_this(), query, callback);
77  {
78  std::lock_guard<std::mutex> lock(impl_->jobMutex_);
79  impl_->jobs_.push_back(job);
80  }
81  return std::make_shared<Observer>(job);
82 }
void query(const GraphQueryPtr &query, const BindingsHandler &callback)

◆ observe() [2/2]

ObserverPtr knowrob::ObserverManager::observe ( const GraphQueryPtr query,
const BindingsHandler callback 
)

Observe a query.

Parameters
querythe query to observe.
callbackthe callback to invoke when the query matches.
Returns
the observer.

◆ query() [1/2]

void ObserverManager::query ( const GraphQueryPtr query,
const BindingsHandler callback 
)

Query the backend.

Parameters
querythe query to execute.
callbackthe callback to invoke with the results.

Definition at line 71 of file ObserverManager.cpp.

71  {
72  backend_->query(query, callback);
73 }

◆ query() [2/2]

void knowrob::ObserverManager::query ( const GraphQueryPtr query,
const BindingsHandler callback 
)

Query the backend.

Parameters
querythe query to execute.
callbackthe callback to invoke with the results.

◆ remove() [1/2]

void ObserverManager::remove ( const TripleContainerPtr triples)

Remove triples.

Parameters
triplesthe triples to remove.

Definition at line 114 of file ObserverManager.cpp.

114  {
115  {
116  std::lock_guard<std::mutex> lock(impl_->queueMutex_);
117  impl_->queue_.push({Impl::Mode::REMOVE, triples});
118  }
119  impl_->queueCondition_.notify_one();
120 }

◆ remove() [2/2]

void knowrob::ObserverManager::remove ( const TripleContainerPtr triples)

Remove triples.

Parameters
triplesthe triples to remove.

◆ run() [1/2]

void ObserverManager::run ( )
protected

Definition at line 122 of file ObserverManager.cpp.

122  {
123  while (impl_->running_) {
124  std::pair<Impl::Mode,TripleContainerPtr> next;
125  {
126  std::unique_lock<std::mutex> lock(impl_->queueMutex_);
127  if(impl_->running_) {
128  impl_->queueCondition_.wait(lock, [this] { return !impl_->running_ || !impl_->queue_.empty(); });
129  }
130  if (!impl_->running_) {
131  break;
132  }
133  next = impl_->queue_.front();
134  }
135  {
136  std::lock_guard<std::mutex> lock(impl_->jobMutex_);
137  if (next.first == Impl::Mode::INSERT) {
138  for (auto &job: impl_->jobs_) {
139  job->processInsertion(next.second);
140  }
141  } else {
142  for (auto &job: impl_->jobs_) {
143  job->processRemoval(next.second);
144  }
145  }
146  }
147  {
148  std::unique_lock<std::mutex> lock(impl_->queueMutex_);
149  impl_->queue_.pop();
150  if (impl_->queue_.empty()) {
151  impl_->syncCondition_.notify_all();
152  }
153  }
154  }
155 }

◆ run() [2/2]

void knowrob::ObserverManager::run ( )
protected

◆ stop() [1/2]

void ObserverManager::stop ( )

Stop the observer manager.

Definition at line 55 of file ObserverManager.cpp.

55  {
56  {
57  std::lock_guard<std::mutex> lock(impl_->queueMutex_);
58  while (!impl_->queue_.empty()) {
59  impl_->queue_.pop();
60  }
61  }
62  {
63  std::lock_guard<std::mutex> lock(impl_->jobMutex_);
64  impl_->jobs_.clear();
65  }
66  impl_->running_ = false;
67  backend_ = nullptr;
68  impl_->queueCondition_.notify_one();
69 }

◆ stop() [2/2]

void knowrob::ObserverManager::stop ( )

Stop the observer manager.

◆ stopObservation() [1/2]

void ObserverManager::stopObservation ( const Observer observer)

Stop observing a query.

Parameters
observerthe observer to stop.

Definition at line 95 of file ObserverManager.cpp.

95  {
96  std::lock_guard<std::mutex> lock(impl_->jobMutex_);
97  for (auto it = impl_->jobs_.begin(); it != impl_->jobs_.end(); ++it) {
98  auto &job = *it;
99  if (job.get() == observer.job().get()) {
100  impl_->jobs_.erase(it);
101  break;
102  }
103  }
104 }
auto & job() const
Definition: Observer.h:36

◆ stopObservation() [2/2]

void knowrob::ObserverManager::stopObservation ( const Observer observer)

Stop observing a query.

Parameters
observerthe observer to stop.

◆ synchronize() [1/2]

void ObserverManager::synchronize ( )

Block until all observers have been synchronized.

Definition at line 84 of file ObserverManager.cpp.

84  {
85  {
86  std::lock_guard<std::mutex> lock(impl_->queueMutex_);
87  if (impl_->queue_.empty()) return;
88  }
89  {
90  std::unique_lock<std::mutex> lock(impl_->syncMutex_);
91  impl_->syncCondition_.wait(lock, [this] { return impl_->queue_.empty(); });
92  }
93 }

◆ synchronize() [2/2]

void knowrob::ObserverManager::synchronize ( )

Block until all observers have been synchronized.

Member Data Documentation

◆ backend_

QueryableBackendPtr knowrob::ObserverManager::backend_
protected

Definition at line 74 of file ObserverManager.h.

◆ impl_

std::unique_ptr< Impl > knowrob::ObserverManager::impl_
protected

Definition at line 73 of file ObserverManager.h.


The documentation for this class was generated from the following files: