From 3c2efd9cf3b3017f2292c56858e941e8416cec8f Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Wed, 8 Sep 2021 11:06:45 -0700 Subject: [PATCH] Stop async listing earlier (#13160) Stop async listing if we have not heard back from the client for 3 minutes. This will stop spending resources on async listings when they are unlikely to get used. If the client returns a new listing will be started on the second request. Stop saving cache metadata to disk. It is cleared on restarts anyway. Removes all load/save functionality --- cmd/metacache-bucket.go | 112 +---------------- cmd/metacache-bucket_gen.go | 209 ------------------------------- cmd/metacache-bucket_gen_test.go | 123 ------------------ cmd/metacache-manager.go | 18 +-- cmd/metacache-server-pool.go | 20 ++- cmd/metacache-set.go | 17 ++- cmd/metacache.go | 14 ++- cmd/metacache_test.go | 2 +- 8 files changed, 48 insertions(+), 467 deletions(-) delete mode 100644 cmd/metacache-bucket_gen.go delete mode 100644 cmd/metacache-bucket_gen_test.go diff --git a/cmd/metacache-bucket.go b/cmd/metacache-bucket.go index e8485e378..fdad507cb 100644 --- a/cmd/metacache-bucket.go +++ b/cmd/metacache-bucket.go @@ -18,25 +18,17 @@ package cmd import ( - "bytes" "context" "errors" - "fmt" - "net/http" "runtime/debug" "sort" "sync" "time" - "github.com/klauspost/compress/s2" - "github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/console" - "github.com/tinylib/msgp/msgp" ) -//go:generate msgp -file $GOFILE -unexported - // a bucketMetacache keeps track of all caches generated // for a bucket. type bucketMetacache struct { @@ -78,108 +70,6 @@ func (b *bucketMetacache) debugf(format string, data ...interface{}) { } } -// loadBucketMetaCache will load the cache from the object layer. -// If the cache cannot be found a new one is created. -func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache, error) { - objAPI := newObjectLayerFn() - for objAPI == nil { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - time.Sleep(250 * time.Millisecond) - } - objAPI = newObjectLayerFn() - if objAPI == nil { - logger.LogIf(ctx, fmt.Errorf("loadBucketMetaCache: object layer not ready. bucket: %q", bucket)) - } - } - - var meta bucketMetacache - var decErr error - // Use global context for this. - r, err := objAPI.GetObjectNInfo(GlobalContext, minioMetaBucket, pathJoin("buckets", bucket, ".metacache", "index.s2"), nil, http.Header{}, readLock, ObjectOptions{}) - if err == nil { - dec := s2DecPool.Get().(*s2.Reader) - dec.Reset(r) - decErr = meta.DecodeMsg(msgp.NewReader(dec)) - dec.Reset(nil) - r.Close() - s2DecPool.Put(dec) - } - if err != nil { - switch err.(type) { - case ObjectNotFound: - err = nil - case InsufficientReadQuorum: - // Cache is likely lost. Clean up and return new. - return newBucketMetacache(bucket, true), nil - default: - logger.LogIf(ctx, err) - } - return newBucketMetacache(bucket, false), err - } - if decErr != nil { - if errors.Is(err, context.Canceled) { - return newBucketMetacache(bucket, false), err - } - // Log the error, but assume the data is lost and return a fresh bucket. - // Otherwise a broken cache will never recover. - logger.LogIf(ctx, decErr) - return newBucketMetacache(bucket, true), nil - } - // Sanity check... - if meta.bucket != bucket { - logger.Info("loadBucketMetaCache: loaded cache name mismatch, want %s, got %s. Discarding.", bucket, meta.bucket) - return newBucketMetacache(bucket, true), nil - } - meta.cachesRoot = make(map[string][]string, len(meta.caches)/10) - // Index roots - for id, cache := range meta.caches { - meta.cachesRoot[cache.root] = append(meta.cachesRoot[cache.root], id) - } - return &meta, nil -} - -// save the bucket cache to the object storage. -func (b *bucketMetacache) save(ctx context.Context) error { - objAPI := newObjectLayerFn() - if objAPI == nil { - return errServerNotInitialized - } - - // Keep lock while we marshal. - // We need a write lock since we update 'updated' - b.mu.Lock() - if !b.updated { - b.mu.Unlock() - return nil - } - // Save as s2 compressed msgpack - tmp := bytes.NewBuffer(make([]byte, 0, b.Msgsize())) - enc := s2.NewWriter(tmp) - err := msgp.Encode(enc, b) - if err != nil { - b.mu.Unlock() - return err - } - err = enc.Close() - if err != nil { - b.mu.Unlock() - return err - } - b.updated = false - b.mu.Unlock() - - hr, err := hash.NewReader(tmp, int64(tmp.Len()), "", "", int64(tmp.Len())) - if err != nil { - return err - } - _, err = objAPI.PutObject(ctx, minioMetaBucket, pathJoin("buckets", b.bucket, ".metacache", "index.s2"), NewPutObjReader(hr), ObjectOptions{}) - logger.LogIf(ctx, err) - return err -} - // findCache will attempt to find a matching cache for the provided options. // If a cache with the same ID exists already it will be returned. // If none can be found a new is created with the provided ID. @@ -267,7 +157,7 @@ func (b *bucketMetacache) cleanup() { }) // Keep first metacacheMaxEntries... for _, cache := range remainCaches[metacacheMaxEntries:] { - if time.Since(cache.lastHandout) > 30*time.Minute { + if time.Since(cache.lastHandout) > metacacheMaxClientWait { remove[cache.id] = struct{}{} } } diff --git a/cmd/metacache-bucket_gen.go b/cmd/metacache-bucket_gen.go deleted file mode 100644 index 580ce51ba..000000000 --- a/cmd/metacache-bucket_gen.go +++ /dev/null @@ -1,209 +0,0 @@ -package cmd - -// Code generated by github.com/tinylib/msgp DO NOT EDIT. - -import ( - "github.com/tinylib/msgp/msgp" -) - -// DecodeMsg implements msgp.Decodable -func (z *bucketMetacache) DecodeMsg(dc *msgp.Reader) (err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "bucket": - z.bucket, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "bucket") - return - } - case "caches": - var zb0002 uint32 - zb0002, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err, "caches") - return - } - if z.caches == nil { - z.caches = make(map[string]metacache, zb0002) - } else if len(z.caches) > 0 { - for key := range z.caches { - delete(z.caches, key) - } - } - for zb0002 > 0 { - zb0002-- - var za0001 string - var za0002 metacache - za0001, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "caches") - return - } - err = za0002.DecodeMsg(dc) - if err != nil { - err = msgp.WrapError(err, "caches", za0001) - return - } - z.caches[za0001] = za0002 - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - return -} - -// EncodeMsg implements msgp.Encodable -func (z *bucketMetacache) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 2 - // write "bucket" - err = en.Append(0x82, 0xa6, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74) - if err != nil { - return - } - err = en.WriteString(z.bucket) - if err != nil { - err = msgp.WrapError(err, "bucket") - return - } - // write "caches" - err = en.Append(0xa6, 0x63, 0x61, 0x63, 0x68, 0x65, 0x73) - if err != nil { - return - } - err = en.WriteMapHeader(uint32(len(z.caches))) - if err != nil { - err = msgp.WrapError(err, "caches") - return - } - for za0001, za0002 := range z.caches { - err = en.WriteString(za0001) - if err != nil { - err = msgp.WrapError(err, "caches") - return - } - err = za0002.EncodeMsg(en) - if err != nil { - err = msgp.WrapError(err, "caches", za0001) - return - } - } - return -} - -// MarshalMsg implements msgp.Marshaler -func (z *bucketMetacache) MarshalMsg(b []byte) (o []byte, err error) { - o = msgp.Require(b, z.Msgsize()) - // map header, size 2 - // string "bucket" - o = append(o, 0x82, 0xa6, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74) - o = msgp.AppendString(o, z.bucket) - // string "caches" - o = append(o, 0xa6, 0x63, 0x61, 0x63, 0x68, 0x65, 0x73) - o = msgp.AppendMapHeader(o, uint32(len(z.caches))) - for za0001, za0002 := range z.caches { - o = msgp.AppendString(o, za0001) - o, err = za0002.MarshalMsg(o) - if err != nil { - err = msgp.WrapError(err, "caches", za0001) - return - } - } - return -} - -// UnmarshalMsg implements msgp.Unmarshaler -func (z *bucketMetacache) UnmarshalMsg(bts []byte) (o []byte, err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "bucket": - z.bucket, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "bucket") - return - } - case "caches": - var zb0002 uint32 - zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err, "caches") - return - } - if z.caches == nil { - z.caches = make(map[string]metacache, zb0002) - } else if len(z.caches) > 0 { - for key := range z.caches { - delete(z.caches, key) - } - } - for zb0002 > 0 { - var za0001 string - var za0002 metacache - zb0002-- - za0001, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "caches") - return - } - bts, err = za0002.UnmarshalMsg(bts) - if err != nil { - err = msgp.WrapError(err, "caches", za0001) - return - } - z.caches[za0001] = za0002 - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - o = bts - return -} - -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z *bucketMetacache) Msgsize() (s int) { - s = 1 + 7 + msgp.StringPrefixSize + len(z.bucket) + 7 + msgp.MapHeaderSize - if z.caches != nil { - for za0001, za0002 := range z.caches { - _ = za0002 - s += msgp.StringPrefixSize + len(za0001) + za0002.Msgsize() - } - } - return -} diff --git a/cmd/metacache-bucket_gen_test.go b/cmd/metacache-bucket_gen_test.go deleted file mode 100644 index 38a5ca2d8..000000000 --- a/cmd/metacache-bucket_gen_test.go +++ /dev/null @@ -1,123 +0,0 @@ -package cmd - -// Code generated by github.com/tinylib/msgp DO NOT EDIT. - -import ( - "bytes" - "testing" - - "github.com/tinylib/msgp/msgp" -) - -func TestMarshalUnmarshalbucketMetacache(t *testing.T) { - v := bucketMetacache{} - bts, err := v.MarshalMsg(nil) - if err != nil { - t.Fatal(err) - } - left, err := v.UnmarshalMsg(bts) - if err != nil { - t.Fatal(err) - } - if len(left) > 0 { - t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) - } - - left, err = msgp.Skip(bts) - if err != nil { - t.Fatal(err) - } - if len(left) > 0 { - t.Errorf("%d bytes left over after Skip(): %q", len(left), left) - } -} - -func BenchmarkMarshalMsgbucketMetacache(b *testing.B) { - v := bucketMetacache{} - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - v.MarshalMsg(nil) - } -} - -func BenchmarkAppendMsgbucketMetacache(b *testing.B) { - v := bucketMetacache{} - bts := make([]byte, 0, v.Msgsize()) - bts, _ = v.MarshalMsg(bts[0:0]) - b.SetBytes(int64(len(bts))) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - bts, _ = v.MarshalMsg(bts[0:0]) - } -} - -func BenchmarkUnmarshalbucketMetacache(b *testing.B) { - v := bucketMetacache{} - bts, _ := v.MarshalMsg(nil) - b.ReportAllocs() - b.SetBytes(int64(len(bts))) - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := v.UnmarshalMsg(bts) - if err != nil { - b.Fatal(err) - } - } -} - -func TestEncodeDecodebucketMetacache(t *testing.T) { - v := bucketMetacache{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - - m := v.Msgsize() - if buf.Len() > m { - t.Log("WARNING: TestEncodeDecodebucketMetacache Msgsize() is inaccurate") - } - - vn := bucketMetacache{} - err := msgp.Decode(&buf, &vn) - if err != nil { - t.Error(err) - } - - buf.Reset() - msgp.Encode(&buf, &v) - err = msgp.NewReader(&buf).Skip() - if err != nil { - t.Error(err) - } -} - -func BenchmarkEncodebucketMetacache(b *testing.B) { - v := bucketMetacache{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - b.SetBytes(int64(buf.Len())) - en := msgp.NewWriter(msgp.Nowhere) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - v.EncodeMsg(en) - } - en.Flush() -} - -func BenchmarkDecodebucketMetacache(b *testing.B) { - v := bucketMetacache{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - b.SetBytes(int64(buf.Len())) - rd := msgp.NewEndlessReader(buf.Bytes(), b) - dc := msgp.NewReader(rd) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - err := v.DecodeMsg(dc) - if err != nil { - b.Fatal(err) - } - } -} diff --git a/cmd/metacache-manager.go b/cmd/metacache-manager.go index 63a41618d..34282fad5 100644 --- a/cmd/metacache-manager.go +++ b/cmd/metacache-manager.go @@ -64,7 +64,6 @@ func (m *metacacheManager) initManager() { defer t.Stop() var exit bool - bg := context.Background() for !exit { select { case <-t.C: @@ -76,7 +75,6 @@ func (m *metacacheManager) initManager() { if !exit { v.cleanup() } - logger.LogIf(bg, v.save(bg)) } m.mu.RUnlock() m.mu.Lock() @@ -116,8 +114,8 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket // Return a transient bucket for invalid or system buckets. m.mu.RLock() b, ok := m.buckets[bucket] - m.mu.RUnlock() if ok { + m.mu.RUnlock() if b.bucket != bucket { logger.Info("getBucket: cached bucket %s does not match this bucket %s", b.bucket, bucket) debug.PrintStack() @@ -125,11 +123,12 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket return b } + m.mu.RUnlock() m.mu.Lock() + defer m.mu.Unlock() // See if someone else fetched it while we waited for the lock. b, ok = m.buckets[bucket] if ok { - m.mu.Unlock() if b.bucket != bucket { logger.Info("getBucket: newly cached bucket %s does not match this bucket %s", b.bucket, bucket) debug.PrintStack() @@ -137,16 +136,9 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket return b } - // Load bucket. If we fail return the transient bucket. - b, err := loadBucketMetaCache(ctx, bucket) - if err != nil { - logger.LogIf(ctx, err) - } - if b.bucket != bucket { - logger.LogIf(ctx, fmt.Errorf("getBucket: loaded bucket %s does not match this bucket %s", b.bucket, bucket)) - } + // New bucket. If we fail return the transient bucket. + b = newBucketMetacache(bucket, true) m.buckets[bucket] = b - m.mu.Unlock() return b } diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index f034fa268..7ef1cca16 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -127,9 +127,7 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) ( return entries, io.EOF } if !errors.Is(err, context.DeadlineExceeded) { - // TODO: Remove, not really informational. - logger.LogIf(ctx, err) - o.debugln("listPath: deadline exceeded") + o.debugln("listPath: got error", err) } o.Transient = true o.Create = false @@ -146,6 +144,22 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) ( } else { // Continue listing o.ID = c.id + go func(meta metacache) { + // Continuously update while we wait. + t := time.NewTicker(metacacheMaxClientWait / 10) + defer t.Stop() + select { + case <-ctx.Done(): + // Request is done, stop updating. + return + case <-t.C: + meta.lastHandout = time.Now() + if rpc == nil { + meta, _ = localMetacacheMgr.updateCacheEntry(meta) + } + meta, _ = rpc.UpdateMetacacheListing(ctx, meta) + } + }(*c) } } } diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index e2dac4614..051c1b653 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -643,11 +643,20 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache metaMu.Lock() meta := *mc.meta meta, err = o.updateMetacacheListing(meta, rpc) - *mc.meta = meta - if meta.status == scanStateError { - logger.LogIf(ctx, err) + if err == nil && time.Since(meta.lastHandout) > metacacheMaxClientWait { cancel() exit = true + meta.status = scanStateError + meta.error = fmt.Sprintf("listing canceled since time since last handout was %v ago", time.Since(meta.lastHandout).Round(time.Second)) + o.debugln(color.Green("saveMetaCacheStream: ") + meta.error) + meta, err = o.updateMetacacheListing(meta, rpc) + } + if err == nil { + *mc.meta = meta + if meta.status == scanStateError { + cancel() + exit = true + } } metaMu.Unlock() } @@ -664,7 +673,7 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache if len(b.data) == 0 && b.n == 0 && o.Transient { return nil } - o.debugln(color.Green("listPath:")+" saving block", b.n, "to", o.objectPath(b.n)) + o.debugln(color.Green("saveMetaCacheStream:")+" saving block", b.n, "to", o.objectPath(b.n)) r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data))) logger.LogIf(ctx, err) custom := b.headerKV() diff --git a/cmd/metacache.go b/cmd/metacache.go index 384f40073..73336662e 100644 --- a/cmd/metacache.go +++ b/cmd/metacache.go @@ -39,6 +39,9 @@ const ( // Time in which the initiator of a scan must have reported back. metacacheMaxRunningAge = time.Minute + // Max time between client calls before dropping an async cache listing. + metacacheMaxClientWait = 3 * time.Minute + // metacacheBlockSize is the number of file/directory entries to have in each block. metacacheBlockSize = 5000 @@ -82,8 +85,9 @@ func (m *metacache) worthKeeping() bool { case !cache.finished() && time.Since(cache.lastUpdate) > metacacheMaxRunningAge: // Not finished and update for metacacheMaxRunningAge, discard it. return false - case cache.finished() && time.Since(cache.lastHandout) > 30*time.Minute: - // Keep only for 30 minutes. + case cache.finished() && time.Since(cache.lastHandout) > 5*metacacheMaxClientWait: + // Keep for 15 minutes after we last saw the client. + // Since the cache is finished keeping it a bit longer doesn't hurt us. return false case cache.status == scanStateError || cache.status == scanStateNone: // Remove failed listings after 5 minutes. @@ -113,6 +117,9 @@ func baseDirFromPrefix(prefix string) string { func (m *metacache) update(update metacache) { m.lastUpdate = UTCNow() + if m.lastHandout.After(m.lastHandout) { + m.lastHandout = UTCNow() + } if m.status == scanStateStarted && update.status == scanStateSuccess { m.ended = UTCNow() } @@ -121,7 +128,8 @@ func (m *metacache) update(update metacache) { m.status = update.status } - if m.status == scanStateStarted && time.Since(m.lastHandout) > 15*time.Minute { + if m.status == scanStateStarted && time.Since(m.lastHandout) > metacacheMaxClientWait { + // Drop if client hasn't been seen for 3 minutes. m.status = scanStateError m.error = "client not seen" } diff --git a/cmd/metacache_test.go b/cmd/metacache_test.go index 882b76f99..84860d850 100644 --- a/cmd/metacache_test.go +++ b/cmd/metacache_test.go @@ -226,7 +226,7 @@ func Test_metacache_finished(t *testing.T) { func Test_metacache_worthKeeping(t *testing.T) { // TODO: Update... - wantResults := []bool{0: true, 1: true, 2: true, 3: true, 4: false, 5: true, 6: true, 7: false, 8: false} + wantResults := []bool{0: true, 1: true, 2: true, 3: false, 4: false, 5: true, 6: true, 7: false, 8: false} for i, tt := range metaCacheTestset { t.Run(tt.id, func(t *testing.T) {