From 5f971fea6efa79e6db642182920b559c5a229cd4 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Fri, 1 Dec 2023 00:18:04 -0800 Subject: [PATCH] Fix Mux Connect Error (#18567) `OpMuxConnectError` was not handled correctly. Remove local checks for single request handlers so they can run before being registered locally. Bonus: Only log IAM bootstrap on startup. --- cmd/erasure-healing.go | 2 +- cmd/erasure-server-pool.go | 2 +- cmd/erasure.go | 31 ++++++++++++++++++------ cmd/global-heal.go | 2 +- cmd/iam-store.go | 7 +++++- cmd/iam.go | 2 +- cmd/metacache-set.go | 4 ++-- cmd/storage-rest-server.go | 13 +++++++++- internal/grid/connection.go | 24 ++++++++++++------- internal/grid/grid_test.go | 48 +++++++++++++++++++++++++++++++++++++ internal/grid/handlers.go | 2 +- 11 files changed, 113 insertions(+), 24 deletions(-) diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 5bad2779c..4985a1437 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -72,7 +72,7 @@ func (er erasureObjects) listAndHeal(bucket, prefix string, healEntry func(strin ctx, cancel := context.WithCancel(context.Background()) defer cancel() - disks, _ := er.getOnlineDisksWithHealing() + disks, _ := er.getOnlineDisksWithHealing(false) if len(disks) == 0 { return errors.New("listAndHeal: No non-healing drives found") } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index e98dec6d2..b3762739e 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1967,7 +1967,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re go func() { defer wg.Done() - disks, _ := set.getOnlineDisksWithHealing() + disks, _ := set.getOnlineDisksWithHealing(true) if len(disks) == 0 { cancel() return diff --git a/cmd/erasure.go b/cmd/erasure.go index 7806f19d4..b60bc44eb 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -274,7 +274,12 @@ func (er erasureObjects) LocalStorageInfo(ctx context.Context) StorageInfo { return getStorageInfo(localDisks, localEndpoints) } -func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, healing bool) { +// getOnlineDisksWithHealing - returns online disks and overall healing status. +// Disks are randomly ordered, but in the following groups: +// - Non-scanning disks +// - Non-healing disks +// - Healing disks (if inclHealing is true) +func (er erasureObjects) getOnlineDisksWithHealing(inclHealing bool) (newDisks []StorageAPI, healing bool) { var wg sync.WaitGroup disks := er.getDisks() infos := make([]DiskInfo, len(disks)) @@ -292,7 +297,7 @@ func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, hea } di, err := disk.DiskInfo(context.Background(), false) - if err != nil || di.Healing { + if err != nil { // - Do not consume disks which are not reachable // unformatted or simply not accessible for some reason. // @@ -303,21 +308,31 @@ func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, hea } return } + if !inclHealing && di.Healing { + return + } infos[i] = di }() } wg.Wait() - var scanningDisks []StorageAPI + var scanningDisks, healingDisks []StorageAPI for i, info := range infos { // Check if one of the drives in the set is being healed. // this information is used by scanner to skip healing // this erasure set while it calculates the usage. - if info.Healing || info.Error != "" { - healing = true + if info.Error != "" || disks[i] == nil { continue } + if info.Healing { + healing = true + if inclHealing { + healingDisks = append(healingDisks, disks[i]) + } + continue + } + if !info.Scanning { newDisks = append(newDisks, disks[i]) } else { @@ -325,8 +340,10 @@ func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, hea } } - // Prefer new disks over disks which are currently being scanned. + // Prefer non-scanning disks over disks which are currently being scanned. newDisks = append(newDisks, scanningDisks...) + /// Then add healing disks. + newDisks = append(newDisks, healingDisks...) return newDisks, healing } @@ -364,7 +381,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa } // Collect disks we can use. - disks, healing := er.getOnlineDisksWithHealing() + disks, healing := er.getOnlineDisksWithHealing(false) if len(disks) == 0 { logger.LogIf(ctx, errors.New("data-scanner: all drives are offline or being healed, skipping scanner cycle")) return nil diff --git a/cmd/global-heal.go b/cmd/global-heal.go index b7c1848f7..52a8ecc0d 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -209,7 +209,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, bucket, humanize.Ordinal(er.setIndex+1)) } - disks, _ := er.getOnlineDisksWithHealing() + disks, _ := er.getOnlineDisksWithHealing(false) if len(disks) == 0 { logger.LogIf(ctx, fmt.Errorf("no online disks found to heal the bucket `%s`", bucket)) continue diff --git a/cmd/iam-store.go b/cmd/iam-store.go index 734b0780f..fe0e16bbd 100644 --- a/cmd/iam-store.go +++ b/cmd/iam-store.go @@ -488,7 +488,12 @@ func (store *IAMStoreSys) PurgeExpiredSTS(ctx context.Context) error { // LoadIAMCache reads all IAM items and populates a new iamCache object and // replaces the in-memory cache object. -func (store *IAMStoreSys) LoadIAMCache(ctx context.Context) error { +func (store *IAMStoreSys) LoadIAMCache(ctx context.Context, firstTime bool) error { + bootstrapTraceMsg := func(s string) { + if firstTime { + bootstrapTraceMsg(s) + } + } bootstrapTraceMsg("loading IAM data") newCache := newIamCache() diff --git a/cmd/iam.go b/cmd/iam.go index c1951733d..5030f686d 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -189,7 +189,7 @@ func (sys *IAMSys) Initialized() bool { // Load - loads all credentials, policies and policy mappings. func (sys *IAMSys) Load(ctx context.Context, firstTime bool) error { loadStartTime := time.Now() - err := sys.store.LoadIAMCache(ctx) + err := sys.store.LoadIAMCache(ctx, firstTime) if err != nil { atomic.AddUint64(&sys.TotalRefreshFailures, 1) return err diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index a4ba0a096..6340cc82c 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -606,8 +606,8 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, resul defer close(results) o.debugf(color.Green("listPath:")+" with options: %#v", o) - // get non-healing disks for listing - disks, _ := er.getOnlineDisksWithHealing() + // get prioritized non-healing disks for listing + disks, _ := er.getOnlineDisksWithHealing(true) askDisks := getListQuorum(o.AskDisks, er.setDriveCount) var fallbackDisks []StorageAPI diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index e506fc30c..e85a1d481 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -1350,12 +1350,12 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin return collectInternodeStats(httpTraceHdrs(f)) } + registered := 0 for _, setDisks := range storageDisks { for _, storage := range setDisks { if storage == nil { continue } - endpoint := storage.Endpoint() server := &storageRESTServer{storage: newXLStorageDiskIDCheck(storage, true)} @@ -1402,6 +1402,17 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin Handle: server.WalkDirHandler, OutCapacity: 1, }), "unable to register handler") + registered++ } } + if registered == 0 { + // Register a dummy handler so remote calls can go out. + logger.FatalIf(gm.RegisterStreamingHandler(grid.HandlerWalkDir, grid.StreamHandler{ + Subroute: fmt.Sprintf("__dummy__%d", time.Now().UnixNano()), + Handle: func(ctx context.Context, payload []byte, in <-chan []byte, out chan<- []byte) *grid.RemoteErr { + return grid.NewRemoteErr(errDiskNotFound) + }, + OutCapacity: 1, + }), "unable to register handler") + } } diff --git a/internal/grid/connection.go b/internal/grid/connection.go index 6c2f7ddc6..30d35edf2 100644 --- a/internal/grid/connection.go +++ b/internal/grid/connection.go @@ -321,10 +321,7 @@ func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]by if c.State() != StateConnected { return nil, ErrDisconnected } - handler := c.handlers.single[h] - if handler == nil { - return nil, ErrUnknownHandler - } + // Create mux client and call. client, err := c.newMuxClient(ctx) if err != nil { return nil, err @@ -349,10 +346,7 @@ func (c *Subroute) Request(ctx context.Context, h HandlerID, req []byte) ([]byte if c.State() != StateConnected { return nil, ErrDisconnected } - handler := c.handlers.subSingle[makeZeroSubHandlerID(h)] - if handler == nil { - return nil, ErrUnknownHandler - } + // Create mux client and call. client, err := c.newMuxClient(ctx) if err != nil { return nil, err @@ -1159,6 +1153,8 @@ func (c *Connection) handleMsg(ctx context.Context, m message, subID *subHandler c.handleAckMux(ctx, m) case OpConnectMux: c.handleConnectMux(ctx, m, subID) + case OpMuxConnectError: + c.handleConnectMuxError(ctx, m) default: logger.LogIf(ctx, fmt.Errorf("unknown message type: %v", m.Op)) } @@ -1210,6 +1206,18 @@ func (c *Connection) handleConnectMux(ctx context.Context, m message, subID *sub } } +// handleConnectMuxError when mux connect was rejected. +func (c *Connection) handleConnectMuxError(ctx context.Context, m message) { + if v, ok := c.outgoing.Load(m.MuxID); ok { + var cErr muxConnectError + _, err := cErr.UnmarshalMsg(m.Payload) + logger.LogIf(ctx, err) + v.error(RemoteErr(cErr.Error)) + return + } + PutByteBuffer(m.Payload) +} + func (c *Connection) handleAckMux(ctx context.Context, m message) { PutByteBuffer(m.Payload) v, ok := c.outgoing.Load(m.MuxID) diff --git a/internal/grid/grid_test.go b/internal/grid/grid_test.go index b8262fecb..ea269ef64 100644 --- a/internal/grid/grid_test.go +++ b/internal/grid/grid_test.go @@ -129,6 +129,54 @@ func TestSingleRoundtrip(t *testing.T) { }) } +func TestSingleRoundtripNotReady(t *testing.T) { + defer testlogger.T.SetLogTB(t)() + errFatal := func(t testing.TB, err error) { + t.Helper() + if err != nil { + t.Fatal(err) + } + } + grid, err := SetupTestGrid(2) + errFatal(t, err) + remoteHost := grid.Hosts[1] + local := grid.Managers[0] + + // 1: Echo + errFatal(t, local.RegisterSingleHandler(handlerTest, func(payload []byte) ([]byte, *RemoteErr) { + t.Log("1: server payload: ", len(payload), "bytes.") + return append([]byte{}, payload...), nil + })) + // 2: Return as error + errFatal(t, local.RegisterSingleHandler(handlerTest2, func(payload []byte) ([]byte, *RemoteErr) { + t.Log("2: server payload: ", len(payload), "bytes.") + err := RemoteErr(payload) + return nil, &err + })) + + // Do not register remote handlers + + // local to remote + remoteConn := local.Connection(remoteHost) + remoteConn.WaitForConnect(context.Background()) + defer testlogger.T.SetErrorTB(t)() + + t.Run("localToRemote", func(t *testing.T) { + const testPayload = "Hello Grid World!" + // Single requests should have remote errors. + _, err := remoteConn.Request(context.Background(), handlerTest, []byte(testPayload)) + if v, ok := err.(*RemoteErr); !ok || v.Error() != "Invalid Handler for type" { + t.Fatalf("Unexpected error: %v, %T", err, err) + } + // Streams should not be able to set up until registered. + // Thus, the error is a local error. + _, err = remoteConn.NewStream(context.Background(), handlerTest, []byte(testPayload)) + if !errors.Is(err, ErrUnknownHandler) { + t.Fatalf("Unexpected error: %v, %T", err, err) + } + }) +} + func TestSingleRoundtripGenerics(t *testing.T) { defer testlogger.T.SetLogTB(t)() errFatal := func(err error) { diff --git a/internal/grid/handlers.go b/internal/grid/handlers.go index 83b57c544..4d40ccda0 100644 --- a/internal/grid/handlers.go +++ b/internal/grid/handlers.go @@ -89,7 +89,7 @@ var handlerPrefixes = [handlerLast]string{ } const ( - lockPrefix = "lock" + lockPrefix = "lockR" storagePrefix = "storageR" )