mirror of
https://github.com/minio/minio.git
synced 2026-02-04 18:00:15 -05:00
feat: bring new HDD related performance enhancements (#18239)
Optionally allows customers to enable - Enable an external cache to catch GET/HEAD responses - Enable skipping disks that are slow to respond in GET/HEAD when we have already achieved a quorum
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
// Copyright (c) 2015-2021 MinIO, Inc.
|
||||
// Copyright (c) 2015-2023 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
@@ -19,6 +19,7 @@ package cmd
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/xml"
|
||||
@@ -35,6 +36,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"unicode"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/klauspost/compress/gzhttp"
|
||||
@@ -48,6 +50,7 @@ import (
|
||||
"github.com/minio/minio/internal/bucket/lifecycle"
|
||||
objectlock "github.com/minio/minio/internal/bucket/object/lock"
|
||||
"github.com/minio/minio/internal/bucket/replication"
|
||||
"github.com/minio/minio/internal/config/cache"
|
||||
"github.com/minio/minio/internal/config/dns"
|
||||
"github.com/minio/minio/internal/config/storageclass"
|
||||
"github.com/minio/minio/internal/crypto"
|
||||
@@ -62,6 +65,7 @@ import (
|
||||
"github.com/minio/minio/internal/s3select"
|
||||
"github.com/minio/mux"
|
||||
"github.com/minio/pkg/v2/policy"
|
||||
"github.com/valyala/bytebufferpool"
|
||||
)
|
||||
|
||||
// supportedHeadGetReqParams - supported request parameters for GET and HEAD presigned request.
|
||||
@@ -379,6 +383,92 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
|
||||
}
|
||||
}
|
||||
|
||||
cachedResult := globalCacheConfig.Enabled() && opts.VersionID == ""
|
||||
|
||||
var update bool
|
||||
if cachedResult {
|
||||
rc := &cache.CondCheck{}
|
||||
h := r.Header.Clone()
|
||||
if opts.PartNumber > 0 {
|
||||
h.Set(xhttp.PartNumber, strconv.Itoa(opts.PartNumber))
|
||||
}
|
||||
rc.Init(bucket, object, h)
|
||||
|
||||
ci, err := globalCacheConfig.Get(rc)
|
||||
if ci != nil {
|
||||
tgs, ok := ci.Metadata[xhttp.AmzObjectTagging]
|
||||
if ok {
|
||||
// Set this such that authorization policies can be applied on the object tags.
|
||||
r.Header.Set(xhttp.AmzObjectTagging, tgs)
|
||||
}
|
||||
|
||||
if s3Error := authorizeRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
|
||||
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(s3Error))
|
||||
return
|
||||
}
|
||||
|
||||
okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent ||
|
||||
ci.StatusCode == http.StatusPreconditionFailed || ci.StatusCode == http.StatusNotModified)
|
||||
if okSt {
|
||||
ci.WriteHeaders(w, func() {
|
||||
// set common headers
|
||||
setCommonHeaders(w)
|
||||
}, func() {
|
||||
okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent)
|
||||
if okSt && len(ci.Data) > 0 {
|
||||
for k, v := range ci.Metadata {
|
||||
w.Header().Set(k, v)
|
||||
}
|
||||
|
||||
if opts.PartNumber > 0 && strings.Contains(ci.ETag, "-") {
|
||||
w.Header()[xhttp.AmzMpPartsCount] = []string{
|
||||
strings.TrimLeftFunc(ci.ETag, func(r rune) bool {
|
||||
return !unicode.IsNumber(r)
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
// For providing ranged content
|
||||
start, rangeLen, err := rs.GetOffsetLength(ci.Size)
|
||||
if err != nil {
|
||||
start, rangeLen = 0, ci.Size
|
||||
}
|
||||
|
||||
// Set content length.
|
||||
w.Header().Set(xhttp.ContentLength, strconv.FormatInt(rangeLen, 10))
|
||||
if rs != nil {
|
||||
contentRange := fmt.Sprintf("bytes %d-%d/%d", start, start+rangeLen-1, ci.Size)
|
||||
w.Header().Set(xhttp.ContentRange, contentRange)
|
||||
}
|
||||
|
||||
io.Copy(w, bytes.NewReader(ci.Data))
|
||||
return
|
||||
}
|
||||
if ci.StatusCode == http.StatusPreconditionFailed {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL)
|
||||
return
|
||||
} else if ci.StatusCode == http.StatusNotModified {
|
||||
w.WriteHeader(ci.StatusCode)
|
||||
return
|
||||
}
|
||||
|
||||
// We did not satisfy any requirement from the cache, update the cache.
|
||||
// this basically means that we do not have the Data for the object
|
||||
// cached yet
|
||||
update = true
|
||||
})
|
||||
if !update {
|
||||
// No update is needed means we have written already to the client just return here.
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if errors.Is(err, cache.ErrKeyMissing) {
|
||||
update = true
|
||||
}
|
||||
}
|
||||
|
||||
// Validate pre-conditions if any.
|
||||
opts.CheckPrecondFn = func(oi ObjectInfo) bool {
|
||||
if _, err := DecryptObjectInfo(&oi, r); err != nil {
|
||||
@@ -389,6 +479,8 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
|
||||
return checkPreconditions(ctx, w, r, oi, opts)
|
||||
}
|
||||
|
||||
opts.FastGetObjInfo = true
|
||||
|
||||
var proxy proxyResult
|
||||
gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, opts)
|
||||
if err != nil {
|
||||
@@ -487,7 +579,6 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
|
||||
objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone)
|
||||
|
||||
// Set encryption response headers
|
||||
|
||||
if kind, isEncrypted := crypto.IsEncrypted(objInfo.UserDefined); isEncrypted {
|
||||
switch kind {
|
||||
case crypto.S3:
|
||||
@@ -510,6 +601,39 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
|
||||
hash.AddChecksumHeader(w, objInfo.decryptChecksums(opts.PartNumber))
|
||||
}
|
||||
|
||||
var buf *bytebufferpool.ByteBuffer
|
||||
if update {
|
||||
if globalCacheConfig.MatchesSize(objInfo.Size) {
|
||||
buf = bytebufferpool.Get()
|
||||
defer bytebufferpool.Put(buf)
|
||||
}
|
||||
defer func() {
|
||||
var data []byte
|
||||
if buf != nil {
|
||||
data = buf.Bytes()
|
||||
}
|
||||
|
||||
asize, err := objInfo.GetActualSize()
|
||||
if err != nil {
|
||||
asize = objInfo.Size
|
||||
}
|
||||
|
||||
globalCacheConfig.Set(&cache.ObjectInfo{
|
||||
Key: objInfo.Name,
|
||||
Bucket: objInfo.Bucket,
|
||||
ETag: objInfo.ETag,
|
||||
ModTime: objInfo.ModTime,
|
||||
Expires: objInfo.ExpiresStr(),
|
||||
CacheControl: objInfo.CacheControl,
|
||||
Metadata: cleanReservedKeys(objInfo.UserDefined),
|
||||
Range: rangeHeader,
|
||||
PartNumber: opts.PartNumber,
|
||||
Size: asize,
|
||||
Data: data,
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
if err = setObjectHeaders(w, objInfo, rs, opts); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
@@ -522,8 +646,14 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
|
||||
|
||||
setHeadGetRespHeaders(w, r.Form)
|
||||
|
||||
var iw io.Writer
|
||||
iw = w
|
||||
if buf != nil {
|
||||
iw = io.MultiWriter(w, buf)
|
||||
}
|
||||
|
||||
statusCodeWritten := false
|
||||
httpWriter := xioutil.WriteOnClose(w)
|
||||
httpWriter := xioutil.WriteOnClose(iw)
|
||||
if rs != nil || opts.PartNumber > 0 {
|
||||
statusCodeWritten = true
|
||||
w.WriteHeader(http.StatusPartialContent)
|
||||
@@ -644,6 +774,104 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
|
||||
// Get request range.
|
||||
var rs *HTTPRangeSpec
|
||||
rangeHeader := r.Header.Get(xhttp.Range)
|
||||
if rangeHeader != "" {
|
||||
rs, _ = parseRequestRangeSpec(rangeHeader)
|
||||
}
|
||||
|
||||
if rangeHeader != "" {
|
||||
// Both 'Range' and 'partNumber' cannot be specified at the same time
|
||||
if opts.PartNumber > 0 {
|
||||
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber))
|
||||
return
|
||||
}
|
||||
|
||||
if rs, err = parseRequestRangeSpec(rangeHeader); err != nil {
|
||||
// Handle only errInvalidRange. Ignore other
|
||||
// parse error and treat it as regular Get
|
||||
// request like Amazon S3.
|
||||
if errors.Is(err, errInvalidRange) {
|
||||
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRange))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cachedResult := globalCacheConfig.Enabled() && opts.VersionID == ""
|
||||
|
||||
var update bool
|
||||
if cachedResult {
|
||||
rc := &cache.CondCheck{}
|
||||
h := r.Header.Clone()
|
||||
if opts.PartNumber > 0 {
|
||||
h.Set(xhttp.PartNumber, strconv.Itoa(opts.PartNumber))
|
||||
}
|
||||
rc.Init(bucket, object, h)
|
||||
|
||||
ci, err := globalCacheConfig.Get(rc)
|
||||
if ci != nil {
|
||||
tgs, ok := ci.Metadata[xhttp.AmzObjectTagging]
|
||||
if ok {
|
||||
// Set this such that authorization policies can be applied on the object tags.
|
||||
r.Header.Set(xhttp.AmzObjectTagging, tgs)
|
||||
}
|
||||
|
||||
if s3Error := authorizeRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
|
||||
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(s3Error))
|
||||
return
|
||||
}
|
||||
|
||||
okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent ||
|
||||
ci.StatusCode == http.StatusPreconditionFailed || ci.StatusCode == http.StatusNotModified)
|
||||
if okSt {
|
||||
ci.WriteHeaders(w, func() {
|
||||
// set common headers
|
||||
setCommonHeaders(w)
|
||||
}, func() {
|
||||
okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent)
|
||||
if okSt {
|
||||
for k, v := range ci.Metadata {
|
||||
w.Header().Set(k, v)
|
||||
}
|
||||
|
||||
// For providing ranged content
|
||||
start, rangeLen, err := rs.GetOffsetLength(ci.Size)
|
||||
if err != nil {
|
||||
start, rangeLen = 0, ci.Size
|
||||
}
|
||||
|
||||
if opts.PartNumber > 0 && strings.Contains(ci.ETag, "-") {
|
||||
w.Header()[xhttp.AmzMpPartsCount] = []string{
|
||||
strings.TrimLeftFunc(ci.ETag, func(r rune) bool {
|
||||
return !unicode.IsNumber(r)
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
// Set content length for the range.
|
||||
w.Header().Set(xhttp.ContentLength, strconv.FormatInt(rangeLen, 10))
|
||||
if rs != nil {
|
||||
contentRange := fmt.Sprintf("bytes %d-%d/%d", start, start+rangeLen-1, ci.Size)
|
||||
w.Header().Set(xhttp.ContentRange, contentRange)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
if ci.StatusCode == http.StatusPreconditionFailed {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(ci.StatusCode)
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
if errors.Is(err, cache.ErrKeyMissing) {
|
||||
update = true
|
||||
}
|
||||
}
|
||||
|
||||
opts.FastGetObjInfo = true
|
||||
|
||||
objInfo, err := getObjectInfo(ctx, bucket, object, opts)
|
||||
var proxy proxyResult
|
||||
@@ -651,9 +879,6 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
|
||||
// proxy HEAD to replication target if active-active replication configured on bucket
|
||||
proxytgts := getProxyTargets(ctx, bucket, object, opts)
|
||||
if !proxytgts.Empty() {
|
||||
if rangeHeader != "" {
|
||||
rs, _ = parseRequestRangeSpec(rangeHeader)
|
||||
}
|
||||
var oi ObjectInfo
|
||||
oi, proxy = proxyHeadToReplicationTarget(ctx, bucket, object, rs, opts, proxytgts)
|
||||
if proxy.Proxy {
|
||||
@@ -737,29 +962,29 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
|
||||
return
|
||||
}
|
||||
|
||||
if update {
|
||||
asize, err := objInfo.GetActualSize()
|
||||
if err != nil {
|
||||
asize = objInfo.Size
|
||||
}
|
||||
|
||||
defer globalCacheConfig.Set(&cache.ObjectInfo{
|
||||
Key: objInfo.Name,
|
||||
Bucket: objInfo.Bucket,
|
||||
ETag: objInfo.ETag,
|
||||
ModTime: objInfo.ModTime,
|
||||
Expires: objInfo.ExpiresStr(),
|
||||
CacheControl: objInfo.CacheControl,
|
||||
Size: asize,
|
||||
Metadata: cleanReservedKeys(objInfo.UserDefined),
|
||||
})
|
||||
}
|
||||
|
||||
// Validate pre-conditions if any.
|
||||
if checkPreconditions(ctx, w, r, objInfo, opts) {
|
||||
return
|
||||
}
|
||||
|
||||
if rangeHeader != "" {
|
||||
// Both 'Range' and 'partNumber' cannot be specified at the same time
|
||||
if opts.PartNumber > 0 {
|
||||
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber))
|
||||
return
|
||||
}
|
||||
|
||||
if rs, err = parseRequestRangeSpec(rangeHeader); err != nil {
|
||||
// Handle only errInvalidRange. Ignore other
|
||||
// parse error and treat it as regular Get
|
||||
// request like Amazon S3.
|
||||
if errors.Is(err, errInvalidRange) {
|
||||
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRange))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set encryption response headers
|
||||
switch kind, _ := crypto.IsEncrypted(objInfo.UserDefined); kind {
|
||||
case crypto.S3:
|
||||
@@ -1531,6 +1756,22 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
Host: handlers.GetSourceIP(r),
|
||||
})
|
||||
|
||||
asize, err := objInfo.GetActualSize()
|
||||
if err != nil {
|
||||
asize = objInfo.Size
|
||||
}
|
||||
|
||||
defer globalCacheConfig.Set(&cache.ObjectInfo{
|
||||
Key: objInfo.Name,
|
||||
Bucket: objInfo.Bucket,
|
||||
ETag: objInfo.ETag,
|
||||
ModTime: objInfo.ModTime,
|
||||
Expires: objInfo.ExpiresStr(),
|
||||
CacheControl: objInfo.CacheControl,
|
||||
Size: asize,
|
||||
Metadata: cleanReservedKeys(objInfo.UserDefined),
|
||||
})
|
||||
|
||||
if !remoteCallRequired && !globalTierConfigMgr.Empty() {
|
||||
// Schedule object for immediate transition if eligible.
|
||||
objInfo.ETag = origETag
|
||||
@@ -1633,7 +1874,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
var (
|
||||
md5hex = clientETag.String()
|
||||
sha256hex = ""
|
||||
reader io.Reader = r.Body
|
||||
rd io.Reader = r.Body
|
||||
s3Err APIErrorCode
|
||||
putObject = objectAPI.PutObject
|
||||
)
|
||||
@@ -1647,14 +1888,14 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
switch rAuthType {
|
||||
case authTypeStreamingSigned, authTypeStreamingSignedTrailer:
|
||||
// Initialize stream signature verifier.
|
||||
reader, s3Err = newSignV4ChunkedReader(r, rAuthType == authTypeStreamingSignedTrailer)
|
||||
rd, s3Err = newSignV4ChunkedReader(r, rAuthType == authTypeStreamingSignedTrailer)
|
||||
if s3Err != ErrNone {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
|
||||
return
|
||||
}
|
||||
case authTypeStreamingUnsignedTrailer:
|
||||
// Initialize stream chunked reader with optional trailers.
|
||||
reader, s3Err = newUnsignedV4ChunkedReader(r, true)
|
||||
rd, s3Err = newUnsignedV4ChunkedReader(r, true)
|
||||
if s3Err != ErrNone {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
|
||||
return
|
||||
@@ -1702,6 +1943,18 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
AutoEncrypt: globalAutoEncryption,
|
||||
})
|
||||
|
||||
var buf *bytebufferpool.ByteBuffer
|
||||
if globalCacheConfig.MatchesSize(size) {
|
||||
buf = bytebufferpool.Get()
|
||||
defer bytebufferpool.Put(buf)
|
||||
}
|
||||
|
||||
var reader io.Reader
|
||||
reader = rd
|
||||
if buf != nil {
|
||||
reader = io.TeeReader(rd, buf)
|
||||
}
|
||||
|
||||
actualSize := size
|
||||
var idxCb func() []byte
|
||||
if isCompressible(r.Header, object) && size > minCompressibleSize {
|
||||
@@ -1904,6 +2157,30 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
|
||||
setPutObjHeaders(w, objInfo, false)
|
||||
|
||||
defer func() {
|
||||
var data []byte
|
||||
if buf != nil {
|
||||
data = buf.Bytes()
|
||||
}
|
||||
|
||||
asize, err := objInfo.GetActualSize()
|
||||
if err != nil {
|
||||
asize = objInfo.Size
|
||||
}
|
||||
|
||||
globalCacheConfig.Set(&cache.ObjectInfo{
|
||||
Key: objInfo.Name,
|
||||
Bucket: objInfo.Bucket,
|
||||
ETag: objInfo.ETag,
|
||||
ModTime: objInfo.ModTime,
|
||||
Expires: objInfo.ExpiresStr(),
|
||||
CacheControl: objInfo.CacheControl,
|
||||
Size: asize,
|
||||
Metadata: cleanReservedKeys(objInfo.UserDefined),
|
||||
Data: data,
|
||||
})
|
||||
}()
|
||||
|
||||
// Notify object created event.
|
||||
evt := eventArgs{
|
||||
EventName: event.ObjectCreatedPut,
|
||||
@@ -2234,10 +2511,28 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
|
||||
return err
|
||||
}
|
||||
|
||||
objInfo.ETag = getDecryptedETag(r.Header, objInfo, false)
|
||||
|
||||
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
|
||||
scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType)
|
||||
}
|
||||
|
||||
asize, err := objInfo.GetActualSize()
|
||||
if err != nil {
|
||||
asize = objInfo.Size
|
||||
}
|
||||
|
||||
defer globalCacheConfig.Set(&cache.ObjectInfo{
|
||||
Key: objInfo.Name,
|
||||
Bucket: objInfo.Bucket,
|
||||
ETag: objInfo.ETag,
|
||||
ModTime: objInfo.ModTime,
|
||||
Expires: objInfo.ExpiresStr(),
|
||||
CacheControl: objInfo.CacheControl,
|
||||
Size: asize,
|
||||
Metadata: cleanReservedKeys(objInfo.UserDefined),
|
||||
})
|
||||
|
||||
// Notify object created event.
|
||||
evt := eventArgs{
|
||||
EventName: event.ObjectCreatedPut,
|
||||
@@ -2413,6 +2708,8 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
|
||||
return
|
||||
}
|
||||
|
||||
defer globalCacheConfig.Delete(bucket, object)
|
||||
|
||||
setPutObjHeaders(w, objInfo, true)
|
||||
writeSuccessNoContent(w)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user