From 064f36ca5a4b9e93baad36f2166acd3c81e95976 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 26 Jul 2024 05:55:01 -0700 Subject: [PATCH] move to GET for internal stream READs instead of POST (#20160) the main reason is to let Go net/http perform necessary book keeping properly, and in essential from consistency point of view its GETs all the way. Deprecate sendFile() as its buggy inside Go runtime. --- cmd/generic-handlers.go | 2 +- cmd/generic-handlers_test.go | 5 +++-- cmd/storage-rest-client.go | 6 ++++-- cmd/storage-rest-common.go | 2 +- cmd/storage-rest-server.go | 28 +--------------------------- internal/ioutil/ioutil.go | 23 ++++++++++++++++++----- internal/rest/client.go | 17 +++++++++++------ 7 files changed, 39 insertions(+), 44 deletions(-) diff --git a/cmd/generic-handlers.go b/cmd/generic-handlers.go index 996c84ad2..defd66e6c 100644 --- a/cmd/generic-handlers.go +++ b/cmd/generic-handlers.go @@ -251,7 +251,7 @@ func guessIsRPCReq(req *http.Request) bool { return true } - return req.Method == http.MethodPost && + return (req.Method == http.MethodPost || req.Method == http.MethodGet) && strings.HasPrefix(req.URL.Path, minioReservedBucketPath+SlashSeparator) } diff --git a/cmd/generic-handlers_test.go b/cmd/generic-handlers_test.go index 303e42d2f..12ba5631e 100644 --- a/cmd/generic-handlers_test.go +++ b/cmd/generic-handlers_test.go @@ -51,9 +51,10 @@ func TestGuessIsRPC(t *testing.T) { r = &http.Request{ Proto: "HTTP/1.1", Method: http.MethodGet, + URL: u, } - if guessIsRPCReq(r) { - t.Fatal("Test shouldn't report as net/rpc for a non net/rpc request.") + if !guessIsRPCReq(r) { + t.Fatal("Test shouldn't fail for a possible net/rpc request.") } r = &http.Request{ Proto: "HTTP/1.1", diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 7a71c174e..593fa9f82 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -617,9 +617,11 @@ func (client *storageRESTClient) ReadFileStream(ctx context.Context, volume, pat values.Set(storageRESTFilePath, path) values.Set(storageRESTOffset, strconv.Itoa(int(offset))) values.Set(storageRESTLength, strconv.Itoa(int(length))) - respBody, err := client.call(ctx, storageRESTMethodReadFileStream, values, nil, -1) + values.Set(storageRESTDiskID, *client.diskID.Load()) + + respBody, err := client.restClient.CallWithHTTPMethod(ctx, http.MethodGet, storageRESTMethodReadFileStream, values, nil, -1) if err != nil { - return nil, err + return nil, toStorageErr(err) } return respBody, nil } diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index d8434e97b..8b9493058 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -20,7 +20,7 @@ package cmd //go:generate msgp -file $GOFILE -unexported const ( - storageRESTVersion = "v59" // Change ReadOptions inclFreeVersions + storageRESTVersion = "v60" // ReadFileStream now uses http.MethodGet storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index ab34552c9..43bfae5da 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -29,14 +29,12 @@ import ( "net/http" "os/user" "path" - "runtime" "runtime/debug" "strconv" "strings" "sync" "time" - "github.com/dustin/go-humanize" "github.com/minio/minio/internal/grid" "github.com/tinylib/msgp/msgp" @@ -606,8 +604,6 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http return } - w.Header().Set(xhttp.ContentLength, strconv.FormatInt(length, 10)) - rc, err := s.getStorage().ReadFileStream(r.Context(), volume, filePath, offset, length) if err != nil { s.writeErrorResponse(w, err) @@ -615,28 +611,6 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http } defer rc.Close() - noReadFrom := runtime.GOOS == "windows" || length < 4*humanize.MiByte - if !noReadFrom { - rf, ok := w.(io.ReaderFrom) - if ok { - // Attempt to use splice/sendfile() optimization, A very specific behavior mentioned below is necessary. - // See https://github.com/golang/go/blob/f7c5cbb82087c55aa82081e931e0142783700ce8/src/net/sendfile_linux.go#L20 - // Windows can lock up with this optimization, so we fall back to regular copy. - sr, ok := rc.(*sendFileReader) - if ok { - // Sendfile sends in 4MiB chunks per sendfile syscall which is more than enough - // for most setups. - _, err = rf.ReadFrom(sr.Reader) - if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients - storageLogIf(r.Context(), err) - } - if err == nil || !errors.Is(err, xhttp.ErrNotImplemented) { - return - } - } - } - } // noReadFrom means use io.Copy() - _, err = xioutil.Copy(w, rc) if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients storageLogIf(r.Context(), err) @@ -1367,12 +1341,12 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCreateFile).HandlerFunc(h(server.CreateFileHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadFile).HandlerFunc(h(server.ReadFileHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadFileStream).HandlerFunc(h(server.ReadFileStreamHandler)) - subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersions).HandlerFunc(h(server.DeleteVersionsHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodVerifyFile).HandlerFunc(h(server.VerifyFileHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(h(server.StatInfoFile)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadMultiple).HandlerFunc(h(server.ReadMultiple)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCleanAbandoned).HandlerFunc(h(server.CleanAbandonedDataHandler)) + subrouter.Methods(http.MethodGet).Path(storageRESTVersionPrefix + storageRESTMethodReadFileStream).HandlerFunc(h(server.ReadFileStreamHandler)) logger.FatalIf(storageListDirRPC.RegisterNoInput(gm, server.ListDirHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageReadAllRPC.Register(gm, server.ReadAllHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageWriteAllRPC.Register(gm, server.WriteAllHandler, endpoint.Path), "unable to register handler") diff --git a/internal/ioutil/ioutil.go b/internal/ioutil/ioutil.go index 1b93fbb60..49a4300c5 100644 --- a/internal/ioutil/ioutil.go +++ b/internal/ioutil/ioutil.go @@ -34,8 +34,9 @@ import ( // Block sizes constant. const ( - SmallBlock = 32 * humanize.KiByte // Default r/w block size for smaller objects. - LargeBlock = 1 * humanize.MiByte // Default r/w block size for normal objects. + SmallBlock = 32 * humanize.KiByte // Default r/w block size for smaller objects. + MediumBlock = 128 * humanize.KiByte // Default r/w block size for medium sized objects. + LargeBlock = 1 * humanize.MiByte // Default r/w block size for normal objects. ) // aligned sync.Pool's @@ -46,6 +47,12 @@ var ( return &b }, } + ODirectPoolMedium = sync.Pool{ + New: func() interface{} { + b := disk.AlignedBlock(MediumBlock) + return &b + }, + } ODirectPoolSmall = sync.Pool{ New: func() interface{} { b := disk.AlignedBlock(SmallBlock) @@ -294,13 +301,19 @@ func NewSkipReader(r io.Reader, n int64) io.Reader { return &SkipReader{r, n} } +// writerOnly hides an io.Writer value's optional ReadFrom method +// from io.Copy. +type writerOnly struct { + io.Writer +} + // Copy is exactly like io.Copy but with reusable buffers. func Copy(dst io.Writer, src io.Reader) (written int64, err error) { - bufp := ODirectPoolLarge.Get().(*[]byte) + bufp := ODirectPoolMedium.Get().(*[]byte) + defer ODirectPoolMedium.Put(bufp) buf := *bufp - defer ODirectPoolLarge.Put(bufp) - return io.CopyBuffer(dst, src, buf) + return io.CopyBuffer(writerOnly{dst}, src, buf) } // SameFile returns if the files are same. diff --git a/internal/rest/client.go b/internal/rest/client.go index 4e01b9cc8..c07de174f 100644 --- a/internal/rest/client.go +++ b/internal/rest/client.go @@ -129,13 +129,13 @@ func removeEmptyPort(host string) string { } // Copied from http.NewRequest but implemented to ensure we reuse `url.URL` instance. -func (c *Client) newRequest(ctx context.Context, u url.URL, body io.Reader) (*http.Request, error) { +func (c *Client) newRequest(ctx context.Context, method string, u url.URL, body io.Reader) (*http.Request, error) { rc, ok := body.(io.ReadCloser) if !ok && body != nil { rc = io.NopCloser(body) } req := &http.Request{ - Method: http.MethodPost, + Method: method, URL: &u, Proto: "HTTP/1.1", ProtoMajor: 1, @@ -290,8 +290,8 @@ func (c *Client) dumpHTTP(req *http.Request, resp *http.Response) { // ErrClientClosed returned when *Client is closed. var ErrClientClosed = errors.New("rest client is closed") -// Call - make a REST call with context. -func (c *Client) Call(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) { +// CallWithHTTPMethod - make a REST call with context, using a custom HTTP method. +func (c *Client) CallWithHTTPMethod(ctx context.Context, httpMethod, rpcMethod string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) { switch atomic.LoadInt32(&c.connected) { case closed: // client closed, this is usually a manual process @@ -307,10 +307,10 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod // Shallow copy. We don't modify the *UserInfo, if set. // All other fields are copied. u := *c.url - u.Path = path.Join(u.Path, method) + u.Path = path.Join(u.Path, rpcMethod) u.RawQuery = values.Encode() - req, err := c.newRequest(ctx, u, body) + req, err := c.newRequest(ctx, httpMethod, u, body) if err != nil { return nil, &NetworkError{Err: err} } @@ -382,6 +382,11 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod return resp.Body, nil } +// Call - make a REST call with context. +func (c *Client) Call(ctx context.Context, rpcMethod string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) { + return c.CallWithHTTPMethod(ctx, http.MethodPost, rpcMethod, values, body, length) +} + // Close closes all idle connections of the underlying http client func (c *Client) Close() { atomic.StoreInt32(&c.connected, closed)