Implement batch snowball (#18485)

This commit is contained in:
Anis Eleuch
2023-11-22 10:51:46 -08:00
committed by GitHub
parent 0b074d0fae
commit 70fbcfee4a
14 changed files with 901 additions and 83 deletions

View File

@@ -527,6 +527,61 @@ func toObjectInfo(bucket, object string, objInfo miniogo.ObjectInfo) ObjectInfo
return oi
}
func (r BatchJobReplicateV1) writeAsArchive(ctx context.Context, objAPI ObjectLayer, remoteClnt *minio.Client, entries []ObjectInfo) error {
input := make(chan minio.SnowballObject, 1)
opts := minio.SnowballOptions{
Opts: minio.PutObjectOptions{},
InMemory: *r.Source.Snowball.InMemory,
Compress: *r.Source.Snowball.Compress,
SkipErrs: *r.Source.Snowball.SkipErrs,
}
go func() {
defer close(input)
for _, entry := range entries {
gr, err := objAPI.GetObjectNInfo(ctx, r.Source.Bucket,
entry.Name, nil, nil, ObjectOptions{
VersionID: entry.VersionID,
})
if err != nil {
logger.LogIf(ctx, err)
continue
}
snowballObj := minio.SnowballObject{
// Create path to store objects within the bucket.
Key: entry.Name,
Size: entry.Size,
ModTime: entry.ModTime,
VersionID: entry.VersionID,
Content: gr,
Headers: make(http.Header),
Close: func() {
gr.Close()
},
}
opts, err := batchReplicationOpts(ctx, "", gr.ObjInfo)
if err != nil {
logger.LogIf(ctx, err)
continue
}
for k, vals := range opts.Header() {
for _, v := range vals {
snowballObj.Headers.Add(k, v)
}
}
input <- snowballObj
}
}()
// Collect and upload all entries.
return remoteClnt.PutObjectsSnowball(ctx, r.Target.Bucket, opts, input)
}
// ReplicateToTarget read from source and replicate to configured target
func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectLayer, c *miniogo.Core, srcObjInfo ObjectInfo, retry bool) error {
srcBucket := r.Source.Bucket
@@ -941,8 +996,73 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
if err != nil {
return err
}
c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID)
var (
walkCh = make(chan ObjectInfo, 100)
slowCh = make(chan ObjectInfo, 100)
)
if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() {
go func() {
defer close(slowCh)
// Snowball currently needs the high level minio-go Client, not the Core one
cl, err := miniogo.New(u.Host, &miniogo.Options{
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken),
Secure: u.Scheme == "https",
Transport: getRemoteInstanceTransport,
BucketLookup: lookupStyle(r.Target.Path),
})
if err != nil {
logger.LogIf(ctx, err)
return
}
// Already validated before arriving here
smallerThan, _ := humanize.ParseBytes(*r.Source.Snowball.SmallerThan)
var (
obj = ObjectInfo{}
batch = make([]ObjectInfo, 0, *r.Source.Snowball.Batch)
valid = true
)
for valid {
obj, valid = <-walkCh
if !valid {
goto write
}
if obj.DeleteMarker || !obj.VersionPurgeStatus.Empty() || obj.Size >= int64(smallerThan) {
slowCh <- obj
continue
}
batch = append(batch, obj)
if len(batch) < *r.Source.Snowball.Batch {
continue
}
write:
if len(batch) > 0 {
if err := r.writeAsArchive(ctx, api, cl, batch); err != nil {
logger.LogIf(ctx, err)
for _, b := range batch {
slowCh <- b
}
}
batch = batch[:0]
}
}
}()
} else {
slowCh = walkCh
}
workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_REPLICATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2)))
if err != nil {
return err
@@ -963,8 +1083,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
// one of source/target is s3, skip delete marker and all versions under the same object name.
s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3
results := make(chan ObjectInfo, 100)
if err := api.Walk(ctx, r.Source.Bucket, r.Source.Prefix, results, ObjectOptions{
if err := api.Walk(ctx, r.Source.Bucket, r.Source.Prefix, walkCh, ObjectOptions{
WalkMarker: lastObject,
WalkFilter: selectObj,
}); err != nil {
@@ -976,7 +1095,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
prevObj := ""
skipReplicate := false
for result := range results {
for result := range slowCh {
result := result
if result.Name != prevObj {
prevObj = result.Name
@@ -1090,6 +1209,9 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest,
if err := r.Source.Type.Validate(); err != nil {
return err
}
if err := r.Source.Snowball.Validate(); err != nil {
return err
}
if r.Source.Creds.Empty() && r.Target.Creds.Empty() {
return errInvalidArgument
}
@@ -1399,10 +1521,38 @@ func (a adminAPIHandlers) StartBatchJob(w http.ResponseWriter, r *http.Request)
return
}
// Validate the incoming job request
if err := job.Validate(ctx, objectAPI); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return
}
job.ID = fmt.Sprintf("%s:%d", shortuuid.New(), GetProxyEndpointLocalIndex(globalProxyEndpoints))
job.User = user
job.Started = time.Now()
// Fill with default values
if job.Replicate != nil {
if job.Replicate.Source.Snowball.Disable == nil {
job.Replicate.Source.Snowball.Disable = ptr(false)
}
if job.Replicate.Source.Snowball.Batch == nil {
job.Replicate.Source.Snowball.Batch = ptr(100)
}
if job.Replicate.Source.Snowball.InMemory == nil {
job.Replicate.Source.Snowball.InMemory = ptr(true)
}
if job.Replicate.Source.Snowball.Compress == nil {
job.Replicate.Source.Snowball.Compress = ptr(false)
}
if job.Replicate.Source.Snowball.SmallerThan == nil {
job.Replicate.Source.Snowball.SmallerThan = ptr("5MiB")
}
if job.Replicate.Source.Snowball.SkipErrs == nil {
job.Replicate.Source.Snowball.SkipErrs = ptr(true)
}
}
if err := job.save(ctx, objectAPI); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return