Newer noncurrent versions (#13815)

- Rename MaxNoncurrentVersions tag to NewerNoncurrentVersions

Note: We apply overlapping NewerNoncurrentVersions rules such that 
we honor the highest among applicable limits. e.g if 2 overlapping rules 
are configured with 2 and 3 noncurrent versions to be retained, we 
will retain 3.

- Expire newer noncurrent versions after noncurrent days
- MinIO extension: allow noncurrent days to be zero, allowing expiry 
  of noncurrent version as soon as more than configured 
  NewerNoncurrentVersions are present.
- Allow NewerNoncurrentVersions rules on object-locked buckets
- No x-amz-expiration when NewerNoncurrentVersions configured
- ComputeAction should skip rules with NewerNoncurrentVersions > 0
- Add unit tests for lifecycle.ComputeAction
- Support lifecycle rules with MaxNoncurrentVersions
- Extend ExpectedExpiryTime to work with zero days
- Fix all-time comparisons to be relative to UTC
This commit is contained in:
Krishnan Parthasarathi
2021-12-14 09:41:44 -08:00
committed by GitHub
parent 113c7ff49a
commit 44a9339c0a
7 changed files with 248 additions and 83 deletions

View File

@@ -24,7 +24,6 @@ import (
"github.com/gorilla/mux"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/bucket/object/lock"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/bucket/policy"
@@ -80,17 +79,6 @@ func (api objectAPIHandlers) PutBucketLifecycleHandler(w http.ResponseWriter, r
return
}
// Disallow MaxNoncurrentVersions if bucket has object locking enabled
var rCfg lock.Retention
if rCfg, err = globalBucketObjectLockSys.Get(bucket); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if rCfg.LockEnabled && bucketLifecycle.HasMaxNoncurrentVersions() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidLifecycleWithObjectLock), r.URL)
return
}
// Validate the transition storage ARNs
if err = validateTransitionTier(bucketLifecycle); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)

View File

@@ -81,21 +81,21 @@ type expiryTask struct {
}
type expiryState struct {
once sync.Once
byDaysCh chan expiryTask
byMaxNoncurrentCh chan maxNoncurrentTask
once sync.Once
byDaysCh chan expiryTask
byNewerNoncurrentCh chan newerNoncurrentTask
}
// PendingTasks returns the number of pending ILM expiry tasks.
func (es *expiryState) PendingTasks() int {
return len(es.byDaysCh) + len(es.byMaxNoncurrentCh)
return len(es.byDaysCh) + len(es.byNewerNoncurrentCh)
}
// close closes work channels exactly once.
func (es *expiryState) close() {
es.once.Do(func() {
close(es.byDaysCh)
close(es.byMaxNoncurrentCh)
close(es.byNewerNoncurrentCh)
})
}
@@ -109,13 +109,13 @@ func (es *expiryState) enqueueByDays(oi ObjectInfo, restoredObject bool, rmVersi
}
}
// enqueueByMaxNoncurrent enqueues object versions expired by
// MaxNoncurrentVersions limit for expiry.
func (es *expiryState) enqueueByMaxNoncurrent(bucket string, versions []ObjectToDelete) {
// enqueueByNewerNoncurrent enqueues object versions expired by
// NewerNoncurrentVersions limit for expiry.
func (es *expiryState) enqueueByNewerNoncurrent(bucket string, versions []ObjectToDelete) {
select {
case <-GlobalContext.Done():
es.close()
case es.byMaxNoncurrentCh <- maxNoncurrentTask{bucket: bucket, versions: versions}:
case es.byNewerNoncurrentCh <- newerNoncurrentTask{bucket: bucket, versions: versions}:
default:
}
}
@@ -124,8 +124,8 @@ var globalExpiryState *expiryState
func newExpiryState() *expiryState {
return &expiryState{
byDaysCh: make(chan expiryTask, 10000),
byMaxNoncurrentCh: make(chan maxNoncurrentTask, 10000),
byDaysCh: make(chan expiryTask, 10000),
byNewerNoncurrentCh: make(chan newerNoncurrentTask, 10000),
}
}
@@ -141,15 +141,15 @@ func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) {
}
}()
go func() {
for t := range globalExpiryState.byMaxNoncurrentCh {
for t := range globalExpiryState.byNewerNoncurrentCh {
deleteObjectVersions(ctx, objectAPI, t.bucket, t.versions)
}
}()
}
// maxNoncurrentTask encapsulates arguments required by worker to expire objects
// by MaxNoncurrentVersions
type maxNoncurrentTask struct {
// newerNoncurrentTask encapsulates arguments required by worker to expire objects
// by NewerNoncurrentVersions
type newerNoncurrentTask struct {
bucket string
versions []ObjectToDelete
}

View File

@@ -972,14 +972,14 @@ func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, oi O
}
// applyMaxNoncurrentVersionLimit removes noncurrent versions older than the most recent MaxNoncurrentVersions configured.
// applyNewerNoncurrentVersionLimit removes noncurrent versions older than the most recent NewerNoncurrentVersions configured.
// Note: This function doesn't update sizeSummary since it always removes versions that it doesn't return.
func (i *scannerItem) applyMaxNoncurrentVersionLimit(ctx context.Context, o ObjectLayer, fivs []FileInfo) ([]FileInfo, error) {
func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo) ([]FileInfo, error) {
if i.lifeCycle == nil {
return fivs, nil
}
lim := i.lifeCycle.NoncurrentVersionsExpirationLimit(lifecycle.ObjectOpts{Name: i.objectPath()})
_, days, lim := i.lifeCycle.NoncurrentVersionsExpirationLimit(lifecycle.ObjectOpts{Name: i.objectPath()})
if lim == 0 || len(fivs) <= lim+1 { // fewer than lim _noncurrent_ versions
return fivs, nil
}
@@ -992,6 +992,7 @@ func (i *scannerItem) applyMaxNoncurrentVersionLimit(ctx context.Context, o Obje
toDel := make([]ObjectToDelete, 0, len(overflowVersions))
for _, fi := range overflowVersions {
obj := fi.ToObjectInfo(i.bucket, i.objectPath())
// skip versions with object locking enabled
if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj) {
if i.debug {
if obj.VersionID != "" {
@@ -1000,22 +1001,34 @@ func (i *scannerItem) applyMaxNoncurrentVersionLimit(ctx context.Context, o Obje
console.Debugf(applyVersionActionsLogPrefix+" lifecycle: %s is locked, not deleting\n", obj.Name)
}
}
// add this version back to remaining versions for
// subsequent lifecycle policy applications
fivs = append(fivs, fi)
continue
}
// NoncurrentDays not passed yet.
if time.Now().UTC().Before(lifecycle.ExpectedExpiryTime(obj.SuccessorModTime, days)) {
// add this version back to remaining versions for
// subsequent lifecycle policy applications
fivs = append(fivs, fi)
continue
}
toDel = append(toDel, ObjectToDelete{
ObjectName: fi.Name,
VersionID: fi.VersionID,
})
}
globalExpiryState.enqueueByMaxNoncurrent(i.bucket, toDel)
globalExpiryState.enqueueByNewerNoncurrent(i.bucket, toDel)
return fivs, nil
}
// applyVersionActions will apply lifecycle checks on all versions of a scanned item. Returns versions that remain
// after applying lifecycle checks configured.
func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fivs []FileInfo) ([]FileInfo, error) {
return i.applyMaxNoncurrentVersionLimit(ctx, o, fivs)
return i.applyNewerNoncurrentVersionLimit(ctx, o, fivs)
}
// applyActions will apply lifecycle checks on to a scanned item.