From 8f7c73932872228e408eb21fe798ac0bfef2ad00 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 5 Aug 2022 09:40:03 -0700 Subject: [PATCH] feat: add SpeedTest ResponseTimes and TTFB (#15479) Capture average, p50, p99, p999 response times and ttfb values. These are needed for latency measurements and overall understanding of our speedtest results. --- cmd/perf-tests.go | 68 +++++++++++++++++++++++++++++++++++++++++------ cmd/speedtest.go | 14 ++++++++++ 2 files changed, 74 insertions(+), 8 deletions(-) diff --git a/cmd/perf-tests.go b/cmd/perf-tests.go index 7b6f45cf5..2a4f17f3d 100644 --- a/cmd/perf-tests.go +++ b/cmd/perf-tests.go @@ -37,16 +37,37 @@ import ( // SpeedTestResult return value of the speedtest function type SpeedTestResult struct { - Endpoint string - Uploads uint64 - Downloads uint64 - Error string + Endpoint string + Uploads uint64 + Downloads uint64 + UploadTimes madmin.TimeDurations + DownloadTimes madmin.TimeDurations + DownloadTTFB madmin.TimeDurations + Error string } func newRandomReader(size int) io.Reader { return io.LimitReader(randreader.New(), int64(size)) } +type firstByteRecorder struct { + t *time.Time + r io.Reader +} + +func (f *firstByteRecorder) Read(p []byte) (n int, err error) { + if f.t != nil || len(p) == 0 { + return f.r.Read(p) + } + // Read a single byte. + n, err = f.r.Read(p[:1]) + if n > 0 { + t := time.Now() + f.t = &t + } + return n, err +} + // Runs the speedtest on local MinIO process. func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, error) { objAPI := newObjectLayerFn() @@ -54,9 +75,9 @@ func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, er return SpeedTestResult{}, errServerNotInitialized } + var wg sync.WaitGroup var errOnce sync.Once var retError string - var wg sync.WaitGroup var totalBytesWritten uint64 var totalBytesRead uint64 @@ -80,11 +101,14 @@ func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, er DisableMultipart: true, } + var mu sync.Mutex + var uploadTimes madmin.TimeDurations wg.Add(opts.concurrency) for i := 0; i < opts.concurrency; i++ { go func(i int) { defer wg.Done() for { + t := time.Now() reader := newRandomReader(opts.objectSize) tmpObjName := pathJoin(objNamePrefix, fmt.Sprintf("%d/%d", i, objCountPerThread[i])) info, err := globalMinioClient.PutObject(uploadsCtx, opts.bucketName, tmpObjName, reader, int64(opts.objectSize), popts) @@ -97,8 +121,12 @@ func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, er uploadsCancel() return } + response := time.Since(t) atomic.AddUint64(&totalBytesWritten, uint64(info.Size)) objCountPerThread[i]++ + mu.Lock() + uploadTimes = append(uploadTimes, response) + mu.Unlock() } }(i) } @@ -106,7 +134,12 @@ func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, er // We already saw write failures, no need to proceed into read's if retError != "" { - return SpeedTestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil + return SpeedTestResult{ + Uploads: totalBytesWritten, + Downloads: totalBytesRead, + UploadTimes: uploadTimes, + Error: retError, + }, nil } downloadsCtx, downloadsCancel := context.WithCancel(context.Background()) @@ -119,6 +152,8 @@ func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, er gopts := minio.GetObjectOptions{} gopts.Set(globalObjectPerfUserMetadata, "true") // Bypass S3 API freeze + var downloadTimes madmin.TimeDurations + var downloadTTFB madmin.TimeDurations wg.Add(opts.concurrency) for i := 0; i < opts.concurrency; i++ { go func(i int) { @@ -132,6 +167,7 @@ func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, er j = 0 } tmpObjName := pathJoin(objNamePrefix, fmt.Sprintf("%d/%d", i, j)) + t := time.Now() r, err := globalMinioClient.GetObject(downloadsCtx, opts.bucketName, tmpObjName, gopts) if err != nil { errResp, ok := err.(minio.ErrorResponse) @@ -146,13 +182,22 @@ func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, er downloadsCancel() return } - n, err := io.Copy(ioutil.Discard, r) + fbr := firstByteRecorder{ + r: r, + } + n, err := io.Copy(ioutil.Discard, &fbr) r.Close() if err == nil { + response := time.Since(t) + ttfb := time.Since(*fbr.t) // Only capture success criteria - do not // have to capture failed reads, truncated // reads etc. atomic.AddUint64(&totalBytesRead, uint64(n)) + mu.Lock() + downloadTimes = append(downloadTimes, response) + downloadTTFB = append(downloadTTFB, ttfb) + mu.Unlock() } if err != nil { if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) { @@ -169,7 +214,14 @@ func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, er } wg.Wait() - return SpeedTestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil + return SpeedTestResult{ + Uploads: totalBytesWritten, + Downloads: totalBytesRead, + UploadTimes: uploadTimes, + DownloadTimes: downloadTimes, + DownloadTTFB: downloadTTFB, + Error: retError, + }, nil } // To collect RX stats during "mc support perf net" diff --git a/cmd/speedtest.go b/cmd/speedtest.go index 5d172539d..1bf2f1194 100644 --- a/cmd/speedtest.go +++ b/cmd/speedtest.go @@ -94,6 +94,9 @@ func objectSpeedTest(ctx context.Context, opts speedTestOpts) chan madmin.SpeedT result.GETStats.ObjectsPerSec = throughputHighestGet / uint64(opts.objectSize) / uint64(durationSecs) result.PUTStats.ThroughputPerSec = throughputHighestPut / uint64(durationSecs) result.PUTStats.ObjectsPerSec = throughputHighestPut / uint64(opts.objectSize) / uint64(durationSecs) + var totalUploadTimes madmin.TimeDurations + var totalDownloadTimes madmin.TimeDurations + var totalDownloadTTFB madmin.TimeDurations for i := 0; i < len(throughputHighestResults); i++ { errStr := "" if throughputHighestResults[i].Error != "" { @@ -116,14 +119,23 @@ func objectSpeedTest(ctx context.Context, opts speedTestOpts) chan madmin.SpeedT ObjectsPerSec: throughputHighestResults[i].Uploads / uint64(opts.objectSize) / uint64(durationSecs), Err: errStr, }) + result.GETStats.Servers = append(result.GETStats.Servers, madmin.SpeedTestStatServer{ Endpoint: throughputHighestResults[i].Endpoint, ThroughputPerSec: throughputHighestResults[i].Downloads / uint64(durationSecs), ObjectsPerSec: throughputHighestResults[i].Downloads / uint64(opts.objectSize) / uint64(durationSecs), Err: errStr, }) + + totalUploadTimes = append(totalUploadTimes, throughputHighestResults[i].UploadTimes...) + totalDownloadTimes = append(totalDownloadTimes, throughputHighestResults[i].DownloadTimes...) + totalDownloadTTFB = append(totalDownloadTTFB, throughputHighestResults[i].DownloadTTFB...) } + result.PUTStats.Response = totalUploadTimes.Measure() + result.GETStats.Response = totalDownloadTimes.Measure() + result.GETStats.TTFB = totalDownloadTTFB.Measure() + result.Size = opts.objectSize result.Disks = globalEndpoints.NEndpoints() result.Servers = len(globalNotificationSys.peerClients) + 1 @@ -185,6 +197,8 @@ func objectSpeedTest(ctx context.Context, opts speedTestOpts) chan madmin.SpeedT break } + // We break if we did not see 2.5% growth rate in total GET + // requests, we have reached our peak at this point. doBreak := float64(totalGet-throughputHighestGet)/float64(totalGet) < 0.025 throughputHighestGet = totalGet