knowrob  2.1.0
A Knowledge Base System for Cognition-enabled Robots
knowrob::QueryPipeline Class Reference

#include <QueryPipeline.h>

Public Member Functions

 QueryPipeline (const std::shared_ptr< KnowledgeBase > &kb, const FormulaPtr &phi, const QueryContextPtr &ctx)
 
 QueryPipeline (const std::shared_ptr< KnowledgeBase > &kb, const ConjunctiveQueryPtr &conjunctiveQuery)
 
 ~QueryPipeline ()
 
void operator>> (const std::shared_ptr< TokenStream > &stage)
 
void stopBuffering ()
 
 QueryPipeline (const std::shared_ptr< KnowledgeBase > &kb, const FormulaPtr &phi, const QueryContextPtr &ctx)
 
 QueryPipeline (const std::shared_ptr< KnowledgeBase > &kb, const ConjunctiveQueryPtr &conjunctiveQuery)
 
 ~QueryPipeline ()
 
void operator>> (const std::shared_ptr< TokenStream > &stage)
 
void stopBuffering ()
 

Protected Member Functions

void addInitialStage (const std::shared_ptr< TokenStream > &stage)
 
void addInitialStage (const std::shared_ptr< TokenStream > &stage)
 

Static Protected Member Functions

static std::vector< ComputablePtrcreateComputationSequence (const std::shared_ptr< KnowledgeBase > &kb, const std::list< DependencyNodePtr > &dependencyGroup)
 
static void createComputationPipeline (const std::shared_ptr< KnowledgeBase > &kb, std::vector< ComputablePtr > &computableLiterals, const std::shared_ptr< TokenBroadcaster > &pipelineInput, const std::shared_ptr< TokenBroadcaster > &pipelineOutput, const QueryContextPtr &ctx)
 
static std::vector< ComputablePtrcreateComputationSequence (const std::shared_ptr< KnowledgeBase > &kb, const std::list< DependencyNodePtr > &dependencyGroup)
 
static void createComputationPipeline (const std::shared_ptr< KnowledgeBase > &kb, std::vector< ComputablePtr > &computableLiterals, const std::shared_ptr< TokenBroadcaster > &pipelineInput, const std::shared_ptr< TokenBroadcaster > &pipelineOutput, const QueryContextPtr &ctx)
 

Protected Attributes

std::vector< std::shared_ptr< TokenStream > > initialStages_
 
std::shared_ptr< TokenBroadcasterfinalStage_
 
std::shared_ptr< TokenBufferbufferStage_
 

Detailed Description

Holds a reference to pipeline stages during execution, and stops each stage on destruction ensuring that none of them continues broadcasting messages.

Definition at line 25 of file QueryPipeline.h.

Constructor & Destructor Documentation

◆ QueryPipeline() [1/4]

QueryPipeline::QueryPipeline ( const std::shared_ptr< KnowledgeBase > &  kb,
const FormulaPtr phi,
const QueryContextPtr ctx 
)

Create a query pipeline for the given formula.

Parameters
kbthe knowledge base to query.
phithe formula to query.
ctxthe query context.

Definition at line 50 of file QueryPipeline.cpp.

