From 03dc65e12dd4f9693dadb17760b629c61e7ccb11 Mon Sep 17 00:00:00 2001 From: Poorna Date: Fri, 27 Oct 2023 21:08:53 -0700 Subject: [PATCH] Reload replication targets lazily if missing (#18333) There can be rare situations where errors seen in bucket metadata load on startup or subsequent metadata updates can result in missing replication remotes. Attempt a refresh of remote targets backed by a good replication config lazily in 5 minute intervals if there ever occurs a situation where remote targets go AWOL. --- cmd/bucket-replication-handlers.go | 2 +- cmd/bucket-replication.go | 16 ++--- cmd/bucket-targets.go | 95 +++++++++++++++++++++++++++--- 3 files changed, 96 insertions(+), 17 deletions(-) diff --git a/cmd/bucket-replication-handlers.go b/cmd/bucket-replication-handlers.go index 2e8b080ad..29c066a08 100644 --- a/cmd/bucket-replication-handlers.go +++ b/cmd/bucket-replication-handlers.go @@ -575,7 +575,7 @@ func (api objectAPIHandlers) ValidateBucketReplicationCredsHandler(w http.Respon if rule.Status == replication.Disabled { continue } - clnt := globalBucketTargetSys.GetRemoteTargetClient(rule.Destination.Bucket) + clnt := globalBucketTargetSys.GetRemoteTargetClient(bucket, rule.Destination.Bucket) if clnt == nil { writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrRemoteTargetNotFoundError, fmt.Errorf("replication config with rule ID %s has a stale target", rule.ID)), r.URL) return diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 6ae0db752..28c39bd97 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -113,7 +113,7 @@ func validateReplicationDestination(ctx context.Context, bucket string, rCfg *re if arn.Type != madmin.ReplicationService { return sameTarget, toAPIError(ctx, BucketRemoteArnTypeInvalid{Bucket: bucket}) } - clnt := globalBucketTargetSys.GetRemoteTargetClient(arnStr) + clnt := globalBucketTargetSys.GetRemoteTargetClient(bucket, arnStr) if clnt == nil { return sameTarget, toAPIError(ctx, BucketRemoteTargetNotFound{Bucket: bucket}) } @@ -138,7 +138,7 @@ func validateReplicationDestination(ctx context.Context, bucket string, rCfg *re } } // validate replication ARN against target endpoint - c := globalBucketTargetSys.GetRemoteTargetClient(arnStr) + c := globalBucketTargetSys.GetRemoteTargetClient(bucket, arnStr) if c != nil { if err := checkRemoteEndpoint(ctx, c.EndpointURL()); err != nil { switch err.(type) { @@ -280,7 +280,7 @@ func mustReplicate(ctx context.Context, bucket, object string, mopts mustReplica } tgtArns := cfg.FilterTargetArns(opts) for _, tgtArn := range tgtArns { - tgt := globalBucketTargetSys.GetRemoteTargetClient(tgtArn) + tgt := globalBucketTargetSys.GetRemoteTargetClient(bucket, tgtArn) // the target online status should not be used here while deciding // whether to replicate as the target could be temporarily down opts.TargetArn = tgtArn @@ -383,7 +383,7 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet continue } } - tgt := globalBucketTargetSys.GetRemoteTargetClient(tgtArn) + tgt := globalBucketTargetSys.GetRemoteTargetClient(bucket, tgtArn) // the target online status should not be used here while deciding // whether to replicate deletes as the target could be temporarily down tgtDsc := newReplicateTargetDecision(tgtArn, false, false) @@ -495,7 +495,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj if dobj.TargetArn != "" && dobj.TargetArn != tgtEntry.Arn { continue } - tgtClnt := globalBucketTargetSys.GetRemoteTargetClient(tgtEntry.Arn) + tgtClnt := globalBucketTargetSys.GetRemoteTargetClient(bucket, tgtEntry.Arn) if tgtClnt == nil { // Skip stale targets if any and log them to be missing atleast once. logger.LogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtEntry.Arn), tgtEntry.Arn) @@ -1023,7 +1023,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje var wg sync.WaitGroup var mu sync.Mutex for _, tgtArn := range tgtArns { - tgt := globalBucketTargetSys.GetRemoteTargetClient(tgtArn) + tgt := globalBucketTargetSys.GetRemoteTargetClient(bucket, tgtArn) if tgt == nil { logger.LogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn), tgtArn) sendEvent(eventArgs{ @@ -2188,7 +2188,7 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, rs *HTTPRa return nil, oi, proxy } for _, t := range proxyTargets.Targets { - tgt = globalBucketTargetSys.GetRemoteTargetClient(t.Arn) + tgt = globalBucketTargetSys.GetRemoteTargetClient(bucket, t.Arn) if tgt == nil || globalBucketTargetSys.isOffline(tgt.EndpointURL()) { continue } @@ -2560,7 +2560,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object logger.LogIf(ctx, fmt.Errorf("replication resync failed for %s - arn specified %s is missing in the replication config", opts.bucket, opts.arn)) return } - tgt := globalBucketTargetSys.GetRemoteTargetClient(opts.arn) + tgt := globalBucketTargetSys.GetRemoteTargetClient(opts.bucket, opts.arn) if tgt == nil { logger.LogIf(ctx, fmt.Errorf("replication resync failed for %s - target could not be created for arn %s", opts.bucket, opts.arn)) return diff --git a/cmd/bucket-targets.go b/cmd/bucket-targets.go index 53aa5878d..0b3f07a8e 100644 --- a/cmd/bucket-targets.go +++ b/cmd/bucket-targets.go @@ -40,14 +40,29 @@ const ( defaultHealthCheckReloadDuration = 30 * time.Minute ) +type arnTarget struct { + Client *TargetClient + lastRefresh time.Time +} + +// arnErrs represents number of errors seen for a ARN and if update is in progress +// to refresh remote targets from bucket metadata. +type arnErrs struct { + count int64 + updateInProgress bool + bucket string +} + // BucketTargetSys represents bucket targets subsystem type BucketTargetSys struct { sync.RWMutex - arnRemotesMap map[string]*TargetClient + arnRemotesMap map[string]arnTarget targetsMap map[string][]madmin.BucketTarget hMutex sync.RWMutex hc map[string]epHealth hcClient *madmin.AnonymousClient + aMutex sync.RWMutex + arnErrsMap map[string]arnErrs // map of ARN to error count of failures to get target } type latencyStat struct { @@ -364,7 +379,7 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m } sys.targetsMap[bucket] = newtgts - sys.arnRemotesMap[tgt.Arn] = clnt + sys.arnRemotesMap[tgt.Arn] = arnTarget{Client: clnt} sys.updateBandwidthLimit(bucket, tgt.Arn, tgt.BandwidthLimit) return nil } @@ -432,11 +447,71 @@ func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr str return nil } +func (sys *BucketTargetSys) markRefreshInProgress(bucket, arn string) { + sys.aMutex.Lock() + defer sys.aMutex.Unlock() + if v, ok := sys.arnErrsMap[arn]; !ok { + sys.arnErrsMap[arn] = arnErrs{ + updateInProgress: true, + count: v.count + 1, + bucket: bucket, + } + } +} + +func (sys *BucketTargetSys) markRefreshDone(bucket, arn string) { + sys.aMutex.Lock() + defer sys.aMutex.Unlock() + if v, ok := sys.arnErrsMap[arn]; ok { + sys.arnErrsMap[arn] = arnErrs{ + updateInProgress: false, + count: v.count, + bucket: bucket, + } + } +} + +func (sys *BucketTargetSys) isReloadingTarget(bucket, arn string) bool { + sys.aMutex.RLock() + defer sys.aMutex.RUnlock() + if v, ok := sys.arnErrsMap[arn]; ok { + return v.updateInProgress + } + return false +} + +func (sys *BucketTargetSys) incTargetErr(bucket, arn string) { + sys.aMutex.Lock() + defer sys.aMutex.Unlock() + if v, ok := sys.arnErrsMap[arn]; ok { + sys.arnErrsMap[arn] = arnErrs{ + updateInProgress: v.updateInProgress, + count: v.count + 1, + } + } +} + // GetRemoteTargetClient returns minio-go client for replication target instance -func (sys *BucketTargetSys) GetRemoteTargetClient(arn string) *TargetClient { +func (sys *BucketTargetSys) GetRemoteTargetClient(bucket, arn string) *TargetClient { sys.RLock() - defer sys.RUnlock() - return sys.arnRemotesMap[arn] + tgt := sys.arnRemotesMap[arn] + sys.RUnlock() + + if tgt.Client != nil { + return tgt.Client + } + defer func() { // lazy refresh remote targets + if tgt.Client == nil && !sys.isReloadingTarget(bucket, arn) && (tgt.lastRefresh.Equal(timeSentinel) || tgt.lastRefresh.Before(UTCNow().Add(-5*time.Minute))) { + tgts, err := globalBucketMetadataSys.GetBucketTargetsConfig(bucket) + if err == nil { + sys.markRefreshInProgress(bucket, arn) + sys.UpdateAllTargets(bucket, tgts) + sys.markRefreshDone(bucket, arn) + } + } + sys.incTargetErr(bucket, arn) + }() + return nil } // GetRemoteBucketTargetByArn returns BucketTarget for a ARN @@ -457,8 +532,9 @@ func (sys *BucketTargetSys) GetRemoteBucketTargetByArn(ctx context.Context, buck // NewBucketTargetSys - creates new replication system. func NewBucketTargetSys(ctx context.Context) *BucketTargetSys { sys := &BucketTargetSys{ - arnRemotesMap: make(map[string]*TargetClient), + arnRemotesMap: make(map[string]arnTarget), targetsMap: make(map[string][]madmin.BucketTarget), + arnErrsMap: make(map[string]arnErrs), hc: make(map[string]epHealth), hcClient: newHCClient(), } @@ -502,7 +578,10 @@ func (sys *BucketTargetSys) UpdateAllTargets(bucket string, tgts *madmin.BucketT if err != nil { continue } - sys.arnRemotesMap[tgt.Arn] = tgtClient + sys.arnRemotesMap[tgt.Arn] = arnTarget{ + Client: tgtClient, + lastRefresh: UTCNow(), + } sys.updateBandwidthLimit(bucket, tgt.Arn, tgt.BandwidthLimit) } @@ -526,7 +605,7 @@ func (sys *BucketTargetSys) set(bucket BucketInfo, meta BucketMetadata) { logger.LogIf(GlobalContext, err) continue } - sys.arnRemotesMap[tgt.Arn] = tgtClient + sys.arnRemotesMap[tgt.Arn] = arnTarget{Client: tgtClient} sys.updateBandwidthLimit(bucket.Name, tgt.Arn, tgt.BandwidthLimit) } sys.targetsMap[bucket.Name] = cfg.Targets