From 049e1bf7820436bf04757d2eab5c044a44a4e2c4 Mon Sep 17 00:00:00 2001 From: Vaibhav <120372639+Vaibhav090420@users.noreply.github.com> Date: Thu, 9 Jan 2025 14:48:31 +0530 Subject: [PATCH] Add realtime freshness to bulk update api (#217) --- .../mongo/update/MongoUpdateExecutor.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/update/MongoUpdateExecutor.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/update/MongoUpdateExecutor.java index 9474b939..f7af1344 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/update/MongoUpdateExecutor.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/update/MongoUpdateExecutor.java @@ -1,5 +1,6 @@ package org.hypertrace.core.documentstore.mongo.update; +import static org.hypertrace.core.documentstore.model.options.DataFreshness.REALTIME_FRESHNESS; import static org.hypertrace.core.documentstore.model.options.ReturnDocumentType.NONE; import static org.hypertrace.core.documentstore.mongo.MongoUtils.getReturnDocument; import static org.hypertrace.core.documentstore.mongo.query.parser.MongoFilterTypeExpressionParser.getFilter; @@ -19,6 +20,7 @@ import org.hypertrace.core.documentstore.commons.CommonUpdateValidator; import org.hypertrace.core.documentstore.commons.UpdateValidator; import org.hypertrace.core.documentstore.model.config.ConnectionConfig; +import org.hypertrace.core.documentstore.model.options.QueryOptions; import org.hypertrace.core.documentstore.model.options.ReturnDocumentType; import org.hypertrace.core.documentstore.model.options.UpdateOptions; import org.hypertrace.core.documentstore.model.options.UpdateOptions.MissingDocumentStrategy; @@ -106,13 +108,17 @@ public Optional> bulkUpdate( switch (returnDocumentType) { case BEFORE_UPDATE: - cursor = queryExecutor.aggregate(query); + cursor = + queryExecutor.aggregate( + query, QueryOptions.builder().dataFreshness(REALTIME_FRESHNESS).build()); logAndUpdate(filter, updateObject, mongoUpdateOptions); return Optional.of(cursor); case AFTER_UPDATE: logAndUpdate(filter, updateObject, mongoUpdateOptions); - cursor = queryExecutor.aggregate(query); + cursor = + queryExecutor.aggregate( + query, QueryOptions.builder().dataFreshness(REALTIME_FRESHNESS).build()); return Optional.of(cursor); case NONE: