From 59749a2b85e6e658eaa62244bfb4244a4ef5e2d7 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 30 Nov 2017 15:58:46 -0800 Subject: [PATCH] erasure: Remove prefix based listing support on ListMultipartUploads (#5248) Previously we have removed this support under FS on #4996, deprecate this on erasure coded backend as well to simplify our multipart support. --- cmd/object-api-multipart-common.go | 4 + cmd/object-api-multipart_test.go | 35 ----- cmd/xl-v1-multipart.go | 223 ++++++----------------------- 3 files changed, 48 insertions(+), 214 deletions(-) diff --git a/cmd/object-api-multipart-common.go b/cmd/object-api-multipart-common.go index a8d329483..4a0e73f51 100644 --- a/cmd/object-api-multipart-common.go +++ b/cmd/object-api-multipart-common.go @@ -167,6 +167,10 @@ func listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count // Read `uploads.json`. uploadsJSON, err := readUploadsJSON(bucketName, objectName, disk) if err != nil { + switch errors.Cause(err) { + case errFileNotFound, errFileAccessDenied: + return nil, true, nil + } return nil, false, err } index := 0 diff --git a/cmd/object-api-multipart_test.go b/cmd/object-api-multipart_test.go index 0023a9402..981a0b738 100644 --- a/cmd/object-api-multipart_test.go +++ b/cmd/object-api-multipart_test.go @@ -1246,41 +1246,6 @@ func testListMultipartUploads(obj ObjectLayer, instanceType string, t TestErrHan if actualResult.KeyMarker != expectedResult.KeyMarker { t.Errorf("Test %d: %s: Expected keyMarker to be \"%s\", but instead found it to be \"%s\"", i+1, instanceType, expectedResult.KeyMarker, actualResult.KeyMarker) } - - // ListMultipartUploads returns empty respsonse always in FS mode - if instanceType != FSTestStr { - // Asserting IsTruncated. - if actualResult.IsTruncated != testCase.expectedResult.IsTruncated { - t.Errorf("Test %d: %s: Expected Istruncated to be \"%v\", but found it to \"%v\"", i+1, instanceType, expectedResult.IsTruncated, actualResult.IsTruncated) - continue - } - // Asserting NextUploadIDMarker. - if actualResult.NextUploadIDMarker != expectedResult.NextUploadIDMarker { - t.Errorf("Test %d: %s: Expected NextUploadIDMarker to be \"%s\", but instead found it to be \"%s\"", i+1, instanceType, expectedResult.NextUploadIDMarker, actualResult.NextUploadIDMarker) - continue - } - // Asserting NextKeyMarker. - if actualResult.NextKeyMarker != expectedResult.NextKeyMarker { - t.Errorf("Test %d: %s: Expected NextKeyMarker to be \"%s\", but instead found it to be \"%s\"", i+1, instanceType, expectedResult.NextKeyMarker, actualResult.NextKeyMarker) - continue - } - // Asserting the number of upload Metadata info. - if len(expectedResult.Uploads) != len(actualResult.Uploads) { - t.Errorf("Test %d: %s: Expected the result to contain info of %d Multipart Uploads, but found %d instead", i+1, instanceType, len(expectedResult.Uploads), len(actualResult.Uploads)) - continue - } - // Iterating over the uploads Metadata and asserting the fields. - for j, actualMetaData := range actualResult.Uploads { - // Asserting the object name in the upload Metadata. - if actualMetaData.Object != expectedResult.Uploads[j].Object { - t.Errorf("Test %d: %s: Metadata %d: Expected Metadata Object to be \"%s\", but instead found \"%s\"", i+1, instanceType, j+1, expectedResult.Uploads[j].Object, actualMetaData.Object) - } - // Asserting the uploadID in the upload Metadata. - if actualMetaData.UploadID != expectedResult.Uploads[j].UploadID { - t.Errorf("Test %d: %s: Metadata %d: Expected Metadata UploadID to be \"%s\", but instead found \"%s\"", i+1, instanceType, j+1, expectedResult.Uploads[j].UploadID, actualMetaData.UploadID) - } - } - } } } } diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 9621395c0..4aaf03765 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -272,189 +272,54 @@ func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPr return evalDisks(disks, mErrs), err } -// listMultipartUploads - lists all multipart uploads. -func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { - result := ListMultipartsInfo{ - IsTruncated: true, - MaxUploads: maxUploads, - KeyMarker: keyMarker, - Prefix: prefix, - Delimiter: delimiter, - } - - recursive := true - if delimiter == slashSeparator { - recursive = false - } - - // Not using path.Join() as it strips off the trailing '/'. - multipartPrefixPath := pathJoin(bucket, prefix) - if prefix == "" { - // Should have a trailing "/" if prefix is "" - // For ex. multipartPrefixPath should be "multipart/bucket/" if prefix is "" - multipartPrefixPath += slashSeparator - } - multipartMarkerPath := "" - if keyMarker != "" { - multipartMarkerPath = pathJoin(bucket, keyMarker) - } - var uploads []MultipartInfo - var err error - var eof bool - // List all upload ids for the keyMarker starting from - // uploadIDMarker first. - if uploadIDMarker != "" { - // hold lock on keyMarker path - keyMarkerLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, - pathJoin(bucket, keyMarker)) - if err = keyMarkerLock.GetRLock(globalListingTimeout); err != nil { - return lmi, err - } - for _, disk := range xl.getLoadBalancedDisks() { - if disk == nil { - continue - } - uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, disk) - if err == nil { - break - } - if errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) { - continue - } - break - } - keyMarkerLock.RUnlock() - if err != nil { - return lmi, err - } - maxUploads = maxUploads - len(uploads) - } - var walkerCh chan treeWalkResult - var walkerDoneCh chan struct{} - heal := false // true only for xl.ListObjectsHeal - // Validate if we need to list further depending on maxUploads. - if maxUploads > 0 { - walkerCh, walkerDoneCh = xl.listPool.Release(listParams{minioMetaMultipartBucket, recursive, multipartMarkerPath, multipartPrefixPath, heal}) - if walkerCh == nil { - walkerDoneCh = make(chan struct{}) - isLeaf := xl.isMultipartUpload - listDir := listDirFactory(isLeaf, xlTreeWalkIgnoredErrs, xl.getLoadBalancedDisks()...) - walkerCh = startTreeWalk(minioMetaMultipartBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, isLeaf, walkerDoneCh) - } - // Collect uploads until we have reached maxUploads count to 0. - for maxUploads > 0 { - walkResult, ok := <-walkerCh - if !ok { - // Closed channel. - eof = true - break - } - // For any walk error return right away. - if walkResult.err != nil { - return lmi, walkResult.err - } - entry := strings.TrimPrefix(walkResult.entry, retainSlash(bucket)) - // For an entry looking like a directory, store and - // continue the loop not need to fetch uploads. - if hasSuffix(walkResult.entry, slashSeparator) { - uploads = append(uploads, MultipartInfo{ - Object: entry, - }) - maxUploads-- - if maxUploads == 0 { - eof = true - break - } - continue - } - var newUploads []MultipartInfo - var end bool - uploadIDMarker = "" - - // For the new object entry we get all its - // pending uploadIDs. - entryLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, - pathJoin(bucket, entry)) - if err = entryLock.GetRLock(globalListingTimeout); err != nil { - return lmi, err - } - var disk StorageAPI - for _, disk = range xl.getLoadBalancedDisks() { - if disk == nil { - continue - } - newUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, disk) - if err == nil { - break - } - if errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) { - continue - } - break - } - entryLock.RUnlock() - if err != nil { - if errors.IsErrIgnored(err, xlTreeWalkIgnoredErrs...) { - continue - } - return lmi, err - } - uploads = append(uploads, newUploads...) - maxUploads -= len(newUploads) - if end && walkResult.end { - eof = true - break - } - } - } - // For all received uploads fill in the multiparts result. - for _, upload := range uploads { - var objectName string - var uploadID string - if hasSuffix(upload.Object, slashSeparator) { - // All directory entries are common prefixes. - uploadID = "" // For common prefixes, upload ids are empty. - objectName = upload.Object - result.CommonPrefixes = append(result.CommonPrefixes, objectName) - } else { - uploadID = upload.UploadID - objectName = upload.Object - result.Uploads = append(result.Uploads, upload) - } - result.NextKeyMarker = objectName - result.NextUploadIDMarker = uploadID - } - - if !eof { - // Save the go-routine state in the pool so that it can continue from where it left off on - // the next request. - xl.listPool.Set(listParams{bucket, recursive, result.NextKeyMarker, prefix, heal}, walkerCh, walkerDoneCh) - } - - result.IsTruncated = !eof - // Result is not truncated, reset the markers. - if !result.IsTruncated { - result.NextKeyMarker = "" - result.NextUploadIDMarker = "" - } - return result, nil -} - -// ListMultipartUploads - lists all the pending multipart uploads on a -// bucket. Additionally takes 'prefix, keyMarker, uploadIDmarker and a -// delimiter' which allows us to list uploads match a particular -// prefix or lexically starting from 'keyMarker' or delimiting the -// output to get a directory like listing. +// ListMultipartUploads - lists all the pending multipart +// uploads for a particular object in a bucket. // -// Implements S3 compatible ListMultipartUploads API. The resulting -// ListMultipartsInfo structure is unmarshalled directly into XML and -// replied back to the client. -func (xl xlObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { - if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, xl); err != nil { +// Implements minimal S3 compatible ListMultipartUploads API. We do +// not support prefix based listing, this is a deliberate attempt +// towards simplification of multipart APIs. +// The resulting ListMultipartsInfo structure is unmarshalled directly as XML. +func (xl xlObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { + if err := checkListMultipartArgs(bucket, object, keyMarker, uploadIDMarker, delimiter, xl); err != nil { return lmi, err } - return xl.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) + result := ListMultipartsInfo{} + + result.IsTruncated = true + result.MaxUploads = maxUploads + result.KeyMarker = keyMarker + result.Prefix = object + result.Delimiter = delimiter + + for _, disk := range xl.getLoadBalancedDisks() { + if disk == nil { + continue + } + + uploads, _, err := listMultipartUploadIDs(bucket, object, uploadIDMarker, maxUploads, disk) + if err != nil { + return lmi, err + } + + result.NextKeyMarker = object + // Loop through all the received uploads fill in the multiparts result. + for _, upload := range uploads { + uploadID := upload.UploadID + result.Uploads = append(result.Uploads, upload) + result.NextUploadIDMarker = uploadID + } + + result.IsTruncated = len(uploads) == maxUploads + + if !result.IsTruncated { + result.NextKeyMarker = "" + result.NextUploadIDMarker = "" + } + break + } + + return result, nil } // newMultipartUpload - wrapper for initializing a new multipart