diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index b44e90442..83b731bd0 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -2255,7 +2255,7 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, rs *HTTPRa if rs != nil { h, err := rs.ToHeader() if err != nil { - logger.LogIf(ctx, fmt.Errorf("Invalid range header for %s/%s(%s) - %w", bucket, object, opts.VersionID, err)) + logger.LogIf(ctx, fmt.Errorf("invalid range header for %s/%s(%s) - %w", bucket, object, opts.VersionID, err)) continue } gopts.Set(xhttp.Range, h) @@ -2348,6 +2348,127 @@ func scheduleReplication(ctx context.Context, oi ObjectInfo, o ObjectLayer, dsc } } +// proxyTaggingToRepTarget proxies tagging requests to remote targets for +// active-active replicated setups +func proxyTaggingToRepTarget(ctx context.Context, bucket, object string, tags *tags.Tags, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (proxy proxyResult) { + // this option is set when active-active replication is in place between site A -> B, + // and request hits site B that does not have the object yet. + if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header + return proxy + } + var wg sync.WaitGroup + errs := make([]error, len(proxyTargets.Targets)) + for idx, t := range proxyTargets.Targets { + tgt := globalBucketTargetSys.GetRemoteTargetClient(bucket, t.Arn) + if tgt == nil || globalBucketTargetSys.isOffline(tgt.EndpointURL()) { + continue + } + // if proxying explicitly disabled on remote target + if tgt.disableProxy { + continue + } + idx := idx + wg.Add(1) + go func(idx int, tgt *TargetClient) { + defer wg.Done() + var err error + if tags != nil { + popts := minio.PutObjectTaggingOptions{ + VersionID: opts.VersionID, + Internal: minio.AdvancedObjectTaggingOptions{ + ReplicationProxyRequest: "true", + }, + } + err = tgt.PutObjectTagging(ctx, tgt.Bucket, object, tags, popts) + } else { + dopts := minio.RemoveObjectTaggingOptions{ + VersionID: opts.VersionID, + Internal: minio.AdvancedObjectTaggingOptions{ + ReplicationProxyRequest: "true", + }, + } + err = tgt.RemoveObjectTagging(ctx, tgt.Bucket, object, dopts) + } + if err != nil { + errs[idx] = err + } + }(idx, tgt) + } + wg.Wait() + + var ( + terr error + taggedCount int + ) + for _, err := range errs { + if err == nil { + taggedCount++ + continue + } + errCode := minio.ToErrorResponse(err).Code + if errCode != "NoSuchKey" && errCode != "NoSuchVersion" { + terr = err + } + } + // don't return error if at least one target was tagged successfully + if taggedCount == 0 && terr != nil { + proxy.Err = terr + } + return proxy +} + +// proxyGetTaggingToRepTarget proxies get tagging requests to remote targets for +// active-active replicated setups +func proxyGetTaggingToRepTarget(ctx context.Context, bucket, object string, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (tgs *tags.Tags, proxy proxyResult) { + // this option is set when active-active replication is in place between site A -> B, + // and request hits site B that does not have the object yet. + if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header + return nil, proxy + } + var wg sync.WaitGroup + errs := make([]error, len(proxyTargets.Targets)) + tagSlc := make([]map[string]string, len(proxyTargets.Targets)) + for idx, t := range proxyTargets.Targets { + tgt := globalBucketTargetSys.GetRemoteTargetClient(bucket, t.Arn) + if tgt == nil || globalBucketTargetSys.isOffline(tgt.EndpointURL()) { + continue + } + // if proxying explicitly disabled on remote target + if tgt.disableProxy { + continue + } + idx := idx + wg.Add(1) + go func(idx int, tgt *TargetClient) { + defer wg.Done() + var err error + gopts := minio.GetObjectTaggingOptions{ + VersionID: opts.VersionID, + Internal: minio.AdvancedObjectTaggingOptions{ + ReplicationProxyRequest: "true", + }, + } + tgs, err = tgt.GetObjectTagging(ctx, tgt.Bucket, object, gopts) + if err != nil { + errs[idx] = err + } else { + tagSlc[idx] = tgs.ToMap() + } + }(idx, tgt) + } + wg.Wait() + for idx, err := range errs { + errCode := minio.ToErrorResponse(err).Code + if err != nil && errCode != "NoSuchKey" && errCode != "NoSuchVersion" { + return nil, proxyResult{Err: err} + } + if err == nil { + tgs, _ = tags.MapToObjectTags(tagSlc[idx]) + } + } + return tgs, proxy +} + func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer) { globalReplicationPool.queueReplicaDeleteTask(dv) for arn := range dv.ReplicationState.Targets { diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index a51f5b74a..799e6b722 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -3304,8 +3304,25 @@ func (api objectAPIHandlers) GetObjectTaggingHandler(w http.ResponseWriter, r *h ot, err := objAPI.GetObjectTags(ctx, bucket, object, opts) if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return + // if object/version is not found locally, but exists on peer site - proxy + // the tagging request to peer site. The response to client will + // return tags from peer site. + if isErrObjectNotFound(err) || isErrVersionNotFound(err) { + proxytgts := getProxyTargets(ctx, bucket, object, opts) + if !proxytgts.Empty() { + // proxy to replication target if site replication is in place. + tags, gerr := proxyGetTaggingToRepTarget(ctx, bucket, object, opts, proxytgts) + if gerr.Err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, gerr.Err), r.URL) + return + } // overlay tags from peer site. + ot = tags + w.Header()[xhttp.MinIOTaggingProxied] = []string{"true"} // indicate that the request was proxied. + } + } else { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } } // Set this such that authorization policies can be applied on the object tags. @@ -3385,6 +3402,33 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h objInfo, err := objAPI.GetObjectInfo(ctx, bucket, object, opts) if err != nil { + // if object is not found locally, but exists on peer site - proxy + // the tagging request to peer site. The response to client will + // be 200 with extra header indicating that the request was proxied. + if isErrObjectNotFound(err) || isErrVersionNotFound(err) { + proxytgts := getProxyTargets(ctx, bucket, object, opts) + if !proxytgts.Empty() { + // proxy to replication target if site replication is in place. + perr := proxyTaggingToRepTarget(ctx, bucket, object, tags, opts, proxytgts) + if perr.Err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, perr.Err), r.URL) + return + } + w.Header()[xhttp.MinIOTaggingProxied] = []string{"true"} + writeSuccessResponseHeadersOnly(w) + // when tagging is proxied, the object version is not available to return + // as header in the response, or ObjectInfo in the notification event. + sendEvent(eventArgs{ + EventName: event.ObjectCreatedPutTagging, + BucketName: bucket, + ReqParams: extractReqParams(r), + RespElements: extractRespElements(w), + UserAgent: r.UserAgent(), + Host: handlers.GetSourceIP(r), + }) + return + } + } writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return } @@ -3453,6 +3497,34 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r oi, err := objAPI.GetObjectInfo(ctx, bucket, object, opts) if err != nil { + // if object is not found locally, but exists on peer site - proxy + // the tagging request to peer site. The response to client will + // be 200 OK with extra header indicating that the request was proxied. + if isErrObjectNotFound(err) || isErrVersionNotFound(err) { + proxytgts := getProxyTargets(ctx, bucket, object, opts) + if !proxytgts.Empty() { + // proxy to replication target if active-active replication is in place. + perr := proxyTaggingToRepTarget(ctx, bucket, object, nil, opts, proxytgts) + if perr.Err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, perr.Err), r.URL) + return + } + // when delete tagging is proxied, the object version/tags are not available to return + // as header in the response, nor ObjectInfo in the notification event. + w.Header()[xhttp.MinIOTaggingProxied] = []string{"true"} + writeSuccessNoContent(w) + sendEvent(eventArgs{ + EventName: event.ObjectCreatedDeleteTagging, + BucketName: bucket, + Object: oi, + ReqParams: extractReqParams(r), + RespElements: extractRespElements(w), + UserAgent: r.UserAgent(), + Host: handlers.GetSourceIP(r), + }) + return + } + } writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return } diff --git a/internal/http/headers.go b/internal/http/headers.go index 2900a2cb5..7206dc3c6 100644 --- a/internal/http/headers.go +++ b/internal/http/headers.go @@ -227,6 +227,10 @@ const ( MinIOSourceObjectRetentionTimestamp = "X-Minio-Source-Replication-Retention-Timestamp" // Header indiicates last rtention update time on source MinIOSourceObjectLegalHoldTimestamp = "X-Minio-Source-Replication-LegalHold-Timestamp" + // Header indicates a Tag operation was performed on one/more peers successfully, though the + // current cluster does not have the object yet. This is in a site/bucket replication scenario. + MinIOTaggingProxied = "X-Minio-Tagging-Proxied" + // predicted date/time of transition MinIOTransition = "X-Minio-Transition" MinIOLifecycleCfgUpdatedAt = "X-Minio-LifecycleConfig-UpdatedAt"