Skip to content

Commit

Permalink
Merge pull request #109 from dlarson04/Issue_108
Browse files Browse the repository at this point in the history
Issue 108 - CSS avoid leader
  • Loading branch information
linggao authored Apr 11, 2022
2 parents a769f31 + 6768ba9 commit 1edc3d4
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 150 deletions.
10 changes: 6 additions & 4 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,6 @@ func GetStartAndEndRangeFromContentRangeHeader(request *http.Request) (int64, in
return totalSize, startOffset, endOffset, nil
}

func GetMMSUploadOwnerHeader(request *http.Request) string {
uploadOwner := request.Header.Get("MMS-Upload-Owner")
return uploadOwner
}

// MetaData is the metadata that identifies and defines the sync service object.
// Every object includes metadata (mandatory) and data (optional). The metadata and data can be updated independently.
Expand Down Expand Up @@ -498,6 +494,11 @@ type MetaData struct {
// This field should not be set by users.
ChunkSize int `json:"chunkSize" bson:"chunk-size"`

// UploadChunkSize is an internal field indicating the upload chunk size used when uploading the
// the model (if chunking was used)
// This field should not be set by users.
UploadChunkSize int64 `json:"uploadChunkSize" bson:"upload-chunk-size"`

// HashAlgorithm used for data signature sign/verification. "SHA1" and "SHA256" are supported hash algorithms.
// Valid values are: "SHA1", "SHA256"
// Optional field, if omitted the data signature/verification will not be applied
Expand Down Expand Up @@ -1106,3 +1107,4 @@ func init() {
Version.Major = 1
Version.Minor = 0
}

27 changes: 5 additions & 22 deletions core/base/apiModule.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/open-horizon/edge-sync-service/core/communications"
"github.com/open-horizon/edge-sync-service/core/dataURI"
"github.com/open-horizon/edge-sync-service/core/dataVerifier"
"github.com/open-horizon/edge-sync-service/core/leader"
"github.com/open-horizon/edge-sync-service/core/storage"
"github.com/open-horizon/edge-utilities/logger"
"github.com/open-horizon/edge-utilities/logger/log"
Expand Down Expand Up @@ -757,29 +756,11 @@ func PutObjectAllData(orgID string, objectType string, objectID string, dataRead
return true, nil
}

func PutObjectChunkData(orgID string, objectType string, objectID string, dataReader io.Reader, startOffset int64, endOffset int64, totalSize int64, uploadOwnerID string) (bool, common.SyncServiceError) {
func PutObjectChunkData(orgID string, objectType string, objectID string, dataReader io.Reader, startOffset int64, endOffset int64, totalSize int64) (bool, common.SyncServiceError) {
if trace.IsLogging(logger.DEBUG) {
trace.Debug("In PutObjectChunkData. Update data %s %s %s, startOffset: %d, endOffset: %d. Check if is leader: %t. UploadOwnerID: %s, current CSS ID: %s\n", orgID, objectType, objectID, startOffset, endOffset, leader.CheckIfLeader(), uploadOwnerID, leader.GetLeaderID())
trace.Debug("In PutObjectChunkData. Update data %s %s %s, startOffset: %d, endOffset: %d.\n", orgID, objectType, objectID, startOffset, endOffset)
}

if !leader.CheckIfLeader() {
if trace.IsLogging(logger.DEBUG) {
trace.Debug("In PutObjectChunkData. This is not leader, ignore...")
}
return false, &common.IgnoredRequest{Message: "Request Ignored by non-leader"}
}

if uploadOwnerID != "" && uploadOwnerID != leader.GetLeaderID() {
if log.IsLogging(logger.ERROR) {
log.Error("Failed to put chunk data for %s %s %s. It is leader, but uploadOwnerID (%s) != CSSID (%s)", orgID, objectType, objectID, uploadOwnerID, leader.GetLeaderID())
}
return false, &common.InternalError{Message: "leader changed during the chunk uploading"}
}

// 2 situations when reach here:
// It is leader && uploadOwnerID == leader.GetLeaderID()
// It is leader && uploadOwnerID == ""

common.HealthStatus.ClientRequestReceived()

lockIndex := common.HashStrings(orgID, objectType, objectID)
Expand Down Expand Up @@ -814,7 +795,9 @@ func PutObjectChunkData(orgID string, objectType string, objectID string, dataRe

// append Data to temp file/data
isTempData := false
if common.NeedDataVerification(*metaData) {

// For MongoDB implementation, all data uploaded in chunks will be written to temp storage and then combined later. The dataVerifier will be a pass-thru if signature verification unneeded
if common.Configuration.StorageProvider == common.Mongo || common.NeedDataVerification(*metaData) {
isTempData = true
}

Expand Down
6 changes: 2 additions & 4 deletions core/base/apiServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2350,10 +2350,8 @@ func handleObjectPutData(orgID string, objectType string, objectID string, write
return
}

uploadOwner := common.GetMMSUploadOwnerHeader(request)

if trace.IsLogging(logger.DEBUG) {
trace.Debug("In handleObjectPutData. TotalSize: %d, startOffset: %d, endOffset: %d, upload owner: %s\n", totalSize, startOffset, endOffset, uploadOwner)
trace.Debug("In handleObjectPutData. TotalSize: %d, startOffset: %d, endOffset: %d\n", totalSize, startOffset, endOffset)
}

var found bool
Expand All @@ -2362,7 +2360,7 @@ func handleObjectPutData(orgID string, objectType string, objectID string, write
found, err = PutObjectAllData(orgID, objectType, objectID, request.Body)
chunkUpload = false
} else {
found, err = PutObjectChunkData(orgID, objectType, objectID, request.Body, startOffset, endOffset, totalSize, uploadOwner)
found, err = PutObjectChunkData(orgID, objectType, objectID, request.Body, startOffset, endOffset, totalSize)
chunkUpload = true
}

Expand Down
30 changes: 24 additions & 6 deletions core/base/objectVerifyQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ func (q *ObjectVerifyQueue) run() {
trace.Trace("Get an object %s/%s/%s from object verification Queue", meta.DestOrgID, meta.ObjectType, meta.ObjectID)
}

if common.NeedDataVerification(meta) {
// For Mongo, we always need to do some file manipulation for upload chunking - either a pass-thru copy or signature verification. If file is streamed to mongo, we shouldn't get here
if common.Configuration.StorageProvider == common.Mongo || common.NeedDataVerification(meta) {
if trace.IsLogging(logger.TRACE) {
trace.Trace("Start data verification for %s/%s/%s", meta.DestOrgID, meta.ObjectType, meta.ObjectID)
if common.NeedDataVerification(meta) {
trace.Trace("Start data verification for %s/%s/%s", meta.DestOrgID, meta.ObjectType, meta.ObjectID)
} else {
trace.Trace("Start pass-thru copy for %s/%s/%s", meta.DestOrgID, meta.ObjectType, meta.ObjectID)
}
}

lockIndex := common.HashStrings(meta.DestOrgID, meta.ObjectType, meta.ObjectID)
Expand All @@ -63,7 +68,11 @@ func (q *ObjectVerifyQueue) run() {
// Set object status from "verifying" to "verificationFailed"
} else if success, err := dataVf.VerifyDataSignature(dr, meta.DestOrgID, meta.ObjectType, meta.ObjectID, ""); !success || err != nil {
if log.IsLogging(logger.ERROR) {
log.Error("Failed to verify data for object %s/%s/%s, remove unverified data", meta.DestOrgID, meta.ObjectType, meta.ObjectID)
if common.NeedDataVerification(meta) {
log.Error("Failed to verify data for object %s/%s/%s, remove unverified data", meta.DestOrgID, meta.ObjectType, meta.ObjectID)
} else {
log.Error("Failed to copy data for object %s/%s/%s, remove unverified data", meta.DestOrgID, meta.ObjectType, meta.ObjectID)
}
if err != nil {
log.Error("Error: %s", err.Error())
}
Expand All @@ -72,11 +81,16 @@ func (q *ObjectVerifyQueue) run() {
// Set object status from "verifying" to "verification_failed"
} else {
status = common.ReadyToSend
dataVf.RemoveTempData(meta.DestOrgID, meta.ObjectType, meta.ObjectID, meta.DestinationDataURI)
}

// Data is verified, object status is set to "ready" during VerifyDataSignature(StoreObjectData)
if trace.IsLogging(logger.DEBUG) {
trace.Debug("Data verification is done for object %s/%s/%s, updating object status to %s", meta.DestOrgID, meta.ObjectType, meta.ObjectID, status)
if common.NeedDataVerification(meta) {
trace.Debug("Data verification is done for object %s/%s/%s, updating object status to %s", meta.DestOrgID, meta.ObjectType, meta.ObjectID, status)
} else {
trace.Debug("Pass-thru copy is done for object %s/%s/%s, updating object status to %s", meta.DestOrgID, meta.ObjectType, meta.ObjectID, status)
}
}

if err := store.UpdateObjectStatus(meta.DestOrgID, meta.ObjectType, meta.ObjectID, status); err != nil {
Expand All @@ -96,9 +110,13 @@ func (q *ObjectVerifyQueue) run() {
continue
}

// Data is verified, object status is set to "ready" during VerifyDataSignature(StoreObjectData)
// Data is verified or copied, object status is set to "ready" during VerifyDataSignature(StoreObjectData)
if trace.IsLogging(logger.DEBUG) {
trace.Debug("Data verified for object %s/%s/%s", meta.DestOrgID, meta.ObjectType, meta.ObjectID)
if common.NeedDataVerification(meta) {
trace.Debug("Data verified for object %s/%s/%s", meta.DestOrgID, meta.ObjectType, meta.ObjectID)
} else {
trace.Debug("Data copied for object %s/%s/%s", meta.DestOrgID, meta.ObjectType, meta.ObjectID)
}
}

// StoreObject increments the instance id if this is a data update, we need to fetch the updated meta data
Expand Down
48 changes: 32 additions & 16 deletions core/dataVerifier/dataVerifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,30 +52,46 @@ func NewDataVerifier(hashAlgorithm string, publicKey string, signature string) *

// VerifyDataSignature is to verify the data. This function will generate the tmp data in storage. Call RemoveTempData() after verification to remove the tmp data
func (dataVerifier *DataVerifier) VerifyDataSignature(data io.Reader, orgID string, objectType string, objectID string, destinationDataURI string) (bool, common.SyncServiceError) {

var dr io.Reader
var publicKeyBytes []byte
var signatureBytes []byte
var err error

if dataVerifier.writeThrough {
return true, nil
dr = data
} else {
if publicKeyBytes, err = base64.StdEncoding.DecodeString(dataVerifier.publicKey); err != nil {
return false, &common.InternalError{Message: "PublicKey is not base64 encoded. Error: " + err.Error()}
} else if signatureBytes, err = base64.StdEncoding.DecodeString(dataVerifier.signature); err != nil {
return false, &common.InternalError{Message: "Signature is not base64 encoded. Error: " + err.Error()}
} else {

dr = io.TeeReader(data, dataVerifier.dataHash)
}
}

if publicKeyBytes, err := base64.StdEncoding.DecodeString(dataVerifier.publicKey); err != nil {
return false, &common.InternalError{Message: "PublicKey is not base64 encoded. Error: " + err.Error()}
} else if signatureBytes, err := base64.StdEncoding.DecodeString(dataVerifier.signature); err != nil {
return false, &common.InternalError{Message: "Signature is not base64 encoded. Error: " + err.Error()}
} else {
dr := io.TeeReader(data, dataVerifier.dataHash)
if trace.IsLogging(logger.DEBUG) {
if trace.IsLogging(logger.DEBUG) {
if dataVerifier.writeThrough {
trace.Debug("DataVerifier - Pass-thru mode for object %s %s\n", objectType, objectID)
} else {
trace.Debug("DataVerifier - In VerifyDataSignature, verifying and storing data for object %s %s\n", objectType, objectID)
}
}

if destinationDataURI != "" {
if _, err := dataURI.StoreData(destinationDataURI, dr, 0); err != nil {
return false, err
}
} else {
if exists, err := Store.StoreObjectData(orgID, objectType, objectID, dr); err != nil || !exists {
return false, err
}
if destinationDataURI != "" {
if _, err := dataURI.StoreData(destinationDataURI, dr, 0); err != nil {
return false, err
}
} else {
if exists, err := Store.StoreObjectData(orgID, objectType, objectID, dr); err != nil || !exists {
return false, err
}
}

if dataVerifier.writeThrough {
return true, nil
} else {
return dataVerifier.verifyHelper(publicKeyBytes, signatureBytes)
}
}
Expand Down
Loading

0 comments on commit 1edc3d4

Please sign in to comment.