Skip to content

Commit

Permalink
Merge pull request #157 from LiilyZhang/zhangl/mongo6UploadFile
Browse files Browse the repository at this point in the history
Issue #156 - MongoDB6: Concurrent uploading object data f…
  • Loading branch information
LiilyZhang authored Oct 22, 2024
2 parents 72a79b0 + 874865d commit 6604d71
Show file tree
Hide file tree
Showing 10 changed files with 32 additions and 41 deletions.
6 changes: 3 additions & 3 deletions core/base/apiModule.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func UpdateObject(orgID string, objectType string, objectID string, metaData com
if metaData.DestinationsList != nil && metaData.DestType != "" {
return &common.InvalidRequest{Message: "Both destinations list and destination type are specified"}
}
if validatedDestList, _ := common.ValidateDestinationListInput(metaData.DestinationsList); validatedDestList == false {
if validatedDestList, _ := common.ValidateDestinationListInput(metaData.DestinationsList); !validatedDestList {
return &common.InvalidRequest{Message: "Unsupported char <, > in destinationsList."}
}

Expand Down Expand Up @@ -561,7 +561,7 @@ func GetObjectDataByChunk(orgID string, objectType string, objectID string, star

// Add semaphore here to prevent OOM when scaled agents try to download install packages during auto-upgrade at same time
// Semaphore is not applied to GetObjectData because agent-install.sh can't handle it. GetObjectData will just stream the data back without using CSS memory
if common.ObjectDownloadSemaphore.TryAcquire(1) == false {
if !common.ObjectDownloadSemaphore.TryAcquire(1) {
// If too many downloads are in flight, agent will get error and retry. Originally, there was a lock around the download that
// caused the downloads to be serial. It was changed to use a semaphore to allow limited concurrency.
if trace.IsLogging(logger.TRACE) {
Expand Down Expand Up @@ -1659,7 +1659,7 @@ func RegisterWebhook(orgID string, objectType string, webhook string) common.Syn
if err != nil {
return &common.InvalidRequest{Message: "Invalid webhook"}
}
if err != nil || (!strings.EqualFold(uri.Scheme, "http") && !strings.EqualFold(uri.Scheme, "https")) {
if !strings.EqualFold(uri.Scheme, "http") && !strings.EqualFold(uri.Scheme, "https") {
return &common.InvalidRequest{Message: "Invalid destination data URI"}
}

Expand Down
2 changes: 1 addition & 1 deletion core/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func checkIPAddress(host string) (string, common.SyncServiceError) {
}

if ipAddress == "" {
return "", &common.SetupError{Message: fmt.Sprintf("Failed to get IP for the host.")}
return "", &common.SetupError{Message: "Failed to get IP for the host."}
}
return ipAddress, nil
}
2 changes: 1 addition & 1 deletion core/base/certificates.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func setupCertificates() error {
return &common.InvalidRequest{Message: fmt.Sprintf("%s is a directory", certFile)}
}
return nil
} else if err != nil && !os.IsNotExist(err) {
} else if !os.IsNotExist(err) {
return err
}

Expand Down
2 changes: 0 additions & 2 deletions core/base/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ var ConfigFile string

var swaggerFile string

var ipAddress string

func init() {
if flag.CommandLine.Lookup("h") == nil {
flag.BoolVar(&help, "h", false, "Display usage information.")
Expand Down
8 changes: 4 additions & 4 deletions core/communications/httpCommunication.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (communication *HTTP) SendNotificationMessage(notificationTopic string, des
} else if err != nil {
return &Error{"Failed to send HTTP request. Error: " + err.Error()}
} else if response == nil {
return &Error{"Received nil response from HTTP request. Error: " + err.Error()}
return &Error{"Received nil response from HTTP request."}
} else {
if response.StatusCode == http.StatusNoContent {
switch notificationTopic {
Expand Down Expand Up @@ -1478,7 +1478,7 @@ func (communication *HTTP) handlePutData(orgID string, objectType string, object
if trace.IsLogging(logger.DEBUG) {
trace.Debug("Get Content-Range header, will handle put chunked data")
}
if isLastChunk, handlErr = communication.handlePutChunkedData(*metaData, request, startOffset, endOffset, totalSize); err != nil {
if isLastChunk, handlErr = communication.handlePutChunkedData(*metaData, request, startOffset, endOffset, totalSize); handlErr != nil {
common.ObjectLocks.Unlock(lockIndex)
return handlErr
}
Expand Down Expand Up @@ -1681,7 +1681,7 @@ func (communication *HTTP) handleGetData(orgID string, objectType string, object
trace.Trace("Handling object get data of %s %s %s %s \n", objectType, objectID, destType, destID)
}

if common.ObjectDownloadSemaphore.TryAcquire(1) == false {
if !common.ObjectDownloadSemaphore.TryAcquire(1) {
// If too many downloads are in flight, agent will get error and retry. Originally, there was a lock around the download that
// caused the downloads to be serial. It was changed to use a semaphore to allow limited concurrency.
if trace.IsLogging(logger.TRACE) {
Expand Down Expand Up @@ -2214,7 +2214,7 @@ func (communication *HTTP) SendFeedbackMessage(code int, retryInterval int32, re
} else if err != nil {
return &Error{"Failed to send HTTP request. Error: " + err.Error()}
} else if response == nil {
return &Error{"Received nil response from feedback HTTP request. Error: " + err.Error()}
return &Error{"Received nil response from feedback HTTP request."}
} else if response.StatusCode == http.StatusNoContent {
if trace.IsLogging(logger.DEBUG) {
trace.Debug("In SendFeedbackMessage, i: %d, received %d from feedback spi \n", i, response.StatusCode)
Expand Down
2 changes: 1 addition & 1 deletion core/communications/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func callWebhooks(metaData *common.MetaData) {
return
}
for _, url := range webhooks {
request, err := http.NewRequest("POST", url, bytes.NewReader(body))
request, _ := http.NewRequest("POST", url, bytes.NewReader(body))
request.ContentLength = int64(len(body))
request.Header.Add("Content-Type", "Application/JSON")
response, err := http.DefaultClient.Do(request)
Expand Down
4 changes: 2 additions & 2 deletions core/storage/boltStorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,11 @@ func (store *BoltStorage) StoreObject(metaData common.MetaData, data []byte, sta
function := func(object boltObject) (boltObject, common.SyncServiceError) {
if object.Status == common.ConsumedByDest {
// On ESS we remove the data of consumed objects, therefore we can't accept "meta only" updates
return object, &common.InvalidRequest{"Can't update only the meta data of consumed object"}
return object, &common.InvalidRequest{Message: "Can't update only the meta data of consumed object"}
}
if (object.Meta.DestinationPolicy == nil && metaData.DestinationPolicy != nil) ||
(object.Meta.DestinationPolicy != nil && metaData.DestinationPolicy == nil) {
return object, &common.InvalidRequest{"Can't update the existence of Destination Policy"}
return object, &common.InvalidRequest{Message: "Can't update the existence of Destination Policy"}
}

metaData.DataID = object.Meta.DataID // Keep the previous data id
Expand Down
2 changes: 1 addition & 1 deletion core/storage/inMemoryStorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ func (store *InMemoryStorage) CleanObjects() common.SyncServiceError {

// GetObjectDestinations gets destinations that the object has to be sent to
func (store *InMemoryStorage) GetObjectDestinations(metaData common.MetaData) ([]common.Destination, common.SyncServiceError) {
return []common.Destination{common.Destination{DestOrgID: metaData.DestOrgID, DestType: common.Configuration.DestinationType,
return []common.Destination{{DestOrgID: metaData.DestOrgID, DestType: common.Configuration.DestinationType,
DestID: common.Configuration.DestinationID, Communication: common.Configuration.CommunicationProtocol}}, nil
}

Expand Down
27 changes: 9 additions & 18 deletions core/storage/mongoStorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func (store *MongoStorage) UpdateObjectDelivering(orgID string, objectType strin
}
return nil
}
return &Error{fmt.Sprintf("Failed to update object's destinations.")}
return &Error{"Failed to update object's destinations."}
}

// RetrieveObjectStatus finds the object and return its status
Expand Down Expand Up @@ -997,7 +997,7 @@ OUTER:
}
return metaDatas, nil
}
return nil, &Error{fmt.Sprintf("Failed to update object's destinations.")}
return nil, &Error{"Failed to update object's destinations."}
}

// RetrieveConsumedObjects returns all the consumed objects originated from this node
Expand Down Expand Up @@ -1040,7 +1040,7 @@ func (store *MongoStorage) RetrieveObjectAndStatus(orgID string, objectType stri
func (store *MongoStorage) RetrieveObjectData(orgID string, objectType string, objectID string, isTempData bool) (io.Reader, common.SyncServiceError) {
var id string
if isTempData {
return nil, &Error{fmt.Sprintf("RetrieveObjectData with isTempData set true should not be called for Mongo DB.")}
return nil, &Error{"RetrieveObjectData with isTempData set true should not be called for Mongo DB."}
} else {
id = createObjectCollectionID(orgID, objectType, objectID)
}
Expand Down Expand Up @@ -1180,7 +1180,7 @@ func (store *MongoStorage) StoreObjectData(orgID string, objectType string, obje
}

func (store *MongoStorage) StoreObjectTempData(orgID string, objectType string, objectID string, dataReader io.Reader) (bool, common.SyncServiceError) {
return false, &Error{fmt.Sprintf("StoreObjectTempData should not be called for Mongo DB.")}
return false, &Error{"StoreObjectTempData should not be called for Mongo DB."}
}

// Removes all the temporary chunk files from Mongo. Depends on RetrieveObjectTempData putting the fileHandle of the temp file into the map
Expand All @@ -1193,7 +1193,7 @@ func (store *MongoStorage) RemoveObjectTempData(orgID string, objectType string,

metaData, err := store.RetrieveObject(orgID, objectType, objectID)
if err != nil || metaData == nil {
return &Error{fmt.Sprintf("Error in retrieving object metadata.\n")}
return &Error{fmt.Sprintf("Error in retrieving object metadata: %v", err)}
}

var offset int64 = 0
Expand All @@ -1212,8 +1212,6 @@ func (store *MongoStorage) RemoveObjectTempData(orgID string, objectType string,

if fileHandle != nil {
fileHandle.Close()
//store.deleteFileHandle(id)

//Don't return on errors
store.removeFile(id)
}
Expand All @@ -1235,7 +1233,7 @@ func (store *MongoStorage) RetrieveObjectTempData(orgID string, objectType strin
readers := make([]io.Reader, 0)
metaData, err := store.RetrieveObject(orgID, objectType, objectID)
if err != nil || metaData == nil {
return nil, &Error{fmt.Sprintf("Error in retrieving object metadata.\n")}
return nil, &Error{fmt.Sprintf("Error in retrieving object metadata: %v", err)}
}

var offset int64 = 0
Expand Down Expand Up @@ -1361,13 +1359,6 @@ func (store *MongoStorage) AppendObjectData(orgID string, objectType string, obj
trace.Trace("Model file completely written; set updatedLastChunk to %t\n", updatedLastChunk)
}
}

//store.deleteFileHandle(id)
//err = fileHandle.Close()
if err != nil {
return updatedLastChunk, &Error{fmt.Sprintf("Failed to close the file. Error: %s.", err)}
}

return updatedLastChunk, nil
}

Expand All @@ -1385,7 +1376,7 @@ func (store *MongoStorage) getUploadDataChunkSize(orgID string, objectType strin

metaData, err := store.RetrieveObject(orgID, objectType, objectID)
if err != nil || metaData == nil || metaData.UploadChunkSize == 0 {
return 0, &Error{fmt.Sprintf("Error in getUploadDataChunkSize. Failed to find upload chunk size.\n")}
return 0, &Error{fmt.Sprintf("Error in getUploadDataChunkSize. Failed to find upload chunk size: %v\n", err)}
}

return metaData.UploadChunkSize, nil
Expand Down Expand Up @@ -1585,7 +1576,7 @@ func (store *MongoStorage) AddWebhook(orgID string, objectType string, url strin
}
return nil
}
return &Error{fmt.Sprintf("Failed to add a webhook.")}
return &Error{"Failed to add a webhook."}
}

// DeleteWebhook deletes a webhook for an object type
Expand Down Expand Up @@ -1624,7 +1615,7 @@ func (store *MongoStorage) DeleteWebhook(orgID string, objectType string, url st
}
return nil
}
return &Error{fmt.Sprintf("Failed to delete a webhook.")}
return &Error{"Failed to delete a webhook."}
}

// RetrieveWebhooks gets the webhooks for the object type
Expand Down
18 changes: 10 additions & 8 deletions core/storage/mongoStorageHelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,22 +357,27 @@ func (store *MongoStorage) openFile(id string) (*gridfs.DownloadStream, common.S
// Save file into mongo gridFS
func (store *MongoStorage) createFile(id string, data io.Reader) common.SyncServiceError {

function := func(db *mongo.Database) (*gridfs.DownloadStream, error) {
function := func(db *mongo.Database) error {
var err error
bucket := store.gridfsBucket
if bucket == nil {
if bucket, err = gridfs.NewBucket(db); err != nil {
return nil, err
return err
}
}

uploadOpts := options.GridFSUpload().SetChunkSizeBytes(int32(common.Configuration.MaxDataChunkSize))
// filename of the object in fs.File is the value of id
_, err = bucket.UploadFromStream(id, data, uploadOpts)
return nil, err
if uploadStream, err := bucket.OpenUploadStream(id, uploadOpts); err != nil {
return err
} else {
_, err = io.Copy(uploadStream, data)
uploadStream.Close()
return err
}
}

_, retry, err := store.withDBAndReturnHelper(function, false)
retry, err := store.withDBHelper(function, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -440,9 +445,6 @@ func (store *MongoStorage) withDBHelper(function func(*mongo.Database) error, is
}
store.gridfsBucket = gridfsBucket
err = function(db)
if err == nil {
return false, nil
}
if err == nil || err == mongo.ErrNoDocuments || err == mongo.ErrNilCursor || mongo.IsDuplicateKeyError(err) || IsNotFound(err) {
if isRead {
common.HealthStatus.DBReadFailed()
Expand Down

0 comments on commit 6604d71

Please sign in to comment.