diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 5e758d848..1a82e1005 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -608,7 +608,7 @@ func (z *erasureServerPools) IsDecommissionRunning() bool { return false } -func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) { +func (z *erasureServerPools) decommissionObject(ctx context.Context, idx int, bucket string, gr *GetObjectReader) (err error) { objInfo := gr.ObjInfo defer func() { @@ -623,9 +623,11 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri if objInfo.isMultipart() { res, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name, ObjectOptions{ - VersionID: objInfo.VersionID, - UserDefined: objInfo.UserDefined, - NoAuditLog: true, + VersionID: objInfo.VersionID, + UserDefined: objInfo.UserDefined, + NoAuditLog: true, + SrcPoolIdx: idx, + DataMovement: true, }) if err != nil { return fmt.Errorf("decommissionObject: NewMultipartUpload() %w", err) @@ -660,6 +662,7 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri } } _, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, parts, ObjectOptions{ + SrcPoolIdx: idx, DataMovement: true, MTime: objInfo.ModTime, NoAuditLog: true, @@ -681,6 +684,7 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri NewPutObjReader(hr), ObjectOptions{ DataMovement: true, + SrcPoolIdx: idx, VersionID: objInfo.VersionID, MTime: objInfo.ModTime, UserDefined: objInfo.UserDefined, @@ -855,6 +859,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool versionID = nullVersionID } + var failure, ignore bool if version.Deleted { _, err := z.DeleteObject(ctx, bi.Name, @@ -867,14 +872,19 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool VersionID: versionID, MTime: version.ModTime, DeleteReplication: version.ReplicationState, + SrcPoolIdx: idx, + DataMovement: true, DeleteMarker: true, // make sure we create a delete marker SkipDecommissioned: true, // make sure we skip the decommissioned pool NoAuditLog: true, }) - var failure bool if err != nil { - if isErrObjectNotFound(err) || isErrVersionNotFound(err) { - err = nil + // This can happen when rebalance stop races with ongoing rebalance workers. + // These rebalance failures can be ignored. + if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) { + ignore = true + stopFn(0, nil) + continue } } stopFn(version.Size, err) @@ -893,22 +903,26 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool continue } - var failure, ignore bool // gr.Close() is ensured by decommissionObject(). for try := 0; try < 3; try++ { if version.IsRemote() { if err := z.DecomTieredObject(ctx, bi.Name, version.Name, version, ObjectOptions{ - VersionID: versionID, - MTime: version.ModTime, - UserDefined: version.Metadata, + VersionID: versionID, + MTime: version.ModTime, + UserDefined: version.Metadata, + SrcPoolIdx: idx, + DataMovement: true, }); err != nil { - stopFn(version.Size, err) - failure = true - decomLogIf(ctx, err) - continue + if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) { + ignore = true + stopFn(0, nil) + } + } + if !ignore { + stopFn(version.Size, err) + failure = err != nil + decomLogIf(ctx, err) } - stopFn(version.Size, nil) - failure = false break } gr, err := set.GetObjectNInfo(ctx, @@ -925,7 +939,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool if isErrObjectNotFound(err) || isErrVersionNotFound(err) { // object deleted by the application, nothing to do here we move on. ignore = true - stopFn(version.Size, nil) + stopFn(0, nil) break } if err != nil && !ignore { @@ -943,7 +957,12 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool stopFn(version.Size, err) continue } - if err = z.decommissionObject(ctx, bi.Name, gr); err != nil { + if err = z.decommissionObject(ctx, idx, bi.Name, gr); err != nil { + if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) { + ignore = true + stopFn(0, nil) + break + } stopFn(version.Size, err) failure = true decomLogIf(ctx, err) diff --git a/cmd/erasure-server-pool-rebalance.go b/cmd/erasure-server-pool-rebalance.go index d13456b48..749cf4605 100644 --- a/cmd/erasure-server-pool-rebalance.go +++ b/cmd/erasure-server-pool-rebalance.go @@ -626,14 +626,18 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, var rebalanced, expired int for _, version := range fivs.Versions { + stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceObject, poolIdx, bucket, version.Name, version.VersionID) + // Skip transitioned objects for now. TBD if version.IsRemote() { + stopFn(version.Size, errors.New("ILM Tiered version will be skipped for now")) continue } // Apply lifecycle rules on the objects that are expired. if filterLifecycle(bucket, version.Name, version) { expired++ + stopFn(version.Size, errors.New("ILM expired object/version will be skipped")) continue } @@ -643,6 +647,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, remainingVersions := len(fivs.Versions) - expired if version.Deleted && remainingVersions == 1 { rebalanced++ + stopFn(version.Size, errors.New("DELETE marked object with no other non-current versions will be skipped")) continue } @@ -651,6 +656,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, versionID = nullVersionID } + var failure, ignore bool if version.Deleted { _, err := z.DeleteObject(ctx, bucket, @@ -660,16 +666,26 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, VersionID: versionID, MTime: version.ModTime, DeleteReplication: version.ReplicationState, + SrcPoolIdx: poolIdx, + DataMovement: true, DeleteMarker: true, // make sure we create a delete marker SkipRebalancing: true, // make sure we skip the decommissioned pool NoAuditLog: true, }) - var failure bool - if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { - rebalanceLogIf(ctx, err) - failure = true + // This can happen when rebalance stop races with ongoing rebalance workers. + // These rebalance failures can be ignored. + if err != nil { + // This can happen when rebalance stop races with ongoing rebalance workers. + // These rebalance failures can be ignored. + if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) { + ignore = true + stopFn(0, nil) + continue + } } - + stopFn(version.Size, err) + rebalanceLogIf(ctx, err) + failure = err != nil if !failure { z.updatePoolStats(poolIdx, bucket, version) rebalanced++ @@ -678,10 +694,8 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, continue } - var failure, ignore bool for try := 0; try < 3; try++ { // GetObjectReader.Close is called by rebalanceObject - stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceObject, poolIdx, bucket, version.Name, version.VersionID) gr, err := set.GetObjectNInfo(ctx, bucket, encodeDirObject(version.Name), @@ -709,9 +723,10 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, if err = z.rebalanceObject(ctx, poolIdx, bucket, gr); err != nil { // This can happen when rebalance stop races with ongoing rebalance workers. // These rebalance failures can be ignored. - if isDataMovementOverWriteErr(err) { + if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) { ignore = true - continue + stopFn(0, nil) + break } failure = true rebalanceLogIf(ctx, err) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index cf41b128d..55dc5f890 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1147,6 +1147,16 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob return pinfo.ObjInfo, nil } + // Datamovement must never be allowed on the same pool. + if opts.DataMovement && opts.SrcPoolIdx == pinfo.Index { + return pinfo.ObjInfo, DataMovementOverwriteErr{ + Bucket: bucket, + Object: object, + VersionID: opts.VersionID, + Err: errDataMovementSrcDstPoolSame, + } + } + // Delete concurrently in all server pools with read quorum error for unversioned objects. if len(noReadQuorumPools) > 0 && !opts.Versioned && !opts.VersionSuspended { return z.deleteObjectFromAllPools(ctx, bucket, object, opts, noReadQuorumPools) @@ -2797,5 +2807,14 @@ func (z *erasureServerPools) DecomTieredObject(ctx context.Context, bucket, obje return err } + if opts.DataMovement && idx == opts.SrcPoolIdx { + return DataMovementOverwriteErr{ + Bucket: bucket, + Object: object, + VersionID: opts.VersionID, + Err: errDataMovementSrcDstPoolSame, + } + } + return z.serverPools[idx].DecomTieredObject(ctx, bucket, object, fi, opts) }