diff --git a/cmd/batch-expire.go b/cmd/batch-expire.go index ac0fac773..3a39f473c 100644 --- a/cmd/batch-expire.go +++ b/cmd/batch-expire.go @@ -27,6 +27,7 @@ import ( "net/http" "runtime" "strconv" + "strings" "time" "github.com/minio/minio-go/v7/pkg/tags" @@ -281,7 +282,7 @@ type BatchJobExpire struct { line, col int APIVersion string `yaml:"apiVersion" json:"apiVersion"` Bucket string `yaml:"bucket" json:"bucket"` - Prefix string `yaml:"prefix" json:"prefix"` + Prefix BatchJobPrefix `yaml:"prefix" json:"prefix"` NotificationCfg BatchJobNotification `yaml:"notify" json:"notify"` Retry BatchJobRetry `yaml:"retry" json:"retry"` Rules []BatchJobExpireFilter `yaml:"rules" json:"rules"` @@ -535,18 +536,29 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo return err } - ctx, cancel := context.WithCancel(ctx) - defer cancel() + ctx, cancelCause := context.WithCancelCause(ctx) + defer cancelCause(nil) results := make(chan itemOrErr[ObjectInfo], workerSize) - if err := api.Walk(ctx, r.Bucket, r.Prefix, results, WalkOptions{ - Marker: lastObject, - LatestOnly: false, // we need to visit all versions of the object to implement purge: retainVersions - VersionsSort: WalkVersionsSortDesc, - }); err != nil { - // Do not need to retry if we can't list objects on source. - return err - } + go func() { + for _, prefix := range r.Prefix.F() { + prefixResultCh := make(chan itemOrErr[ObjectInfo], workerSize) + err := api.Walk(ctx, r.Bucket, strings.TrimSpace(prefix), prefixResultCh, WalkOptions{ + Marker: lastObject, + LatestOnly: false, // we need to visit all versions of the object to implement purge: retainVersions + VersionsSort: WalkVersionsSortDesc, + }) + if err != nil { + cancelCause(err) + xioutil.SafeClose(results) + return + } + for result := range prefixResultCh { + results <- result + } + } + xioutil.SafeClose(results) + }() // Goroutine to periodically save batch-expire job's in-memory state saverQuitCh := make(chan struct{}) @@ -657,6 +669,10 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo ObjectInfo: result.Item, }) } + if context.Cause(ctx) != nil { + xioutil.SafeClose(expireCh) + return context.Cause(ctx) + } // Send any remaining objects downstream if len(toDel) > 0 { select { diff --git a/cmd/batch-expire_gen.go b/cmd/batch-expire_gen.go index ccfb6b29e..3bdac35ba 100644 --- a/cmd/batch-expire_gen.go +++ b/cmd/batch-expire_gen.go @@ -39,7 +39,7 @@ func (z *BatchJobExpire) DecodeMsg(dc *msgp.Reader) (err error) { return } case "Prefix": - z.Prefix, err = dc.ReadString() + err = z.Prefix.DecodeMsg(dc) if err != nil { err = msgp.WrapError(err, "Prefix") return @@ -114,7 +114,7 @@ func (z *BatchJobExpire) EncodeMsg(en *msgp.Writer) (err error) { if err != nil { return } - err = en.WriteString(z.Prefix) + err = z.Prefix.EncodeMsg(en) if err != nil { err = msgp.WrapError(err, "Prefix") return @@ -171,7 +171,11 @@ func (z *BatchJobExpire) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendString(o, z.Bucket) // string "Prefix" o = append(o, 0xa6, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78) - o = msgp.AppendString(o, z.Prefix) + o, err = z.Prefix.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } // string "NotificationCfg" o = append(o, 0xaf, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x66, 0x67) o, err = z.NotificationCfg.MarshalMsg(o) @@ -230,7 +234,7 @@ func (z *BatchJobExpire) UnmarshalMsg(bts []byte) (o []byte, err error) { return } case "Prefix": - z.Prefix, bts, err = msgp.ReadStringBytes(bts) + bts, err = z.Prefix.UnmarshalMsg(bts) if err != nil { err = msgp.WrapError(err, "Prefix") return @@ -280,7 +284,7 @@ func (z *BatchJobExpire) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *BatchJobExpire) Msgsize() (s int) { - s = 1 + 11 + msgp.StringPrefixSize + len(z.APIVersion) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 16 + z.NotificationCfg.Msgsize() + 6 + z.Retry.Msgsize() + 6 + msgp.ArrayHeaderSize + s = 1 + 11 + msgp.StringPrefixSize + len(z.APIVersion) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + z.Prefix.Msgsize() + 16 + z.NotificationCfg.Msgsize() + 6 + z.Retry.Msgsize() + 6 + msgp.ArrayHeaderSize for za0001 := range z.Rules { s += z.Rules[za0001].Msgsize() } diff --git a/cmd/batch-expire_test.go b/cmd/batch-expire_test.go index a5335e1e5..18f7150dd 100644 --- a/cmd/batch-expire_test.go +++ b/cmd/batch-expire_test.go @@ -18,6 +18,7 @@ package cmd import ( + "slices" "testing" "gopkg.in/yaml.v3" @@ -68,4 +69,57 @@ expire: # Expire objects that match a condition if err != nil { t.Fatal("Failed to parse batch-job-expire yaml", err) } + if !slices.Equal(job.Expire.Prefix.F(), []string{"myprefix"}) { + t.Fatal("Failed to parse batch-job-expire yaml") + } + + multiPrefixExpireYaml := ` +expire: # Expire objects that match a condition + apiVersion: v1 + bucket: mybucket # Bucket where this batch job will expire matching objects from + prefix: # (Optional) Prefix under which this job will expire objects matching the rules below. + - myprefix + - myprefix1 + rules: + - type: object # regular objects with zero or more older versions + name: NAME # match object names that satisfy the wildcard expression. + olderThan: 7d10h # match objects older than this value + createdBefore: "2006-01-02T15:04:05.00Z" # match objects created before "date" + tags: + - key: name + value: pick* # match objects with tag 'name', all values starting with 'pick' + metadata: + - key: content-type + value: image/* # match objects with 'content-type', all values starting with 'image/' + size: + lessThan: "10MiB" # match objects with size less than this value (e.g. 10MiB) + greaterThan: 1MiB # match objects with size greater than this value (e.g. 1MiB) + purge: + # retainVersions: 0 # (default) delete all versions of the object. This option is the fastest. + # retainVersions: 5 # keep the latest 5 versions of the object. + + - type: deleted # objects with delete marker as their latest version + name: NAME # match object names that satisfy the wildcard expression. + olderThan: 10h # match objects older than this value (e.g. 7d10h31s) + createdBefore: "2006-01-02T15:04:05.00Z" # match objects created before "date" + purge: + # retainVersions: 0 # (default) delete all versions of the object. This option is the fastest. + # retainVersions: 5 # keep the latest 5 versions of the object including delete markers. + + notify: + endpoint: https://notify.endpoint # notification endpoint to receive job completion status + token: Bearer xxxxx # optional authentication token for the notification endpoint + + retry: + attempts: 10 # number of retries for the job before giving up + delay: 500ms # least amount of delay between each retry +` + var multiPrefixJob BatchJobRequest + err = yaml.Unmarshal([]byte(multiPrefixExpireYaml), &multiPrefixJob) + if err != nil { + t.Fatal("Failed to parse batch-job-expire yaml", err) + } + if !slices.Equal(multiPrefixJob.Expire.Prefix.F(), []string{"myprefix", "myprefix1"}) { + t.Fatal("Failed to parse batch-job-expire yaml") + } } diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index a313784a2..d8dcb27b3 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -385,12 +385,23 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 minioSrc := r.Source.Type == BatchJobReplicateResourceMinIO ctx, cancel := context.WithCancel(ctx) - objInfoCh := c.ListObjects(ctx, r.Source.Bucket, miniogo.ListObjectsOptions{ - Prefix: r.Source.Prefix, - WithVersions: minioSrc, - Recursive: true, - WithMetadata: true, - }) + + objInfoCh := make(chan miniogo.ObjectInfo, 1) + go func() { + for _, prefix := range r.Source.Prefix.F() { + prefixObjInfoCh := c.ListObjects(ctx, r.Source.Bucket, miniogo.ListObjectsOptions{ + Prefix: strings.TrimSpace(prefix), + WithVersions: minioSrc, + Recursive: true, + WithMetadata: true, + }) + for obj := range prefixObjInfoCh { + objInfoCh <- obj + } + } + xioutil.SafeClose(objInfoCh) + }() + prevObj := "" skipReplicate := false @@ -1188,19 +1199,28 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba if walkQuorum == "" { walkQuorum = "strict" } - ctx, cancel := context.WithCancel(ctx) + ctx, cancelCause := context.WithCancelCause(ctx) // one of source/target is s3, skip delete marker and all versions under the same object name. s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 - if err := api.Walk(ctx, r.Source.Bucket, r.Source.Prefix, walkCh, WalkOptions{ - Marker: lastObject, - Filter: selectObj, - AskDisks: walkQuorum, - }); err != nil { - cancel() - // Do not need to retry if we can't list objects on source. - return err - } + go func() { + for _, prefix := range r.Source.Prefix.F() { + prefixWalkCh := make(chan itemOrErr[ObjectInfo], 100) + if err := api.Walk(ctx, r.Source.Bucket, strings.TrimSpace(prefix), prefixWalkCh, WalkOptions{ + Marker: lastObject, + Filter: selectObj, + AskDisks: walkQuorum, + }); err != nil { + cancelCause(err) + xioutil.SafeClose(walkCh) + return + } + for obj := range prefixWalkCh { + walkCh <- obj + } + } + xioutil.SafeClose(walkCh) + }() prevObj := "" @@ -1251,7 +1271,10 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba }() } wk.Wait() - + // Do not need to retry if we can't list objects on source. + if context.Cause(ctx) != nil { + return context.Cause(ctx) + } ri.RetryAttempts = attempts ri.Complete = ri.ObjectsFailed == 0 ri.Failed = ri.ObjectsFailed > 0 @@ -1264,7 +1287,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba batchLogOnceIf(ctx, fmt.Errorf("unable to notify %v", err), job.ID+"notify") } - cancel() + cancelCause(nil) if ri.Failed { ri.ObjectsFailed = 0 ri.Bucket = "" @@ -2232,3 +2255,30 @@ func lookupStyle(s string) miniogo.BucketLookupType { } return lookup } + +// BatchJobPrefix - to support prefix field yaml unmarshalling with string or slice of strings +type BatchJobPrefix []string + +var _ yaml.Unmarshaler = &BatchJobPrefix{} + +// UnmarshalYAML - to support prefix field yaml unmarshalling with string or slice of strings +func (b *BatchJobPrefix) UnmarshalYAML(value *yaml.Node) error { + // try slice first + tmpSlice := []string{} + if err := value.Decode(&tmpSlice); err == nil { + *b = tmpSlice + return nil + } + // try string + tmpStr := "" + if err := value.Decode(&tmpStr); err == nil { + *b = []string{tmpStr} + return nil + } + return fmt.Errorf("unable to decode %s", value.Value) +} + +// F - return prefix(es) as slice +func (b *BatchJobPrefix) F() []string { + return *b +} diff --git a/cmd/batch-handlers_gen.go b/cmd/batch-handlers_gen.go index 4cae65173..5dd0bf047 100644 --- a/cmd/batch-handlers_gen.go +++ b/cmd/batch-handlers_gen.go @@ -6,6 +6,89 @@ import ( "github.com/tinylib/msgp/msgp" ) +// DecodeMsg implements msgp.Decodable +func (z *BatchJobPrefix) DecodeMsg(dc *msgp.Reader) (err error) { + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + if cap((*z)) >= int(zb0002) { + (*z) = (*z)[:zb0002] + } else { + (*z) = make(BatchJobPrefix, zb0002) + } + for zb0001 := range *z { + (*z)[zb0001], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z BatchJobPrefix) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteArrayHeader(uint32(len(z))) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0003 := range z { + err = en.WriteString(z[zb0003]) + if err != nil { + err = msgp.WrapError(err, zb0003) + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z BatchJobPrefix) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendArrayHeader(o, uint32(len(z))) + for zb0003 := range z { + o = msgp.AppendString(o, z[zb0003]) + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *BatchJobPrefix) UnmarshalMsg(bts []byte) (o []byte, err error) { + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + if cap((*z)) >= int(zb0002) { + (*z) = (*z)[:zb0002] + } else { + (*z) = make(BatchJobPrefix, zb0002) + } + for zb0001 := range *z { + (*z)[zb0001], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z BatchJobPrefix) Msgsize() (s int) { + s = msgp.ArrayHeaderSize + for zb0003 := range z { + s += msgp.StringPrefixSize + len(z[zb0003]) + } + return +} + // DecodeMsg implements msgp.Decodable func (z *BatchJobRequest) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte diff --git a/cmd/batch-handlers_gen_test.go b/cmd/batch-handlers_gen_test.go index 64a04ca6f..d67aacde2 100644 --- a/cmd/batch-handlers_gen_test.go +++ b/cmd/batch-handlers_gen_test.go @@ -9,6 +9,119 @@ import ( "github.com/tinylib/msgp/msgp" ) +func TestMarshalUnmarshalBatchJobPrefix(t *testing.T) { + v := BatchJobPrefix{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgBatchJobPrefix(b *testing.B) { + v := BatchJobPrefix{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgBatchJobPrefix(b *testing.B) { + v := BatchJobPrefix{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalBatchJobPrefix(b *testing.B) { + v := BatchJobPrefix{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeBatchJobPrefix(t *testing.T) { + v := BatchJobPrefix{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeBatchJobPrefix Msgsize() is inaccurate") + } + + vn := BatchJobPrefix{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeBatchJobPrefix(b *testing.B) { + v := BatchJobPrefix{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeBatchJobPrefix(b *testing.B) { + v := BatchJobPrefix{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalBatchJobRequest(t *testing.T) { v := BatchJobRequest{} bts, err := v.MarshalMsg(nil) diff --git a/cmd/batch-handlers_test.go b/cmd/batch-handlers_test.go new file mode 100644 index 000000000..213e76703 --- /dev/null +++ b/cmd/batch-handlers_test.go @@ -0,0 +1,75 @@ +// Copyright (c) 2015-2024 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "slices" + "testing" + + "gopkg.in/yaml.v3" +) + +func TestBatchJobPrefix_UnmarshalYAML(t *testing.T) { + type args struct { + yamlStr string + } + type PrefixTemp struct { + Prefix BatchJobPrefix `yaml:"prefix"` + } + tests := []struct { + name string + b PrefixTemp + args args + want []string + wantErr bool + }{ + { + name: "test1", + b: PrefixTemp{}, + args: args{ + yamlStr: ` +prefix: "foo" +`, + }, + want: []string{"foo"}, + wantErr: false, + }, + { + name: "test2", + b: PrefixTemp{}, + args: args{ + yamlStr: ` +prefix: + - "foo" + - "bar" +`, + }, + want: []string{"foo", "bar"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := yaml.Unmarshal([]byte(tt.args.yamlStr), &tt.b); (err != nil) != tt.wantErr { + t.Errorf("UnmarshalYAML() error = %v, wantErr %v", err, tt.wantErr) + } + if !slices.Equal(tt.b.Prefix.F(), tt.want) { + t.Errorf("UnmarshalYAML() = %v, want %v", tt.b.Prefix.F(), tt.want) + } + }) + } +} diff --git a/cmd/batch-replicate.go b/cmd/batch-replicate.go index b3d6f3da8..37a1834d4 100644 --- a/cmd/batch-replicate.go +++ b/cmd/batch-replicate.go @@ -151,7 +151,7 @@ func (t BatchJobReplicateTarget) ValidPath() bool { type BatchJobReplicateSource struct { Type BatchJobReplicateResourceType `yaml:"type" json:"type"` Bucket string `yaml:"bucket" json:"bucket"` - Prefix string `yaml:"prefix" json:"prefix"` + Prefix BatchJobPrefix `yaml:"prefix" json:"prefix"` Endpoint string `yaml:"endpoint" json:"endpoint"` Path string `yaml:"path" json:"path"` Creds BatchJobReplicateCredentials `yaml:"credentials" json:"credentials"` diff --git a/cmd/batch-replicate_gen.go b/cmd/batch-replicate_gen.go index 6392829e2..ea5fe5b27 100644 --- a/cmd/batch-replicate_gen.go +++ b/cmd/batch-replicate_gen.go @@ -411,7 +411,7 @@ func (z *BatchJobReplicateSource) DecodeMsg(dc *msgp.Reader) (err error) { return } case "Prefix": - z.Prefix, err = dc.ReadString() + err = z.Prefix.DecodeMsg(dc) if err != nil { err = msgp.WrapError(err, "Prefix") return @@ -514,7 +514,7 @@ func (z *BatchJobReplicateSource) EncodeMsg(en *msgp.Writer) (err error) { if err != nil { return } - err = en.WriteString(z.Prefix) + err = z.Prefix.EncodeMsg(en) if err != nil { err = msgp.WrapError(err, "Prefix") return @@ -600,7 +600,11 @@ func (z *BatchJobReplicateSource) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendString(o, z.Bucket) // string "Prefix" o = append(o, 0xa6, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78) - o = msgp.AppendString(o, z.Prefix) + o, err = z.Prefix.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } // string "Endpoint" o = append(o, 0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74) o = msgp.AppendString(o, z.Endpoint) @@ -664,7 +668,7 @@ func (z *BatchJobReplicateSource) UnmarshalMsg(bts []byte) (o []byte, err error) return } case "Prefix": - z.Prefix, bts, err = msgp.ReadStringBytes(bts) + bts, err = z.Prefix.UnmarshalMsg(bts) if err != nil { err = msgp.WrapError(err, "Prefix") return @@ -742,7 +746,7 @@ func (z *BatchJobReplicateSource) UnmarshalMsg(bts []byte) (o []byte, err error) // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *BatchJobReplicateSource) Msgsize() (s int) { - s = 1 + 5 + msgp.StringPrefixSize + len(string(z.Type)) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 5 + msgp.StringPrefixSize + len(z.Path) + 6 + 1 + 10 + msgp.StringPrefixSize + len(z.Creds.AccessKey) + 10 + msgp.StringPrefixSize + len(z.Creds.SecretKey) + 13 + msgp.StringPrefixSize + len(z.Creds.SessionToken) + 9 + z.Snowball.Msgsize() + s = 1 + 5 + msgp.StringPrefixSize + len(string(z.Type)) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + z.Prefix.Msgsize() + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 5 + msgp.StringPrefixSize + len(z.Path) + 6 + 1 + 10 + msgp.StringPrefixSize + len(z.Creds.AccessKey) + 10 + msgp.StringPrefixSize + len(z.Creds.SecretKey) + 13 + msgp.StringPrefixSize + len(z.Creds.SessionToken) + 9 + z.Snowball.Msgsize() return } diff --git a/cmd/batch-replicate_test.go b/cmd/batch-replicate_test.go index fb6b686f3..e84c59cf9 100644 --- a/cmd/batch-replicate_test.go +++ b/cmd/batch-replicate_test.go @@ -18,6 +18,7 @@ package cmd import ( + "slices" "testing" "gopkg.in/yaml.v3" @@ -97,4 +98,85 @@ replicate: if err != nil { t.Fatal("Failed to parse batch-job-replicate yaml", err) } + if !slices.Equal(job.Replicate.Source.Prefix.F(), []string{"object-prefix1"}) { + t.Fatal("Failed to parse batch-job-replicate yaml", err) + } + multiPrefixReplicateYaml := ` +replicate: + apiVersion: v1 + # source of the objects to be replicated + source: + type: minio # valid values are "s3" or "minio" + bucket: mytest + prefix: # 'PREFIX' is optional + - object-prefix1 + - object-prefix2 + # If your source is the 'local' alias specified to 'mc batch start', then the 'endpoint' and 'credentials' fields are optional and can be omitted + # Either the 'source' or 'remote' *must* be the "local" deployment +# endpoint: "http://127.0.0.1:9000" +# # path: "on|off|auto" # "on" enables path-style bucket lookup. "off" enables virtual host (DNS)-style bucket lookup. Defaults to "auto" +# credentials: +# accessKey: minioadmin # Required +# secretKey: minioadmin # Required +# # sessionToken: SESSION-TOKEN # Optional only available when rotating credentials are used + snowball: # automatically activated if the source is local + disable: true # optionally turn-off snowball archive transfer +# batch: 100 # upto this many objects per archive +# inmemory: true # indicates if the archive must be staged locally or in-memory +# compress: false # S2/Snappy compressed archive +# smallerThan: 5MiB # create archive for all objects smaller than 5MiB +# skipErrs: false # skips any source side read() errors + + # target where the objects must be replicated + target: + type: minio # valid values are "s3" or "minio" + bucket: mytest + prefix: stage # 'PREFIX' is optional + # If your source is the 'local' alias specified to 'mc batch start', then the 'endpoint' and 'credentials' fields are optional and can be omitted + + # Either the 'source' or 'remote' *must* be the "local" deployment + endpoint: "http://127.0.0.1:9001" + # path: "on|off|auto" # "on" enables path-style bucket lookup. "off" enables virtual host (DNS)-style bucket lookup. Defaults to "auto" + credentials: + accessKey: minioadmin + secretKey: minioadmin + # sessionToken: SESSION-TOKEN # Optional only available when rotating credentials are used + + # NOTE: All flags are optional + # - filtering criteria only applies for all source objects match the criteria + # - configurable notification endpoints + # - configurable retries for the job (each retry skips successfully previously replaced objects) + flags: + filter: + newerThan: "7d10h31s" # match objects newer than this value (e.g. 7d10h31s) + olderThan: "7d" # match objects older than this value (e.g. 7d10h31s) +# createdAfter: "date" # match objects created after "date" +# createdBefore: "date" # match objects created before "date" + + ## NOTE: tags are not supported when "source" is remote. + tags: + - key: "name" + value: "pick*" # match objects with tag 'name', with all values starting with 'pick' + + metadata: + - key: "content-type" + value: "image/*" # match objects with 'content-type', with all values starting with 'image/' + +# notify: +# endpoint: "https://notify.endpoint" # notification endpoint to receive job status events +# token: "Bearer xxxxx" # optional authentication token for the notification endpoint +# +# retry: +# attempts: 10 # number of retries for the job before giving up +# delay: "500ms" # least amount of delay between each retry + +` + var multiPrefixJob BatchJobRequest + err = yaml.Unmarshal([]byte(multiPrefixReplicateYaml), &multiPrefixJob) + if err != nil { + t.Fatal("Failed to parse batch-job-replicate yaml", err) + } + if !slices.Equal(multiPrefixJob.Replicate.Source.Prefix.F(), []string{"object-prefix1", "object-prefix2"}) { + t.Fatal("Failed to parse batch-job-replicate yaml") + } }