knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
TokenBroadcaster.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of KnowRob, please consult
3  * https://github.com/knowrob/knowrob for license details.
4  */
5 
6 #include <knowrob/queries/TokenBroadcaster.h>
7 #include "knowrob/Logger.h"
8 #include "knowrob/integration/python/utils.h"
9 
10 using namespace knowrob;
11 
13  : TokenStream() {}
14 
16  if (isOpened()) {
17  for (auto &x: subscribers_) {
18  x->push(EndOfEvaluation::get());
19  }
20  }
21 }
22 
23 void TokenBroadcaster::addSubscriber(const std::shared_ptr<Channel> &subscriber) {
24  std::lock_guard<std::mutex> lock(mtx_);
25  subscribers_.push_back(subscriber);
26 }
27 
28 void TokenBroadcaster::removeSubscriber(const std::shared_ptr<Channel> &subscriber) {
29  std::lock_guard<std::mutex> lock(mtx_);
30  subscribers_.remove(subscriber);
31 }
32 
33 void TokenBroadcaster::push(const TokenPtr &tok) {
34  pushToBroadcast(tok);
35 }
36 
38  // broadcast the query result to all subscribers.
39  // for now only allow one broadcast at a time: if multiple
40  // broadcasts are performed, there are all sorts of concurrency problems
41  // for stages in query pipelines, so we better avoid it for now.
42  // Also protect the list of subscribers with a mutex.
43  //std::lock_guard<std::shared_mutex> lock(mtx_);
44  std::lock_guard<std::mutex> lock(mtx_);
45  for (auto &x: subscribers_) {
46  x->push(tok);
47  }
48 }
49 
50 namespace knowrob {
51  void operator>>(const std::shared_ptr<TokenBroadcaster> &a,
52  const std::shared_ptr<TokenStream> &b) {
53  a->addSubscriber(TokenStream::Channel::create(b));
54  }
55 }
56 
57 namespace knowrob::py {
58  template<>
60  using namespace boost::python;
61  class_<TokenBroadcaster, std::shared_ptr<TokenBroadcaster>, bases<TokenStream>, boost::noncopyable>
62  ("TokenBroadcaster", init<>())
63  .def("addSubscriber", &TokenBroadcaster::addSubscriber)
64  .def("removeSubscriber", &TokenBroadcaster::removeSubscriber);
65  }
66 }
void removeSubscriber(const std::shared_ptr< Channel > &subscriber)
virtual void pushToBroadcast(const TokenPtr &tok)
std::list< std::shared_ptr< Channel > > subscribers_
void addSubscriber(const std::shared_ptr< Channel > &subscriber)
void push(const TokenPtr &tok) override
static std::shared_ptr< Channel > create(const std::shared_ptr< TokenStream > &stream)
Definition: TokenStream.cpp:88
bool isOpened() const
Definition: TokenStream.cpp:44
void createType< TokenBroadcaster >()
std::shared_ptr< const Token > TokenPtr
Definition: Token.h:74
void operator>>(const std::shared_ptr< TokenBroadcaster > &a, const std::shared_ptr< TokenStream > &b)