diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go
index 82ddfa5ab..60a077b70 100644
--- a/cmd/admin-handlers.go
+++ b/cmd/admin-handlers.go
@@ -38,7 +38,6 @@ import (
"sort"
"strconv"
"strings"
- "sync"
"time"
"github.com/dustin/go-humanize"
@@ -1264,43 +1263,29 @@ func (a adminAPIHandlers) DriveSpeedtestHandler(w http.ResponseWriter, r *http.R
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
- enc := json.NewEncoder(w)
ch := globalNotificationSys.DriveSpeedTest(ctx, opts)
- var wg sync.WaitGroup
- wg.Add(1)
-
- // local driveSpeedTest
- go func() {
- defer wg.Done()
- enc.Encode(driveSpeedTest(ctx, opts))
- if wf, ok := w.(http.Flusher); ok {
- wf.Flush()
- }
- }()
-
+ enc := json.NewEncoder(w)
for {
select {
case <-ctx.Done():
- goto endloop
+ return
case <-keepAliveTicker.C:
// Write a blank entry to prevent client from disconnecting
if err := enc.Encode(madmin.DriveSpeedTestResult{}); err != nil {
- goto endloop
+ return
}
w.(http.Flusher).Flush()
case result, ok := <-ch:
if !ok {
- goto endloop
+ return
}
if err := enc.Encode(result); err != nil {
- goto endloop
+ return
}
w.(http.Flusher).Flush()
}
}
-endloop:
- wg.Wait()
}
// Admin API errors
diff --git a/cmd/auth-handler_test.go b/cmd/auth-handler_test.go
index 7b871f2c8..f1aba829c 100644
--- a/cmd/auth-handler_test.go
+++ b/cmd/auth-handler_test.go
@@ -32,6 +32,12 @@ import (
iampolicy "github.com/minio/pkg/iam/policy"
)
+type nullReader struct{}
+
+func (r *nullReader) Read(b []byte) (int, error) {
+ return len(b), nil
+}
+
// Test get request auth type.
func TestGetRequestAuthType(t *testing.T) {
type testCase struct {
diff --git a/cmd/healthinfo.go b/cmd/healthinfo.go
deleted file mode 100644
index aaae1d76e..000000000
--- a/cmd/healthinfo.go
+++ /dev/null
@@ -1,113 +0,0 @@
-// Copyright (c) 2015-2021 MinIO, Inc.
-//
-// This file is part of MinIO Object Storage stack
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package cmd
-
-import (
- "context"
- "math"
- "os"
- "sync"
-
- "github.com/minio/madmin-go"
- "github.com/minio/minio/internal/disk"
-)
-
-// round returns value rounding to specified decimal places.
-func round(f float64, n int) float64 {
- if n <= 0 {
- return math.Round(f)
- }
-
- p := math.Pow10(n)
- return math.Round(f*p) / p
-}
-
-func getDrivePerfInfo(ctx context.Context, parallel bool) []madmin.DrivePerfInfo {
- pools := globalEndpoints
- info := []madmin.DrivePerfInfo{}
- var wg sync.WaitGroup
- for _, pool := range pools {
- for _, endpoint := range pool.Endpoints {
- if !endpoint.IsLocal {
- continue
- }
-
- if _, err := os.Stat(endpoint.Path); err != nil {
- info = append(info, madmin.DrivePerfInfo{
- Path: endpoint.Path,
- Error: err.Error(),
- })
- continue
- }
-
- getHealthInfo := func(path string) {
- defer wg.Done()
-
- latency, throughput, err := disk.GetHealthInfo(
- ctx, path, pathJoin(path, minioMetaTmpBucket, mustGetUUID()),
- )
- if err != nil {
- info = append(info, madmin.DrivePerfInfo{
- Path: path,
- Error: err.Error(),
- })
- } else {
- info = append(info, madmin.DrivePerfInfo{
- Path: path,
- Latency: madmin.Latency{
- Avg: round(latency.Avg, 3),
- Max: round(latency.Max, 3),
- Min: round(latency.Min, 3),
- Percentile50: round(latency.Percentile50, 3),
- Percentile90: round(latency.Percentile90, 3),
- Percentile99: round(latency.Percentile99, 3),
- },
- Throughput: madmin.Throughput{
- Avg: uint64(round(throughput.Avg, 0)),
- Max: uint64(round(throughput.Max, 0)),
- Min: uint64(round(throughput.Min, 0)),
- Percentile50: uint64(round(throughput.Percentile50, 0)),
- Percentile90: uint64(round(throughput.Percentile90, 0)),
- Percentile99: uint64(round(throughput.Percentile99, 0)),
- },
- })
- }
- }
-
- wg.Add(1)
- if parallel {
- go getHealthInfo(endpoint.Path)
- } else {
- getHealthInfo(endpoint.Path)
- }
- }
- }
-
- wg.Wait()
- return info
-}
-
-func getDrivePerfInfos(ctx context.Context, addr string) madmin.DrivePerfInfos {
- serialPerf := getDrivePerfInfo(ctx, false)
- parallelPerf := getDrivePerfInfo(ctx, true)
- return madmin.DrivePerfInfos{
- NodeCommon: madmin.NodeCommon{Addr: addr},
- SerialPerf: serialPerf,
- ParallelPerf: parallelPerf,
- }
-}
diff --git a/cmd/notification.go b/cmd/notification.go
index 9dfa8ea4d..52f54c76b 100644
--- a/cmd/notification.go
+++ b/cmd/notification.go
@@ -26,7 +26,6 @@ import (
"io"
"net/http"
"net/url"
- "sort"
"strings"
"sync"
"time"
@@ -35,7 +34,6 @@ import (
"github.com/cespare/xxhash/v2"
"github.com/klauspost/compress/zip"
"github.com/minio/madmin-go"
- "github.com/minio/minio-go/v7/pkg/set"
bucketBandwidth "github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/event"
@@ -856,199 +854,6 @@ func (sys *NotificationSys) Send(args eventArgs) {
sys.targetList.Send(args.ToEvent(true), targetIDSet, sys.targetResCh)
}
-// GetNetPerfInfo - Net information
-func (sys *NotificationSys) GetNetPerfInfo(ctx context.Context) madmin.NetPerfInfo {
- var sortedGlobalEndpoints []string
-
- /*
- Ensure that only untraversed links are visited by this server
- i.e. if net perf tests have been performed between a -> b, then do
- not run it between b -> a
-
- The graph of tests looks like this
-
- a b c d
- a | o | x | x | x |
- b | o | o | x | x |
- c | o | o | o | x |
- d | o | o | o | o |
-
- 'x's should be tested, and 'o's should be skipped
- */
-
- hostSet := set.NewStringSet()
- for _, ez := range globalEndpoints {
- for _, e := range ez.Endpoints {
- if !hostSet.Contains(e.Host) {
- sortedGlobalEndpoints = append(sortedGlobalEndpoints, e.Host)
- hostSet.Add(e.Host)
- }
- }
- }
-
- sort.Strings(sortedGlobalEndpoints)
- var remoteTargets []*peerRESTClient
- search := func(host string) *peerRESTClient {
- for index, client := range sys.peerClients {
- if client == nil {
- continue
- }
- if sys.peerClients[index].host.String() == host {
- return client
- }
- }
- return nil
- }
-
- for i := 0; i < len(sortedGlobalEndpoints); i++ {
- if sortedGlobalEndpoints[i] != globalLocalNodeName {
- continue
- }
- for j := 0; j < len(sortedGlobalEndpoints); j++ {
- if j > i {
- remoteTarget := search(sortedGlobalEndpoints[j])
- if remoteTarget != nil {
- remoteTargets = append(remoteTargets, remoteTarget)
- }
- }
- }
- }
-
- netInfos := make([]madmin.PeerNetPerfInfo, len(remoteTargets))
-
- for index, client := range remoteTargets {
- if client == nil {
- continue
- }
- var err error
- netInfos[index], err = client.GetNetPerfInfo(ctx)
-
- addr := client.host.String()
- reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
- ctx := logger.SetReqInfo(GlobalContext, reqInfo)
- logger.LogIf(ctx, err)
- netInfos[index].Addr = addr
- if err != nil {
- netInfos[index].Error = err.Error()
- }
- }
- return madmin.NetPerfInfo{
- NodeCommon: madmin.NodeCommon{Addr: globalLocalNodeName},
- RemotePeers: netInfos,
- }
-}
-
-// DispatchNetPerfInfo - Net perf information from other nodes
-func (sys *NotificationSys) DispatchNetPerfInfo(ctx context.Context) []madmin.NetPerfInfo {
- serverNetInfos := []madmin.NetPerfInfo{}
-
- for index, client := range sys.peerClients {
- if client == nil {
- continue
- }
- serverNetInfo, err := sys.peerClients[index].DispatchNetInfo(ctx)
- if err != nil {
- serverNetInfo.Addr = client.host.String()
- serverNetInfo.Error = err.Error()
- }
- serverNetInfos = append(serverNetInfos, serverNetInfo)
- }
- return serverNetInfos
-}
-
-// DispatchNetPerfChan - Net perf information from other nodes
-func (sys *NotificationSys) DispatchNetPerfChan(ctx context.Context) chan madmin.NetPerfInfo {
- serverNetInfos := make(chan madmin.NetPerfInfo)
- wg := sync.WaitGroup{}
-
- wg.Add(1)
- go func() {
- for _, client := range sys.peerClients {
- if client == nil {
- continue
- }
- serverNetInfo, err := client.DispatchNetInfo(ctx)
- if err != nil {
- serverNetInfo.Addr = client.host.String()
- serverNetInfo.Error = err.Error()
- }
- serverNetInfos <- serverNetInfo
- }
- wg.Done()
- }()
-
- go func() {
- wg.Wait()
- close(serverNetInfos)
- }()
-
- return serverNetInfos
-}
-
-// GetParallelNetPerfInfo - Performs Net parallel tests
-func (sys *NotificationSys) GetParallelNetPerfInfo(ctx context.Context) madmin.NetPerfInfo {
- netInfos := []madmin.PeerNetPerfInfo{}
- wg := sync.WaitGroup{}
-
- for index, client := range sys.peerClients {
- if client == nil {
- continue
- }
-
- wg.Add(1)
- go func(index int) {
- netInfo, err := sys.peerClients[index].GetNetPerfInfo(ctx)
- netInfo.Addr = sys.peerClients[index].host.String()
- if err != nil {
- netInfo.Error = err.Error()
- }
- netInfos = append(netInfos, netInfo)
- wg.Done()
- }(index)
- }
- wg.Wait()
- return madmin.NetPerfInfo{
- NodeCommon: madmin.NodeCommon{Addr: globalLocalNodeName},
- RemotePeers: netInfos,
- }
-}
-
-// GetDrivePerfInfos - Drive performance information
-func (sys *NotificationSys) GetDrivePerfInfos(ctx context.Context) chan madmin.DrivePerfInfos {
- updateChan := make(chan madmin.DrivePerfInfos)
- wg := sync.WaitGroup{}
-
- for _, client := range sys.peerClients {
- if client == nil {
- continue
- }
- wg.Add(1)
- go func(client *peerRESTClient) {
- reply, err := client.GetDrivePerfInfos(ctx)
-
- addr := client.host.String()
- reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
- ctx := logger.SetReqInfo(GlobalContext, reqInfo)
- logger.LogIf(ctx, err)
-
- reply.Addr = addr
- if err != nil {
- reply.Error = err.Error()
- }
-
- updateChan <- reply
- wg.Done()
- }(client)
- }
-
- go func() {
- wg.Wait()
- close(updateChan)
- }()
-
- return updateChan
-}
-
// GetCPUs - Get all CPU information.
func (sys *NotificationSys) GetCPUs(ctx context.Context) []madmin.CPUs {
reply := make([]madmin.CPUs, len(sys.peerClients))
@@ -1740,7 +1545,6 @@ func (sys *NotificationSys) Speedtest(ctx context.Context, size int,
func (sys *NotificationSys) DriveSpeedTest(ctx context.Context, opts madmin.DriveSpeedTestOpts) chan madmin.DriveSpeedTestResult {
ch := make(chan madmin.DriveSpeedTestResult)
var wg sync.WaitGroup
-
for _, client := range sys.peerClients {
if client == nil {
continue
@@ -1753,7 +1557,10 @@ func (sys *NotificationSys) DriveSpeedTest(ctx context.Context, opts madmin.Driv
resp.Error = err.Error()
}
- ch <- resp
+ select {
+ case <-ctx.Done():
+ case ch <- resp:
+ }
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String())
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
@@ -1761,10 +1568,19 @@ func (sys *NotificationSys) DriveSpeedTest(ctx context.Context, opts madmin.Driv
}(client)
}
+ wg.Add(1)
go func() {
+ defer wg.Done()
+ select {
+ case <-ctx.Done():
+ case ch <- driveSpeedTest(ctx, opts):
+ }
+ }()
+
+ go func(wg *sync.WaitGroup, ch chan madmin.DriveSpeedTestResult) {
wg.Wait()
close(ch)
- }()
+ }(&wg, ch)
return ch
}
diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go
index 5cd885c56..8ed4111d3 100644
--- a/cmd/peer-rest-client.go
+++ b/cmd/peer-rest-client.go
@@ -24,15 +24,11 @@ import (
"errors"
"fmt"
"io"
- "math"
"net/url"
"strconv"
"strings"
- "sync"
- "sync/atomic"
"time"
- "github.com/dustin/go-humanize"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/http"
@@ -111,271 +107,6 @@ func (client *peerRESTClient) ServerInfo() (info madmin.ServerProperties, err er
return info, err
}
-type networkOverloadedErr struct{}
-
-var networkOverloaded networkOverloadedErr
-
-func (n networkOverloadedErr) Error() string {
- return "network overloaded"
-}
-
-type nullReader struct{}
-
-func (r *nullReader) Read(b []byte) (int, error) {
- return len(b), nil
-}
-
-func (client *peerRESTClient) doNetTest(ctx context.Context, dataSize int64, threadCount uint) (info madmin.PeerNetPerfInfo, err error) {
- var mu sync.Mutex // mutex used to protect these slices in go-routines
- latencies := []float64{}
- throughputs := []float64{}
-
- buflimiter := make(chan struct{}, threadCount)
- errChan := make(chan error, threadCount)
-
- var totalTransferred int64
-
- // ensure enough samples to obtain normal distribution
- maxSamples := int(10 * threadCount)
- if maxSamples > 50 {
- maxSamples = 50
- }
-
- innerCtx, cancel := context.WithCancel(ctx)
-
- slowSamples := int32(0)
- maxSlowSamples := int32(maxSamples/20) + 1 // 5% of total
- slowSample := func() {
- if slowSamples > maxSlowSamples {
- return
- }
- if atomic.AddInt32(&slowSamples, 1) > maxSlowSamples {
- errChan <- networkOverloaded
- cancel()
- }
- }
-
- var wg sync.WaitGroup
- finish := func() {
- <-buflimiter
- wg.Done()
- }
-
- for i := 0; i < maxSamples; i++ {
- if slowSamples > maxSlowSamples {
- break
- }
-
- select {
- case <-ctx.Done():
- cancel()
- return info, ctx.Err()
- case err = <-errChan:
- case buflimiter <- struct{}{}:
- if slowSamples > maxSlowSamples {
- break
- }
- wg.Add(1)
-
- if innerCtx.Err() != nil {
- finish()
- continue
- }
-
- go func(i int) {
- start := time.Now()
- before := atomic.LoadInt64(&totalTransferred)
-
- ctx, cancel := context.WithTimeout(innerCtx, 3*time.Second)
- defer cancel()
-
- progress := io.LimitReader(&nullReader{}, dataSize)
-
- // Turn off healthCheckFn for health tests to cater for higher load on the peers.
- clnt := newPeerRESTClient(client.host)
- clnt.restClient.HealthCheckFn = nil
-
- respBody, err := clnt.callWithContext(ctx, peerRESTMethodNetInfo, nil, progress, dataSize)
- if err != nil {
- if errors.Is(err, context.DeadlineExceeded) {
- slowSample()
- finish()
- return
- }
-
- errChan <- err
- finish()
- return
- }
- http.DrainBody(respBody)
-
- finish()
- atomic.AddInt64(&totalTransferred, dataSize)
- after := atomic.LoadInt64(&totalTransferred)
- end := time.Now()
-
- latency := end.Sub(start).Seconds()
-
- if latency > maxLatencyForSizeThreads(dataSize, threadCount) {
- slowSample()
- }
-
- /* Throughput = (total data transferred across all threads / time taken) */
- throughput := float64((after - before)) / latency
-
- // Protect updating latencies and throughputs slices from
- // multiple go-routines.
- mu.Lock()
- latencies = append(latencies, latency)
- throughputs = append(throughputs, throughput)
- mu.Unlock()
- }(i)
- }
- }
- wg.Wait()
-
- if slowSamples > maxSlowSamples {
- return info, networkOverloaded
- }
- if err != nil {
- return info, err
- }
-
- latency, throughput, err := xnet.ComputePerfStats(latencies, throughputs)
- return madmin.PeerNetPerfInfo{
- Latency: madmin.Latency{
- Avg: round(latency.Avg, 3),
- Max: round(latency.Max, 3),
- Min: round(latency.Min, 3),
- Percentile50: round(latency.Percentile50, 3),
- Percentile90: round(latency.Percentile90, 3),
- Percentile99: round(latency.Percentile99, 3),
- },
- Throughput: madmin.Throughput{
- Avg: uint64(round(throughput.Avg, 0)),
- Max: uint64(round(throughput.Max, 0)),
- Min: uint64(round(throughput.Min, 0)),
- Percentile50: uint64(round(throughput.Percentile50, 0)),
- Percentile90: uint64(round(throughput.Percentile90, 0)),
- Percentile99: uint64(round(throughput.Percentile99, 0)),
- },
- }, nil
-}
-
-func maxLatencyForSizeThreads(size int64, threadCount uint) float64 {
- Gbit100 := 12.5 * float64(humanize.GiByte)
- Gbit40 := 5.00 * float64(humanize.GiByte)
- Gbit25 := 3.25 * float64(humanize.GiByte)
- Gbit10 := 1.25 * float64(humanize.GiByte)
- // Gbit1 := 0.25 * float64(humanize.GiByte)
-
- // Given the current defaults, each combination of size/thread
- // is supposed to fully saturate the intended pipe when all threads are active
- // i.e. if the test is performed in a perfectly controlled environment, i.e. without
- // CPU scheduling latencies and/or network jitters, then all threads working
- // simultaneously should result in each of them completing in 1s
- //
- // In reality, I've assumed a normal distribution of latency with expected mean of 1s and min of 0s
- // Then, 95% of threads should complete within 2 seconds (2 std. deviations from the mean). The 2s comes
- // from fitting the normal curve such that the mean is 1.
- //
- // i.e. we expect that no more than 5% of threads to take longer than 2s to push the data.
- //
- // throughput | max latency
- // 100 Gbit | 2s
- // 40 Gbit | 2s
- // 25 Gbit | 2s
- // 10 Gbit | 2s
- // 1 Gbit | inf
-
- throughput := float64(size * int64(threadCount))
- if throughput >= Gbit100 {
- return 2.0
- } else if throughput >= Gbit40 {
- return 2.0
- } else if throughput >= Gbit25 {
- return 2.0
- } else if throughput >= Gbit10 {
- return 2.0
- }
- return math.MaxFloat64
-}
-
-// GetNetPerfInfo - fetch network information for a remote node.
-func (client *peerRESTClient) GetNetPerfInfo(ctx context.Context) (info madmin.PeerNetPerfInfo, err error) {
- // 100 Gbit -> 256 MiB * 50 threads
- // 40 Gbit -> 256 MiB * 20 threads
- // 25 Gbit -> 128 MiB * 25 threads
- // 10 Gbit -> 128 MiB * 10 threads
- // 1 Gbit -> 64 MiB * 2 threads
-
- type step struct {
- size int64
- threads uint
- }
- steps := []step{
- { // 100 Gbit
- size: 256 * humanize.MiByte,
- threads: 50,
- },
- { // 40 Gbit
- size: 256 * humanize.MiByte,
- threads: 20,
- },
- { // 25 Gbit
- size: 128 * humanize.MiByte,
- threads: 25,
- },
- { // 10 Gbit
- size: 128 * humanize.MiByte,
- threads: 10,
- },
- { // 1 Gbit
- size: 64 * humanize.MiByte,
- threads: 2,
- },
- }
-
- for i := range steps {
- size := steps[i].size
- threads := steps[i].threads
-
- if info, err = client.doNetTest(ctx, size, threads); err != nil {
- if err == networkOverloaded {
- continue
- }
- }
- return info, err
- }
- return info, err
-}
-
-// DispatchNetInfo - dispatch other nodes to run Net info.
-func (client *peerRESTClient) DispatchNetInfo(ctx context.Context) (info madmin.NetPerfInfo, err error) {
- respBody, err := client.callWithContext(ctx, peerRESTMethodDispatchNetInfo, nil, nil, -1)
- if err != nil {
- return
- }
- defer http.DrainBody(respBody)
- waitReader, err := waitForHTTPResponse(respBody)
- if err != nil {
- return
- }
- err = gob.NewDecoder(waitReader).Decode(&info)
- return
-}
-
-// GetDrivePerfInfos - fetch all disk's serial/parallal performance information for a remote node.
-func (client *peerRESTClient) GetDrivePerfInfos(ctx context.Context) (info madmin.DrivePerfInfos, err error) {
- respBody, err := client.callWithContext(ctx, peerRESTMethodDriveInfo, nil, nil, -1)
- if err != nil {
- return
- }
- defer http.DrainBody(respBody)
- err = gob.NewDecoder(respBody).Decode(&info)
- return info, err
-}
-
// GetCPUs - fetch CPU information for a remote node.
func (client *peerRESTClient) GetCPUs(ctx context.Context) (info madmin.CPUs, err error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodCPUInfo, nil, nil, -1)
diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go
index ab828d341..ef78aa179 100644
--- a/cmd/peer-rest-common.go
+++ b/cmd/peer-rest-common.go
@@ -27,8 +27,6 @@ const (
const (
peerRESTMethodHealth = "/health"
peerRESTMethodServerInfo = "/serverinfo"
- peerRESTMethodDriveInfo = "/driveinfo"
- peerRESTMethodNetInfo = "/netinfo"
peerRESTMethodCPUInfo = "/cpuinfo"
peerRESTMethodDiskHwInfo = "/diskhwinfo"
peerRESTMethodOsInfo = "/osinfo"
@@ -37,7 +35,6 @@ const (
peerRESTMethodSysErrors = "/syserrors"
peerRESTMethodSysServices = "/sysservices"
peerRESTMethodSysConfig = "/sysconfig"
- peerRESTMethodDispatchNetInfo = "/dispatchnetinfo"
peerRESTMethodDeleteBucketMetadata = "/deletebucketmetadata"
peerRESTMethodLoadBucketMetadata = "/loadbucketmetadata"
peerRESTMethodGetBucketStats = "/getbucketstats"
diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go
index 72541de2d..5e7ce65bd 100644
--- a/cmd/peer-rest-server.go
+++ b/cmd/peer-rest-server.go
@@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"io"
- "io/ioutil"
"net/http"
"strconv"
"strings"
@@ -337,68 +336,6 @@ func (s *peerRESTServer) ServerInfoHandler(w http.ResponseWriter, r *http.Reques
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
-func (s *peerRESTServer) NetInfoHandler(w http.ResponseWriter, r *http.Request) {
- ctx := newContext(r, w, "NetInfo")
- if !s.IsValid(w, r) {
- s.writeErrorResponse(w, errors.New("Invalid request"))
- return
- }
-
- // Use this trailer to send additional headers after sending body
- w.Header().Set("Trailer", "FinalStatus")
-
- w.Header().Set("Content-Type", "application/octet-stream")
- w.WriteHeader(http.StatusOK)
-
- n, err := io.Copy(ioutil.Discard, r.Body)
- if err == io.ErrUnexpectedEOF {
- w.Header().Set("FinalStatus", err.Error())
- return
- }
- if err != nil && err != io.EOF {
- logger.LogIf(ctx, err)
- w.Header().Set("FinalStatus", err.Error())
- return
- }
- if n != r.ContentLength {
- err := fmt.Errorf("Subnet health: short read: expected %d found %d", r.ContentLength, n)
- logger.LogIf(ctx, err)
- w.Header().Set("FinalStatus", err.Error())
- return
- }
- w.Header().Set("FinalStatus", "Success")
-}
-
-func (s *peerRESTServer) DispatchNetInfoHandler(w http.ResponseWriter, r *http.Request) {
- if !s.IsValid(w, r) {
- s.writeErrorResponse(w, errors.New("Invalid request"))
- return
- }
-
- done := keepHTTPResponseAlive(w)
-
- ctx := newContext(r, w, "DispatchNetInfo")
- info := globalNotificationSys.GetNetPerfInfo(ctx)
-
- done(nil)
- logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
-}
-
-// GetDrivePerfInfosHandler - returns all disk's serial/parallal performance information.
-func (s *peerRESTServer) GetDrivePerfInfosHandler(w http.ResponseWriter, r *http.Request) {
- if !s.IsValid(w, r) {
- s.writeErrorResponse(w, errors.New("Invalid request"))
- return
- }
-
- ctx, cancel := context.WithCancel(newContext(r, w, "DriveInfo"))
- defer cancel()
-
- info := getDrivePerfInfos(ctx, r.Host)
-
- logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
-}
-
// GetCPUsHandler - returns CPU info.
func (s *peerRESTServer) GetCPUsHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@@ -1313,8 +1250,8 @@ func (s *peerRESTServer) DevNull(w http.ResponseWriter, r *http.Request) {
}
}
-// Netperf - perform netperf
-func (s *peerRESTServer) Netperf(w http.ResponseWriter, r *http.Request) {
+// NetSpeedTestHandlers - perform network speedtest
+func (s *peerRESTServer) NetSpeedTestHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
return
@@ -1344,9 +1281,6 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodOsInfo).HandlerFunc(httpTraceHdrs(server.GetOSInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDiskHwInfo).HandlerFunc(httpTraceHdrs(server.GetPartitionsHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCPUInfo).HandlerFunc(httpTraceHdrs(server.GetCPUsHandler))
- subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDriveInfo).HandlerFunc(httpTraceHdrs(server.GetDrivePerfInfosHandler))
- subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodNetInfo).HandlerFunc(httpTraceHdrs(server.NetInfoHandler))
- subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDispatchNetInfo).HandlerFunc(httpTraceHdrs(server.DispatchNetInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCycleBloom).HandlerFunc(httpTraceHdrs(server.CycleServerBloomFilterHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetAllBucketStats).HandlerFunc(httpTraceHdrs(server.GetAllBucketStatsHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeleteBucketMetadata).HandlerFunc(httpTraceHdrs(server.DeleteBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...)
@@ -1377,7 +1311,7 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadTransitionTierConfig).HandlerFunc(httpTraceHdrs(server.LoadTransitionTierConfigHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSpeedtest).HandlerFunc(httpTraceHdrs(server.SpeedtestHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDriveSpeedTest).HandlerFunc(httpTraceHdrs(server.DriveSpeedTestHandler))
- subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodNetperf).HandlerFunc(httpTraceHdrs(server.Netperf))
+ subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodNetperf).HandlerFunc(httpTraceHdrs(server.NetSpeedTestHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDevNull).HandlerFunc(httpTraceHdrs(server.DevNull))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadSiteReplicationConfig).HandlerFunc(httpTraceHdrs(server.ReloadSiteReplicationConfigHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadPoolMeta).HandlerFunc(httpTraceHdrs(server.ReloadPoolMetaHandler))
diff --git a/internal/disk/health.go b/internal/disk/health.go
deleted file mode 100644
index 94e321975..000000000
--- a/internal/disk/health.go
+++ /dev/null
@@ -1,145 +0,0 @@
-// Copyright (c) 2015-2021 MinIO, Inc.
-//
-// This file is part of MinIO Object Storage stack
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package disk
-
-import (
- "context"
- "fmt"
- "os"
- "time"
-
- "github.com/dustin/go-humanize"
- "github.com/minio/madmin-go"
- "github.com/montanaflynn/stats"
-)
-
-// GetHealthInfo about the drive
-func GetHealthInfo(ctx context.Context, drive, fsPath string) (madmin.DiskLatency, madmin.DiskThroughput, error) {
- // Create a file with O_DIRECT flag, choose default umask and also make sure
- // we are exclusively writing to a new file using O_EXCL.
- w, err := OpenFileDirectIO(fsPath, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0o666)
- if err != nil {
- return madmin.DiskLatency{}, madmin.DiskThroughput{}, err
- }
-
- defer func() {
- w.Close()
- os.Remove(fsPath)
- }()
-
- blockSize := 4 * humanize.MiByte
- fileSize := 256 * humanize.MiByte
-
- latencies := make([]float64, fileSize/blockSize)
- throughputs := make([]float64, fileSize/blockSize)
-
- data := AlignedBlock(blockSize)
-
- for i := 0; i < (fileSize / blockSize); i++ {
- if ctx.Err() != nil {
- return madmin.DiskLatency{}, madmin.DiskThroughput{}, ctx.Err()
- }
- startTime := time.Now()
- if n, err := w.Write(data); err != nil {
- return madmin.DiskLatency{}, madmin.DiskThroughput{}, err
- } else if n != blockSize {
- return madmin.DiskLatency{}, madmin.DiskThroughput{}, fmt.Errorf("Expected to write %d, but only wrote %d", blockSize, n)
- }
- latencyInSecs := time.Since(startTime).Seconds()
- latencies[i] = latencyInSecs
- }
-
- // Sync every full writes fdatasync
- Fdatasync(w)
-
- for i := range latencies {
- throughput := float64(blockSize) / latencies[i]
- throughputs[i] = throughput
- }
-
- var avgLatency float64
- var percentile50Latency float64
- var percentile90Latency float64
- var percentile99Latency float64
- var minLatency float64
- var maxLatency float64
-
- var avgThroughput float64
- var percentile50Throughput float64
- var percentile90Throughput float64
- var percentile99Throughput float64
- var minThroughput float64
- var maxThroughput float64
-
- if avgLatency, err = stats.Mean(latencies); err != nil {
- return madmin.DiskLatency{}, madmin.DiskThroughput{}, err
- }
- if percentile50Latency, err = stats.Percentile(latencies, 50); err != nil {
- return madmin.DiskLatency{}, madmin.DiskThroughput{}, err
- }
- if percentile90Latency, err = stats.Percentile(latencies, 90); err != nil {
- return madmin.DiskLatency{}, madmin.DiskThroughput{}, err
- }
- if percentile99Latency, err = stats.Percentile(latencies, 99); err != nil {
- return madmin.DiskLatency{}, madmin.DiskThroughput{}, err
- }
- if maxLatency, err = stats.Max(latencies); err != nil {
- return madmin.DiskLatency{}, madmin.DiskThroughput{}, err
- }
- if minLatency, err = stats.Min(latencies); err != nil {
- return madmin.DiskLatency{}, madmin.DiskThroughput{}, err
- }
- l := madmin.DiskLatency{
- Avg: avgLatency,
- Percentile50: percentile50Latency,
- Percentile90: percentile90Latency,
- Percentile99: percentile99Latency,
- Min: minLatency,
- Max: maxLatency,
- }
-
- if avgThroughput, err = stats.Mean(throughputs); err != nil {
- return madmin.DiskLatency{}, madmin.DiskThroughput{}, err
- }
- if percentile50Throughput, err = stats.Percentile(throughputs, 50); err != nil {
- return madmin.DiskLatency{}, madmin.DiskThroughput{}, err
- }
- if percentile90Throughput, err = stats.Percentile(throughputs, 90); err != nil {
- return madmin.DiskLatency{}, madmin.DiskThroughput{}, err
- }
- if percentile99Throughput, err = stats.Percentile(throughputs, 99); err != nil {
- return madmin.DiskLatency{}, madmin.DiskThroughput{}, err
- }
- if maxThroughput, err = stats.Max(throughputs); err != nil {
- return madmin.DiskLatency{}, madmin.DiskThroughput{}, err
- }
- if minThroughput, err = stats.Min(throughputs); err != nil {
- return madmin.DiskLatency{}, madmin.DiskThroughput{}, err
- }
-
- t := madmin.DiskThroughput{
- Avg: avgThroughput,
- Percentile50: percentile50Throughput,
- Percentile90: percentile90Throughput,
- Percentile99: percentile99Throughput,
- Min: minThroughput,
- Max: maxThroughput,
- }
-
- return l, t, nil
-}