mirror of
https://github.com/minio/minio.git
synced 2026-02-05 02:10:14 -05:00
fix: simplify background heal and trigger heal items early (#9928)
Bonus fix during versioning merge one of the PR was missing the offline/online disk count fix from #9801 port it correctly over to the master branch from release. Additionally, add versionID support for MRF Fixes #9910 Fixes #9931
This commit is contained in:
@@ -33,6 +33,7 @@ function start_minio_3_node() {
|
||||
declare -a ARGS
|
||||
export MINIO_ACCESS_KEY=minio
|
||||
export MINIO_SECRET_KEY=minio123
|
||||
export MINIO_ERASURE_SET_DRIVE_COUNT=6
|
||||
|
||||
start_port=$(shuf -i 10000-65000 -n 1)
|
||||
for i in $(seq 1 3); do
|
||||
|
||||
@@ -173,7 +173,7 @@ func (a adminAPIHandlers) GetConfigKVHandler(w http.ResponseWriter, r *http.Requ
|
||||
}
|
||||
|
||||
cfg := globalServerConfig
|
||||
if globalSafeMode {
|
||||
if newObjectLayerFn() == nil {
|
||||
var err error
|
||||
cfg, err = getValidConfig(objectAPI)
|
||||
if err != nil {
|
||||
|
||||
@@ -708,10 +708,6 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
// find number of disks in the setup, ignore any errors here.
|
||||
info, _ := objectAPI.StorageInfo(ctx, false)
|
||||
numDisks := info.Backend.OfflineDisks.Sum() + info.Backend.OnlineDisks.Sum()
|
||||
|
||||
healPath := pathJoin(hip.bucket, hip.objPrefix)
|
||||
if hip.clientToken == "" && !hip.forceStart && !hip.forceStop {
|
||||
nh, exists := globalAllHealState.getHealSequence(healPath)
|
||||
@@ -754,7 +750,7 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
|
||||
respCh <- hr
|
||||
}()
|
||||
case hip.clientToken == "":
|
||||
nh := newHealSequence(GlobalContext, hip.bucket, hip.objPrefix, handlers.GetSourceIP(r), numDisks, hip.hs, hip.forceStart)
|
||||
nh := newHealSequence(GlobalContext, hip.bucket, hip.objPrefix, handlers.GetSourceIP(r), hip.hs, hip.forceStart)
|
||||
go func() {
|
||||
respBytes, apiErr, errMsg := globalAllHealState.LaunchNewHealSequence(nh)
|
||||
hr := healResp{respBytes, apiErr, errMsg}
|
||||
@@ -1399,10 +1395,8 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
|
||||
}
|
||||
}
|
||||
|
||||
mode := ""
|
||||
if globalSafeMode {
|
||||
mode = "safe"
|
||||
} else {
|
||||
mode := "safe"
|
||||
if newObjectLayerFn() != nil {
|
||||
mode = "online"
|
||||
}
|
||||
|
||||
|
||||
@@ -77,9 +77,6 @@ type healSequenceStatus struct {
|
||||
FailureDetail string `json:"Detail,omitempty"`
|
||||
StartTime time.Time `json:"StartTime"`
|
||||
|
||||
// disk information
|
||||
NumDisks int `json:"NumDisks"`
|
||||
|
||||
// settings for the heal sequence
|
||||
HealSettings madmin.HealOpts `json:"Settings"`
|
||||
|
||||
@@ -95,8 +92,8 @@ type allHealState struct {
|
||||
healSeqMap map[string]*healSequence
|
||||
}
|
||||
|
||||
// initHealState - initialize healing apparatus
|
||||
func initHealState() *allHealState {
|
||||
// newHealState - initialize global heal state management
|
||||
func newHealState() *allHealState {
|
||||
healState := &allHealState{
|
||||
healSeqMap: make(map[string]*healSequence),
|
||||
}
|
||||
@@ -368,7 +365,7 @@ type healSequence struct {
|
||||
// NewHealSequence - creates healSettings, assumes bucket and
|
||||
// objPrefix are already validated.
|
||||
func newHealSequence(ctx context.Context, bucket, objPrefix, clientAddr string,
|
||||
numDisks int, hs madmin.HealOpts, forceStart bool) *healSequence {
|
||||
hs madmin.HealOpts, forceStart bool) *healSequence {
|
||||
|
||||
reqInfo := &logger.ReqInfo{RemoteHost: clientAddr, API: "Heal", BucketName: bucket}
|
||||
reqInfo.AppendTags("prefix", objPrefix)
|
||||
@@ -388,7 +385,6 @@ func newHealSequence(ctx context.Context, bucket, objPrefix, clientAddr string,
|
||||
currentStatus: healSequenceStatus{
|
||||
Summary: healNotStartedStatus,
|
||||
HealSettings: hs,
|
||||
NumDisks: numDisks,
|
||||
},
|
||||
traverseAndHealDoneCh: make(chan error),
|
||||
cancelCtx: cancel,
|
||||
@@ -677,11 +673,6 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
|
||||
}
|
||||
|
||||
func (h *healSequence) healItemsFromSourceCh() error {
|
||||
bucketsOnly := true // heal buckets only, not objects.
|
||||
if err := h.healItems(bucketsOnly); err != nil {
|
||||
logger.LogIf(h.ctx, err)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case source, ok := <-h.sourceCh:
|
||||
@@ -716,7 +707,7 @@ func (h *healSequence) healFromSourceCh() {
|
||||
h.healItemsFromSourceCh()
|
||||
}
|
||||
|
||||
func (h *healSequence) healItems(bucketsOnly bool) error {
|
||||
func (h *healSequence) healDiskMeta() error {
|
||||
// Start with format healing
|
||||
if err := h.healDiskFormat(); err != nil {
|
||||
return err
|
||||
@@ -728,7 +719,11 @@ func (h *healSequence) healItems(bucketsOnly bool) error {
|
||||
}
|
||||
|
||||
// Start healing the bucket config prefix.
|
||||
if err := h.healMinioSysMeta(bucketConfigPrefix)(); err != nil {
|
||||
return h.healMinioSysMeta(bucketConfigPrefix)()
|
||||
}
|
||||
|
||||
func (h *healSequence) healItems(bucketsOnly bool) error {
|
||||
if err := h.healDiskMeta(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -100,7 +100,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
|
||||
}
|
||||
}
|
||||
|
||||
func initHealRoutine() *healRoutine {
|
||||
func newHealRoutine() *healRoutine {
|
||||
return &healRoutine{
|
||||
tasks: make(chan healTask),
|
||||
doneCh: make(chan struct{}),
|
||||
@@ -108,22 +108,22 @@ func initHealRoutine() *healRoutine {
|
||||
|
||||
}
|
||||
|
||||
func startBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
||||
func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
||||
// Run the background healer
|
||||
globalBackgroundHealRoutine = initHealRoutine()
|
||||
globalBackgroundHealRoutine = newHealRoutine()
|
||||
go globalBackgroundHealRoutine.run(ctx, objAPI)
|
||||
|
||||
// Launch the background healer sequence to track
|
||||
// background healing operations, ignore errors
|
||||
// errors are handled into offline disks already.
|
||||
info, _ := objAPI.StorageInfo(ctx, false)
|
||||
numDisks := info.Backend.OnlineDisks.Sum() + info.Backend.OfflineDisks.Sum()
|
||||
nh := newBgHealSequence(numDisks)
|
||||
globalBackgroundHealState.LaunchNewHealSequence(nh)
|
||||
}
|
||||
nh := newBgHealSequence()
|
||||
// Heal any disk format and metadata early, if possible.
|
||||
if err := nh.healDiskMeta(); err != nil {
|
||||
if newObjectLayerFn() != nil {
|
||||
// log only in situations, when object layer
|
||||
// has fully initialized.
|
||||
logger.LogIf(nh.ctx, err)
|
||||
}
|
||||
}
|
||||
|
||||
func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
||||
go startBackgroundHealing(ctx, objAPI)
|
||||
globalBackgroundHealState.LaunchNewHealSequence(nh)
|
||||
}
|
||||
|
||||
// healDiskFormat - heals format.json, return value indicates if a
|
||||
@@ -140,12 +140,20 @@ func healDiskFormat(ctx context.Context, objAPI ObjectLayer, opts madmin.HealOpt
|
||||
// Healing succeeded notify the peers to reload format and re-initialize disks.
|
||||
// We will not notify peers if healing is not required.
|
||||
if err == nil {
|
||||
for _, nerr := range globalNotificationSys.ReloadFormat(opts.DryRun) {
|
||||
if nerr.Err != nil {
|
||||
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
||||
logger.LogIf(ctx, nerr.Err)
|
||||
// Notify servers in background and retry if needed.
|
||||
go func() {
|
||||
retry:
|
||||
for _, nerr := range globalNotificationSys.ReloadFormat(opts.DryRun) {
|
||||
if nerr.Err != nil {
|
||||
if nerr.Err.Error() == errServerNotInitialized.Error() {
|
||||
time.Sleep(time.Second)
|
||||
goto retry
|
||||
}
|
||||
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
||||
logger.LogIf(ctx, nerr.Err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return res, nil
|
||||
|
||||
@@ -149,7 +149,7 @@ func readServerConfig(ctx context.Context, objAPI ObjectLayer) (config.Config, e
|
||||
if err != nil {
|
||||
// Config not found for some reason, allow things to continue
|
||||
// by initializing a new fresh config in safe mode.
|
||||
if err == errConfigNotFound && globalSafeMode {
|
||||
if err == errConfigNotFound && newObjectLayerFn() == nil {
|
||||
return newServerConfig(), nil
|
||||
}
|
||||
return nil, err
|
||||
|
||||
@@ -1479,9 +1479,5 @@ func (fs *FSObjects) IsReady(_ context.Context) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
globalObjLayerMutex.RLock()
|
||||
res := globalObjectAPI != nil && !globalSafeMode
|
||||
globalObjLayerMutex.RUnlock()
|
||||
|
||||
return res
|
||||
return newObjectLayerFn() != nil
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ var leaderLockTimeout = newDynamicTimeout(time.Minute, time.Minute)
|
||||
|
||||
// NewBgHealSequence creates a background healing sequence
|
||||
// operation which crawls all objects and heal them.
|
||||
func newBgHealSequence(numDisks int) *healSequence {
|
||||
func newBgHealSequence() *healSequence {
|
||||
|
||||
reqInfo := &logger.ReqInfo{API: "BackgroundHeal"}
|
||||
ctx, cancelCtx := context.WithCancel(logger.SetReqInfo(GlobalContext, reqInfo))
|
||||
@@ -54,7 +54,6 @@ func newBgHealSequence(numDisks int) *healSequence {
|
||||
currentStatus: healSequenceStatus{
|
||||
Summary: healNotStartedStatus,
|
||||
HealSettings: hs,
|
||||
NumDisks: numDisks,
|
||||
},
|
||||
cancelCtx: cancelCtx,
|
||||
ctx: ctx,
|
||||
|
||||
@@ -204,6 +204,12 @@ func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) {
|
||||
}
|
||||
}(txnLk)
|
||||
|
||||
// Enable healing to heal drives if possible
|
||||
if globalIsXL {
|
||||
initBackgroundHealing(ctx, newObject)
|
||||
initLocalDisksAutoHeal(ctx, newObject)
|
||||
}
|
||||
|
||||
// **** WARNING ****
|
||||
// Migrating to encrypted backend should happen before initialization of any
|
||||
// sub-systems, make sure that we do not move the above codeblock elsewhere.
|
||||
@@ -419,9 +425,9 @@ func serverMain(ctx *cli.Context) {
|
||||
setMaxResources()
|
||||
|
||||
if globalIsXL {
|
||||
// Init global heal state
|
||||
globalAllHealState = initHealState()
|
||||
globalBackgroundHealState = initHealState()
|
||||
// New global heal state
|
||||
globalAllHealState = newHealState()
|
||||
globalBackgroundHealState = newHealState()
|
||||
}
|
||||
|
||||
// Configure server.
|
||||
@@ -501,12 +507,6 @@ func serverMain(ctx *cli.Context) {
|
||||
|
||||
newAllSubsystems()
|
||||
|
||||
// Enable healing to heal drives if possible
|
||||
if globalIsXL {
|
||||
initBackgroundHealing(GlobalContext, newObject)
|
||||
initLocalDisksAutoHeal(GlobalContext, newObject)
|
||||
}
|
||||
|
||||
go startBackgroundOps(GlobalContext, newObject)
|
||||
|
||||
logger.FatalIf(initSafeMode(GlobalContext, newObject), "Unable to initialize server switching into safe-mode")
|
||||
|
||||
@@ -404,7 +404,7 @@ func resetGlobalIsXL() {
|
||||
func resetGlobalHealState() {
|
||||
// Init global heal state
|
||||
if globalAllHealState == nil {
|
||||
globalAllHealState = initHealState()
|
||||
globalAllHealState = newHealState()
|
||||
} else {
|
||||
globalAllHealState.Lock()
|
||||
for _, v := range globalAllHealState.healSeqMap {
|
||||
@@ -417,7 +417,7 @@ func resetGlobalHealState() {
|
||||
|
||||
// Init background heal state
|
||||
if globalBackgroundHealState == nil {
|
||||
globalBackgroundHealState = initHealState()
|
||||
globalBackgroundHealState = newHealState()
|
||||
} else {
|
||||
globalBackgroundHealState.Lock()
|
||||
for _, v := range globalBackgroundHealState.healSeqMap {
|
||||
|
||||
@@ -90,8 +90,8 @@ type xlSets struct {
|
||||
pool *MergeWalkPool
|
||||
poolSplunk *MergeWalkPool
|
||||
|
||||
mrfMU sync.Mutex
|
||||
mrfUploads map[string]int
|
||||
mrfMU sync.Mutex
|
||||
mrfOperations map[string]int
|
||||
}
|
||||
|
||||
func isEndpointConnected(diskMap map[string]StorageAPI, endpoint string) bool {
|
||||
@@ -303,7 +303,7 @@ func newXLSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageA
|
||||
distributionAlgo: format.XL.DistributionAlgo,
|
||||
pool: NewMergeWalkPool(globalMergeLookupTimeout),
|
||||
poolSplunk: NewMergeWalkPool(globalMergeLookupTimeout),
|
||||
mrfUploads: make(map[string]int),
|
||||
mrfOperations: make(map[string]int),
|
||||
}
|
||||
|
||||
mutex := newNSLock(globalIsDistXL)
|
||||
@@ -348,7 +348,7 @@ func newXLSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageA
|
||||
getEndpoints: s.GetEndpoints(i),
|
||||
nsMutex: mutex,
|
||||
bp: bp,
|
||||
mrfUploadCh: make(chan partialUpload, 10000),
|
||||
mrfOpCh: make(chan partialOperation, 10000),
|
||||
}
|
||||
|
||||
go s.sets[i].cleanupStaleMultipartUploads(ctx,
|
||||
@@ -1765,9 +1765,9 @@ func (s *xlSets) GetMetrics(ctx context.Context) (*Metrics, error) {
|
||||
// from all underlying xl sets and puts them in a global map which
|
||||
// should not have more than 10000 entries.
|
||||
func (s *xlSets) maintainMRFList() {
|
||||
var agg = make(chan partialUpload, 10000)
|
||||
var agg = make(chan partialOperation, 10000)
|
||||
for i, xl := range s.sets {
|
||||
go func(c <-chan partialUpload, setIndex int) {
|
||||
go func(c <-chan partialOperation, setIndex int) {
|
||||
for msg := range c {
|
||||
msg.failedSet = setIndex
|
||||
select {
|
||||
@@ -1775,16 +1775,16 @@ func (s *xlSets) maintainMRFList() {
|
||||
default:
|
||||
}
|
||||
}
|
||||
}(xl.mrfUploadCh, i)
|
||||
}(xl.mrfOpCh, i)
|
||||
}
|
||||
|
||||
for fUpload := range agg {
|
||||
for fOp := range agg {
|
||||
s.mrfMU.Lock()
|
||||
if len(s.mrfUploads) > 10000 {
|
||||
if len(s.mrfOperations) > 10000 {
|
||||
s.mrfMU.Unlock()
|
||||
continue
|
||||
}
|
||||
s.mrfUploads[pathJoin(fUpload.bucket, fUpload.object)] = fUpload.failedSet
|
||||
s.mrfOperations[pathJoin(fOp.bucket, fOp.object)] = fOp.failedSet
|
||||
s.mrfMU.Unlock()
|
||||
}
|
||||
}
|
||||
@@ -1810,17 +1810,17 @@ func (s *xlSets) healMRFRoutine() {
|
||||
for e := range s.disksConnectEvent {
|
||||
// Get the list of objects related the xl set
|
||||
// to which the connected disk belongs.
|
||||
var mrfUploads []string
|
||||
var mrfOperations []string
|
||||
s.mrfMU.Lock()
|
||||
for k, v := range s.mrfUploads {
|
||||
for k, v := range s.mrfOperations {
|
||||
if v == e.setIndex {
|
||||
mrfUploads = append(mrfUploads, k)
|
||||
mrfOperations = append(mrfOperations, k)
|
||||
}
|
||||
}
|
||||
s.mrfMU.Unlock()
|
||||
|
||||
// Heal objects
|
||||
for _, u := range mrfUploads {
|
||||
for _, u := range mrfOperations {
|
||||
// Send an object to be healed with a timeout
|
||||
select {
|
||||
case bgSeq.sourceCh <- healSource{path: u}:
|
||||
@@ -1828,7 +1828,7 @@ func (s *xlSets) healMRFRoutine() {
|
||||
}
|
||||
|
||||
s.mrfMU.Lock()
|
||||
delete(s.mrfUploads, u)
|
||||
delete(s.mrfOperations, u)
|
||||
s.mrfMU.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -731,9 +731,9 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
||||
}
|
||||
|
||||
// Check if there is any offline disk and add it to the MRF list
|
||||
for i := 0; i < len(onlineDisks); i++ {
|
||||
if onlineDisks[i] == nil || storageDisks[i] == nil {
|
||||
xl.addPartialUpload(bucket, object)
|
||||
for i, disk := range onlineDisks {
|
||||
if disk == nil || storageDisks[i] == nil {
|
||||
xl.addPartial(bucket, object)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@@ -667,7 +667,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
|
||||
// during this upload, send it to the MRF list.
|
||||
for i := 0; i < len(onlineDisks); i++ {
|
||||
if onlineDisks[i] == nil || storageDisks[i] == nil {
|
||||
xl.addPartialUpload(bucket, object)
|
||||
xl.addPartial(bucket, object)
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -927,6 +927,21 @@ func (xl xlObjects) DeleteObjects(ctx context.Context, bucket string, objects []
|
||||
}
|
||||
}
|
||||
|
||||
// Check failed deletes across multiple objects
|
||||
for i, object := range objects {
|
||||
if deleteErrs[i] == nil {
|
||||
// Check if there is any offline disk and add it to the MRF list
|
||||
for _, disk := range xl.getDisks() {
|
||||
if disk == nil {
|
||||
// all other direct versionId references we should
|
||||
// ensure no dangling file is left over.
|
||||
xl.addPartial(bucket, object)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return deleteErrs, nil
|
||||
}
|
||||
|
||||
@@ -973,6 +988,13 @@ func (xl xlObjects) DeleteObject(ctx context.Context, bucket, object string) (er
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
for _, disk := range storageDisks {
|
||||
if disk == nil {
|
||||
xl.addPartial(bucket, object)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Success.
|
||||
return nil
|
||||
}
|
||||
@@ -999,11 +1021,11 @@ func (xl xlObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuat
|
||||
return listObjectsV2Info, err
|
||||
}
|
||||
|
||||
// Send the successful but partial upload, however ignore
|
||||
// Send the successful but partial upload/delete, however ignore
|
||||
// if the channel is blocked by other items.
|
||||
func (xl xlObjects) addPartialUpload(bucket, key string) {
|
||||
func (xl xlObjects) addPartial(bucket, object string) {
|
||||
select {
|
||||
case xl.mrfUploadCh <- partialUpload{bucket: bucket, object: key}:
|
||||
case xl.mrfOpCh <- partialOperation{bucket: bucket, object: object}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,9 +41,9 @@ const (
|
||||
// OfflineDisk represents an unavailable disk.
|
||||
var OfflineDisk StorageAPI // zero value is nil
|
||||
|
||||
// partialUpload is a successful upload of an object
|
||||
// but not written in all disks (having quorum)
|
||||
type partialUpload struct {
|
||||
// partialOperation is a successful upload/delete
|
||||
// of an object but not written in all disks (having quorum)
|
||||
type partialOperation struct {
|
||||
bucket string
|
||||
object string
|
||||
failedSet int
|
||||
@@ -69,7 +69,7 @@ type xlObjects struct {
|
||||
// Byte pools used for temporary i/o buffers.
|
||||
bp *bpool.BytePoolCap
|
||||
|
||||
mrfUploadCh chan partialUpload
|
||||
mrfOpCh chan partialOperation
|
||||
}
|
||||
|
||||
// NewNSLock - initialize a new namespace RWLocker instance.
|
||||
|
||||
@@ -1676,18 +1676,20 @@ func (z *xlZones) PutObjectTags(ctx context.Context, bucket, object string, tags
|
||||
if z.SingleZone() {
|
||||
return z.zones[0].PutObjectTags(ctx, bucket, object, tags)
|
||||
}
|
||||
|
||||
for _, zone := range z.zones {
|
||||
err := zone.PutObjectTags(ctx, bucket, object, tags)
|
||||
if err != nil {
|
||||
if isErrBucketNotFound(err) {
|
||||
if isErrObjectNotFound(err) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return BucketNotFound{
|
||||
return ObjectNotFound{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1699,15 +1701,16 @@ func (z *xlZones) DeleteObjectTags(ctx context.Context, bucket, object string) e
|
||||
for _, zone := range z.zones {
|
||||
err := zone.DeleteObjectTags(ctx, bucket, object)
|
||||
if err != nil {
|
||||
if isErrBucketNotFound(err) {
|
||||
if isErrObjectNotFound(err) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return BucketNotFound{
|
||||
return ObjectNotFound{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1719,14 +1722,15 @@ func (z *xlZones) GetObjectTags(ctx context.Context, bucket, object string) (*ta
|
||||
for _, zone := range z.zones {
|
||||
tags, err := zone.GetObjectTags(ctx, bucket, object)
|
||||
if err != nil {
|
||||
if isErrBucketNotFound(err) {
|
||||
if isErrObjectNotFound(err) {
|
||||
continue
|
||||
}
|
||||
return tags, err
|
||||
}
|
||||
return tags, nil
|
||||
}
|
||||
return nil, BucketNotFound{
|
||||
return nil, ObjectNotFound{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,7 +77,6 @@ type HealTaskStatus struct {
|
||||
FailureDetail string `json:"detail"`
|
||||
StartTime time.Time `json:"startTime"`
|
||||
HealSettings HealOpts `json:"settings"`
|
||||
NumDisks int `json:"numDisks"`
|
||||
|
||||
Items []HealResultItem `json:"items,omitempty"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user