7 #include <knowrob/Logger.h>
8 #include <knowrob/queries/TokenStream.h>
9 #include <knowrob/queries/QueryError.h>
10 #include "knowrob/integration/python/utils.h"
11 #include "knowrob/queries/TokenQueue.h"
12 #include "knowrob/queries/TokenBroadcaster.h"
13 #include "knowrob/queries/TokenBuffer.h"
25 std::list<std::shared_ptr<Channel>> channels;
33 for (
auto &channel: channels) {
49 if (tok->indicatesEndOfEvaluation()) {
50 bool doPushMsg =
false;
62 KB_WARN(
"ignoring attempt to write to a channel with a singular iterator.");
71 KB_WARN(
"ignoring attempt to write to a closed stream.");
81 hasValidIterator_(true) {
89 const std::shared_ptr<TokenStream> &stream) {
92 std::lock_guard<std::mutex> lock1(stream->channel_mutex_);
93 if (stream->isOpened()) {
94 auto channel = std::make_shared<TokenStream::Channel>(stream);
95 stream->channels_.push_back(channel);
96 channel->iterator_ = stream->channels_.end();
100 throw QueryError(
"cannot create a channel of a closed stream");
107 std::lock_guard<std::shared_mutex> lock(mutex_);
110 if (stream_->isOpened()) {
118 return reinterpret_cast<std::uintptr_t
>(
this);
124 std::shared_lock<std::shared_mutex> lock(mutex_);
126 stream_->push(*
this, tok);
127 if (tok->indicatesEndOfEvaluation()) {
131 }
else if (!tok->indicatesEndOfEvaluation()) {
132 KB_WARN(
"message pushed to closed stream {}",
reinterpret_cast<std::uintptr_t
>(
this));
145 class_<TokenStream, std::shared_ptr<TokenStream>, boost::noncopyable>
146 (
"TokenStream", no_init)
148 class_<TokenStream::Channel, std::shared_ptr<TokenStream::Channel>, boost::noncopyable>
149 (
"TokenChannel", no_init)
static std::shared_ptr< Channel > create(const std::shared_ptr< TokenStream > &stream)
std::list< std::shared_ptr< Channel > >::iterator iterator_
void invalidateIterator()
bool hasValidIterator() const
Channel(const std::shared_ptr< TokenStream > &stream)
void push(const TokenPtr &tok)
std::atomic< bool > isOpened_
std::mutex channel_mutex_
virtual void push(Channel &channel, const TokenPtr &tok)
std::list< std::shared_ptr< Channel > > channels_
void createType< TokenBroadcaster >()
void createType< Token >()
void createType< TokenQueue >()
void createType< TokenStream >()
void createType< TokenBuffer >()
std::shared_ptr< const Token > TokenPtr