knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
knowrob::TokenStream Class Referenceabstract

#include <TokenStream.h>

Inheritance diagram for knowrob::TokenStream:

Classes

class  Channel
 

Public Member Functions

 TokenStream ()
 
virtual ~TokenStream ()
 
 TokenStream (const TokenStream &)=delete
 
virtual void close ()
 
bool isOpened () const
 
 TokenStream ()
 
virtual ~TokenStream ()
 
 TokenStream (const TokenStream &)=delete
 
virtual void close ()
 
bool isOpened () const
 

Protected Member Functions

virtual void push (Channel &channel, const TokenPtr &tok)
 
virtual void push (const TokenPtr &tok)=0
 
virtual void push (Channel &channel, const TokenPtr &tok)
 
virtual void push (const TokenPtr &tok)=0
 

Protected Attributes

std::list< std::shared_ptr< Channel > > channels_
 
std::atomic< bool > isOpened_
 
std::mutex channel_mutex_
 

Detailed Description

A stream of query results. The only way to write to a stream is by creating a channel.

Definition at line 22 of file TokenStream.h.

Constructor & Destructor Documentation

◆ TokenStream() [1/4]

TokenStream::TokenStream ( )

Definition at line 17 of file TokenStream.cpp.

18  : isOpened_(true) {}
std::atomic< bool > isOpened_
Definition: TokenStream.h:119

◆ ~TokenStream() [1/2]

TokenStream::~TokenStream ( )
virtual

Definition at line 20 of file TokenStream.cpp.

20  {
21  close();
22 }
virtual void close()
Definition: TokenStream.cpp:24

◆ TokenStream() [2/4]

knowrob::TokenStream::TokenStream ( const TokenStream )
delete

Cannot be copy-assigned.

◆ TokenStream() [3/4]

knowrob::TokenStream::TokenStream ( )

◆ ~TokenStream() [2/2]

virtual knowrob::TokenStream::~TokenStream ( )
virtual

◆ TokenStream() [4/4]

knowrob::TokenStream::TokenStream ( const TokenStream )
delete

Cannot be copy-assigned.

Member Function Documentation

◆ close() [1/2]

void TokenStream::close ( )
virtual

Close the stream. This will push an EOS message, and all future attempts to push a non EOS message will cause a warning. Once closed, a stream cannot be opened again. Note that a stream auto-closes once it has received EOS messages from all of its input channels.

Reimplemented in knowrob::QueryStage::Pusher, knowrob::QueryStage, knowrob::QueryStage::Pusher, and knowrob::QueryStage.

Definition at line 24 of file TokenStream.cpp.

24  {
25  std::list<std::shared_ptr<Channel>> channels;
26  {
27  std::lock_guard<std::mutex> lock(channel_mutex_);
28  // make a copy of the channels list to prevent that the channels list is modified
29  // while closing the channels
30  channels = channels_;
31  }
32  // allow the channels to generate EOS messages for clean shutdown
33  for (auto &channel: channels) {
34  channel->close();
35  }
36  {
37  // Finally mark the stream as closed
38  std::lock_guard<std::mutex> lock(channel_mutex_);
39  channels_.clear();
40  isOpened_ = false;
41  }
42 }
std::mutex channel_mutex_
Definition: TokenStream.h:120
std::list< std::shared_ptr< Channel > > channels_
Definition: TokenStream.h:118

◆ close() [2/2]

virtual void knowrob::TokenStream::close ( )
virtual

Close the stream. This will push an EOS message, and all future attempts to push a non EOS message will cause a warning. Once closed, a stream cannot be opened again. Note that a stream auto-closes once it has received EOS messages from all of its input channels.

Reimplemented in knowrob::QueryStage::Pusher, knowrob::QueryStage, knowrob::QueryStage::Pusher, and knowrob::QueryStage.

◆ isOpened() [1/2]

bool TokenStream::isOpened ( ) const
Returns
true if opened.

Definition at line 44 of file TokenStream.cpp.

44  {
45  return isOpened_;
46 }

◆ isOpened() [2/2]

bool knowrob::TokenStream::isOpened ( ) const
Returns
true if opened.

◆ push() [1/4]

void TokenStream::push ( Channel channel,
const TokenPtr tok 
)
protectedvirtual

Reimplemented in knowrob::ConjunctiveBroadcaster, and knowrob::ConjunctiveBroadcaster.

Definition at line 48 of file TokenStream.cpp.

48  {
49  if (tok->indicatesEndOfEvaluation()) {
50  bool doPushMsg = false;
51  {
52  // prevent channels from being created or closed while processing EOS message
53  std::lock_guard<std::mutex> lock(channel_mutex_);
54  if (isOpened()) {
55  if (channel.hasValidIterator()) {
56  // close this stream if no channels are left
57  channels_.erase(channel.iterator_);
58  channel.invalidateIterator();
59  doPushMsg = channels_.empty();
60  isOpened_ = !doPushMsg;
61  } else {
62  KB_WARN("ignoring attempt to write to a channel with a singular iterator.");
63  }
64  }
65  }
66  // send EOS on this stream if no channels are left
67  if (doPushMsg) {
68  push(tok);
69  }
70  } else if (!isOpened()) {
71  KB_WARN("ignoring attempt to write to a closed stream.");
72  } else {
73  push(tok);
74  }
75 }
#define KB_WARN
Definition: Logger.h:27
virtual void push(Channel &channel, const TokenPtr &tok)
Definition: TokenStream.cpp:48
bool isOpened() const
Definition: TokenStream.cpp:44

◆ push() [2/4]

virtual void knowrob::TokenStream::push ( Channel channel,
const TokenPtr tok 
)
protectedvirtual

◆ push() [3/4]

◆ push() [4/4]

Member Data Documentation

◆ channel_mutex_

std::mutex knowrob::TokenStream::channel_mutex_
protected

Definition at line 120 of file TokenStream.h.

◆ channels_

std::list< std::shared_ptr< Channel > > knowrob::TokenStream::channels_
protected

Definition at line 118 of file TokenStream.h.

◆ isOpened_

std::atomic< bool > knowrob::TokenStream::isOpened_
protected

Definition at line 119 of file TokenStream.h.


The documentation for this class was generated from the following files: