Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support erasing inflights in CompleteMPU and AbortMPU #5719

Merged
merged 5 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/api/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ const api = {

const requestContexts = prepareRequestContexts(apiMethod, request,
sourceBucket, sourceObject, sourceVersionId);

// Extract all the _apiMethods and store them in an array
const apiMethods = requestContexts ? requestContexts.map(context => context._apiMethod) : [];
// Attach the names to the current request
Expand Down
9 changes: 9 additions & 0 deletions lib/api/apiUtils/authorization/prepareRequestContexts.js
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,15 @@ function prepareRequestContexts(apiMethod, request, sourceBucket,
requestContexts.push(requestContext);
}

if (apiMethod === 'completeMultipartUpload' || apiMethod === 'multipartDelete') {
// Request account quotas explicitly for MPU requests, to consider parts cleanup
// NOTE: we need quota for these, but it will be evaluated at the end of the API,
// once the parts have actually been deleted (not via standardMetadataValidateBucketAndObj)
requestContexts.forEach(context => {
context._needQuota = true; // eslint-disable-line no-param-reassign
});
}

return requestContexts;
}

Expand Down
18 changes: 17 additions & 1 deletion lib/api/apiUtils/object/abortMultipartUpload.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
const locationConstraintCheck = require('../object/locationConstraintCheck');
const { standardMetadataValidateBucketAndObj } =
require('../../../metadata/metadataUtils');
const { validateQuotas } = require('../quotas/quotaUtils');
const services = require('../../../services');
const metadata = require('../../../metadata/wrapper');

Expand Down Expand Up @@ -138,7 +139,22 @@
}
cb();
});
}, () => next(null, mpuBucket, storedParts, destBucket));
}, () => {
const length = storedParts.reduce((length, loc) => length + loc.value.Size, 0);
return validateQuotas(request, destBucket, request.accountQuotas,
['objectDelete'], 'objectDelete', -length, false, log, err => {
if (err) {
// Ignore error, as the data has been deleted already: only inflight count
// has not been updated, and will be eventually consistent anyway
log.warn('failed to update inflights', {

Check warning on line 149 in lib/api/apiUtils/object/abortMultipartUpload.js

View check run for this annotation

Codecov / codecov/patch

lib/api/apiUtils/object/abortMultipartUpload.js#L149

Added line #L149 was not covered by tests
method: 'abortMultipartUpload',
locations,
error: err,
});
}
next(null, mpuBucket, storedParts, destBucket);
});
});
},
function deleteShadowObjectMetadata(mpuBucket, storedParts, destBucket, next) {
let splitter = constants.splitter;
Expand Down
72 changes: 45 additions & 27 deletions lib/api/completeMultipartUpload.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
= require('./apiUtils/object/locationKeysHaveChanged');
const { setExpirationHeaders } = require('./apiUtils/object/expirationHeaders');
const { validatePutVersionId } = require('./apiUtils/object/coldStorage');
const { validateQuotas } = require('./apiUtils/quotas/quotaUtils');

const versionIdUtils = versioning.VersionID;

Expand Down Expand Up @@ -213,12 +214,13 @@
return next(err, destBucket);
}
const storedParts = result.Contents;
const totalMPUSize = storedParts.reduce((acc, part) => acc + part.value.Size, 0);
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, location, mpuOverviewKey);
jsonList, storedMetadata, location, mpuOverviewKey, totalMPUSize);
});
},
function completeExternalMpu(destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, location, mpuOverviewKey, next) {
jsonList, storedMetadata, location, mpuOverviewKey, totalMPUSize, next) {
const mdInfo = { storedParts, mpuOverviewKey, splitter };
const mpuInfo =
{ objectKey, uploadId, jsonList, bucketName, destBucket };
Expand All @@ -236,16 +238,17 @@
}
// if mpu not handled externally, completeObjData will be null
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey);
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
totalMPUSize);
});
},
function validateAndFilterParts(destBucket, objMD, mpuBucket,
storedParts, jsonList, storedMetadata, completeObjData, mpuOverviewKey,
next) {
totalMPUSize, next) {
if (completeObjData) {
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
completeObjData.filteredPartsObj);
completeObjData.filteredPartsObj, totalMPUSize);
}
const filteredPartsObj = validateAndFilterMpuParts(storedParts,
jsonList, mpuOverviewKey, splitter, log);
Expand All @@ -254,11 +257,11 @@
}
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
filteredPartsObj);
filteredPartsObj, totalMPUSize);
},
function processParts(destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
filteredPartsObj, next) {
filteredPartsObj, totalMPUSize, next) {
// if mpu was completed on backend that stored mpu MD externally,
// skip MD processing steps
if (completeObjData && skipMpuPartProcessing(completeObjData)) {
Expand All @@ -276,7 +279,7 @@
const calculatedSize = completeObjData.contentLength;
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
completeObjData.eTag, calculatedSize, dataLocations,
[mpuOverviewKey], null, completeObjData);
[mpuOverviewKey], null, completeObjData, totalMPUSize);
}

const partsInfo =
Expand All @@ -300,15 +303,15 @@
];
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
aggregateETag, calculatedSize, dataLocations, keysToDelete,
extraPartLocations, completeObjData);
extraPartLocations, completeObjData, totalMPUSize);
}
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
aggregateETag, calculatedSize, dataLocations, keysToDelete,
extraPartLocations, null);
extraPartLocations, null, totalMPUSize);
},
function prepForStoring(destBucket, objMD, mpuBucket, storedMetadata,
aggregateETag, calculatedSize, dataLocations, keysToDelete,
extraPartLocations, completeObjData, next) {
extraPartLocations, completeObjData, totalMPUSize, next) {
const metaHeaders = {};
const keysNotNeeded =
['initiator', 'partLocations', 'key',
Expand All @@ -321,6 +324,8 @@
metaHeaders[item] = storedMetadata[item];
});

const droppedMPUSize = totalMPUSize - calculatedSize;

const metaStoreParams = {
authInfo,
objectKey,
Expand Down Expand Up @@ -380,7 +385,7 @@
return process.nextTick(() => next(null, destBucket, dataLocations,
metaStoreParams, mpuBucket, keysToDelete, aggregateETag,
objMD, extraPartLocations, pseudoCipherBundle,
completeObjData, options));
completeObjData, options, droppedMPUSize));
}

if (!destBucket.isVersioningEnabled() && objMD?.archive?.archiveInfo) {
Expand All @@ -403,13 +408,13 @@
return next(null, destBucket, dataLocations,
metaStoreParams, mpuBucket, keysToDelete, aggregateETag,
objMD, extraPartLocations, pseudoCipherBundle,
completeObjData, options);
completeObjData, options, droppedMPUSize);
});
},
function storeAsNewObj(destinationBucket, dataLocations,
metaStoreParams, mpuBucket, keysToDelete, aggregateETag, objMD,
extraPartLocations, pseudoCipherBundle,
completeObjData, options, next) {
completeObjData, options, droppedMPUSize, next) {
const dataToDelete = options.dataToDelete;
/* eslint-disable no-param-reassign */
metaStoreParams.versionId = options.versionId;
Expand Down Expand Up @@ -461,7 +466,7 @@
return next(null, mpuBucket, keysToDelete, aggregateETag,
extraPartLocations, destinationBucket,
// pass the original version ID as generatedVersionId
objMD.versionId);
objMD.versionId, droppedMPUSize);
}
}
return services.metadataStoreObject(destinationBucket.getName(),
Expand Down Expand Up @@ -499,31 +504,44 @@
}
return next(null, mpuBucket, keysToDelete,
aggregateETag, extraPartLocations,
destinationBucket, generatedVersionId);
destinationBucket, generatedVersionId,
droppedMPUSize);
});
}
return next(null, mpuBucket, keysToDelete, aggregateETag,
extraPartLocations, destinationBucket,
generatedVersionId);
generatedVersionId, droppedMPUSize);
});
},
function deletePartsMetadata(mpuBucket, keysToDelete, aggregateETag,
extraPartLocations, destinationBucket, generatedVersionId, next) {
extraPartLocations, destinationBucket, generatedVersionId, droppedMPUSize, next) {
services.batchDeleteObjectMetadata(mpuBucket.getName(),
keysToDelete, log, err => next(err, extraPartLocations,
destinationBucket, aggregateETag, generatedVersionId));
destinationBucket, aggregateETag, generatedVersionId, droppedMPUSize));
},
function batchDeleteExtraParts(extraPartLocations, destinationBucket,
aggregateETag, generatedVersionId, next) {
aggregateETag, generatedVersionId, droppedMPUSize, next) {
if (extraPartLocations && extraPartLocations.length > 0) {
return data.batchDelete(extraPartLocations, request.method,
null, log, err => {
if (err) {
return next(err);
}
return next(null, destinationBucket, aggregateETag,
generatedVersionId);
return data.batchDelete(extraPartLocations, request.method, null, log, err => {
if (err) {
return next(err);

Check warning on line 527 in lib/api/completeMultipartUpload.js

View check run for this annotation

Codecov / codecov/patch

lib/api/completeMultipartUpload.js#L527

Added line #L527 was not covered by tests
}

francoisferrand marked this conversation as resolved.
Show resolved Hide resolved
return validateQuotas(request, destinationBucket, request.accountQuotas,
['objectDelete'], 'objectDelete', -droppedMPUSize, false, log, err => {
if (err) {
// Ignore error, as the data has been deleted already: only inflight count
// has not been updated, and will be eventually consistent anyway
log.warn('failed to update inflights', {

Check warning on line 535 in lib/api/completeMultipartUpload.js

View check run for this annotation

Codecov / codecov/patch

lib/api/completeMultipartUpload.js#L535

Added line #L535 was not covered by tests
method: 'completeMultipartUpload',
extraPartLocations,
error: err,
});
}
return next(null, destinationBucket, aggregateETag,
generatedVersionId);
});
});
}
return next(null, destinationBucket, aggregateETag,
generatedVersionId);
Expand Down
132 changes: 132 additions & 0 deletions tests/quota/awsNodeSdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -883,4 +883,136 @@ function multiObjectDelete(bucket, keys, size, callback) {
next => deleteBucket(bucket, next),
], done);
});

it('should reduce inflights when completing MPU with fewer parts than uploaded', done => {
const bucket = 'quota-test-bucket-mpu1';
const key = 'quota-test-object';
const parts = 3;
const partSize = 5 * 1024 * 1024;
const totalSize = parts * partSize;
const usedParts = 2;
let uploadId = null;
const ETags = [];

if (!s3Config.isQuotaInflightEnabled()) {
return done();
}

return async.series([
next => createBucket(bucket, false, next),
next => sendRequest(putQuotaVerb, '127.0.0.1:8000', `/${bucket}/?quota=true`,
JSON.stringify({ quota: totalSize * 2 }), config)
.then(() => next()).catch(err => next(err)),
next => s3Client.createMultipartUpload({
Bucket: bucket,
Key: key,
}, (err, data) => {
if (err) {
return next(err);
}
uploadId = data.UploadId;
return next();
}),
next => async.timesSeries(parts, (n, cb) => {
const uploadPartParams = {
Bucket: bucket,
Key: key,
PartNumber: n + 1,
UploadId: uploadId,
Body: Buffer.alloc(partSize),
};
return s3Client.uploadPart(uploadPartParams, (err, data) => {
if (err) {
return cb(err);
}
ETags[n] = data.ETag;
return cb();
});
}, next),
next => wait(inflightFlushFrequencyMS * 2, next),
next => {
// Verify all parts are counted in inflights
assert.strictEqual(scuba.getInflightsForBucket(bucket), totalSize);
return next();
},
next => {
// Complete with only first two parts
const params = {
Bucket: bucket,
Key: key,
MultipartUpload: {
Parts: Array.from({ length: usedParts }, (_, i) => ({
ETag: ETags[i],
PartNumber: i + 1,
})),
},
UploadId: uploadId,
};
return s3Client.completeMultipartUpload(params, next);
},
next => wait(inflightFlushFrequencyMS * 2, () => next()),
next => {
// Verify inflights reduced by dropped part
const expectedInflights = usedParts * partSize;
assert.strictEqual(scuba.getInflightsForBucket(bucket), expectedInflights);
return next();
},
next => deleteObject(bucket, key, usedParts * partSize, next),
next => deleteBucket(bucket, next),
], done);
});

it('should reduce inflights when aborting MPU', done => {
const bucket = 'quota-test-bucket-mpu2';
const key = 'quota-test-object';
const parts = 3;
const partSize = 5 * 1024 * 1024;
const totalSize = parts * partSize;
let uploadId = null;

if (!s3Config.isQuotaInflightEnabled()) {
return done();
}

return async.series([
next => createBucket(bucket, false, next),
next => sendRequest(putQuotaVerb, '127.0.0.1:8000', `/${bucket}/?quota=true`,
JSON.stringify({ quota: totalSize * 2 }), config)
.then(() => next()).catch(err => next(err)),
next => s3Client.createMultipartUpload({
Bucket: bucket,
Key: key,
}, (err, data) => {
if (err) {
return next(err);
}
uploadId = data.UploadId;
return next();
}),
next => async.timesSeries(parts, (n, cb) => {
const uploadPartParams = {
Bucket: bucket,
Key: key,
PartNumber: n + 1,
UploadId: uploadId,
Body: Buffer.alloc(partSize),
};
return s3Client.uploadPart(uploadPartParams, cb);
}, next),
next => wait(inflightFlushFrequencyMS * 2, next),
next => {
// Verify all parts are counted in inflights
assert.strictEqual(scuba.getInflightsForBucket(bucket), totalSize);
return next();
},
next => abortMPU(bucket, key, uploadId, totalSize, next),
next => wait(inflightFlushFrequencyMS * 2, next),
next => {
// Verify inflights reduced to zero after abort
assert.strictEqual(scuba.getInflightsForBucket(bucket), 0);
return next();
},
next => deleteBucket(bucket, next),
], done);
});
});
Loading
Loading