51  {
52  auto outStream = std::make_shared<TokenBuffer>();
53 
54  // decompose input formula into parts that are considered in disjunction,
55  // and thus can be evaluated in parallel.
56  QueryTree qt(phi);
57  for (auto &path: qt) {
58  // each node in a path is either a predicate, a negated predicate,
59  // a modal formula, or the negation of a modal formula.
60  // each of these formula types is handled separately below.
61  std::vector<FirstOrderLiteralPtr> posLiterals, negLiterals;
62  std::vector<std::shared_ptr<ModalFormula>> posModals, negModals;
63 
64  // split path into positive and negative literals and modals
65  for (auto &node: path.nodes()) {
66  switch (node->type()) {
68  auto lit = std::make_shared<FirstOrderLiteral>(
69  std::static_pointer_cast<Predicate>(node), false);
70  posLiterals.push_back(lit);
71  break;
72  }
73 
74  case FormulaType::MODAL:
75  posModals.push_back(std::static_pointer_cast<ModalFormula>(node));
76  break;
77 
78  case FormulaType::NEGATION: {
79  auto negation = (Negation *) node.get();
80  auto negated = negation->negatedFormula();
81  switch (negated->type()) {
83  auto lit = std::make_shared<FirstOrderLiteral>(
84  std::static_pointer_cast<Predicate>(negated), true);
85  negLiterals.push_back(lit);
86  break;
87  }
88  case FormulaType::MODAL:
89  negModals.push_back(std::static_pointer_cast<ModalFormula>(negated));
90  break;
91  default:
92  throw QueryError("Unexpected negated formula type {} in QueryTree.", (int) negated->type());
93  }
94  break;
95  }
96  default:
97  throw QueryError("Unexpected formula type {} in QueryTree.", (int) node->type());
98  }
99  }
100 
101  std::shared_ptr<TokenBroadcaster> lastStage;
102  std::shared_ptr<TokenBuffer> firstBuffer;
103 
104  // first evaluate positive literals if any.
105  // note that the first stage is buffered, so that the next stage can be added to the pipeline
106  // and only after stopping the buffering messages will be forwarded to the next stage.
107  if (posLiterals.empty()) {
108  // if there are none, we still need to indicate begin and end of stream for the rest of the pipeline.
109  // so we just push `GenericYes` (an empty substitution) followed by `EndOfEvaluation` and
110  // feed these messages to the next stage.
111  firstBuffer = std::make_shared<TokenBuffer>();
112  lastStage = firstBuffer;
113  auto channel = TokenStream::Channel::create(lastStage);
114  channel->push(GenericYes());
115  channel->push(EndOfEvaluation::get());
116  addInitialStage(lastStage);
117  } else {
118  auto pathQuery = std::make_shared<ConjunctiveQuery>(posLiterals, ctx);
119  auto subPipeline = std::make_shared<QueryPipeline>(kb, pathQuery);
120  firstBuffer = std::make_shared<AnswerBuffer_WithReference>(subPipeline);
121  *subPipeline >> firstBuffer;
122  subPipeline->stopBuffering();
123  lastStage = firstBuffer;
124  addInitialStage(lastStage);
125  }
126 
127  // --------------------------------------
128  // Evaluate all positive modals in sequence.
129  // --------------------------------------
130  for (auto &posModal: posModals) {
131  auto modalStage = std::make_shared<ModalStage>(kb, posModal, ctx);
132  modalStage->selfWeakRef_ = modalStage;
133  lastStage >> modalStage;
134  lastStage = modalStage;
135  }
136 
137  // --------------------------------------
138  // Evaluate all negative literals in parallel.
139  // --------------------------------------
140  if (!negLiterals.empty()) {
141  // run a dedicated stage where negated literals can be evaluated in parallel
142  auto negLiteralStage = std::make_shared<PredicateNegationStage>(
143  kb, ctx, negLiterals);
144  lastStage >> negLiteralStage;
145  lastStage = negLiteralStage;
146  }
147 
148  // --------------------------------------
149  // Evaluate all negative modals in parallel.
150  // --------------------------------------
151  if (!negModals.empty()) {
152  // run a dedicated stage where negated modals can be evaluated in parallel
153  auto negModalStage = std::make_shared<ModalNegationStage>(
154  kb, ctx, negModals);
155  lastStage >> negModalStage;
156  lastStage = negModalStage;
157  }
158 
159  lastStage >> outStream;
160  firstBuffer->stopBuffering();
161  }
162  // Note: At this point outStream could already contain solutions, but these are buffered
163  // such that they won't be lost during pipeline creation.
164 
165  // if there were multiple paths, consolidate answers from them.
166  // e.g. if one yields no and the other true, the no should be ignored.
167  if (qt.numPaths() > 1) {
168  auto consolidator = std::make_shared<DisjunctiveBroadcaster>();
169  outStream >> consolidator;
170  finalStage_ = consolidator;
171  } else {
172  finalStage_ = outStream;
173  }
174  bufferStage_ = outStream;
175 }
std::shared_ptr< TokenBroadcaster > finalStage_
Definition: QueryPipeline.h:57
std::shared_ptr< TokenBuffer > bufferStage_
Definition: QueryPipeline.h:58
void addInitialStage(const std::shared_ptr< TokenStream > &stage)
static std::shared_ptr< Channel > create(const std::shared_ptr< TokenStream > &stream)
Definition: TokenStream.cpp:88
const std::shared_ptr< const AnswerYes > & GenericYes()
Definition: AnswerYes.cpp:161

