diff --git a/cmd/gateway-azure.go b/cmd/gateway-azure.go index 3954703da..6f4812062 100644 --- a/cmd/gateway-azure.go +++ b/cmd/gateway-azure.go @@ -25,17 +25,18 @@ import ( "io" "net/http" "net/url" - "strconv" "strings" "sync" "time" "github.com/Azure/azure-sdk-for-go/storage" + humanize "github.com/dustin/go-humanize" "github.com/minio/minio-go/pkg/policy" "github.com/minio/sha256-simd" ) const globalAzureAPIVersion = "2016-05-31" +const azureBlockSize = 100 * humanize.MiByte // Canonicalize the metadata headers, without this azure-sdk calculates // incorrect signature. This attempt to canonicalize is to convert @@ -496,27 +497,23 @@ func (a *azureObjects) CopyObjectPart(srcBucket, srcObject, destBucket, destObje return info, traceError(NotImplemented{}) } -// Encode partID+md5Hex to a blockID. -func azureGetBlockID(partID int, md5Hex string) string { - return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%.5d.%s", partID, md5Hex))) +// Encode partID, subPartNumber and md5Hex to blockID. +func azureGetBlockID(partID, subPartNumber int, md5Hex string) string { + return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%05d.%02d.%s", partID, subPartNumber, md5Hex))) } -// Decode blockID to partID+md5Hex. -func azureParseBlockID(blockID string) (int, string, error) { - idByte, err := base64.StdEncoding.DecodeString(blockID) - if err != nil { - return 0, "", traceError(err) +// Parse blockID into partID, subPartNumber and md5Hex. +func azureParseBlockID(blockID string) (partID, subPartNumber int, md5Hex string, err error) { + var blockIDBytes []byte + if blockIDBytes, err = base64.StdEncoding.DecodeString(blockID); err != nil { + return } - idStr := string(idByte) - splitRes := strings.Split(idStr, ".") - if len(splitRes) != 2 { - return 0, "", traceError(errUnexpected) + + if _, err = fmt.Sscanf(string(blockIDBytes), "%05d.%02d.%s", &partID, &subPartNumber, &md5Hex); err != nil { + err = fmt.Errorf("invalid block id '%s'", string(blockIDBytes)) } - partID, err := strconv.Atoi(splitRes[0]) - if err != nil { - return 0, "", traceError(err) - } - return partID, splitRes[1], nil + + return } // PutObjectPart - Use Azure equivalent PutBlockWithLength. @@ -550,10 +547,25 @@ func (a *azureObjects) PutObjectPart(bucket, object, uploadID string, partID int teeReader = io.TeeReader(data, io.MultiWriter(writers...)) } - id := azureGetBlockID(partID, etag) - err = a.client.PutBlockWithLength(bucket, object, id, uint64(size), teeReader, nil) - if err != nil { - return info, azureToObjectError(traceError(err), bucket, object) + subPartSize := int64(azureBlockSize) + subPartNumber := 1 + for remainingSize := size; remainingSize >= 0; remainingSize -= subPartSize { + // Allow to create zero sized part. + if remainingSize == 0 && subPartNumber > 1 { + break + } + + if remainingSize < subPartSize { + subPartSize = remainingSize + } + + id := azureGetBlockID(partID, subPartNumber, etag) + err = a.client.PutBlockWithLength(bucket, object, id, uint64(subPartSize), io.LimitReader(teeReader, subPartSize), nil) + if err != nil { + return info, azureToObjectError(traceError(err), bucket, object) + } + + subPartNumber++ } if md5Hex != "" { @@ -601,7 +613,7 @@ func (a *azureObjects) ListObjectParts(bucket, object, uploadID string, partNumb break } partCount++ - partID, md5Hex, err := azureParseBlockID(part.Name) + partID, _, md5Hex, err := azureParseBlockID(part.Name) if err != nil { return result, err } @@ -639,14 +651,63 @@ func (a *azureObjects) CompleteMultipartUpload(bucket, object, uploadID string, if meta == nil { return objInfo, traceError(InvalidUploadID{uploadID}) } - var blocks []storage.Block - for _, part := range uploadedParts { - blocks = append(blocks, storage.Block{ - ID: azureGetBlockID(part.PartNumber, part.ETag), - Status: storage.BlockStatusUncommitted, - }) + + resp, err := a.client.GetBlockList(bucket, object, storage.BlockListTypeUncommitted) + if err != nil { + return objInfo, azureToObjectError(traceError(err), bucket, object) } - err = a.client.PutBlockList(bucket, object, blocks) + + getBlocks := func(partNumber int, etag string) (blocks []storage.Block, size int64, err error) { + for _, part := range resp.UncommittedBlocks { + var partID int + var md5Hex string + if partID, _, md5Hex, err = azureParseBlockID(part.Name); err != nil { + return nil, 0, err + } + + if partNumber == partID && etag == md5Hex { + blocks = append(blocks, storage.Block{ + ID: part.Name, + Status: storage.BlockStatusUncommitted, + }) + + size += part.Size + } + } + + if len(blocks) == 0 { + return nil, 0, InvalidPart{} + } + + return blocks, size, nil + } + + var allBlocks []storage.Block + partSizes := make([]int64, len(uploadedParts)) + for i, part := range uploadedParts { + var blocks []storage.Block + var size int64 + blocks, size, err = getBlocks(part.PartNumber, part.ETag) + if err != nil { + return objInfo, traceError(err) + } + + allBlocks = append(allBlocks, blocks...) + partSizes[i] = size + } + + // Error out if parts except last part sizing < 5MiB. + for i, size := range partSizes[:len(partSizes)-1] { + if size < globalMinPartSize { + return objInfo, traceError(PartTooSmall{ + PartNumber: uploadedParts[i].PartNumber, + PartSize: size, + PartETag: uploadedParts[i].ETag, + }) + } + } + + err = a.client.PutBlockList(bucket, object, allBlocks) if err != nil { return objInfo, azureToObjectError(traceError(err), bucket, object) } diff --git a/cmd/gateway-azure_test.go b/cmd/gateway-azure_test.go index 29b70889d..65008e88e 100644 --- a/cmd/gateway-azure_test.go +++ b/cmd/gateway-azure_test.go @@ -133,15 +133,16 @@ func TestAzureToObjectError(t *testing.T) { // Test azureGetBlockID(). func TestAzureGetBlockID(t *testing.T) { testCases := []struct { - partID int - md5 string - blockID string + partID int + subPartNumber int + md5 string + blockID string }{ - {1, "d41d8cd98f00b204e9800998ecf8427e", "MDAwMDEuZDQxZDhjZDk4ZjAwYjIwNGU5ODAwOTk4ZWNmODQyN2U="}, - {2, "a7fb6b7b36ee4ed66b5546fac4690273", "MDAwMDIuYTdmYjZiN2IzNmVlNGVkNjZiNTU0NmZhYzQ2OTAyNzM="}, + {1, 7, "d41d8cd98f00b204e9800998ecf8427e", "MDAwMDEuMDcuZDQxZDhjZDk4ZjAwYjIwNGU5ODAwOTk4ZWNmODQyN2U="}, + {2, 19, "a7fb6b7b36ee4ed66b5546fac4690273", "MDAwMDIuMTkuYTdmYjZiN2IzNmVlNGVkNjZiNTU0NmZhYzQ2OTAyNzM="}, } for _, test := range testCases { - blockID := azureGetBlockID(test.partID, test.md5) + blockID := azureGetBlockID(test.partID, test.subPartNumber, test.md5) if blockID != test.blockID { t.Fatalf("%s is not equal to %s", blockID, test.blockID) } @@ -151,26 +152,31 @@ func TestAzureGetBlockID(t *testing.T) { // Test azureParseBlockID(). func TestAzureParseBlockID(t *testing.T) { testCases := []struct { - partID int - md5 string - blockID string + blockID string + partID int + subPartNumber int + md5 string }{ - {1, "d41d8cd98f00b204e9800998ecf8427e", "MDAwMDEuZDQxZDhjZDk4ZjAwYjIwNGU5ODAwOTk4ZWNmODQyN2U="}, - {2, "a7fb6b7b36ee4ed66b5546fac4690273", "MDAwMDIuYTdmYjZiN2IzNmVlNGVkNjZiNTU0NmZhYzQ2OTAyNzM="}, + {"MDAwMDEuMDcuZDQxZDhjZDk4ZjAwYjIwNGU5ODAwOTk4ZWNmODQyN2U=", 1, 7, "d41d8cd98f00b204e9800998ecf8427e"}, + {"MDAwMDIuMTkuYTdmYjZiN2IzNmVlNGVkNjZiNTU0NmZhYzQ2OTAyNzM=", 2, 19, "a7fb6b7b36ee4ed66b5546fac4690273"}, } for _, test := range testCases { - partID, md5, err := azureParseBlockID(test.blockID) + partID, subPartNumber, md5, err := azureParseBlockID(test.blockID) if err != nil { t.Fatal(err) } if partID != test.partID { t.Fatalf("%d not equal to %d", partID, test.partID) } + if subPartNumber != test.subPartNumber { + t.Fatalf("%d not equal to %d", subPartNumber, test.subPartNumber) + } if md5 != test.md5 { t.Fatalf("%s not equal to %s", md5, test.md5) } } - _, _, err := azureParseBlockID("junk") + + _, _, _, err := azureParseBlockID("junk") if err == nil { t.Fatal("Expected azureParseBlockID() to return error") }