diff --git a/xl-v1-bucket.go b/xl-v1-bucket.go index 6a51d6e3d..1308799ea 100644 --- a/xl-v1-bucket.go +++ b/xl-v1-bucket.go @@ -108,6 +108,7 @@ var bucketMetadataOpIgnoredErrs = []error{ errDiskNotFound, errDiskAccessDenied, errFaultyDisk, + errVolumeNotFound, } // getBucketInfo - returns the BucketInfo from one of the load balanced disks. diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index 57cb72a55..9ab490f7f 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -333,11 +333,23 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas [] return errXLWriteQuorum } - // For all other errors return. - for _, err := range mErrs { - if err != nil && err != errDiskNotFound { - return err + // Reduce errors and verify quourm and return. + if errCount, reducedErr := reduceErrs(mErrs); reducedErr != nil { + if errCount < writeQuorum { + // Delete all `xl.json` successfully renamed. + deleteAllXLMetadata(disks, bucket, prefix, mErrs) + return errXLWriteQuorum } + if isErrIgnored(reducedErr, []error{ + errDiskNotFound, + errDiskAccessDenied, + errFaultyDisk, + errVolumeNotFound, + }) { + // Success. + return nil + } + return reducedErr } // Success. @@ -386,11 +398,24 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet return errXLWriteQuorum } - // For any other errors delete `xl.json` as well. - for _, err := range mErrs { - if err != nil && err != errDiskNotFound { - return err + // Reduce errors and verify quourm and return. + if errCount, reducedErr := reduceErrs(mErrs); reducedErr != nil { + if errCount < writeQuorum { + // Delete all `xl.json` successfully renamed. + deleteAllXLMetadata(disks, bucket, prefix, mErrs) + return errXLWriteQuorum } + // Ignore specific errors if we are under write quorum. + if isErrIgnored(reducedErr, []error{ + errDiskNotFound, + errDiskAccessDenied, + errFaultyDisk, + errVolumeNotFound, + }) { + // Success. + return nil + } + return reducedErr } // Success. diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index f518ece25..f7d951d74 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -63,6 +63,10 @@ func (xl xlObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploads // Count all the errors and validate if we have write quorum. if !isDiskQuorum(errs, xl.writeQuorum) { + // Do we have readQuorum?. + if isDiskQuorum(errs, xl.readQuorum) { + return nil + } // Rename `uploads.json` left over back to tmp location. for index, disk := range xl.storageDisks { if disk == nil { @@ -150,6 +154,10 @@ func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated t // Count all the errors and validate if we have write quorum. if !isDiskQuorum(errs, xl.writeQuorum) { + // Do we have readQuorum?. + if isDiskQuorum(errs, xl.readQuorum) { + return nil + } // Rename `uploads.json` left over back to tmp location. for index, disk := range xl.storageDisks { if disk == nil { @@ -262,7 +270,7 @@ func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInf } // commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks. -func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, writeQuorum int) error { +func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, writeQuorum, readQuorum int) error { var wg = &sync.WaitGroup{} var mErrs = make([]error, len(disks)) @@ -294,16 +302,36 @@ func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, writeQuor // Wait for all the routines. wg.Wait() - // Do we have write quorum?. + // Do we have write Quorum?. if !isDiskQuorum(mErrs, writeQuorum) { + // Do we have readQuorum?. + if isDiskQuorum(mErrs, readQuorum) { + // Return success. + return nil + } + // Delete all `xl.json` successfully renamed. + deleteAllXLMetadata(disks, minioMetaBucket, dstPrefix, mErrs) return errXLWriteQuorum } - // For all other errors return. - for _, err := range mErrs { - if err != nil && err != errDiskNotFound { - return err + // Reduce errors and verify quourm and return. + if errCount, reducedErr := reduceErrs(mErrs); reducedErr != nil { + if errCount < writeQuorum { + // Delete all `xl.json` successfully renamed. + deleteAllXLMetadata(disks, minioMetaBucket, dstPrefix, mErrs) + return errXLWriteQuorum } + if isErrIgnored(reducedErr, []error{ + errDiskNotFound, + errDiskAccessDenied, + errFaultyDisk, + errVolumeNotFound, + }) { + return nil + } + return reducedErr } + + // Success. return nil } diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index e3df1b12f..b5497e850 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -483,7 +483,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s if err = writeUniqueXLMetadata(onlineDisks, minioMetaBucket, tempXLMetaPath, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil { return "", toObjectErr(err, minioMetaBucket, tempXLMetaPath) } - rErr := commitXLMetadata(onlineDisks, tempXLMetaPath, uploadIDPath, xl.writeQuorum) + rErr := commitXLMetadata(onlineDisks, tempXLMetaPath, uploadIDPath, xl.writeQuorum, xl.readQuorum) if rErr != nil { return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath) } @@ -709,7 +709,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload if err = writeUniqueXLMetadata(xl.storageDisks, minioMetaBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } - rErr := commitXLMetadata(xl.storageDisks, tempUploadIDPath, uploadIDPath, xl.writeQuorum) + rErr := commitXLMetadata(xl.storageDisks, tempUploadIDPath, uploadIDPath, xl.writeQuorum, xl.readQuorum) if rErr != nil { return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath) } diff --git a/xl-v1-object.go b/xl-v1-object.go index 28c36fc60..fb0698a83 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -315,12 +315,22 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, return errXLWriteQuorum } // Return on first error, also undo any partially successful rename operations. - for _, err := range errs { - if err != nil && err != errDiskNotFound { + if errCount, reducedErr := reduceErrs(errs); reducedErr != nil { + if errCount < writeQuorum { // Undo all the partial rename operations. undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs, writeQuorum, readQuorum) - return err + return errXLWriteQuorum } + if isErrIgnored(reducedErr, []error{ + errDiskNotFound, + errDiskAccessDenied, + errFaultyDisk, + errVolumeNotFound, + }) { + // Return success. + return nil + } + return reducedErr } return nil }