Skip to content

Commit

Permalink
bucket notification - introduce batch param (#8685)
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Prinz Setter <[email protected]>
  • Loading branch information
alphaprinz committed Jan 15, 2025
1 parent d1124f1 commit 721dedb
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
1 change: 1 addition & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ config.BUCKET_LOG_CONCURRENCY = 10;
////////////////////////////////
config.NOTIFICATION_LOG_NS = 'notification_logging';
config.NOTIFICATION_LOG_DIR = process.env.NOTIFICATION_LOG_DIR;
config.NOTIFICATION_BATCH = process.env.BATCH || 10;

///////////////////////////
// KEY ROTATOR //
Expand Down
9 changes: 7 additions & 2 deletions src/util/notifications_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ class Notificator {
* @param {Object} options
*/

constructor({name, fs_context, connect_files_dir, nc_config_fs}) {
constructor({name, fs_context, connect_files_dir, nc_config_fs, batch_size}) {
this.name = name;
this.connect_str_to_connection = new Map();
this.notif_to_connect = new Map();
this.fs_context = fs_context ?? get_process_fs_context();
this.connect_files_dir = connect_files_dir ?? DEFAULT_CONNECT_FILES_DIR;
this.nc_config_fs = nc_config_fs;
this.batch_size = batch_size || config.NOTIFICATION_BATCH || 10;
}

async run_batch() {
Expand Down Expand Up @@ -112,7 +113,7 @@ class Notificator {
*/
async _notify(fs_context, log_file, failure_append) {
const file = new LogFile(fs_context, log_file);
const send_promises = [];
let send_promises = [];
await file.collect_and_process(async str => {
try {
const notif = JSON.parse(str);
Expand All @@ -137,6 +138,10 @@ class Notificator {
}
const send_promise = connection.promise_notify(notif, failure_append);
if (send_promise) send_promises.push(send_promise);
if (send_promises.length > this.batch_size) {
await Promise.all(send_promises);
send_promises = [];
}
} catch (err) {
dbg.error("Failed to notify. err = ", err, ", str =", str);
await failure_append(str);
Expand Down

0 comments on commit 721dedb

Please sign in to comment.