6 #include "knowrob/storage/StorageInterface.h"
7 #include "knowrob/storage/ReifiedTriple.h"
8 #include "knowrob/storage/UnReificationContainer.h"
9 #include "knowrob/storage/ReifiedQuery.h"
10 #include "knowrob/storage/Transaction.h"
11 #include "knowrob/semweb/GraphBuiltin.h"
12 #include "knowrob/knowrob.h"
13 #include "knowrob/storage/reification.h"
21 const std::vector<std::shared_ptr<NamedBackend>> &backends) {
22 std::shared_ptr<transaction::Transaction> transaction;
23 if (transactionType ==
Insert) {
24 transaction = std::make_shared<transaction::Insert>(queryable,
vocabulary());
26 transaction = std::make_shared<transaction::Remove>(queryable,
vocabulary());
29 for (
auto &backend: backends) {
30 transaction->addBackend(backend);
34 auto &backend = definedBackend.second->value();
38 for (
auto &excluded: backends) {
39 if (excluded && backend == excluded->value()) {
46 transaction->addBackend(definedBackend.second);
54 std::vector<std::shared_ptr<ThreadPool::Runner>> transactions;
56 auto definedBackend = it.second;
58 auto transaction = std::make_shared<ThreadPool::LambdaRunner>(
60 if (definedBackend->value()->removeAllWithOrigin(origin)) {
62 definedBackend->value()->setVersionOfOrigin(origin, std::nullopt);
64 KB_WARN(
"removal of triples with origin '{}' from backend '{}' failed!", origin,
65 definedBackend->name());
71 [definedBackend](
const std::exception &exc) {
72 KB_ERROR(
"transaction failed for backend '{}': {}", definedBackend->name(), exc.what());
74 transactions.push_back(transaction);
78 for (
auto &transaction: transactions) transaction->join();
81 if (!
vocabulary()->importHierarchy()->isReservedOrigin(origin)) {
82 vocabulary()->importHierarchy()->removeCurrentGraph(origin);
89 auto pat = std::make_shared<TriplePattern>(triple);
95 std::vector<TriplePtr> overlappingTriples;
100 if (mergedTriple.
mergeFrame(*matchedTriple)) {
101 auto &x = overlappingTriples.emplace_back();
104 if (matchedTriple.owned) {
105 x.ptr = matchedTriple.ptr;
106 matchedTriple.owned = false;
108 x.ptr = new TripleCopy(*matchedTriple);
112 if (!overlappingTriples.empty()) {
114 auto container = std::make_shared<ProxyTripleContainer>(&overlappingTriples);
115 createTransaction(backend, Remove)->commit(container);
118 createTransaction(backend, Insert)->commit(mergedTriple);
124 return backend->contains(triple);
128 bool containsAll =
false;
129 for (
auto &reified: reification) {
130 containsAll = backend->contains(*reified.ptr);
140 backend->foreach(visitor);
149 std::vector<TriplePtr> originalTriples;
151 backend->foreach([&](
const TriplePtr &triple) {
153 auto © = originalTriples.emplace_back();
157 copy.ptr = triple.ptr;
158 triple.owned = false;
160 copy.ptr = new TripleCopy(*triple.ptr);
162 unReifiedTriples.
add(*copy.ptr);
168 for (
auto &triple: unReifiedTriples) {
175 backend->batch(callback);
188 std::vector<TriplePtr> reificationTriples;
189 auto batch = std::make_shared<TripleViewBatch>(batchSize);
191 for (
auto &triple: *triples) {
195 triple.owned = false;
196 auto &newOwner = reificationTriples.emplace_back(triple.ptr);
197 unReifiedTriples.add(*newOwner.ptr);
199 auto © = reificationTriples.emplace_back(new TripleCopy(*triple.ptr));
200 unReifiedTriples.add(*copy.ptr);
204 if (
batch->size() >= batchSize) {
211 for (
auto &triple: unReifiedTriples) {
213 if (batch->size() >= batchSize) {
218 if (!batch->empty()) {
223 static void setReifiedVariables(
224 const std::shared_ptr<GraphTerm> &t,
225 const std::map<std::string_view, VariablePtr> &variables) {
226 switch (t->termType()) {
228 auto &
pattern = std::static_pointer_cast<GraphPattern>(t)->value();
229 if (!
pattern->propertyTerm() || !
pattern->propertyTerm()->isAtomic())
break;
230 auto atomic = std::static_pointer_cast<Atomic>(
pattern->propertyTerm());
231 auto needle = variables.find(
atomic->stringForm());
232 if (needle != variables.end()) {
233 pattern->setObjectVariable(needle->second);
239 auto connective = std::static_pointer_cast<GraphConnective>(t);
240 for (
auto &
term: connective->terms()) {
241 setReifiedVariables(
term, variables);
252 static auto ctx = std::make_shared<QueryContext>();
254 backend->match(q, visitor);
258 backend->match(q, visitor);
261 static auto v_o = std::make_shared<Variable>(
"o");
262 static auto v_begin = std::make_shared<Variable>(
"begin");
263 static auto v_end = std::make_shared<Variable>(
"end");
264 static auto v_confidence = std::make_shared<Variable>(
"confidence");
265 static auto v_uncertain = std::make_shared<Variable>(
"uncertain");
266 static auto v_occasional = std::make_shared<Variable>(
"occasional");
268 auto reified = std::make_shared<ReifiedQuery>(q,
vocabulary(),
true);
270 setReifiedVariables(reified->term(), {
271 {reification::hasBeginTime->stringForm(), v_begin},
272 {reification::hasEndTime->stringForm(), v_end},
273 {reification::hasConfidence->stringForm(), v_confidence},
274 {reification::isUncertain->stringForm(), v_uncertain},
275 {reification::isOccasional->stringForm(), v_occasional}
278 backend->query(reified, [&](
const BindingsPtr &bindings) {
284 if (bindings->contains(v_begin->name())) {
285 auto &t_begin = bindings->get(v_begin->name());
286 if (t_begin->isNumeric()) {
287 triple->
setBegin(std::static_pointer_cast<Numeric>(t_begin)->asDouble());
290 if (bindings->contains(v_end->name())) {
291 auto &t_end = bindings->get(v_end->name());
292 if (t_end->isNumeric()) {
293 triple->
setEnd(std::static_pointer_cast<Numeric>(t_end)->asDouble());
296 if (bindings->contains(v_confidence->name())) {
297 auto &t_confidence = bindings->get(v_confidence->name());
298 if (t_confidence->isNumeric()) {
299 triple->
setConfidence(std::static_pointer_cast<Numeric>(t_confidence)->asDouble());
302 if (bindings->contains(v_uncertain->name())) {
303 auto &t_uncertain = bindings->get(v_uncertain->name());
304 if (t_uncertain->isNumeric()) {
305 triple->
setIsUncertain(std::static_pointer_cast<Boolean>(t_uncertain)->asBoolean());
310 if (bindings->contains(v_occasional->name())) {
311 auto &t_occasional = bindings->get(v_occasional->name());
312 if (t_occasional->isNumeric()) {
313 triple->
setIsOccasional(std::static_pointer_cast<Boolean>(t_occasional)->asBoolean());
330 auto reified = std::make_shared<ReifiedQuery>(q,
vocabulary());
331 backend->query(reified, [&](
const BindingsPtr &bindings) {
335 backend->query(q, callback);
341 auto expanded = backend->expand(q);
344 bool hasPositiveAnswer =
false;
346 channel->push(backend->yes(q, expanded, bindings));
347 hasPositiveAnswer = true;
349 if (!hasPositiveAnswer) {
350 channel->push(backend->no(q));
354 catch (
const std::exception &e) {
362 std::shared_ptr<TokenBuffer> result = std::make_shared<TokenBuffer>();
364 std::make_shared<ThreadPool::LambdaRunner>(
369 KB_WARN(
"an exception occurred for graph query ({}): {}.", *q, e.what());
static uint32_t batchSize()
static std::shared_ptr< Numeric > trueAtom()
static int getReificationFlags(const TriplePattern &q)
static bool hasReifiablePattern(const std::shared_ptr< GraphQuery > &nonReified)
static bool isPartOfReification(const Triple &triple)
std::shared_ptr< transaction::Transaction > createTransaction(const QueryableBackendPtr &queryable, TransactionType type, BackendSelection mode=Excluding, const std::vector< std::shared_ptr< NamedBackend >> &backends={})
bool removeAllWithOrigin(std::string_view origin)
bool mergeInsert(const QueryableBackendPtr &backend, const Triple &triple)
auto & vocabulary() const
static void foreach(const QueryableBackendPtr &backend, const TripleVisitor &visitor)
void pushIntoCursor(const QueryableBackendPtr &backend, const GraphPathQueryPtr &query, const TokenBufferPtr &resultStream) const
std::shared_ptr< StorageManager > backendManager_
void query(const QueryableBackendPtr &backend, const GraphQueryPtr &q, const BindingsHandler &callback) const
static void batch(const QueryableBackendPtr &backend, const TripleHandler &callback)
TokenBufferPtr getAnswerCursor(const QueryableBackendPtr &backend, const GraphPathQueryPtr &query)
void match(const QueryableBackendPtr &backend, const TriplePattern &query, const TripleVisitor &visitor) const
std::function< bool()> StopChecker
static std::shared_ptr< Channel > create(const std::shared_ptr< TokenStream > &stream)
void setConfidence(double confidence)
void setIsUncertain(bool isUncertain)
void setIsOccasional(bool isOccasional)
bool mergeFrame(const Triple &other)
void setBegin(double begin)
bool instantiateInto(Triple &triple, const std::shared_ptr< const Bindings > &bindings=Bindings::emptyBindings()) const
void add(const Triple &triple)
GraphTermRule & pattern()
std::function< void(const TriplePtr &)> TripleVisitor
std::shared_ptr< TripleContainer > TripleContainerPtr
std::shared_ptr< const Bindings > BindingsPtr
std::shared_ptr< TokenBuffer > TokenBufferPtr
std::function< void(const BindingsPtr &)> BindingsHandler
std::shared_ptr< ThreadPool > DefaultThreadPool()
std::function< void(const TripleContainerPtr &)> TripleHandler
std::shared_ptr< GraphQuery > GraphQueryPtr
std::shared_ptr< QueryableStorage > QueryableBackendPtr
TripleTemplate< std::string_view > TripleView
std::shared_ptr< GraphPathQuery > GraphPathQueryPtr