◆ QueryPipeline() [2/4]

QueryPipeline::QueryPipeline ( const std::shared_ptr< KnowledgeBase > &  kb,
const ConjunctiveQueryPtr conjunctiveQuery 
)

Create a query pipeline for the given conjunctive query.

Parameters
kbthe knowledge base to query.
conjunctiveQuerythe query to execute.

Definition at line 177 of file QueryPipeline.cpp.

177  {
178  auto &allLiterals = conjunctiveQuery->literals();
179 
180  // --------------------------------------
181  // split input literals into positive and negative literals.
182  // negative literals are evaluated in parallel after all positive literals.
183  // --------------------------------------
184  std::vector<FirstOrderLiteralPtr> positiveLiterals, negativeLiterals;
185  for (auto &l: allLiterals) {
186  if (l->isNegated()) negativeLiterals.push_back(l);
187  else positiveLiterals.push_back(l);
188  }
189 
190  // --------------------------------------
191  // split positive literals into edb-only and computable.
192  // also associate list of reasoner to computable literals.
193  // --------------------------------------
194  std::vector<TriplePatternPtr> edbOnlyLiterals;
195  std::vector<ComputablePtr> computableLiterals;
196  bool hasUnknownPredicate = false;
197  for (auto &l: positiveLiterals) {
198  auto indicator = RDFIndicator(l->predicate());
199 
200  // Find reasoner for the predicate.
201  std::vector<DefiningReasoner> l_reasoner;
202  if (indicator.functor) {
203  l_reasoner = kb->reasonerManager()->findDefiningReasoner(
204  PredicateIndicator(*indicator.functor, indicator.arity));
205  }
206 
207  if (l_reasoner.empty()) {
208  if (indicator.arity > 2) {
209  KB_WARN("Predicate {} is not defined by any reasoner.", *l->predicate());
210  hasUnknownPredicate = true;
211  } else if (indicator.functor && !isMaterializedInEDB(kb, *indicator.functor)) {
212  KB_WARN("Predicate {} is neither materialized in the EDB nor defined by a reasoner.", *l->predicate());
213  hasUnknownPredicate = true;
214  } else {
215  auto rdfLiteral = std::make_shared<TriplePattern>(
216  l->predicate(), l->isNegated());
217  rdfLiteral->setTripleFrame(conjunctiveQuery->ctx()->selector);
218  edbOnlyLiterals.push_back(rdfLiteral);
219  }
220  } else {
221  computableLiterals.push_back(std::make_shared<Computable>(*l, l_reasoner));
222  }
223  }
224 
225  if (hasUnknownPredicate) {
226  // generate a "don't know" message and return.
227  auto out = std::make_shared<TokenBuffer>();
228  auto channel = TokenStream::Channel::create(out);
229  auto dontKnow = std::make_shared<AnswerDontKnow>();
230  channel->push(dontKnow);
231  channel->push(EndOfEvaluation::get());
232  finalStage_ = out;
233  bufferStage_ = out;
234  return;
235  }
236 
237  // --------------------------------------
238  // sort positive literals.
239  // --------------------------------------
240  std::sort(edbOnlyLiterals.begin(), edbOnlyLiterals.end(), EDBComparator(kb->vocabulary()));
241 
242  // --------------------------------------
243  // run EDB query with all edb-only literals.
244  // --------------------------------------
245  std::shared_ptr<TokenBuffer> edbOut;
246  if (edbOnlyLiterals.empty()) {
247  edbOut = std::make_shared<TokenBuffer>();
248  auto channel = TokenStream::Channel::create(edbOut);
249  channel->push(GenericYes());
250  channel->push(EndOfEvaluation::get());
251  } else {
252  auto edb = kb->getBackendForQuery();
253  edbOut = kb->edb()->getAnswerCursor(edb,
254  std::make_shared<GraphPathQuery>(edbOnlyLiterals, conjunctiveQuery->ctx()));
255  }
256  addInitialStage(edbOut);
257 
258  // --------------------------------------
259  // handle positive IDB literals.
260  // --------------------------------------
261  std::shared_ptr<TokenBroadcaster> idbOut;
262  if (computableLiterals.empty()) {
263  idbOut = edbOut;
264  } else {
265  idbOut = std::make_shared<TokenBroadcaster>();
266  // --------------------------------------
267  // Compute dependency groups of computable literals.
268  // --------------------------------------
269  DependencyGraph dg;
270  dg.insert(computableLiterals.begin(), computableLiterals.end());
271 
272  // --------------------------------------
273  // Construct a pipeline for each dependency group.
274  // --------------------------------------
275  if (dg.numGroups() == 1) {
276  auto &literalGroup = *dg.begin();
277  auto sequence = createComputationSequence(kb, literalGroup.member_);
279  kb,
280  sequence,
281  edbOut,
282  idbOut,
283  conjunctiveQuery->ctx());
284  } else {
285  // there are multiple dependency groups. They can be evaluated in parallel.
286 
287  // combines sub-answers computed in different parallel steps
288  auto answerCombiner = std::make_shared<ConjunctiveBroadcaster>();
289  // create a parallel step for each dependency group
290  for (auto &literalGroup: dg) {
291  // --------------------------------------
292  // Construct a pipeline for each dependency group.
293  // --------------------------------------
294  auto sequence = createComputationSequence(kb, literalGroup.member_);
296  kb,
297  sequence,
298  edbOut,
299  answerCombiner,
300  conjunctiveQuery->ctx());
301  }
302  answerCombiner >> idbOut;
303  }
304  }
305 
306  // --------------------------------------
307  // Evaluate all negative literals in parallel.
308  // --------------------------------------
309  if (!negativeLiterals.empty()) {
310  // run a dedicated stage where negated literals can be evaluated in parallel
311  auto negStage = std::make_shared<PredicateNegationStage>(
312  kb, conjunctiveQuery->ctx(), negativeLiterals);
313  idbOut >> negStage;
314  finalStage_ = negStage;
315  } else {
316  finalStage_ = idbOut;
317  }
318  bufferStage_ = edbOut;
319 }
#define KB_WARN
Definition: Logger.h:27
void insert(const DependencyNodePtr &node)
static void createComputationPipeline(const std::shared_ptr< KnowledgeBase > &kb, std::vector< ComputablePtr > &computableLiterals, const std::shared_ptr< TokenBroadcaster > &pipelineInput, const std::shared_ptr< TokenBroadcaster > &pipelineOutput, const QueryContextPtr &ctx)
static std::vector< ComputablePtr > createComputationSequence(const std::shared_ptr< KnowledgeBase > &kb, const std::list< DependencyNodePtr > &dependencyGroup)

