From 8d5456c15a789ed3d492170aecd8c460c9d7afeb Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Fri, 26 Mar 2021 19:17:23 +0100 Subject: [PATCH] Fix error returned by HealObject in some cases (#11906) The background healing can return NoSuchUpload error, the reason is that healing code can return errFileNotFound with three parameters. Simplify the code by returning exact errUploadNotFound error in multipart code. Also ensure that a typed error is always returned whatever the number of parameters because it is better than showing internal error. --- cmd/erasure-multipart.go | 10 +- cmd/object-api-errors.go | 194 +++++++++++++++++++++------------------ cmd/typed-errors.go | 3 + 3 files changed, 115 insertions(+), 92 deletions(-) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index cfe96f611..dbf76ec43 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -44,7 +44,13 @@ func (er erasureObjects) getMultipartSHADir(bucket, object string) string { } // checkUploadIDExists - verify if a given uploadID exists and is valid. -func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) error { +func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) (err error) { + defer func() { + if err == errFileNotFound { + err = errUploadIDNotFound + } + }() + disks := er.getDisks() // Read metadata associated with the object from all disks. @@ -56,7 +62,7 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object } if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { - return toObjectErr(reducedErr, bucket, object) + return reducedErr } // List all online disks. diff --git a/cmd/object-api-errors.go b/cmd/object-api-errors.go index 698c8f6d0..846fb26cb 100644 --- a/cmd/object-api-errors.go +++ b/cmd/object-api-errors.go @@ -22,139 +22,153 @@ import ( "fmt" "io" "path" - "strings" ) // Converts underlying storage error. Convenience function written to // handle all cases where we have known types of errors returned by // underlying storage layer. func toObjectErr(err error, params ...string) error { - if len(params) > 1 { - if HasSuffix(params[1], globalDirSuffix) { - params[1] = strings.TrimSuffix(params[1], globalDirSuffix) + slashSeparator - } - } switch err { case errVolumeNotFound: + apiErr := BucketNotFound{} if len(params) >= 1 { - err = BucketNotFound{Bucket: params[0]} + apiErr.Bucket = params[0] } + return apiErr case errVolumeNotEmpty: + apiErr := BucketNotEmpty{} if len(params) >= 1 { - err = BucketNotEmpty{Bucket: params[0]} + apiErr.Bucket = params[0] } + return apiErr case errVolumeExists: + apiErr := BucketExists{} if len(params) >= 1 { - err = BucketExists{Bucket: params[0]} + apiErr.Bucket = params[0] } + return apiErr case errDiskFull: - err = StorageFull{} + return StorageFull{} case errTooManyOpenFiles: - err = SlowDown{} + return SlowDown{} case errFileAccessDenied: - if len(params) >= 2 { - err = PrefixAccessDenied{ - Bucket: params[0], - Object: params[1], - } + apiErr := PrefixAccessDenied{} + if len(params) >= 1 { + apiErr.Bucket = params[0] } + if len(params) >= 2 { + apiErr.Object = decodeDirObject(params[1]) + } + return apiErr case errFileParentIsFile: - if len(params) >= 2 { - err = ParentIsObject{ - Bucket: params[0], - Object: params[1], - } + apiErr := ParentIsObject{} + if len(params) >= 1 { + apiErr.Bucket = params[0] } + if len(params) >= 2 { + apiErr.Object = decodeDirObject(params[1]) + } + return apiErr case errIsNotRegular: - if len(params) >= 2 { - err = ObjectExistsAsDirectory{ - Bucket: params[0], - Object: params[1], - } + apiErr := ObjectExistsAsDirectory{} + if len(params) >= 1 { + apiErr.Bucket = params[0] } + if len(params) >= 2 { + apiErr.Object = decodeDirObject(params[1]) + } + return apiErr case errFileVersionNotFound: - switch len(params) { - case 2: - err = VersionNotFound{ - Bucket: params[0], - Object: params[1], - } - case 3: - err = VersionNotFound{ - Bucket: params[0], - Object: params[1], - VersionID: params[2], - } + apiErr := VersionNotFound{} + if len(params) >= 1 { + apiErr.Bucket = params[0] } + if len(params) >= 2 { + apiErr.Object = decodeDirObject(params[1]) + } + if len(params) >= 3 { + apiErr.VersionID = params[2] + } + return apiErr case errMethodNotAllowed: - switch len(params) { - case 2: - err = MethodNotAllowed{ - Bucket: params[0], - Object: params[1], - } + apiErr := MethodNotAllowed{} + if len(params) >= 1 { + apiErr.Bucket = params[0] } + if len(params) >= 2 { + apiErr.Object = decodeDirObject(params[1]) + } + return apiErr case errFileNotFound: - switch len(params) { - case 2: - err = ObjectNotFound{ - Bucket: params[0], - Object: params[1], - } - case 3: - err = InvalidUploadID{ - Bucket: params[0], - Object: params[1], - UploadID: params[2], - } + apiErr := ObjectNotFound{} + if len(params) >= 1 { + apiErr.Bucket = params[0] } + if len(params) >= 2 { + apiErr.Object = decodeDirObject(params[1]) + } + return apiErr + case errUploadIDNotFound: + apiErr := InvalidUploadID{} + if len(params) >= 1 { + apiErr.Bucket = params[0] + } + if len(params) >= 2 { + apiErr.Object = decodeDirObject(params[1]) + } + if len(params) >= 3 { + apiErr.UploadID = params[2] + } + return apiErr case errFileNameTooLong: - if len(params) >= 2 { - err = ObjectNameInvalid{ - Bucket: params[0], - Object: params[1], - } + apiErr := ObjectNameInvalid{} + if len(params) >= 1 { + apiErr.Bucket = params[0] } + if len(params) >= 2 { + apiErr.Object = decodeDirObject(params[1]) + } + return apiErr case errDataTooLarge: - if len(params) >= 2 { - err = ObjectTooLarge{ - Bucket: params[0], - Object: params[1], - } + apiErr := ObjectTooLarge{} + if len(params) >= 1 { + apiErr.Bucket = params[0] } + if len(params) >= 2 { + apiErr.Object = decodeDirObject(params[1]) + } + return apiErr case errDataTooSmall: + apiErr := ObjectTooSmall{} + if len(params) >= 1 { + apiErr.Bucket = params[0] + } if len(params) >= 2 { - err = ObjectTooSmall{ - Bucket: params[0], - Object: params[1], - } + apiErr.Object = decodeDirObject(params[1]) } + return apiErr case errErasureReadQuorum: - if len(params) == 1 { - err = InsufficientReadQuorum{ - Bucket: params[0], - } - } else if len(params) >= 2 { - err = InsufficientReadQuorum{ - Bucket: params[0], - Object: params[1], - } + apiErr := InsufficientReadQuorum{} + if len(params) >= 1 { + apiErr.Bucket = params[0] } + if len(params) >= 2 { + apiErr.Object = decodeDirObject(params[1]) + } + return apiErr case errErasureWriteQuorum: - if len(params) == 1 { - err = InsufficientWriteQuorum{ - Bucket: params[0], - } - } else if len(params) >= 2 { - err = InsufficientWriteQuorum{ - Bucket: params[0], - Object: params[1], - } + apiErr := InsufficientWriteQuorum{} + if len(params) >= 1 { + apiErr.Bucket = params[0] } + if len(params) >= 2 { + apiErr.Object = decodeDirObject(params[1]) + } + return apiErr case io.ErrUnexpectedEOF, io.ErrShortWrite: - err = IncompleteBody{} + return IncompleteBody{} case context.Canceled, context.DeadlineExceeded: - err = IncompleteBody{} + return IncompleteBody{} } return err } diff --git a/cmd/typed-errors.go b/cmd/typed-errors.go index 273154725..443de284f 100644 --- a/cmd/typed-errors.go +++ b/cmd/typed-errors.go @@ -98,3 +98,6 @@ var errAccessDenied = errors.New("Do not have enough permissions to access this // error returned when object is locked. var errLockedObject = errors.New("Object is WORM protected and cannot be overwritten or deleted") + +// error returned when upload id not found +var errUploadIDNotFound = errors.New("Specified Upload ID is not found")