From b4b76717c10e147b08de439d164e4511616469b9 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 29 Jun 2020 13:07:26 -0700 Subject: [PATCH] fix: simplify background heal and trigger heal items early (#9928) Bonus fix during versioning merge one of the PR was missing the offline/online disk count fix from #9801 port it correctly over to the master branch from release. Additionally, add versionID support for MRF Fixes #9910 Fixes #9931 --- buildscripts/verify-healing.sh | 1 + cmd/admin-handlers-config-kv.go | 2 +- cmd/admin-handlers.go | 12 +++------ cmd/admin-heal-ops.go | 23 +++++++---------- cmd/background-heal-ops.go | 44 +++++++++++++++++++-------------- cmd/config.go | 2 +- cmd/fs-v1.go | 6 +---- cmd/global-heal.go | 3 +-- cmd/server-main.go | 18 +++++++------- cmd/test-utils_test.go | 4 +-- cmd/xl-sets.go | 30 +++++++++++----------- cmd/xl-v1-multipart.go | 6 ++--- cmd/xl-v1-object.go | 30 +++++++++++++++++++--- cmd/xl-v1.go | 8 +++--- cmd/xl-zones.go | 16 +++++++----- pkg/madmin/heal-commands.go | 1 - 16 files changed, 112 insertions(+), 94 deletions(-) diff --git a/buildscripts/verify-healing.sh b/buildscripts/verify-healing.sh index 5fce83635..ee4425509 100755 --- a/buildscripts/verify-healing.sh +++ b/buildscripts/verify-healing.sh @@ -33,6 +33,7 @@ function start_minio_3_node() { declare -a ARGS export MINIO_ACCESS_KEY=minio export MINIO_SECRET_KEY=minio123 + export MINIO_ERASURE_SET_DRIVE_COUNT=6 start_port=$(shuf -i 10000-65000 -n 1) for i in $(seq 1 3); do diff --git a/cmd/admin-handlers-config-kv.go b/cmd/admin-handlers-config-kv.go index b8616b573..89ffa942d 100644 --- a/cmd/admin-handlers-config-kv.go +++ b/cmd/admin-handlers-config-kv.go @@ -173,7 +173,7 @@ func (a adminAPIHandlers) GetConfigKVHandler(w http.ResponseWriter, r *http.Requ } cfg := globalServerConfig - if globalSafeMode { + if newObjectLayerFn() == nil { var err error cfg, err = getValidConfig(objectAPI) if err != nil { diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 1aeaa5391..3b321b542 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -708,10 +708,6 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) { } } - // find number of disks in the setup, ignore any errors here. - info, _ := objectAPI.StorageInfo(ctx, false) - numDisks := info.Backend.OfflineDisks.Sum() + info.Backend.OnlineDisks.Sum() - healPath := pathJoin(hip.bucket, hip.objPrefix) if hip.clientToken == "" && !hip.forceStart && !hip.forceStop { nh, exists := globalAllHealState.getHealSequence(healPath) @@ -754,7 +750,7 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) { respCh <- hr }() case hip.clientToken == "": - nh := newHealSequence(GlobalContext, hip.bucket, hip.objPrefix, handlers.GetSourceIP(r), numDisks, hip.hs, hip.forceStart) + nh := newHealSequence(GlobalContext, hip.bucket, hip.objPrefix, handlers.GetSourceIP(r), hip.hs, hip.forceStart) go func() { respBytes, apiErr, errMsg := globalAllHealState.LaunchNewHealSequence(nh) hr := healResp{respBytes, apiErr, errMsg} @@ -1399,10 +1395,8 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque } } - mode := "" - if globalSafeMode { - mode = "safe" - } else { + mode := "safe" + if newObjectLayerFn() != nil { mode = "online" } diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index c3fd34bef..19a8b65d9 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -77,9 +77,6 @@ type healSequenceStatus struct { FailureDetail string `json:"Detail,omitempty"` StartTime time.Time `json:"StartTime"` - // disk information - NumDisks int `json:"NumDisks"` - // settings for the heal sequence HealSettings madmin.HealOpts `json:"Settings"` @@ -95,8 +92,8 @@ type allHealState struct { healSeqMap map[string]*healSequence } -// initHealState - initialize healing apparatus -func initHealState() *allHealState { +// newHealState - initialize global heal state management +func newHealState() *allHealState { healState := &allHealState{ healSeqMap: make(map[string]*healSequence), } @@ -368,7 +365,7 @@ type healSequence struct { // NewHealSequence - creates healSettings, assumes bucket and // objPrefix are already validated. func newHealSequence(ctx context.Context, bucket, objPrefix, clientAddr string, - numDisks int, hs madmin.HealOpts, forceStart bool) *healSequence { + hs madmin.HealOpts, forceStart bool) *healSequence { reqInfo := &logger.ReqInfo{RemoteHost: clientAddr, API: "Heal", BucketName: bucket} reqInfo.AppendTags("prefix", objPrefix) @@ -388,7 +385,6 @@ func newHealSequence(ctx context.Context, bucket, objPrefix, clientAddr string, currentStatus: healSequenceStatus{ Summary: healNotStartedStatus, HealSettings: hs, - NumDisks: numDisks, }, traverseAndHealDoneCh: make(chan error), cancelCtx: cancel, @@ -677,11 +673,6 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem } func (h *healSequence) healItemsFromSourceCh() error { - bucketsOnly := true // heal buckets only, not objects. - if err := h.healItems(bucketsOnly); err != nil { - logger.LogIf(h.ctx, err) - } - for { select { case source, ok := <-h.sourceCh: @@ -716,7 +707,7 @@ func (h *healSequence) healFromSourceCh() { h.healItemsFromSourceCh() } -func (h *healSequence) healItems(bucketsOnly bool) error { +func (h *healSequence) healDiskMeta() error { // Start with format healing if err := h.healDiskFormat(); err != nil { return err @@ -728,7 +719,11 @@ func (h *healSequence) healItems(bucketsOnly bool) error { } // Start healing the bucket config prefix. - if err := h.healMinioSysMeta(bucketConfigPrefix)(); err != nil { + return h.healMinioSysMeta(bucketConfigPrefix)() +} + +func (h *healSequence) healItems(bucketsOnly bool) error { + if err := h.healDiskMeta(); err != nil { return err } diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index 20271977b..b7d6b142e 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -100,7 +100,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { } } -func initHealRoutine() *healRoutine { +func newHealRoutine() *healRoutine { return &healRoutine{ tasks: make(chan healTask), doneCh: make(chan struct{}), @@ -108,22 +108,22 @@ func initHealRoutine() *healRoutine { } -func startBackgroundHealing(ctx context.Context, objAPI ObjectLayer) { +func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) { // Run the background healer - globalBackgroundHealRoutine = initHealRoutine() + globalBackgroundHealRoutine = newHealRoutine() go globalBackgroundHealRoutine.run(ctx, objAPI) - // Launch the background healer sequence to track - // background healing operations, ignore errors - // errors are handled into offline disks already. - info, _ := objAPI.StorageInfo(ctx, false) - numDisks := info.Backend.OnlineDisks.Sum() + info.Backend.OfflineDisks.Sum() - nh := newBgHealSequence(numDisks) - globalBackgroundHealState.LaunchNewHealSequence(nh) -} + nh := newBgHealSequence() + // Heal any disk format and metadata early, if possible. + if err := nh.healDiskMeta(); err != nil { + if newObjectLayerFn() != nil { + // log only in situations, when object layer + // has fully initialized. + logger.LogIf(nh.ctx, err) + } + } -func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) { - go startBackgroundHealing(ctx, objAPI) + globalBackgroundHealState.LaunchNewHealSequence(nh) } // healDiskFormat - heals format.json, return value indicates if a @@ -140,12 +140,20 @@ func healDiskFormat(ctx context.Context, objAPI ObjectLayer, opts madmin.HealOpt // Healing succeeded notify the peers to reload format and re-initialize disks. // We will not notify peers if healing is not required. if err == nil { - for _, nerr := range globalNotificationSys.ReloadFormat(opts.DryRun) { - if nerr.Err != nil { - logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) - logger.LogIf(ctx, nerr.Err) + // Notify servers in background and retry if needed. + go func() { + retry: + for _, nerr := range globalNotificationSys.ReloadFormat(opts.DryRun) { + if nerr.Err != nil { + if nerr.Err.Error() == errServerNotInitialized.Error() { + time.Sleep(time.Second) + goto retry + } + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) + } } - } + }() } return res, nil diff --git a/cmd/config.go b/cmd/config.go index 199ece926..ef51107f6 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -149,7 +149,7 @@ func readServerConfig(ctx context.Context, objAPI ObjectLayer) (config.Config, e if err != nil { // Config not found for some reason, allow things to continue // by initializing a new fresh config in safe mode. - if err == errConfigNotFound && globalSafeMode { + if err == errConfigNotFound && newObjectLayerFn() == nil { return newServerConfig(), nil } return nil, err diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 07c385ec2..325184e2c 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -1479,9 +1479,5 @@ func (fs *FSObjects) IsReady(_ context.Context) bool { return false } - globalObjLayerMutex.RLock() - res := globalObjectAPI != nil && !globalSafeMode - globalObjLayerMutex.RUnlock() - - return res + return newObjectLayerFn() != nil } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 7309eb7db..8904ecacc 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -34,7 +34,7 @@ var leaderLockTimeout = newDynamicTimeout(time.Minute, time.Minute) // NewBgHealSequence creates a background healing sequence // operation which crawls all objects and heal them. -func newBgHealSequence(numDisks int) *healSequence { +func newBgHealSequence() *healSequence { reqInfo := &logger.ReqInfo{API: "BackgroundHeal"} ctx, cancelCtx := context.WithCancel(logger.SetReqInfo(GlobalContext, reqInfo)) @@ -54,7 +54,6 @@ func newBgHealSequence(numDisks int) *healSequence { currentStatus: healSequenceStatus{ Summary: healNotStartedStatus, HealSettings: hs, - NumDisks: numDisks, }, cancelCtx: cancelCtx, ctx: ctx, diff --git a/cmd/server-main.go b/cmd/server-main.go index f02de2528..b982f1bbd 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -204,6 +204,12 @@ func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) { } }(txnLk) + // Enable healing to heal drives if possible + if globalIsXL { + initBackgroundHealing(ctx, newObject) + initLocalDisksAutoHeal(ctx, newObject) + } + // **** WARNING **** // Migrating to encrypted backend should happen before initialization of any // sub-systems, make sure that we do not move the above codeblock elsewhere. @@ -419,9 +425,9 @@ func serverMain(ctx *cli.Context) { setMaxResources() if globalIsXL { - // Init global heal state - globalAllHealState = initHealState() - globalBackgroundHealState = initHealState() + // New global heal state + globalAllHealState = newHealState() + globalBackgroundHealState = newHealState() } // Configure server. @@ -501,12 +507,6 @@ func serverMain(ctx *cli.Context) { newAllSubsystems() - // Enable healing to heal drives if possible - if globalIsXL { - initBackgroundHealing(GlobalContext, newObject) - initLocalDisksAutoHeal(GlobalContext, newObject) - } - go startBackgroundOps(GlobalContext, newObject) logger.FatalIf(initSafeMode(GlobalContext, newObject), "Unable to initialize server switching into safe-mode") diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 55213ab2e..348ad3ef8 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -404,7 +404,7 @@ func resetGlobalIsXL() { func resetGlobalHealState() { // Init global heal state if globalAllHealState == nil { - globalAllHealState = initHealState() + globalAllHealState = newHealState() } else { globalAllHealState.Lock() for _, v := range globalAllHealState.healSeqMap { @@ -417,7 +417,7 @@ func resetGlobalHealState() { // Init background heal state if globalBackgroundHealState == nil { - globalBackgroundHealState = initHealState() + globalBackgroundHealState = newHealState() } else { globalBackgroundHealState.Lock() for _, v := range globalBackgroundHealState.healSeqMap { diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index f25563aea..0bef6fb84 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -90,8 +90,8 @@ type xlSets struct { pool *MergeWalkPool poolSplunk *MergeWalkPool - mrfMU sync.Mutex - mrfUploads map[string]int + mrfMU sync.Mutex + mrfOperations map[string]int } func isEndpointConnected(diskMap map[string]StorageAPI, endpoint string) bool { @@ -303,7 +303,7 @@ func newXLSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageA distributionAlgo: format.XL.DistributionAlgo, pool: NewMergeWalkPool(globalMergeLookupTimeout), poolSplunk: NewMergeWalkPool(globalMergeLookupTimeout), - mrfUploads: make(map[string]int), + mrfOperations: make(map[string]int), } mutex := newNSLock(globalIsDistXL) @@ -348,7 +348,7 @@ func newXLSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageA getEndpoints: s.GetEndpoints(i), nsMutex: mutex, bp: bp, - mrfUploadCh: make(chan partialUpload, 10000), + mrfOpCh: make(chan partialOperation, 10000), } go s.sets[i].cleanupStaleMultipartUploads(ctx, @@ -1765,9 +1765,9 @@ func (s *xlSets) GetMetrics(ctx context.Context) (*Metrics, error) { // from all underlying xl sets and puts them in a global map which // should not have more than 10000 entries. func (s *xlSets) maintainMRFList() { - var agg = make(chan partialUpload, 10000) + var agg = make(chan partialOperation, 10000) for i, xl := range s.sets { - go func(c <-chan partialUpload, setIndex int) { + go func(c <-chan partialOperation, setIndex int) { for msg := range c { msg.failedSet = setIndex select { @@ -1775,16 +1775,16 @@ func (s *xlSets) maintainMRFList() { default: } } - }(xl.mrfUploadCh, i) + }(xl.mrfOpCh, i) } - for fUpload := range agg { + for fOp := range agg { s.mrfMU.Lock() - if len(s.mrfUploads) > 10000 { + if len(s.mrfOperations) > 10000 { s.mrfMU.Unlock() continue } - s.mrfUploads[pathJoin(fUpload.bucket, fUpload.object)] = fUpload.failedSet + s.mrfOperations[pathJoin(fOp.bucket, fOp.object)] = fOp.failedSet s.mrfMU.Unlock() } } @@ -1810,17 +1810,17 @@ func (s *xlSets) healMRFRoutine() { for e := range s.disksConnectEvent { // Get the list of objects related the xl set // to which the connected disk belongs. - var mrfUploads []string + var mrfOperations []string s.mrfMU.Lock() - for k, v := range s.mrfUploads { + for k, v := range s.mrfOperations { if v == e.setIndex { - mrfUploads = append(mrfUploads, k) + mrfOperations = append(mrfOperations, k) } } s.mrfMU.Unlock() // Heal objects - for _, u := range mrfUploads { + for _, u := range mrfOperations { // Send an object to be healed with a timeout select { case bgSeq.sourceCh <- healSource{path: u}: @@ -1828,7 +1828,7 @@ func (s *xlSets) healMRFRoutine() { } s.mrfMU.Lock() - delete(s.mrfUploads, u) + delete(s.mrfOperations, u) s.mrfMU.Unlock() } } diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 81e953852..821220be2 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -731,9 +731,9 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string, } // Check if there is any offline disk and add it to the MRF list - for i := 0; i < len(onlineDisks); i++ { - if onlineDisks[i] == nil || storageDisks[i] == nil { - xl.addPartialUpload(bucket, object) + for i, disk := range onlineDisks { + if disk == nil || storageDisks[i] == nil { + xl.addPartial(bucket, object) break } } diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index a658a95cc..6cc0b7101 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -667,7 +667,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, // during this upload, send it to the MRF list. for i := 0; i < len(onlineDisks); i++ { if onlineDisks[i] == nil || storageDisks[i] == nil { - xl.addPartialUpload(bucket, object) + xl.addPartial(bucket, object) break } } @@ -927,6 +927,21 @@ func (xl xlObjects) DeleteObjects(ctx context.Context, bucket string, objects [] } } + // Check failed deletes across multiple objects + for i, object := range objects { + if deleteErrs[i] == nil { + // Check if there is any offline disk and add it to the MRF list + for _, disk := range xl.getDisks() { + if disk == nil { + // all other direct versionId references we should + // ensure no dangling file is left over. + xl.addPartial(bucket, object) + break + } + } + } + } + return deleteErrs, nil } @@ -973,6 +988,13 @@ func (xl xlObjects) DeleteObject(ctx context.Context, bucket, object string) (er return toObjectErr(err, bucket, object) } + for _, disk := range storageDisks { + if disk == nil { + xl.addPartial(bucket, object) + break + } + } + // Success. return nil } @@ -999,11 +1021,11 @@ func (xl xlObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuat return listObjectsV2Info, err } -// Send the successful but partial upload, however ignore +// Send the successful but partial upload/delete, however ignore // if the channel is blocked by other items. -func (xl xlObjects) addPartialUpload(bucket, key string) { +func (xl xlObjects) addPartial(bucket, object string) { select { - case xl.mrfUploadCh <- partialUpload{bucket: bucket, object: key}: + case xl.mrfOpCh <- partialOperation{bucket: bucket, object: object}: default: } } diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 4c4614fe5..29c70088f 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -41,9 +41,9 @@ const ( // OfflineDisk represents an unavailable disk. var OfflineDisk StorageAPI // zero value is nil -// partialUpload is a successful upload of an object -// but not written in all disks (having quorum) -type partialUpload struct { +// partialOperation is a successful upload/delete +// of an object but not written in all disks (having quorum) +type partialOperation struct { bucket string object string failedSet int @@ -69,7 +69,7 @@ type xlObjects struct { // Byte pools used for temporary i/o buffers. bp *bpool.BytePoolCap - mrfUploadCh chan partialUpload + mrfOpCh chan partialOperation } // NewNSLock - initialize a new namespace RWLocker instance. diff --git a/cmd/xl-zones.go b/cmd/xl-zones.go index 119a082a9..4c45d46ab 100644 --- a/cmd/xl-zones.go +++ b/cmd/xl-zones.go @@ -1676,18 +1676,20 @@ func (z *xlZones) PutObjectTags(ctx context.Context, bucket, object string, tags if z.SingleZone() { return z.zones[0].PutObjectTags(ctx, bucket, object, tags) } + for _, zone := range z.zones { err := zone.PutObjectTags(ctx, bucket, object, tags) if err != nil { - if isErrBucketNotFound(err) { + if isErrObjectNotFound(err) { continue } return err } return nil } - return BucketNotFound{ + return ObjectNotFound{ Bucket: bucket, + Object: object, } } @@ -1699,15 +1701,16 @@ func (z *xlZones) DeleteObjectTags(ctx context.Context, bucket, object string) e for _, zone := range z.zones { err := zone.DeleteObjectTags(ctx, bucket, object) if err != nil { - if isErrBucketNotFound(err) { + if isErrObjectNotFound(err) { continue } return err } return nil } - return BucketNotFound{ + return ObjectNotFound{ Bucket: bucket, + Object: object, } } @@ -1719,14 +1722,15 @@ func (z *xlZones) GetObjectTags(ctx context.Context, bucket, object string) (*ta for _, zone := range z.zones { tags, err := zone.GetObjectTags(ctx, bucket, object) if err != nil { - if isErrBucketNotFound(err) { + if isErrObjectNotFound(err) { continue } return tags, err } return tags, nil } - return nil, BucketNotFound{ + return nil, ObjectNotFound{ Bucket: bucket, + Object: object, } } diff --git a/pkg/madmin/heal-commands.go b/pkg/madmin/heal-commands.go index c417ccd71..579f1fe90 100644 --- a/pkg/madmin/heal-commands.go +++ b/pkg/madmin/heal-commands.go @@ -77,7 +77,6 @@ type HealTaskStatus struct { FailureDetail string `json:"detail"` StartTime time.Time `json:"startTime"` HealSettings HealOpts `json:"settings"` - NumDisks int `json:"numDisks"` Items []HealResultItem `json:"items,omitempty"` }