◆ ~QueryPipeline() [1/2]

QueryPipeline::~QueryPipeline ( )

Definition at line 321 of file QueryPipeline.cpp.

321  {
322  for (auto &stage: initialStages_) {
323  stage->close();
324  }
325  initialStages_.clear();
326 }
std::vector< std::shared_ptr< TokenStream > > initialStages_
Definition: QueryPipeline.h:56

◆ QueryPipeline() [3/4]

knowrob::QueryPipeline::QueryPipeline ( const std::shared_ptr< KnowledgeBase > &  kb,
const FormulaPtr phi,
const QueryContextPtr ctx 
)

Create a query pipeline for the given formula.

Parameters
kbthe knowledge base to query.
phithe formula to query.
ctxthe query context.

◆ QueryPipeline() [4/4]

knowrob::QueryPipeline::QueryPipeline ( const std::shared_ptr< KnowledgeBase > &  kb,
const ConjunctiveQueryPtr conjunctiveQuery 
)

Create a query pipeline for the given conjunctive query.

Parameters
kbthe knowledge base to query.
conjunctiveQuerythe query to execute.

◆ ~QueryPipeline() [2/2]

knowrob::QueryPipeline::~QueryPipeline ( )

Member Function Documentation

◆ addInitialStage() [1/2]

