knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
TokenBuffer.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of KnowRob, please consult
3  * https://github.com/knowrob/knowrob for license details.
4  */
5 
6 #include "knowrob/queries/TokenBuffer.h"
7 #include "knowrob/queries/TokenQueue.h"
8 #include "knowrob/Logger.h"
9 #include "knowrob/integration/python/utils.h"
10 #include "knowrob/queries/AnswerDontKnow.h"
11 #include <gtest/gtest.h>
12 
13 using namespace knowrob;
14 
16  : TokenBroadcaster(), isBuffering_(true) {}
17 
19  if (!isBuffering_) return;
20  TokenPtr next;
21 
22  while (true) {
23  {
24  // acquire lock, pop first element and lift the lock before pushing to the broadcaster.
25  // this is done such that other threads may still push to the buffer while we are broadcasting.
26  std::lock_guard<std::mutex> lock(bufferMutex_);
27  if (buffer_.empty()) {
28  isBuffering_ = false;
29  break;
30  } else {
31  next = buffer_.front();
32  buffer_.pop_front();
33  }
34  }
36  }
37 }
38 
39 std::shared_ptr<TokenQueue> TokenBuffer::createQueue() {
40  auto queue = std::make_shared<TokenQueue>();
42  stopBuffering();
43  return queue;
44 }
45 
46 void TokenBuffer::push(const TokenPtr &tok) {
47  {
48  std::lock_guard<std::mutex> lock(bufferMutex_);
49  if (isBuffering_) {
50  buffer_.push_back(tok);
51  return;
52  }
53  }
55 }
56 
57 namespace knowrob::py {
58  template<>
60  using namespace boost::python;
61  class_<TokenBuffer, std::shared_ptr<TokenBuffer>, bases<TokenBroadcaster>, boost::noncopyable>
62  ("TokenBuffer", init<>())
63  .def("stopBuffering", with<no_gil>(&TokenBuffer::stopBuffering))
64  .def("createQueue", &TokenBuffer::createQueue);
65  }
66 }
67 
68 TEST(TokenBuffer, twoTokens) {
69  auto out = std::make_shared<TokenBuffer>();
70  auto channel = TokenStream::Channel::create(out);
71  auto dontKnow = std::make_shared<AnswerDontKnow>();
72  channel->push(dontKnow);
73  channel->push(EndOfEvaluation::get());
74  auto queue = out->createQueue();
75  std::vector<TokenPtr> tokens;
76  while (!queue->empty()) {
77  auto tok = queue->pop_front();
78  tokens.push_back(tok);
79  }
80  ASSERT_EQ(tokens.size(), 2);
81  ASSERT_EQ(tokens[0], dontKnow);
82  ASSERT_EQ(tokens[1], EndOfEvaluation::get());
83 }
TEST(TokenBuffer, twoTokens)
Definition: TokenBuffer.cpp:68
void addSubscriber(const std::shared_ptr< Channel > &subscriber)
void push(const TokenPtr &tok) override
void push(const TokenPtr &tok) override
Definition: TokenBuffer.cpp:46
std::mutex bufferMutex_
Definition: TokenBuffer.h:35
std::list< TokenPtr > buffer_
Definition: TokenBuffer.h:37
std::atomic< bool > isBuffering_
Definition: TokenBuffer.h:36
std::shared_ptr< TokenQueue > createQueue()
Definition: TokenBuffer.cpp:39
static std::shared_ptr< Channel > create(const std::shared_ptr< TokenStream > &stream)
Definition: TokenStream.cpp:88
void createType< TokenBuffer >()
Definition: TokenBuffer.cpp:59
std::shared_ptr< const Token > TokenPtr
Definition: Token.h:74