10 #include "knowrob/storage/ObserverManager.h"
11 #include "knowrob/storage/ObserverJob.h"
15 struct ObserverManager::Impl {
27 std::condition_variable syncCondition_;
28 std::mutex syncMutex_;
30 std::queue<std::pair<Mode,TripleContainerPtr>> queue_;
31 std::mutex queueMutex_;
32 std::condition_variable queueCondition_;
34 std::vector<std::shared_ptr<ObserverJob>> jobs_;
37 std::atomic<bool> running_{
true};
41 : impl_(std::make_unique<Impl>()), backend_(backend) {
48 if (
impl_->thread_.joinable()) {
49 impl_->thread_.join();
57 std::lock_guard<std::mutex> lock(
impl_->queueMutex_);
58 while (!
impl_->queue_.empty()) {
63 std::lock_guard<std::mutex> lock(
impl_->jobMutex_);
66 impl_->running_ =
false;
68 impl_->queueCondition_.notify_one();
76 auto job = std::make_shared<ObserverJob>(shared_from_this(),
query, callback);
78 std::lock_guard<std::mutex> lock(
impl_->jobMutex_);
79 impl_->jobs_.push_back(job);
81 return std::make_shared<Observer>(job);
86 std::lock_guard<std::mutex> lock(
impl_->queueMutex_);
87 if (
impl_->queue_.empty())
return;
90 std::unique_lock<std::mutex> lock(
impl_->syncMutex_);
91 impl_->syncCondition_.wait(lock, [
this] {
return impl_->queue_.empty(); });
96 std::lock_guard<std::mutex> lock(
impl_->jobMutex_);
97 for (
auto it =
impl_->jobs_.begin(); it !=
impl_->jobs_.end(); ++it) {
99 if (job.get() == observer.
job().get()) {
100 impl_->jobs_.erase(it);
108 std::lock_guard<std::mutex> lock(
impl_->queueMutex_);
109 impl_->queue_.push({Impl::Mode::INSERT, triples});
111 impl_->queueCondition_.notify_one();
116 std::lock_guard<std::mutex> lock(
impl_->queueMutex_);
117 impl_->queue_.push({Impl::Mode::REMOVE, triples});
119 impl_->queueCondition_.notify_one();
123 while (
impl_->running_) {
124 std::pair<Impl::Mode,TripleContainerPtr> next;
126 std::unique_lock<std::mutex> lock(
impl_->queueMutex_);
127 if(
impl_->running_) {
128 impl_->queueCondition_.wait(lock, [
this] {
return !
impl_->running_ || !
impl_->queue_.empty(); });
130 if (!
impl_->running_) {
133 next =
impl_->queue_.front();
136 std::lock_guard<std::mutex> lock(
impl_->jobMutex_);
137 if (next.first == Impl::Mode::INSERT) {
138 for (
auto &job:
impl_->jobs_) {
139 job->processInsertion(next.second);
142 for (
auto &job:
impl_->jobs_) {
143 job->processRemoval(next.second);
148 std::unique_lock<std::mutex> lock(
impl_->queueMutex_);
150 if (
impl_->queue_.empty()) {
151 impl_->syncCondition_.notify_all();
ObserverPtr observe(const GraphQueryPtr &query, const BindingsHandler &callback)
void insert(const TripleContainerPtr &triples)
void stopObservation(const Observer &observer)
std::unique_ptr< Impl > impl_
QueryableBackendPtr backend_
void query(const GraphQueryPtr &query, const BindingsHandler &callback)
ObserverManager(const QueryableBackendPtr &backend)
void remove(const TripleContainerPtr &triples)
std::shared_ptr< TripleContainer > TripleContainerPtr
std::shared_ptr< Observer > ObserverPtr
std::function< void(const BindingsPtr &)> BindingsHandler
std::shared_ptr< GraphQuery > GraphQueryPtr
std::shared_ptr< QueryableStorage > QueryableBackendPtr