knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
TokenStream.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of KnowRob, please consult
3  * https://github.com/knowrob/knowrob for license details.
4  */
5 
6 #include <mutex>
7 #include <knowrob/Logger.h>
8 #include <knowrob/queries/TokenStream.h>
9 #include <knowrob/queries/QueryError.h>
10 #include "knowrob/integration/python/utils.h"
11 #include "knowrob/queries/TokenQueue.h"
12 #include "knowrob/queries/TokenBroadcaster.h"
13 #include "knowrob/queries/TokenBuffer.h"
14 
15 using namespace knowrob;
16 
18  : isOpened_(true) {}
19 
21  close();
22 }
23 
25  std::list<std::shared_ptr<Channel>> channels;
26  {
27  std::lock_guard<std::mutex> lock(channel_mutex_);
28  // make a copy of the channels list to prevent that the channels list is modified
29  // while closing the channels
30  channels = channels_;
31  }
32  // allow the channels to generate EOS messages for clean shutdown
33  for (auto &channel: channels) {
34  channel->close();
35  }
36  {
37  // Finally mark the stream as closed
38  std::lock_guard<std::mutex> lock(channel_mutex_);
39  channels_.clear();
40  isOpened_ = false;
41  }
42 }
43 
44 bool TokenStream::isOpened() const {
45  return isOpened_;
46 }
47 
48 void TokenStream::push(Channel &channel, const TokenPtr &tok) {
49  if (tok->indicatesEndOfEvaluation()) {
50  bool doPushMsg = false;
51  {
52  // prevent channels from being created or closed while processing EOS message
53  std::lock_guard<std::mutex> lock(channel_mutex_);
54  if (isOpened()) {
55  if (channel.hasValidIterator()) {
56  // close this stream if no channels are left
57  channels_.erase(channel.iterator_);
58  channel.invalidateIterator();
59  doPushMsg = channels_.empty();
60  isOpened_ = !doPushMsg;
61  } else {
62  KB_WARN("ignoring attempt to write to a channel with a singular iterator.");
63  }
64  }
65  }
66  // send EOS on this stream if no channels are left
67  if (doPushMsg) {
68  push(tok);
69  }
70  } else if (!isOpened()) {
71  KB_WARN("ignoring attempt to write to a closed stream.");
72  } else {
73  push(tok);
74  }
75 }
76 
77 
78 TokenStream::Channel::Channel(const std::shared_ptr<TokenStream> &stream)
79  : stream_(stream),
80  isOpened_(true),
81  hasValidIterator_(true) {
82 }
83 
85  close();
86 }
87 
88 std::shared_ptr<TokenStream::Channel> TokenStream::Channel::create(
89  const std::shared_ptr<TokenStream> &stream) {
90  // prevent the stream from being closed and to modify the channels list,
91  // as we are about to add a new channel to the list.
92  std::lock_guard<std::mutex> lock1(stream->channel_mutex_);
93  if (stream->isOpened()) {
94  auto channel = std::make_shared<TokenStream::Channel>(stream);
95  stream->channels_.push_back(channel);
96  channel->iterator_ = stream->channels_.end();
97  --channel->iterator_;
98  return channel;
99  } else {
100  throw QueryError("cannot create a channel of a closed stream");
101  }
102 }
103 
105  // prevent channels from being closed while other channel operations are in progress.
106  // also avoid close being called multiple times at the same time.
107  std::lock_guard<std::shared_mutex> lock(mutex_);
108  if (isOpened()) {
109  isOpened_ = false;
110  if (stream_->isOpened()) {
111  stream_->push(*this, EndOfEvaluation::get());
112  stream_ = {};
113  }
114  }
115 }
116 
117 uint32_t TokenStream::Channel::id() const {
118  return reinterpret_cast<std::uintptr_t>(this);
119 }
120 
122  // prevent channels from being closed while push operations are in progress
123  // note: this is a shared lock, i.e., multiple push operations can be performed in parallel.
124  std::shared_lock<std::shared_mutex> lock(mutex_);
125  if (isOpened()) {
126  stream_->push(*this, tok);
127  if (tok->indicatesEndOfEvaluation()) {
128  isOpened_ = false;
129  stream_ = {};
130  }
131  } else if (!tok->indicatesEndOfEvaluation()) {
132  KB_WARN("message pushed to closed stream {}", reinterpret_cast<std::uintptr_t>(this));
133  }
134 }
135 
137  return isOpened_;
138 }
139 
140 namespace knowrob::py {
141  template<>
143  using namespace boost::python;
145  class_<TokenStream, std::shared_ptr<TokenStream>, boost::noncopyable>
146  ("TokenStream", no_init)
147  .def("isOpened", &TokenStream::isOpened);
148  class_<TokenStream::Channel, std::shared_ptr<TokenStream::Channel>, boost::noncopyable>
149  ("TokenChannel", no_init)
150  .def("create", &TokenStream::Channel::create).staticmethod("create")
151  .def("push", with<no_gil>(&TokenStream::Channel::push))
152  .def("close", with<no_gil>(&TokenStream::Channel::close))
153  .def("isOpened", &TokenStream::Channel::isOpened)
154  .def("id", &TokenStream::Channel::id);
158  }
159 }
#define KB_WARN
Definition: Logger.h:27
static std::shared_ptr< Channel > create(const std::shared_ptr< TokenStream > &stream)
Definition: TokenStream.cpp:88
std::list< std::shared_ptr< Channel > >::iterator iterator_
Definition: TokenStream.h:108
Channel(const std::shared_ptr< TokenStream > &stream)
Definition: TokenStream.cpp:78
void push(const TokenPtr &tok)
std::atomic< bool > isOpened_
Definition: TokenStream.h:119
std::mutex channel_mutex_
Definition: TokenStream.h:120
virtual void close()
Definition: TokenStream.cpp:24
virtual void push(Channel &channel, const TokenPtr &tok)
Definition: TokenStream.cpp:48
bool isOpened() const
Definition: TokenStream.cpp:44
std::list< std::shared_ptr< Channel > > channels_
Definition: TokenStream.h:118
void createType< TokenBroadcaster >()
void createType< Token >()
Definition: Token.cpp:42
void createType< TokenQueue >()
Definition: TokenQueue.cpp:51
void createType< TokenStream >()
void createType< TokenBuffer >()
Definition: TokenBuffer.cpp:59
std::shared_ptr< const Token > TokenPtr
Definition: Token.h:74