void QueryPipeline::addInitialStage ( const std::shared_ptr< TokenStream > &  stage)
protected

Definition at line 328 of file QueryPipeline.cpp.

328  {
329  initialStages_.push_back(stage);
330 }

◆ addInitialStage() [2/2]

void knowrob::QueryPipeline::addInitialStage ( const std::shared_ptr< TokenStream > &  stage)
protected

◆ createComputationPipeline() [1/2]

void QueryPipeline::createComputationPipeline ( const std::shared_ptr< KnowledgeBase > &  kb,
std::vector< ComputablePtr > &  computableLiterals,
const std::shared_ptr< TokenBroadcaster > &  pipelineInput,
const std::shared_ptr< TokenBroadcaster > &  pipelineOutput,
const QueryContextPtr ctx 
)
staticprotected

Definition at line 453 of file QueryPipeline.cpp.

458  {
459  // This function generates a query pipeline for literals that can be computed
460  // (EDB-only literals are processed separately). The literals are part of one dependency group.
461  // They are sorted, and also evaluated in this order. For each computable literal there is at
462  // least one reasoner that can compute the literal. However, instances of the literal may also
463  // occur in the EDB. Hence, computation results must be combined with results of an EDB query
464  // for each literal.
465 
466  struct MergedComputables {
467  MergedComputables() : requiresEDB(true) {};
468  ComputablePtr item;
469  bool requiresEDB;
470  std::vector<ComputablePtr> literals;
471  };
472 
473  auto lastOut = pipelineInput;
474 
475  // --------------------------------------
476  // Build conjunctive queries if possible.
477  // To this end, search through the list of computable literals and find literals that can
478  // be merged. This is only possible if:
479  // - the literals share the same reasoner
480  // - the literals have only one reasoner associated
481  // (else merge would rather generate a complex sub-pipeline)
482  // - the reasoner supports simple conjunctions
483  // - the literals are not stored in the EDB or the reasoner mirrors the EDB
484  // (else merge would rather generate a complex sub-pipeline)
485  // --------------------------------------
486  std::vector<MergedComputables> mergedComputables;
487  while (!computableLiterals.empty()) {
488  auto next = computableLiterals.front();
489  auto indicator = RDFIndicator(next->predicate());
490  computableLiterals.erase(computableLiterals.begin());
491 
492  auto &merged = mergedComputables.emplace_back();
493  merged.item = next;
494  // EDB queries are only required if one of the reasoner does not mirror the EDB,
495  // i.e. one reasoner that produces all EDB results in addition to the IDB results.
496  merged.requiresEDB = false;
497  for (auto &r: next->reasonerList()) {
498  // Note that we assume here that the data storage of the reasoner mirrors the EDB,
499  // so we just check if the reasoner can ground literals in its storage.
501  merged.requiresEDB = true;
502  break;
503  }
504  }
505  // The predicate can only be materialized in the EDB if it has at most two arguments,
506  // i.e. if it is a RDF predicate.
507  merged.requiresEDB = merged.requiresEDB && indicator.arity <= 2;
508  if (merged.requiresEDB && indicator.functor) {
509  // switch flag to false in case the literal is not materialized in the EDB
510  merged.requiresEDB = isMaterializedInEDB(kb, *indicator.functor);
511  }
512  merged.literals.push_back(next);
513 
514  bool supportsSimpleConjunction = true;
515  for (auto &r: next->reasonerList()) {
516  if (!r.first->hasFeature(GoalDrivenReasonerFeature::SupportsSimpleConjunctions)) {
517  supportsSimpleConjunction = false;
518  break;
519  }
520  }
521 
522  if (supportsSimpleConjunction && !merged.requiresEDB && next->reasonerList().size() == 1) {
523  // merge literals that can be computed by the same reasoner
524  for (auto it = computableLiterals.begin(); it != computableLiterals.end();) {
525  auto &lit = *it;
526  if (next->reasonerList() == lit->reasonerList()) {
527  merged.literals.push_back(lit);
528  it = computableLiterals.erase(it);
529  } else {
530  ++it;
531  }
532  }
533  }
534  }
535 
536  // finally build the pipeline
537  for (auto &mergedComputable: mergedComputables) {
538  auto stepInput = lastOut;
539  auto stepOutput = std::make_shared<TokenBroadcaster>();
540  uint32_t numStages = 0;
541 
542  // --------------------------------------
543  // Construct a pipeline that grounds the literal in the EDB.
544  // --------------------------------------
545  if (mergedComputable.requiresEDB) {
546  auto edb = kb->getBackendForQuery();
547  auto edbStage = std::make_shared<TypedQueryStage<FirstOrderLiteral>>(
548  ctx,
549  mergedComputable.item,
550  [kb, edb, ctx](const FirstOrderLiteralPtr &q) {
551  auto rdfLiteral = std::make_shared<TriplePattern>(
552  q->predicate(), q->isNegated());
553  rdfLiteral->setTripleFrame(ctx->selector);
554  return kb->edb()->getAnswerCursor(edb, std::make_shared<GraphPathQuery>(rdfLiteral, ctx));
555  });
556  edbStage->selfWeakRef_ = edbStage;
557  stepInput >> edbStage;
558  edbStage >> stepOutput;
559  ++numStages;
560  }
561 
562  // --------------------------------------
563  // Construct a pipeline that grounds the literal in the IDB.
564  // To this end add an IDB stage for each reasoner that defines the literal.
565  // --------------------------------------
566  for (auto &r: mergedComputable.item->reasonerList()) {
567  auto idbStage = std::make_shared<TypedQueryStageVec<Computable>>(
568  ctx, mergedComputable.literals,
569  [r, ctx](const std::vector<ComputablePtr> &q) {
570  return ReasonerManager::evaluateQuery(r.first, replaceFunctors(q), ctx);
571  });
572  idbStage->selfWeakRef_ = idbStage;
573  stepInput >> idbStage;
574  idbStage >> stepOutput;
575  ++numStages;
576  }
577  lastOut = stepOutput;
578 
579  // --------------------------------------
580  // add a stage that consolidates the results of the EDB and IDB stages.
581  // in particular the case needs to be handled where none of the stages return
582  // 'true'. Also print a warning if two stages disagree but state they are confident.
583  // --------------------------------------
584  if (numStages > 1) {
585  auto consolidator = std::make_shared<DisjunctiveBroadcaster>();
586  lastOut >> consolidator;
587  lastOut = consolidator;
588  }
589 
590  // --------------------------------------
591  // Optionally add a stage to the pipeline that drops all redundant result.
592  // The filter is applied here to remove redundancies early on directly after IDB and EDB
593  // results are combined.
594  // --------------------------------------
595  if (ctx->queryFlags & QUERY_FLAG_UNIQUE_SOLUTIONS) {
596  auto filterStage = std::make_shared<RedundantAnswerFilter>();
597  lastOut >> filterStage;
598  lastOut = filterStage;
599  }
600  }
601 
602  lastOut >> pipelineOutput;
603 }
static TokenBufferPtr evaluateQuery(const GoalDrivenReasonerPtr &reasoner, const std::vector< FirstOrderLiteralPtr > &literals, const QueryContextPtr &ctx)
@ QUERY_FLAG_UNIQUE_SOLUTIONS
Definition: QueryFlag.h:21
std::shared_ptr< Computable > ComputablePtr
Definition: Computable.h:45
std::shared_ptr< FirstOrderLiteral > FirstOrderLiteralPtr

