knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
knowrob::TokenBuffer Class Reference

#include <TokenBuffer.h>

Inheritance diagram for knowrob::TokenBuffer:
Collaboration diagram for knowrob::TokenBuffer:

Public Member Functions

 TokenBuffer ()
 
void stopBuffering ()
 
std::shared_ptr< TokenQueuecreateQueue ()
 
 TokenBuffer ()
 
void stopBuffering ()
 
std::shared_ptr< TokenQueuecreateQueue ()
 
- Public Member Functions inherited from knowrob::TokenBroadcaster
 TokenBroadcaster ()
 
 ~TokenBroadcaster () override
 
void addSubscriber (const std::shared_ptr< Channel > &subscriber)
 
void removeSubscriber (const std::shared_ptr< Channel > &subscriber)
 
 TokenBroadcaster ()
 
 ~TokenBroadcaster () override
 
void addSubscriber (const std::shared_ptr< Channel > &subscriber)
 
void removeSubscriber (const std::shared_ptr< Channel > &subscriber)
 
- Public Member Functions inherited from knowrob::TokenStream
 TokenStream ()
 
virtual ~TokenStream ()
 
 TokenStream (const TokenStream &)=delete
 
virtual void close ()
 
bool isOpened () const
 
 TokenStream ()
 
virtual ~TokenStream ()
 
 TokenStream (const TokenStream &)=delete
 
virtual void close ()
 
bool isOpened () const
 

Protected Member Functions

void push (const TokenPtr &tok) override
 
void push (const TokenPtr &tok) override
 
- Protected Member Functions inherited from knowrob::TokenBroadcaster
virtual void pushToBroadcast (const TokenPtr &tok)
 
virtual void pushToBroadcast (const TokenPtr &tok)
 
- Protected Member Functions inherited from knowrob::TokenStream
virtual void push (Channel &channel, const TokenPtr &tok)
 
virtual void push (Channel &channel, const TokenPtr &tok)
 

Protected Attributes

std::mutex bufferMutex_
 
std::atomic< bool > isBuffering_
 
std::list< TokenPtrbuffer_
 
- Protected Attributes inherited from knowrob::TokenBroadcaster
std::list< std::shared_ptr< Channel > > subscribers_
 
std::mutex mtx_
 
- Protected Attributes inherited from knowrob::TokenStream
std::list< std::shared_ptr< Channel > > channels_
 
std::atomic< bool > isOpened_
 
std::mutex channel_mutex_
 

Detailed Description

A token stream that buffers all input until stopBuffering() is called. The main purpose is to avoid losing messages while building a pipeline, such that pipeline can be partially active during construction. Also TokenBuffer provides an easy exit point for a pipeline by creating a queue.

Definition at line 19 of file TokenBuffer.h.

Constructor & Destructor Documentation

◆ TokenBuffer() [1/2]

TokenBuffer::TokenBuffer ( )

Definition at line 15 of file TokenBuffer.cpp.

16  : TokenBroadcaster(), isBuffering_(true) {}
std::atomic< bool > isBuffering_
Definition: TokenBuffer.h:36

◆ TokenBuffer() [2/2]

knowrob::TokenBuffer::TokenBuffer ( )

Member Function Documentation

◆ createQueue() [1/2]

std::shared_ptr< TokenQueue > TokenBuffer::createQueue ( )

Create a queue that will receive all output tokens of this stream.

Returns
a queue of tokens.

Definition at line 39 of file TokenBuffer.cpp.

39  {
40  auto queue = std::make_shared<TokenQueue>();
42  stopBuffering();
43  return queue;
44 }
void addSubscriber(const std::shared_ptr< Channel > &subscriber)
static std::shared_ptr< Channel > create(const std::shared_ptr< TokenStream > &stream)
Definition: TokenStream.cpp:88

◆ createQueue() [2/2]

std::shared_ptr<TokenQueue> knowrob::TokenBuffer::createQueue ( )

Create a queue that will receive all output tokens of this stream.

Returns
a queue of tokens.

◆ push() [1/2]

void TokenBuffer::push ( const TokenPtr tok)
overrideprotectedvirtual

Reimplemented from knowrob::TokenBroadcaster.

Definition at line 46 of file TokenBuffer.cpp.

46  {
47  {
48  std::lock_guard<std::mutex> lock(bufferMutex_);
49  if (isBuffering_) {
50  buffer_.push_back(tok);
51  return;
52  }
53  }
55 }
void push(const TokenPtr &tok) override
std::mutex bufferMutex_
Definition: TokenBuffer.h:35
std::list< TokenPtr > buffer_
Definition: TokenBuffer.h:37

◆ push() [2/2]

void knowrob::TokenBuffer::push ( const TokenPtr tok)
overrideprotectedvirtual

Reimplemented from knowrob::TokenBroadcaster.

◆ stopBuffering() [1/2]

void TokenBuffer::stopBuffering ( )

Stop buffering and forward all buffered tokens to subscribers.

Definition at line 18 of file TokenBuffer.cpp.

18  {
19  if (!isBuffering_) return;
20  TokenPtr next;
21 
22  while (true) {
23  {
24  // acquire lock, pop first element and lift the lock before pushing to the broadcaster.
25  // this is done such that other threads may still push to the buffer while we are broadcasting.
26  std::lock_guard<std::mutex> lock(bufferMutex_);
27  if (buffer_.empty()) {
28  isBuffering_ = false;
29  break;
30  } else {
31  next = buffer_.front();
32  buffer_.pop_front();
33  }
34  }
36  }
37 }
std::shared_ptr< const Token > TokenPtr
Definition: Token.h:74

◆ stopBuffering() [2/2]

void knowrob::TokenBuffer::stopBuffering ( )

Stop buffering and forward all buffered tokens to subscribers.

Member Data Documentation

◆ buffer_

std::list< TokenPtr > knowrob::TokenBuffer::buffer_
protected

Definition at line 37 of file TokenBuffer.h.

◆ bufferMutex_

std::mutex knowrob::TokenBuffer::bufferMutex_
protected

Definition at line 35 of file TokenBuffer.h.

◆ isBuffering_

std::atomic< bool > knowrob::TokenBuffer::isBuffering_
protected

Definition at line 36 of file TokenBuffer.h.


The documentation for this class was generated from the following files: