From d96798ae7b007c2f5dd41f9bd1177b8371f37a6c Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Thu, 15 Aug 2024 03:36:00 -0700 Subject: [PATCH] Add support profile deadlines and concurrent operations (#20244) * Allow a maximum of 10 seconds to start profiling operations. * Download up to 16 profiles concurrently, but only allow 10 seconds for each (does not include write time). * Add cluster info as the first operation. * Ignore remote download errors. * Stop remote profiles if the request is terminated. --- cmd/admin-handlers.go | 12 ++++++-- cmd/notification.go | 65 ++++++++++++++++++++++++++--------------- cmd/peer-rest-client.go | 15 +++------- 3 files changed, 55 insertions(+), 37 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 9ab620e71..9938cec5e 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1036,7 +1036,7 @@ func (a adminAPIHandlers) StartProfilingHandler(w http.ResponseWriter, r *http.R // Start profiling on remote servers. var hostErrs []NotificationPeerErr for _, profiler := range profiles { - hostErrs = append(hostErrs, globalNotificationSys.StartProfiling(profiler)...) + hostErrs = append(hostErrs, globalNotificationSys.StartProfiling(ctx, profiler)...) // Start profiling locally as well. prof, err := startProfiler(profiler) @@ -1117,7 +1117,11 @@ func (a adminAPIHandlers) ProfileHandler(w http.ResponseWriter, r *http.Request) // Start profiling on remote servers. for _, profiler := range profiles { - globalNotificationSys.StartProfiling(profiler) + // Limit start time to max 10s. + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + globalNotificationSys.StartProfiling(ctx, profiler) + // StartProfiling blocks, so we can cancel now. + cancel() // Start profiling locally as well. prof, err := startProfiler(profiler) @@ -1132,6 +1136,10 @@ func (a adminAPIHandlers) ProfileHandler(w http.ResponseWriter, r *http.Request) for { select { case <-ctx.Done(): + // Stop remote profiles + go globalNotificationSys.DownloadProfilingData(GlobalContext, io.Discard) + + // Stop local globalProfilerMu.Lock() defer globalProfilerMu.Unlock() for k, v := range globalProfiler { diff --git a/cmd/notification.go b/cmd/notification.go index 802f2c419..e37577566 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -84,6 +84,9 @@ func WithNPeersThrottled(nerrs, wks int) *NotificationGroup { if nerrs <= 0 { nerrs = 1 } + if wks > nerrs { + wks = nerrs + } wk, _ := workers.New(wks) return &NotificationGroup{errs: make([]NotificationPeerErr, nerrs), workers: wk, retryCount: 3} } @@ -292,15 +295,15 @@ func (sys *NotificationSys) BackgroundHealStatus(ctx context.Context) ([]madmin. } // StartProfiling - start profiling on remote peers, by initiating a remote RPC. -func (sys *NotificationSys) StartProfiling(profiler string) []NotificationPeerErr { +func (sys *NotificationSys) StartProfiling(ctx context.Context, profiler string) []NotificationPeerErr { ng := WithNPeers(len(sys.peerClients)) for idx, client := range sys.peerClients { if client == nil { continue } client := client - ng.Go(GlobalContext, func() error { - return client.StartProfiling(profiler) + ng.Go(ctx, func() error { + return client.StartProfiling(ctx, profiler) }, idx, *client.host) } return ng.Wait() @@ -313,28 +316,49 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io zipWriter := zip.NewWriter(writer) defer zipWriter.Close() - for _, client := range sys.peerClients { + // Start by embedding cluster info. + if b := getClusterMetaInfo(ctx); len(b) > 0 { + internalLogIf(ctx, embedFileInZip(zipWriter, "cluster.info", b, 0o600)) + } + + // Profiles can be quite big, so we limit to max 16 concurrent downloads. + ng := WithNPeersThrottled(len(sys.peerClients), 16) + var writeMu sync.Mutex + for i, client := range sys.peerClients { if client == nil { continue } - data, err := client.DownloadProfileData() - if err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) - ctx := logger.SetReqInfo(ctx, reqInfo) - peersLogOnceIf(ctx, err, client.host.String()) - continue - } - - profilingDataFound = true - - for typ, data := range data { - err := embedFileInZip(zipWriter, fmt.Sprintf("profile-%s-%s", client.host.String(), typ), data, 0o600) + ng.Go(ctx, func() error { + // Give 15 seconds to each remote call. + // Errors are logged but not returned. + ctx, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + data, err := client.DownloadProfileData(ctx) if err != nil { reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) ctx := logger.SetReqInfo(ctx, reqInfo) peersLogOnceIf(ctx, err, client.host.String()) + return nil } - } + + for typ, data := range data { + // zip writer only handles one concurrent write + writeMu.Lock() + profilingDataFound = true + err := embedFileInZip(zipWriter, fmt.Sprintf("profile-%s-%s", client.host.String(), typ), data, 0o600) + writeMu.Unlock() + if err != nil { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) + ctx := logger.SetReqInfo(ctx, reqInfo) + peersLogOnceIf(ctx, err, client.host.String()) + } + } + return nil + }, i, *client.host) + } + ng.Wait() + if ctx.Err() != nil { + return false } // Local host @@ -359,9 +383,6 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io err := embedFileInZip(zipWriter, fmt.Sprintf("profile-%s-%s", thisAddr, typ), data, 0o600) internalLogIf(ctx, err) } - if b := getClusterMetaInfo(ctx); len(b) > 0 { - internalLogIf(ctx, embedFileInZip(zipWriter, "cluster.info", b, 0o600)) - } return } @@ -383,10 +404,6 @@ func (sys *NotificationSys) VerifyBinary(ctx context.Context, u *url.URL, sha256 // further discussion advised. Remove this comment and remove the worker model // for this function in future. maxWorkers := runtime.GOMAXPROCS(0) / 2 - if maxWorkers > len(sys.peerClients) { - maxWorkers = len(sys.peerClients) - } - ng := WithNPeersThrottled(len(sys.peerClients), maxWorkers) for idx, client := range sys.peerClients { if client == nil { diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 76aa456be..07fc7b6a0 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -105,13 +105,6 @@ func newPeerRESTClient(peer *xnet.Host, gridHost string) *peerRESTClient { } } -// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected -// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() -// after verifying format.json -func (client *peerRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { - return client.callWithContext(GlobalContext, method, values, body, length) -} - // Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected // permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() // after verifying format.json @@ -257,10 +250,10 @@ func (client *peerRESTClient) GetProcInfo(ctx context.Context) (info madmin.Proc } // StartProfiling - Issues profiling command on the peer node. -func (client *peerRESTClient) StartProfiling(profiler string) error { +func (client *peerRESTClient) StartProfiling(ctx context.Context, profiler string) error { values := make(url.Values) values.Set(peerRESTProfiler, profiler) - respBody, err := client.call(peerRESTMethodStartProfiling, values, nil, -1) + respBody, err := client.callWithContext(ctx, peerRESTMethodStartProfiling, values, nil, -1) if err != nil { return err } @@ -269,8 +262,8 @@ func (client *peerRESTClient) StartProfiling(profiler string) error { } // DownloadProfileData - download profiled data from a remote node. -func (client *peerRESTClient) DownloadProfileData() (data map[string][]byte, err error) { - respBody, err := client.call(peerRESTMethodDownloadProfilingData, nil, nil, -1) +func (client *peerRESTClient) DownloadProfileData(ctx context.Context) (data map[string][]byte, err error) { + respBody, err := client.callWithContext(ctx, peerRESTMethodDownloadProfilingData, nil, nil, -1) if err != nil { return }