From 35dea24ffd0e31896711315a6f7ee57caeef7f84 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 6 May 2022 12:39:58 -0700 Subject: [PATCH] fix: console log peer API from its broken implementation (#14873) console logging peer API was broken as it would timeout after 15minutes, this never really worked beyond this value and basically failed to provide the streaming "log" functionality that was expected from this implementation. also fix convoluted channel handling by keeping things simple, this is rewritten. --- cmd/peer-rest-client.go | 69 ++++++++++++++++++++++++----------------- cmd/peer-rest-server.go | 15 +++++++-- 2 files changed, 52 insertions(+), 32 deletions(-) diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 75d77b0b7..22c1039b5 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -903,43 +903,54 @@ func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh <-chan stru }() } +func (client *peerRESTClient) doConsoleLog(logCh chan interface{}, doneCh <-chan struct{}) { + // To cancel the REST request in case doneCh gets closed. + ctx, cancel := context.WithCancel(GlobalContext) + + cancelCh := make(chan struct{}) + defer close(cancelCh) + go func() { + select { + case <-doneCh: + case <-cancelCh: + // There was an error in the REST request. + } + cancel() + }() + + respBody, err := client.callWithContext(ctx, peerRESTMethodLog, nil, nil, -1) + defer http.DrainBody(respBody) + if err != nil { + return + } + + dec := gob.NewDecoder(respBody) + for { + var lg madmin.LogInfo + if err = dec.Decode(&lg); err != nil { + break + } + if lg.DeploymentID != "" { + select { + case logCh <- lg: + default: + // Do not block on slow receivers. + } + } + } +} + // ConsoleLog - sends request to peer nodes to get console logs func (client *peerRESTClient) ConsoleLog(logCh chan interface{}, doneCh <-chan struct{}) { go func() { for { - // get cancellation context to properly unsubscribe peers - ctx, cancel := context.WithCancel(GlobalContext) - respBody, err := client.callWithContext(ctx, peerRESTMethodLog, nil, nil, -1) - if err != nil { - // Retry the failed request. - time.Sleep(5 * time.Second) - } else { - dec := gob.NewDecoder(respBody) - - go func() { - <-doneCh - cancel() - }() - - for { - var log madmin.LogInfo - if err = dec.Decode(&log); err != nil { - break - } - select { - case logCh <- log: - default: - } - } - } - + client.doConsoleLog(logCh, doneCh) select { case <-doneCh: - cancel() - http.DrainBody(respBody) return default: - // There was error in the REST request, retry. + // There was error in the REST request, retry after sometime as probably the peer is down. + time.Sleep(5 * time.Second) } } }() diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 471c0de04..8b6a39c59 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -932,6 +932,8 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) { return } w.(http.Flusher).Flush() + case <-r.Context().Done(): + return case <-keepAliveTicker.C: if err := enc.Encode(&event.Event{}); err != nil { return @@ -993,6 +995,8 @@ func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) { return } w.(http.Flusher).Flush() + case <-r.Context().Done(): + return case <-keepAliveTicker.C: if err := enc.Encode(&madmin.TraceInfo{}); err != nil { return @@ -1059,15 +1063,15 @@ func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Reques return } - w.Header().Set("Connection", "close") - w.WriteHeader(http.StatusOK) - doneCh := make(chan struct{}) defer close(doneCh) ch := make(chan interface{}, 2000) globalConsoleSys.Subscribe(ch, doneCh, "", 0, string(logger.All), nil) + keepAliveTicker := time.NewTicker(500 * time.Millisecond) + defer keepAliveTicker.Stop() + enc := gob.NewEncoder(w) for { select { @@ -1076,6 +1080,11 @@ func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Reques return } w.(http.Flusher).Flush() + case <-keepAliveTicker.C: + if err := enc.Encode(&madmin.LogInfo{}); err != nil { + return + } + w.(http.Flusher).Flush() case <-r.Context().Done(): return }