◆ createComputationPipeline() [2/2]

static void knowrob::QueryPipeline::createComputationPipeline ( const std::shared_ptr< KnowledgeBase > &  kb,
std::vector< ComputablePtr > &  computableLiterals,
const std::shared_ptr< TokenBroadcaster > &  pipelineInput,
const std::shared_ptr< TokenBroadcaster > &  pipelineOutput,
const QueryContextPtr ctx 
)
staticprotected

◆ createComputationSequence() [1/2]

std::vector< ComputablePtr > QueryPipeline::createComputationSequence ( const std::shared_ptr< KnowledgeBase > &  kb,
const std::list< DependencyNodePtr > &  dependencyGroup 
)
staticprotected

Definition at line 371 of file QueryPipeline.cpp.

373  {
374  // Pick a node to start with.
375  auto comparator = IDBComparator(kb->vocabulary());
376  DependencyNodePtr first;
377  ComputablePtr firstComputable;
378  for (auto &n: dependencyGroup) {
379  auto computable_n =
380  std::static_pointer_cast<Computable>(n->literal());
381  if (!first || comparator(firstComputable, computable_n)) {
382  first = n;
383  firstComputable = computable_n;
384  }
385  }
386 
387  // remember visited nodes, needed for circular dependencies
388  // all nodes added to the queue should also be added to this set.
389  std::set<DependencyNode *> visited;
390  visited.insert(first.get());
391 
392  std::vector<ComputablePtr> sequence;
393  sequence.push_back(firstComputable);
394 
395  // start with a FIFO queue only containing first node
396  std::deque<std::shared_ptr<DependencyNodeQueue>> queue;
397  auto qn0 = std::make_shared<DependencyNodeQueue>(first);
398  queue.push_front(qn0);
399 
400  // loop until queue is empty and process exactly one successor of
401  // the top element in the FIFO in each step. If an element has no
402  // more successors, it can be removed from queue.
403  // Each successor creates an additional node added to the top of the FIFO.
404  while (!queue.empty()) {
405  auto front = queue.front();
406 
407  // get top successor node that has not been visited yet
408  DependencyNodePtr topNext;
409  while (!front->neighbors_.empty()) {
410  auto topNeighbor = front->neighbors_.top();
411  front->neighbors_.pop();
412 
413  if (visited.count(topNeighbor.get()) == 0) {
414  topNext = topNeighbor;
415  break;
416  }
417  }
418  // pop element from queue if all neighbors were processed
419  if (front->neighbors_.empty()) {
420  queue.pop_front();
421  }
422 
423  if (topNext) {
424  // push a new node onto FIFO
425  auto qn_next = std::make_shared<DependencyNodeQueue>(topNext);
426  queue.push_front(qn_next);
427  sequence.push_back(std::static_pointer_cast<Computable>(topNext->literal()));
428  visited.insert(topNext.get());
429  }
430  }
431 
432  return sequence;
433 }
std::shared_ptr< DependencyNode > DependencyNodePtr

