diff --git a/cmd/api-router.go b/cmd/api-router.go index b1d5eb50b..875794396 100644 --- a/cmd/api-router.go +++ b/cmd/api-router.go @@ -166,6 +166,8 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool) bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketnotification", httpTraceAll(api.GetBucketNotificationHandler))).Queries("notification", "") // ListenBucketNotification bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listenbucketnotification", httpTraceAll(api.ListenBucketNotificationHandler))).Queries("events", "{events:.*}") + // ListenBucketNotificationV2 + bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listenbucketnotificationv2", httpTraceAll(api.ListenBucketNotificationHandlerV2))).Queries("type", "2", "events", "{events:.*}") // ListMultipartUploads bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listmultipartuploads", httpTraceAll(api.ListMultipartUploadsHandler))).Queries("uploads", "") // ListObjectsV2M diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index 73b14efc4..10ddcdc53 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -18,11 +18,14 @@ package cmd import ( "bytes" + "encoding/json" "encoding/xml" "errors" "io" "net/http" + "net/url" "path" + "time" "github.com/gorilla/mux" xhttp "github.com/minio/minio/cmd/http" @@ -171,6 +174,144 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter, writeSuccessResponseHeadersOnly(w) } +func (api objectAPIHandlers) ListenBucketNotificationHandlerV2(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "ListenBucketNotificationV2") + + defer logger.AuditLog(w, r, "ListenBucketNotificationV2", mustGetClaimsFromToken(r)) + + // Validate if bucket exists. + objAPI := api.ObjectAPI() + if objAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) + return + } + + if !objAPI.IsNotificationSupported() { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r)) + return + } + + if !objAPI.IsListenBucketSupported() { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r)) + return + } + + vars := mux.Vars(r) + bucketName := vars["bucket"] + + values := r.URL.Query() + + var prefix string + if len(values["prefix"]) > 1 { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrFilterNamePrefix), r.URL, guessIsBrowserReq(r)) + return + } + + if len(values["prefix"]) == 1 { + if err := event.ValidateFilterRuleValue(values["prefix"][0]); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + prefix = values["prefix"][0] + } + + var suffix string + if len(values["suffix"]) > 1 { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrFilterNameSuffix), r.URL, guessIsBrowserReq(r)) + return + } + + if len(values["suffix"]) == 1 { + if err := event.ValidateFilterRuleValue(values["suffix"][0]); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + suffix = values["suffix"][0] + } + + pattern := event.NewPattern(prefix, suffix) + + eventNames := []event.Name{} + for _, s := range values["events"] { + eventName, err := event.ParseName(s) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + eventNames = append(eventNames, eventName) + } + + if _, err := objAPI.GetBucketInfo(ctx, bucketName); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + host, err := xnet.ParseHost(r.RemoteAddr) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + rulesMap := event.NewRulesMap(eventNames, pattern, + event.TargetID{ID: "listen" + "+" + mustGetUUID() + "+" + host.Name, Name: host.Port.String()}) + + w.Header().Set(xhttp.ContentType, "text/event-stream") + + doneCh := make(chan struct{}) + defer close(doneCh) + + // Listen Publisher and peer-listen-client uses nonblocking send and hence does not wait for slow receivers. + // Use buffered channel to take care of burst sends or slow w.Write() + listenCh := make(chan interface{}, 4000) + + peers := getRestClients(globalEndpoints) + + globalHTTPListen.Subscribe(listenCh, doneCh, func(evI interface{}) bool { + ev, ok := evI.(event.Event) + if !ok { + return false + } + objectName, uerr := url.QueryUnescape(ev.S3.Object.Key) + if uerr != nil { + return false + } + return len(rulesMap.Match(ev.EventName, objectName).ToSlice()) != 0 + }) + + for _, peer := range peers { + if peer == nil { + continue + } + peer.Listen(listenCh, doneCh) + } + + keepAliveTicker := time.NewTicker(500 * time.Millisecond) + defer keepAliveTicker.Stop() + + enc := json.NewEncoder(w) + for { + select { + case evI := <-listenCh: + ev := evI.(event.Event) + if err := enc.Encode(struct{ Records []event.Event }{[]event.Event{ev}}); err != nil { + return + } + w.(http.Flusher).Flush() + case <-keepAliveTicker.C: + if _, err := w.Write([]byte(" ")); err != nil { + return + } + w.(http.Flusher).Flush() + case <-GlobalServiceDoneCh: + return + } + } + +} + // ListenBucketNotificationHandler - This HTTP handler sends events to the connected HTTP client. // Client should send prefix/suffix object name to match and events to watch as query parameters. func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { diff --git a/cmd/globals.go b/cmd/globals.go index 4ca725992..6ed957f4b 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -163,6 +163,9 @@ var ( // registered listeners globalHTTPTrace = pubsub.New() + // global Listen system to send S3 API events to registered listeners + globalHTTPListen = pubsub.New() + // global console system to send console logs to // registered listeners globalConsoleSys *HTTPConsoleLoggerSys diff --git a/cmd/notification.go b/cmd/notification.go index 1a71beb63..96b7f9987 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -1297,6 +1297,11 @@ func sendEvent(args eventArgs) { crypto.RemoveSensitiveEntries(args.Object.UserDefined) crypto.RemoveInternalEntries(args.Object.UserDefined) + if globalHTTPListen.HasSubscribers() { + globalHTTPListen.Publish(args.ToEvent()) + return + } + // globalNotificationSys is not initialized in gateway mode. if globalNotificationSys == nil { return diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 0bf271d92..aee0d1e6d 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -677,6 +677,60 @@ func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh chan stru } } +func (client *peerRESTClient) doListen(listenCh chan interface{}, doneCh chan struct{}) { + // To cancel the REST request in case doneCh gets closed. + ctx, cancel := context.WithCancel(context.Background()) + + cancelCh := make(chan struct{}) + defer close(cancelCh) + go func() { + select { + case <-doneCh: + case <-cancelCh: + // There was an error in the REST request. + } + cancel() + }() + + respBody, err := client.callWithContext(ctx, peerRESTMethodListen, nil, nil, -1) + defer http.DrainBody(respBody) + + if err != nil { + return + } + + dec := gob.NewDecoder(respBody) + for { + var ev event.Event + if err = dec.Decode(&ev); err != nil { + return + } + if len(ev.EventVersion) > 0 { + select { + case listenCh <- ev: + default: + // Do not block on slow receivers. + } + } + } +} + +// Listen - listen on peers. +func (client *peerRESTClient) Listen(listenCh chan interface{}, doneCh chan struct{}) { + go func() { + for { + client.doListen(listenCh, doneCh) + select { + case <-doneCh: + return + default: + // There was error in the REST request, retry after sometime as probably the peer is down. + time.Sleep(5 * time.Second) + } + } + }() +} + // Trace - send http trace request to peer nodes func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh chan struct{}, trcAll, trcErr bool) { go func() { diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index 071a752f6..a39de6423 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -53,6 +53,7 @@ const ( peerRESTMethodTargetExists = "/targetexists" peerRESTMethodSendEvent = "/sendevent" peerRESTMethodTrace = "/trace" + peerRESTMethodListen = "/listen" peerRESTMethodBucketLifecycleSet = "/setbucketlifecycle" peerRESTMethodBucketLifecycleRemove = "/removebucketlifecycle" peerRESTMethodLog = "/log" diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index cc73db963..c84a33469 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -961,6 +961,47 @@ func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Req } } +// ListenHandler sends http trace messages back to peer rest client +func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + w.WriteHeader(http.StatusOK) + w.(http.Flusher).Flush() + + doneCh := make(chan struct{}) + defer close(doneCh) + + // Listen Publisher uses nonblocking publish and hence does not wait for slow subscribers. + // Use buffered channel to take care of burst sends or slow w.Write() + ch := make(chan interface{}, 2000) + + globalHTTPListen.Subscribe(ch, doneCh, func(entry interface{}) bool { + return true + }) + + keepAliveTicker := time.NewTicker(500 * time.Millisecond) + defer keepAliveTicker.Stop() + + enc := gob.NewEncoder(w) + for { + select { + case ev := <-ch: + if err := enc.Encode(ev); err != nil { + return + } + w.(http.Flusher).Flush() + case <-keepAliveTicker.C: + if err := enc.Encode(&event.Event{}); err != nil { + return + } + w.(http.Flusher).Flush() + } + } +} + // TraceHandler sends http trace messages back to peer rest client func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -1121,6 +1162,7 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundOpsStatus).HandlerFunc(server.BackgroundOpsStatusHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTrace).HandlerFunc(server.TraceHandler) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodListen).HandlerFunc(server.ListenHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLog).HandlerFunc(server.ConsoleLogHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodPutBucketObjectLockConfig).HandlerFunc(httpTraceHdrs(server.PutBucketObjectLockConfigHandler)).Queries(restQueries(peerRESTBucket)...)