Add support for replication of object tags, retention metadata (#10880)

This commit is contained in:
Poorna Krishnamoorthy
2020-11-19 11:50:22 -08:00
committed by Harshavardhana
parent 0fa430c1da
commit 251c1ef6da
9 changed files with 187 additions and 38 deletions

View File

@@ -24,6 +24,7 @@ import (
"strings"
"time"
minio "github.com/minio/minio-go/v7"
miniogo "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/minio/minio-go/v7/pkg/tags"
@@ -249,6 +250,45 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA
}
}
func getCopyObjMetadata(oi ObjectInfo, dest replication.Destination) map[string]string {
meta := make(map[string]string, len(oi.UserDefined))
for k, v := range oi.UserDefined {
if k == xhttp.AmzBucketReplicationStatus {
continue
}
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
continue
}
meta[k] = v
}
if oi.ContentEncoding != "" {
meta[xhttp.ContentEncoding] = oi.ContentEncoding
}
if oi.ContentType != "" {
meta[xhttp.ContentType] = oi.ContentType
}
tag, err := tags.ParseObjectTags(oi.UserTags)
if err != nil {
return nil
}
if tag != nil {
meta[xhttp.AmzObjectTagging] = tag.String()
meta[xhttp.AmzTagDirective] = "REPLACE"
}
sc := dest.StorageClass
if sc == "" {
sc = oi.StorageClass
}
meta[xhttp.AmzStorageClass] = sc
if oi.UserTags != "" {
meta[xhttp.AmzObjectTagging] = oi.UserTags
}
meta[xhttp.MinIOSourceMTime] = oi.ModTime.Format(time.RFC3339)
meta[xhttp.MinIOSourceETag] = oi.ETag
meta[xhttp.AmzBucketReplicationStatus] = replication.Replica.String()
return meta
}
func putReplicationOpts(ctx context.Context, dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) {
meta := make(map[string]string)
for k, v := range objInfo.UserDefined {
@@ -302,6 +342,53 @@ func putReplicationOpts(ctx context.Context, dest replication.Destination, objIn
return
}
type replicationAction string
const (
replicateMetadata replicationAction = "metadata"
replicateNone replicationAction = "none"
replicateAll replicationAction = "all"
)
// returns replicationAction by comparing metadata between source and target
func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationAction {
// needs full replication
if oi1.ETag != oi2.ETag ||
oi1.VersionID != oi2.VersionID ||
oi1.Size != oi2.Size ||
oi1.DeleteMarker != oi2.IsDeleteMarker {
return replicateAll
}
if !oi1.ModTime.Equal(oi2.LastModified) ||
oi1.ContentType != oi2.ContentType ||
oi1.StorageClass != oi2.StorageClass {
return replicateMetadata
}
if oi1.ContentEncoding != "" {
enc, ok := oi2.UserMetadata[xhttp.ContentEncoding]
if !ok || enc != oi1.ContentEncoding {
return replicateMetadata
}
}
for k, v := range oi2.UserMetadata {
oi2.Metadata[k] = []string{v}
}
if len(oi2.Metadata) != len(oi1.UserDefined) {
return replicateMetadata
}
for k1, v1 := range oi1.UserDefined {
if v2, ok := oi2.Metadata[k1]; !ok || v1 != strings.Join(v2, "") {
return replicateMetadata
}
}
t, _ := tags.MapToObjectTags(oi2.UserTags)
if t.String() != oi1.UserTags {
return replicateMetadata
}
return replicateNone
}
// replicateObject replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLayer) {
@@ -338,16 +425,11 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
return
}
// if heal encounters a pending replication status, either replication
// has failed due to server shutdown or crawler and PutObject replication are in contention.
healPending := objInfo.ReplicationStatus == replication.Pending
// In the rare event that replication is in pending state either due to
// server shut down/crash before replication completed or healing and PutObject
// race - do an additional stat to see if the version ID exists
if healPending {
_, err := tgt.StatObject(ctx, dest.Bucket, object, miniogo.StatObjectOptions{VersionID: objInfo.VersionID})
if err == nil {
rtype := replicateAll
oi, err := tgt.StatObject(ctx, dest.Bucket, object, miniogo.StatObjectOptions{VersionID: objInfo.VersionID})
if err == nil {
rtype = getReplicationAction(objInfo, oi)
if rtype == replicateNone {
gr.Close()
// object with same VersionID already exists, replication kicked off by
// PutObject might have completed.
@@ -375,8 +457,13 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
headerSize += len(k) + len(v)
}
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit)
_, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts)
if rtype == replicateAll {
_, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts)
} else {
// replicate metadata for object tagging/copy with metadata replacement
dstOpts := miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{SourceVersionID: objInfo.VersionID}}
_, err = tgt.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts)
}
r.Close()
if err != nil {
replicationStatus = replication.Failed