◆ createComputationSequence() [2/2]

static std::vector<ComputablePtr> knowrob::QueryPipeline::createComputationSequence ( const std::shared_ptr< KnowledgeBase > &  kb,
const std::list< DependencyNodePtr > &  dependencyGroup 
)
staticprotected

◆ operator>>() [1/2]

void QueryPipeline::operator>> ( const std::shared_ptr< TokenStream > &  stage)

Stream the last stage of the pipeline into the given stage.

Parameters
stagethe stage to stream the last stage into.

Definition at line 332 of file QueryPipeline.cpp.

332  {
333  finalStage_ >> stage;
334 }

◆ operator>>() [2/2]

void knowrob::QueryPipeline::operator>> ( const std::shared_ptr< TokenStream > &  stage)

Stream the last stage of the pipeline into the given stage.

Parameters
stagethe stage to stream the last stage into.

◆ stopBuffering() [1/2]

void QueryPipeline::stopBuffering ( )

After creation of the pipeline, messages are buffered until this is called.

Definition at line 336 of file QueryPipeline.cpp.

336  {
337  bufferStage_->stopBuffering();
338 }

◆ stopBuffering() [2/2]

void knowrob::QueryPipeline::stopBuffering ( )

After creation of the pipeline, messages are buffered until this is called.

Member Data Documentation

◆ bufferStage_

std::shared_ptr< TokenBuffer > knowrob::QueryPipeline::bufferStage_
protected

Definition at line 58 of file QueryPipeline.h.

◆ finalStage_

std::shared_ptr< TokenBroadcaster > knowrob::QueryPipeline::finalStage_
protected

Definition at line 57 of file QueryPipeline.h.

◆ initialStages_

std::vector< std::shared_ptr< TokenStream > > knowrob::QueryPipeline::initialStages_
protected

Definition at line 56 of file QueryPipeline.h.


The documentation for this class was generated from the following files: