6 #include "knowrob/queries/QueryPipeline.h"
7 #include "knowrob/semweb/GraphPathQuery.h"
8 #include "knowrob/queries/DisjunctiveBroadcaster.h"
9 #include "knowrob/queries/RedundantAnswerFilter.h"
10 #include "knowrob/queries/QueryTree.h"
11 #include "knowrob/formulas/ModalFormula.h"
12 #include "knowrob/queries/ModalStage.h"
13 #include "knowrob/queries/NegationStage.h"
14 #include "knowrob/queries/ConjunctiveBroadcaster.h"
15 #include "knowrob/queries/QueryError.h"
16 #include "knowrob/KnowledgeBase.h"
17 #include "knowrob/reasoner/Computable.h"
18 #include "knowrob/semweb/RDFIndicator.h"
26 struct EDBComparator {
27 explicit EDBComparator(
VocabularyPtr vocabulary) : vocabulary_(std::move(vocabulary)) {}
37 struct IDBComparator {
38 explicit IDBComparator(
VocabularyPtr vocabulary) : vocabulary_(std::move(vocabulary)) {}
46 static bool isMaterializedInEDB(
const std::shared_ptr<KnowledgeBase> &kb, std::string_view property) {
47 return kb->vocabulary()->frequency(property) > 0;
52 auto outStream = std::make_shared<TokenBuffer>();
57 for (
auto &path: qt) {
61 std::vector<FirstOrderLiteralPtr> posLiterals, negLiterals;
62 std::vector<std::shared_ptr<ModalFormula>> posModals, negModals;
65 for (
auto &node: path.nodes()) {
66 switch (node->type()) {
68 auto lit = std::make_shared<FirstOrderLiteral>(
69 std::static_pointer_cast<Predicate>(node),
false);
70 posLiterals.push_back(lit);
75 posModals.push_back(std::static_pointer_cast<ModalFormula>(node));
79 auto negation = (
Negation *) node.get();
80 auto negated = negation->negatedFormula();
81 switch (negated->type()) {
83 auto lit = std::make_shared<FirstOrderLiteral>(
84 std::static_pointer_cast<Predicate>(negated),
true);
85 negLiterals.push_back(lit);
89 negModals.push_back(std::static_pointer_cast<ModalFormula>(negated));
92 throw QueryError(
"Unexpected negated formula type {} in QueryTree.", (
int) negated->type());
97 throw QueryError(
"Unexpected formula type {} in QueryTree.", (
int) node->type());
101 std::shared_ptr<TokenBroadcaster> lastStage;
102 std::shared_ptr<TokenBuffer> firstBuffer;
107 if (posLiterals.empty()) {
111 firstBuffer = std::make_shared<TokenBuffer>();
112 lastStage = firstBuffer;
118 auto pathQuery = std::make_shared<ConjunctiveQuery>(posLiterals, ctx);
119 auto subPipeline = std::make_shared<QueryPipeline>(kb, pathQuery);
120 firstBuffer = std::make_shared<AnswerBuffer_WithReference>(subPipeline);
121 *subPipeline >> firstBuffer;
122 subPipeline->stopBuffering();
123 lastStage = firstBuffer;
130 for (
auto &posModal: posModals) {
131 auto modalStage = std::make_shared<ModalStage>(kb, posModal, ctx);
132 modalStage->selfWeakRef_ = modalStage;
133 lastStage >> modalStage;
134 lastStage = modalStage;
140 if (!negLiterals.empty()) {
142 auto negLiteralStage = std::make_shared<PredicateNegationStage>(
143 kb, ctx, negLiterals);
144 lastStage >> negLiteralStage;
145 lastStage = negLiteralStage;
151 if (!negModals.empty()) {
153 auto negModalStage = std::make_shared<ModalNegationStage>(
155 lastStage >> negModalStage;
156 lastStage = negModalStage;
159 lastStage >> outStream;
160 firstBuffer->stopBuffering();
168 auto consolidator = std::make_shared<DisjunctiveBroadcaster>();
169 outStream >> consolidator;
178 auto &allLiterals = conjunctiveQuery->literals();
184 std::vector<FirstOrderLiteralPtr> positiveLiterals, negativeLiterals;
185 for (
auto &l: allLiterals) {
186 if (l->isNegated()) negativeLiterals.push_back(l);
187 else positiveLiterals.push_back(l);
194 std::vector<TriplePatternPtr> edbOnlyLiterals;
195 std::vector<ComputablePtr> computableLiterals;
196 bool hasUnknownPredicate =
false;
197 for (
auto &l: positiveLiterals) {
201 std::vector<DefiningReasoner> l_reasoner;
202 if (indicator.functor) {
203 l_reasoner = kb->reasonerManager()->findDefiningReasoner(
207 if (l_reasoner.empty()) {
208 if (indicator.arity > 2) {
209 KB_WARN(
"Predicate {} is not defined by any reasoner.", *l->predicate());
210 hasUnknownPredicate =
true;
211 }
else if (indicator.functor && !isMaterializedInEDB(kb, *indicator.functor)) {
212 KB_WARN(
"Predicate {} is neither materialized in the EDB nor defined by a reasoner.", *l->predicate());
213 hasUnknownPredicate =
true;
215 auto rdfLiteral = std::make_shared<TriplePattern>(
216 l->predicate(), l->isNegated());
217 rdfLiteral->setTripleFrame(conjunctiveQuery->ctx()->selector);
218 edbOnlyLiterals.push_back(rdfLiteral);
221 computableLiterals.push_back(std::make_shared<Computable>(*l, l_reasoner));
225 if (hasUnknownPredicate) {
227 auto out = std::make_shared<TokenBuffer>();
229 auto dontKnow = std::make_shared<AnswerDontKnow>();
230 channel->push(dontKnow);
240 std::sort(edbOnlyLiterals.begin(), edbOnlyLiterals.end(), EDBComparator(kb->vocabulary()));
245 std::shared_ptr<TokenBuffer> edbOut;
246 if (edbOnlyLiterals.empty()) {
247 edbOut = std::make_shared<TokenBuffer>();
252 auto edb = kb->getBackendForQuery();
253 edbOut = kb->edb()->getAnswerCursor(edb,
254 std::make_shared<GraphPathQuery>(edbOnlyLiterals, conjunctiveQuery->ctx()));
261 std::shared_ptr<TokenBroadcaster> idbOut;
262 if (computableLiterals.empty()) {
265 idbOut = std::make_shared<TokenBroadcaster>();
270 dg.
insert(computableLiterals.begin(), computableLiterals.end());
276 auto &literalGroup = *dg.
begin();
283 conjunctiveQuery->ctx());
288 auto answerCombiner = std::make_shared<ConjunctiveBroadcaster>();
290 for (
auto &literalGroup: dg) {
300 conjunctiveQuery->ctx());
302 answerCombiner >> idbOut;
309 if (!negativeLiterals.empty()) {
311 auto negStage = std::make_shared<PredicateNegationStage>(
312 kb, conjunctiveQuery->ctx(), negativeLiterals);
344 struct DependencyNodeComparator {
347 if (a->numVariables() != b->numVariables()) {
348 return a->numVariables() > b->numVariables();
351 if (a->numNeighbors() != b->numNeighbors()) {
352 return a->numNeighbors() > b->numNeighbors();
358 struct DependencyNodeQueue {
360 std::priority_queue<DependencyNodePtr, std::vector<DependencyNodePtr>, DependencyNodeComparator> neighbors_;
364 for (
auto &neighbor: node->neighbors()) {
365 neighbors_.push(neighbor);
372 const std::shared_ptr<KnowledgeBase> &kb,
373 const std::list<DependencyNodePtr> &dependencyGroup) {
375 auto comparator = IDBComparator(kb->vocabulary());
378 for (
auto &n: dependencyGroup) {
380 std::static_pointer_cast<Computable>(n->literal());
381 if (!first || comparator(firstComputable, computable_n)) {
383 firstComputable = computable_n;
389 std::set<DependencyNode *> visited;
390 visited.insert(first.get());
392 std::vector<ComputablePtr> sequence;
393 sequence.push_back(firstComputable);
396 std::deque<std::shared_ptr<DependencyNodeQueue>> queue;
397 auto qn0 = std::make_shared<DependencyNodeQueue>(first);
398 queue.push_front(qn0);
404 while (!queue.empty()) {
405 auto front = queue.front();
409 while (!front->neighbors_.empty()) {
410 auto topNeighbor = front->neighbors_.top();
411 front->neighbors_.pop();
413 if (visited.count(topNeighbor.get()) == 0) {
414 topNext = topNeighbor;
419 if (front->neighbors_.empty()) {
425 auto qn_next = std::make_shared<DependencyNodeQueue>(topNext);
426 queue.push_front(qn_next);
427 sequence.push_back(std::static_pointer_cast<Computable>(topNext->literal()));
428 visited.insert(topNext.get());
435 static inline std::vector<FirstOrderLiteralPtr> replaceFunctors(
const std::vector<ComputablePtr> &computables) {
437 std::vector<FirstOrderLiteralPtr> result;
438 result.reserve(computables.size());
439 for (
auto &computable: computables) {
440 auto computableFunctor = computable->reasonerList().front().second;
442 if (computable->functor()->stringForm() == computableFunctor->stringForm()) {
443 result.push_back(computable);
445 auto computablePredicate = std::make_shared<Predicate>(computableFunctor, computable->predicate()->arguments());
446 result.push_back(std::make_shared<FirstOrderLiteral>(
447 computablePredicate, computable->isNegated()));
454 const std::shared_ptr<KnowledgeBase> &kb,
455 std::vector<ComputablePtr> &computableLiterals,
456 const std::shared_ptr<TokenBroadcaster> &pipelineInput,
457 const std::shared_ptr<TokenBroadcaster> &pipelineOutput,
466 struct MergedComputables {
467 MergedComputables() : requiresEDB(
true) {};
470 std::vector<ComputablePtr> literals;
473 auto lastOut = pipelineInput;
486 std::vector<MergedComputables> mergedComputables;
487 while (!computableLiterals.empty()) {
488 auto next = computableLiterals.front();
490 computableLiterals.erase(computableLiterals.begin());
492 auto &merged = mergedComputables.emplace_back();
496 merged.requiresEDB =
false;
497 for (
auto &r: next->reasonerList()) {
501 merged.requiresEDB =
true;
507 merged.requiresEDB = merged.requiresEDB && indicator.arity <= 2;
508 if (merged.requiresEDB && indicator.functor) {
510 merged.requiresEDB = isMaterializedInEDB(kb, *indicator.functor);
512 merged.literals.push_back(next);
514 bool supportsSimpleConjunction =
true;
515 for (
auto &r: next->reasonerList()) {
517 supportsSimpleConjunction =
false;
522 if (supportsSimpleConjunction && !merged.requiresEDB && next->reasonerList().size() == 1) {
524 for (
auto it = computableLiterals.begin(); it != computableLiterals.end();) {
526 if (next->reasonerList() == lit->reasonerList()) {
527 merged.literals.push_back(lit);
528 it = computableLiterals.erase(it);
537 for (
auto &mergedComputable: mergedComputables) {
538 auto stepInput = lastOut;
539 auto stepOutput = std::make_shared<TokenBroadcaster>();
540 uint32_t numStages = 0;
545 if (mergedComputable.requiresEDB) {
546 auto edb = kb->getBackendForQuery();
547 auto edbStage = std::make_shared<TypedQueryStage<FirstOrderLiteral>>(
549 mergedComputable.item,
551 auto rdfLiteral = std::make_shared<TriplePattern>(
552 q->predicate(), q->isNegated());
553 rdfLiteral->setTripleFrame(ctx->selector);
554 return kb->edb()->getAnswerCursor(edb, std::make_shared<GraphPathQuery>(rdfLiteral, ctx));
556 edbStage->selfWeakRef_ = edbStage;
557 stepInput >> edbStage;
558 edbStage >> stepOutput;
566 for (
auto &r: mergedComputable.item->reasonerList()) {
567 auto idbStage = std::make_shared<TypedQueryStageVec<Computable>>(
568 ctx, mergedComputable.literals,
569 [r, ctx](
const std::vector<ComputablePtr> &q) {
572 idbStage->selfWeakRef_ = idbStage;
573 stepInput >> idbStage;
574 idbStage >> stepOutput;
577 lastOut = stepOutput;
585 auto consolidator = std::make_shared<DisjunctiveBroadcaster>();
586 lastOut >> consolidator;
587 lastOut = consolidator;
596 auto filterStage = std::make_shared<RedundantAnswerFilter>();
597 lastOut >> filterStage;
598 lastOut = filterStage;
602 lastOut >> pipelineOutput;
607 auto numVars_a = a->numVariables();
608 auto numVars_b = b->numVariables();
609 if (numVars_a != numVars_b)
return (numVars_a > numVars_b);
612 bool hasProperty_a = (a->propertyTerm() && a->propertyTerm()->termType() ==
TermType::ATOMIC);
613 bool hasProperty_b = (b->propertyTerm() && b->propertyTerm()->termType() ==
TermType::ATOMIC);
614 if (hasProperty_a != hasProperty_b)
return (hasProperty_a < hasProperty_b);
618 auto numAsserts_a = vocabulary_->frequency(
619 std::static_pointer_cast<Atomic>(a->propertyTerm())->stringForm());
620 auto numAsserts_b = vocabulary_->frequency(
621 std::static_pointer_cast<Atomic>(b->propertyTerm())->stringForm());
622 if (numAsserts_a != numAsserts_b)
return (numAsserts_a > numAsserts_b);
630 auto numVars_a = a->numVariables();
631 auto numVars_b = b->numVariables();
632 if (numVars_a != numVars_b)
return (numVars_a > numVars_b);
638 bool hasProperty_a = indicator_a.functor.has_value();
639 bool hasProperty_b = indicator_b.functor.has_value();
640 if (hasProperty_a != hasProperty_b)
return (hasProperty_a < hasProperty_b);
644 auto hasEDBAssertion_a = vocabulary_->isDefinedProperty(*indicator_a.functor);
645 auto hasEDBAssertion_b = vocabulary_->isDefinedProperty(*indicator_b.functor);
646 if (hasEDBAssertion_a != hasEDBAssertion_b)
return (hasEDBAssertion_a < hasEDBAssertion_b);
651 auto numAsserts_a = vocabulary_->frequency(*indicator_a.functor);
652 auto numAsserts_b = vocabulary_->frequency(*indicator_b.functor);
653 if (numAsserts_a != numAsserts_b)
return (numAsserts_a > numAsserts_b);
657 auto numReasoner_a = a->reasonerList().size();
658 auto numReasoner_b = b->reasonerList().size();
659 if (numReasoner_a != numReasoner_b)
return (numReasoner_a > numReasoner_b);
void insert(const DependencyNodePtr &node)
std::vector< std::shared_ptr< TokenStream > > initialStages_
void operator>>(const std::shared_ptr< TokenStream > &stage)
static void createComputationPipeline(const std::shared_ptr< KnowledgeBase > &kb, std::vector< ComputablePtr > &computableLiterals, const std::shared_ptr< TokenBroadcaster > &pipelineInput, const std::shared_ptr< TokenBroadcaster > &pipelineOutput, const QueryContextPtr &ctx)
std::shared_ptr< TokenBroadcaster > finalStage_
std::shared_ptr< TokenBuffer > bufferStage_
QueryPipeline(const std::shared_ptr< KnowledgeBase > &kb, const FormulaPtr &phi, const QueryContextPtr &ctx)
void addInitialStage(const std::shared_ptr< TokenStream > &stage)
static std::vector< ComputablePtr > createComputationSequence(const std::shared_ptr< KnowledgeBase > &kb, const std::list< DependencyNodePtr > &dependencyGroup)
static TokenBufferPtr evaluateQuery(const GoalDrivenReasonerPtr &reasoner, const std::vector< FirstOrderLiteralPtr > &literals, const QueryContextPtr &ctx)
static std::shared_ptr< Channel > create(const std::shared_ptr< TokenStream > &stream)
std::shared_ptr< TriplePattern > TriplePatternPtr
std::shared_ptr< DependencyNode > DependencyNodePtr
@ QUERY_FLAG_UNIQUE_SOLUTIONS
std::shared_ptr< Formula > FormulaPtr
std::shared_ptr< Computable > ComputablePtr
std::shared_ptr< ConjunctiveQuery > ConjunctiveQueryPtr
const std::shared_ptr< const AnswerYes > & GenericYes()
std::shared_ptr< const QueryContext > QueryContextPtr
@ SupportsSimpleConjunctions
@ SupportsExtensionalGrounding
std::shared_ptr< FirstOrderLiteral > FirstOrderLiteralPtr
std::shared_ptr< Vocabulary > VocabularyPtr