diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 75d77b0b7..22c1039b5 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -903,43 +903,54 @@ func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh <-chan stru }() } +func (client *peerRESTClient) doConsoleLog(logCh chan interface{}, doneCh <-chan struct{}) { + // To cancel the REST request in case doneCh gets closed. + ctx, cancel := context.WithCancel(GlobalContext) + + cancelCh := make(chan struct{}) + defer close(cancelCh) + go func() { + select { + case <-doneCh: + case <-cancelCh: + // There was an error in the REST request. + } + cancel() + }() + + respBody, err := client.callWithContext(ctx, peerRESTMethodLog, nil, nil, -1) + defer http.DrainBody(respBody) + if err != nil { + return + } + + dec := gob.NewDecoder(respBody) + for { + var lg madmin.LogInfo + if err = dec.Decode(&lg); err != nil { + break + } + if lg.DeploymentID != "" { + select { + case logCh <- lg: + default: + // Do not block on slow receivers. + } + } + } +} + // ConsoleLog - sends request to peer nodes to get console logs func (client *peerRESTClient) ConsoleLog(logCh chan interface{}, doneCh <-chan struct{}) { go func() { for { - // get cancellation context to properly unsubscribe peers - ctx, cancel := context.WithCancel(GlobalContext) - respBody, err := client.callWithContext(ctx, peerRESTMethodLog, nil, nil, -1) - if err != nil { - // Retry the failed request. - time.Sleep(5 * time.Second) - } else { - dec := gob.NewDecoder(respBody) - - go func() { - <-doneCh - cancel() - }() - - for { - var log madmin.LogInfo - if err = dec.Decode(&log); err != nil { - break - } - select { - case logCh <- log: - default: - } - } - } - + client.doConsoleLog(logCh, doneCh) select { case <-doneCh: - cancel() - http.DrainBody(respBody) return default: - // There was error in the REST request, retry. + // There was error in the REST request, retry after sometime as probably the peer is down. + time.Sleep(5 * time.Second) } } }() diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 471c0de04..8b6a39c59 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -932,6 +932,8 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) { return } w.(http.Flusher).Flush() + case <-r.Context().Done(): + return case <-keepAliveTicker.C: if err := enc.Encode(&event.Event{}); err != nil { return @@ -993,6 +995,8 @@ func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) { return } w.(http.Flusher).Flush() + case <-r.Context().Done(): + return case <-keepAliveTicker.C: if err := enc.Encode(&madmin.TraceInfo{}); err != nil { return @@ -1059,15 +1063,15 @@ func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Reques return } - w.Header().Set("Connection", "close") - w.WriteHeader(http.StatusOK) - doneCh := make(chan struct{}) defer close(doneCh) ch := make(chan interface{}, 2000) globalConsoleSys.Subscribe(ch, doneCh, "", 0, string(logger.All), nil) + keepAliveTicker := time.NewTicker(500 * time.Millisecond) + defer keepAliveTicker.Stop() + enc := gob.NewEncoder(w) for { select { @@ -1076,6 +1080,11 @@ func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Reques return } w.(http.Flusher).Flush() + case <-keepAliveTicker.C: + if err := enc.Encode(&madmin.LogInfo{}); err != nil { + return + } + w.(http.Flusher).Flush() case <-r.Context().Done(): return }