From 3ba927edaee33171544433316b0055126d770fe2 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 15 Aug 2023 12:22:30 -0700 Subject: [PATCH] fix: batch status reporting after complete (#17852) batch status can perpetually wait after completion due to a race between the MetricsHandler() returning the active metrics in intervals of 1sec and delete of metrics after job completion. this PR ensures that we keep the 'status' around for a while, i.e upto 24hrs for all the batch jobs. --- cmd/batch-handlers.go | 61 +++++++++++++++++++++++++++++++++---------- cmd/common-main.go | 3 +++ cmd/globals.go | 3 +++ 3 files changed, 53 insertions(+), 14 deletions(-) diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index c9e012ff1..a5c7aff02 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -1413,7 +1413,6 @@ func (j BatchJobRequest) delete(ctx context.Context, api ObjectLayer) { case j.KeyRotate != nil: deleteConfig(ctx, api, pathJoin(j.Location, batchKeyRotationName)) } - globalBatchJobsMetrics.delete(j.ID) deleteConfig(ctx, api, j.Location) } @@ -1815,10 +1814,6 @@ type batchJobMetrics struct { metrics map[string]*batchJobInfo } -var globalBatchJobsMetrics = batchJobMetrics{ - metrics: make(map[string]*batchJobInfo), -} - //msgp:ignore batchJobMetric //go:generate stringer -type=batchJobMetric -trimprefix=batchJobMetric $GOFILE type batchJobMetric uint8 @@ -1858,9 +1853,17 @@ func (m *batchJobMetrics) report(jobID string) (metrics *madmin.BatchJobMetrics) metrics = &madmin.BatchJobMetrics{CollectedAt: time.Now(), Jobs: make(map[string]madmin.JobMetric)} m.RLock() defer m.RUnlock() + + match := true for id, job := range m.metrics { - match := jobID != "" && id == jobID - metrics.Jobs[id] = madmin.JobMetric{ + if jobID != "" { + match = id == jobID + } + if !match { + continue + } + + m := madmin.JobMetric{ JobID: job.JobID, JobType: job.JobType, StartTime: job.StartTime, @@ -1868,28 +1871,58 @@ func (m *batchJobMetrics) report(jobID string) (metrics *madmin.BatchJobMetrics) RetryAttempts: job.RetryAttempts, Complete: job.Complete, Failed: job.Failed, - Replicate: &madmin.ReplicateInfo{ + } + + switch job.JobType { + case string(madmin.BatchJobReplicate): + m.Replicate = &madmin.ReplicateInfo{ Bucket: job.Bucket, Object: job.Object, Objects: job.Objects, ObjectsFailed: job.ObjectsFailed, BytesTransferred: job.BytesTransferred, BytesFailed: job.BytesFailed, - }, - KeyRotate: &madmin.KeyRotationInfo{ + } + case string(madmin.BatchJobKeyRotate): + m.KeyRotate = &madmin.KeyRotationInfo{ Bucket: job.Bucket, Object: job.Object, Objects: job.Objects, ObjectsFailed: job.ObjectsFailed, - }, - } - if match { - break + } } + + metrics.Jobs[id] = m } return metrics } +// keep job metrics for some time after the job is completed +// in-case some one wants to look at the older results. +func (m *batchJobMetrics) purgeJobMetrics() { + t := time.NewTicker(6 * time.Hour) + defer t.Stop() + + for { + select { + case <-GlobalContext.Done(): + return + case <-t.C: + var toDeleteJobMetrics []string + m.RLock() + for id, metrics := range m.metrics { + if time.Since(metrics.LastUpdate) > 24*time.Hour && (metrics.Complete || metrics.Failed) { + toDeleteJobMetrics = append(toDeleteJobMetrics, id) + } + } + m.RUnlock() + for _, jobID := range toDeleteJobMetrics { + m.delete(jobID) + } + } + } +} + func (m *batchJobMetrics) delete(jobID string) { m.Lock() defer m.Unlock() diff --git a/cmd/common-main.go b/cmd/common-main.go index 98f8d53ec..e6afe3ff1 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -95,6 +95,9 @@ func init() { initGlobalContext() + globalBatchJobsMetrics = batchJobMetrics{metrics: make(map[string]*batchJobInfo)} + go globalBatchJobsMetrics.purgeJobMetrics() + t, _ := minioVersionToReleaseTime(Version) if !t.IsZero() { globalVersionUnix = uint64(t.Unix()) diff --git a/cmd/globals.go b/cmd/globals.go index 3ad0f07d3..500bada1a 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -399,6 +399,9 @@ var ( // Set last client perf extra time (get lock, and validate) globalLastClientPerfExtraTime int64 + + // Captures all batch jobs metrics globally + globalBatchJobsMetrics batchJobMetrics // Add new variable global values here. )