From 132e7413ba5464547f5c1f0bdf400a7777e88b72 Mon Sep 17 00:00:00 2001 From: jiuker <2818723467@qq.com> Date: Sat, 27 Jul 2024 01:27:42 +0800 Subject: [PATCH] fix: check once ready for site-replication (#20149) --- cmd/bucket-replication-handlers.go | 4 +-- cmd/bucket-replication.go | 54 ++++++++++++++++++++---------- cmd/site-replication.go | 9 ++--- 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/cmd/bucket-replication-handlers.go b/cmd/bucket-replication-handlers.go index 5316d473f..f593f7666 100644 --- a/cmd/bucket-replication-handlers.go +++ b/cmd/bucket-replication-handlers.go @@ -75,7 +75,7 @@ func (api objectAPIHandlers) PutBucketReplicationConfigHandler(w http.ResponseWr writeErrorResponse(ctx, w, apiErr, r.URL) return } - sameTarget, apiErr := validateReplicationDestination(ctx, bucket, replicationConfig, true) + sameTarget, apiErr := validateReplicationDestination(ctx, bucket, replicationConfig, &validateReplicationDestinationOptions{CheckRemoteBucket: true}) if apiErr != noError { writeErrorResponse(ctx, w, apiErr, r.URL) return @@ -559,7 +559,7 @@ func (api objectAPIHandlers) ValidateBucketReplicationCredsHandler(w http.Respon lockEnabled = lcfg.Enabled() } - sameTarget, apiErr := validateReplicationDestination(ctx, bucket, replicationConfig, true) + sameTarget, apiErr := validateReplicationDestination(ctx, bucket, replicationConfig, &validateReplicationDestinationOptions{CheckRemoteBucket: true}) if apiErr != noError { writeErrorResponse(ctx, w, apiErr, r.URL) return diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 93bef14cd..91566c6a2 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -91,7 +91,10 @@ func getReplicationConfig(ctx context.Context, bucketName string) (rc *replicati // validateReplicationDestination returns error if replication destination bucket missing or not configured // It also returns true if replication destination is same as this server. -func validateReplicationDestination(ctx context.Context, bucket string, rCfg *replication.Config, checkRemote bool) (bool, APIError) { +func validateReplicationDestination(ctx context.Context, bucket string, rCfg *replication.Config, opts *validateReplicationDestinationOptions) (bool, APIError) { + if opts == nil { + opts = &validateReplicationDestinationOptions{} + } var arns []string if rCfg.RoleArn != "" { arns = append(arns, rCfg.RoleArn) @@ -113,7 +116,7 @@ func validateReplicationDestination(ctx context.Context, bucket string, rCfg *re if clnt == nil { return sameTarget, toAPIError(ctx, BucketRemoteTargetNotFound{Bucket: bucket}) } - if checkRemote { // validate remote bucket + if opts.CheckRemoteBucket { // validate remote bucket found, err := clnt.BucketExists(ctx, arn.Bucket) if err != nil { return sameTarget, errorCodes.ToAPIErrWithErr(ErrRemoteDestinationNotFoundError, err) @@ -133,23 +136,29 @@ func validateReplicationDestination(ctx context.Context, bucket string, rCfg *re } } } + // if checked bucket, then check the ready is unnecessary + if !opts.CheckRemoteBucket && opts.CheckReady { + endpoint := clnt.EndpointURL().String() + if errInt, ok := opts.checkReadyErr.Load(endpoint); !ok { + err = checkRemoteEndpoint(ctx, clnt.EndpointURL()) + opts.checkReadyErr.Store(endpoint, err) + } else { + if errInt == nil { + err = nil + } else { + err = errInt.(error) + } + } + switch err.(type) { + case BucketRemoteIdenticalToSource: + return true, errorCodes.ToAPIErrWithErr(ErrBucketRemoteIdenticalToSource, fmt.Errorf("remote target endpoint %s is self referential", clnt.EndpointURL().String())) + default: + } + } // validate replication ARN against target endpoint - c := globalBucketTargetSys.GetRemoteTargetClient(bucket, arnStr) - if c != nil { - if err := checkRemoteEndpoint(ctx, c.EndpointURL()); err != nil { - switch err.(type) { - case BucketRemoteIdenticalToSource: - return true, errorCodes.ToAPIErrWithErr(ErrBucketRemoteIdenticalToSource, fmt.Errorf("remote target endpoint %s is self referential", c.EndpointURL().String())) - default: - } - } - if c.EndpointURL().String() == clnt.EndpointURL().String() { - selfTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort) - if !sameTarget { - sameTarget = selfTarget - } - continue - } + selfTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort) + if !sameTarget { + sameTarget = selfTarget } } @@ -3741,3 +3750,12 @@ func (p *ReplicationPool) getMRF(ctx context.Context, bucket string) (ch <-chan return mrfCh, nil } + +// validateReplicationDestinationOptions is used to configure the validation of the replication destination. +// validateReplicationDestination uses this to configure the validation. +type validateReplicationDestinationOptions struct { + CheckRemoteBucket bool + CheckReady bool + + checkReadyErr sync.Map +} diff --git a/cmd/site-replication.go b/cmd/site-replication.go index d61fd21b5..48fa78a3d 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -1129,7 +1129,7 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context, if err != nil { return err } - sameTarget, apiErr := validateReplicationDestination(ctx, bucket, newReplicationConfig, true) + sameTarget, apiErr := validateReplicationDestination(ctx, bucket, newReplicationConfig, &validateReplicationDestinationOptions{CheckRemoteBucket: true}) if apiErr != noError { return fmt.Errorf("bucket replication config validation error: %#v", apiErr) } @@ -4453,6 +4453,7 @@ func (c *SiteReplicationSys) healBuckets(ctx context.Context, objAPI ObjectLayer return err } ilmExpiryCfgHealed := false + opts := validateReplicationDestinationOptions{CheckReady: true} for _, bi := range buckets { bucket := bi.Name info, err := c.siteReplicationStatus(ctx, objAPI, madmin.SRStatusOptions{ @@ -4472,7 +4473,7 @@ func (c *SiteReplicationSys) healBuckets(ctx context.Context, objAPI ObjectLayer c.healVersioningMetadata(ctx, objAPI, bucket, info) c.healOLockConfigMetadata(ctx, objAPI, bucket, info) c.healSSEMetadata(ctx, objAPI, bucket, info) - c.healBucketReplicationConfig(ctx, objAPI, bucket, info) + c.healBucketReplicationConfig(ctx, objAPI, bucket, info, &opts) c.healBucketPolicies(ctx, objAPI, bucket, info) c.healTagMetadata(ctx, objAPI, bucket, info) c.healBucketQuotaConfig(ctx, objAPI, bucket, info) @@ -5172,7 +5173,7 @@ func (c *SiteReplicationSys) healBucket(ctx context.Context, objAPI ObjectLayer, return nil } -func (c *SiteReplicationSys) healBucketReplicationConfig(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error { +func (c *SiteReplicationSys) healBucketReplicationConfig(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo, opts *validateReplicationDestinationOptions) error { bs := info.BucketStats[bucket] c.RLock() @@ -5226,7 +5227,7 @@ func (c *SiteReplicationSys) healBucketReplicationConfig(ctx context.Context, ob if rcfg != nil && !replMismatch { // validate remote targets on current cluster for this bucket - _, apiErr := validateReplicationDestination(ctx, bucket, rcfg, false) + _, apiErr := validateReplicationDestination(ctx, bucket, rcfg, opts) if apiErr != noError { replMismatch = true }