diff --git a/config.js b/config.js index 17a03f06d2..6c3a2435a4 100644 --- a/config.js +++ b/config.js @@ -712,6 +712,7 @@ config.BUCKET_LOG_CONCURRENCY = 10; //////////////////////////////// config.NOTIFICATION_LOG_NS = 'notification_logging'; config.NOTIFICATION_LOG_DIR = process.env.NOTIFICATION_LOG_DIR; +config.NOTIFICATION_BATCH = process.env.BATCH || 10; /////////////////////////// // KEY ROTATOR // diff --git a/src/util/notifications_util.js b/src/util/notifications_util.js index 6b21cf7ccb..776d5563cf 100644 --- a/src/util/notifications_util.js +++ b/src/util/notifications_util.js @@ -35,13 +35,14 @@ class Notificator { * @param {Object} options */ - constructor({name, fs_context, connect_files_dir, nc_config_fs}) { + constructor({name, fs_context, connect_files_dir, nc_config_fs, batch_size}) { this.name = name; this.connect_str_to_connection = new Map(); this.notif_to_connect = new Map(); this.fs_context = fs_context ?? get_process_fs_context(); this.connect_files_dir = connect_files_dir ?? DEFAULT_CONNECT_FILES_DIR; this.nc_config_fs = nc_config_fs; + this.batch_size = batch_size || config.NOTIFICATION_BATCH || 10; } async run_batch() { @@ -112,7 +113,7 @@ class Notificator { */ async _notify(fs_context, log_file, failure_append) { const file = new LogFile(fs_context, log_file); - const send_promises = []; + let send_promises = []; await file.collect_and_process(async str => { try { const notif = JSON.parse(str); @@ -137,6 +138,10 @@ class Notificator { } const send_promise = connection.promise_notify(notif, failure_append); if (send_promise) send_promises.push(send_promise); + if (send_promises.length > this.batch_size) { + await Promise.all(send_promises); + send_promises = []; + } } catch (err) { dbg.error("Failed to notify. err = ", err, ", str =", str); await failure_append(str);