diff --git a/cmd/admin-handler-utils.go b/cmd/admin-handler-utils.go
index 3a918a5a7..b0f0d8aa3 100644
--- a/cmd/admin-handler-utils.go
+++ b/cmd/admin-handler-utils.go
@@ -124,6 +124,18 @@ func toAdminAPIErr(ctx context.Context, err error) APIError {
Description: err.Error(),
HTTPStatusCode: http.StatusBadRequest,
}
+ case errors.Is(err, errDecommissionRebalanceAlreadyRunning):
+ apiErr = APIError{
+ Code: "XMinioDecommissionNotAllowed",
+ Description: err.Error(),
+ HTTPStatusCode: http.StatusBadRequest,
+ }
+ case errors.Is(err, errRebalanceDecommissionAlreadyRunning):
+ apiErr = APIError{
+ Code: "XMinioRebalanceNotAllowed",
+ Description: err.Error(),
+ HTTPStatusCode: http.StatusBadRequest,
+ }
case errors.Is(err, errConfigNotFound):
apiErr = APIError{
Code: "XMinioConfigError",
diff --git a/cmd/admin-handlers-pools.go b/cmd/admin-handlers-pools.go
index 50e6cdec4..29b21a50d 100644
--- a/cmd/admin-handlers-pools.go
+++ b/cmd/admin-handlers-pools.go
@@ -19,6 +19,7 @@ package cmd
import (
"encoding/json"
+ "errors"
"fmt"
"net/http"
@@ -27,6 +28,11 @@ import (
iampolicy "github.com/minio/pkg/iam/policy"
)
+var (
+ errRebalanceDecommissionAlreadyRunning = errors.New("Rebalance cannot be started, decommission is aleady in progress")
+ errDecommissionRebalanceAlreadyRunning = errors.New("Decommission cannot be started, rebalance is already in progress")
+)
+
func (a adminAPIHandlers) StartDecommission(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "StartDecommission")
@@ -49,6 +55,11 @@ func (a adminAPIHandlers) StartDecommission(w http.ResponseWriter, r *http.Reque
return
}
+ if pools.IsRebalanceStarted() {
+ writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errDecommissionRebalanceAlreadyRunning), r.URL)
+ return
+ }
+
vars := mux.Vars(r)
v := vars["pool"]
@@ -200,3 +211,137 @@ func (a adminAPIHandlers) ListPools(w http.ResponseWriter, r *http.Request) {
logger.LogIf(r.Context(), json.NewEncoder(w).Encode(poolsStatus))
}
+
+func (a adminAPIHandlers) RebalanceStart(w http.ResponseWriter, r *http.Request) {
+ ctx := newContext(r, w, "RebalanceStart")
+ defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
+
+ objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.RebalanceAdminAction)
+ if objectAPI == nil {
+ return
+ }
+
+ // NB rebalance-start admin API is always coordinated from first pool's
+ // first node. The following is required to serialize (the effects of)
+ // concurrent rebalance-start commands.
+ if ep := globalEndpoints[0].Endpoints[0]; !ep.IsLocal {
+ for nodeIdx, proxyEp := range globalProxyEndpoints {
+ if proxyEp.Endpoint.Host == ep.Host {
+ if proxyRequestByNodeIndex(ctx, w, r, nodeIdx) {
+ return
+ }
+ }
+ }
+ }
+
+ pools, ok := objectAPI.(*erasureServerPools)
+ if !ok || len(pools.serverPools) == 1 {
+ writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
+ return
+ }
+
+ if pools.IsDecommissionRunning() {
+ writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errRebalanceDecommissionAlreadyRunning), r.URL)
+ return
+ }
+
+ if pools.IsRebalanceStarted() {
+ writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminRebalanceAlreadyStarted), r.URL)
+ return
+ }
+
+ bucketInfos, err := objectAPI.ListBuckets(ctx, BucketOptions{})
+ if err != nil {
+ writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
+ return
+ }
+
+ buckets := make([]string, 0, len(bucketInfos))
+ for _, bInfo := range bucketInfos {
+ buckets = append(buckets, bInfo.Name)
+ }
+
+ var id string
+ if id, err = pools.initRebalanceMeta(ctx, buckets); err != nil {
+ writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
+ return
+ }
+
+ // Rebalance routine is run on the first node of any pool participating in rebalance.
+ pools.StartRebalance()
+
+ b, err := json.Marshal(struct {
+ ID string `json:"id"`
+ }{ID: id})
+ if err != nil {
+ writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
+ return
+ }
+
+ writeSuccessResponseJSON(w, b)
+ // Notify peers to load rebalance.bin and start rebalance routine if they happen to be
+ // participating pool's leader node
+ globalNotificationSys.LoadRebalanceMeta(ctx, true)
+}
+
+func (a adminAPIHandlers) RebalanceStatus(w http.ResponseWriter, r *http.Request) {
+ ctx := newContext(r, w, "RebalanceStatus")
+ defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
+
+ objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.RebalanceAdminAction)
+ if objectAPI == nil {
+ return
+ }
+
+ // Proxy rebalance-status to first pool first node, so that users see a
+ // consistent view of rebalance progress even though different rebalancing
+ // pools may temporarily have out of date info on the others.
+ if ep := globalEndpoints[0].Endpoints[0]; !ep.IsLocal {
+ for nodeIdx, proxyEp := range globalProxyEndpoints {
+ if proxyEp.Endpoint.Host == ep.Host {
+ if proxyRequestByNodeIndex(ctx, w, r, nodeIdx) {
+ return
+ }
+ }
+ }
+ }
+
+ pools, ok := objectAPI.(*erasureServerPools)
+ if !ok {
+ writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
+ return
+ }
+
+ rs, err := rebalanceStatus(ctx, pools)
+ if err != nil {
+ if errors.Is(err, errRebalanceNotStarted) {
+ writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminRebalanceNotStarted), r.URL)
+ return
+ }
+ logger.LogIf(ctx, fmt.Errorf("failed to fetch rebalance status: %w", err))
+ writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
+ return
+ }
+ logger.LogIf(r.Context(), json.NewEncoder(w).Encode(rs))
+}
+
+func (a adminAPIHandlers) RebalanceStop(w http.ResponseWriter, r *http.Request) {
+ ctx := newContext(r, w, "RebalanceStop")
+ defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
+
+ objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.RebalanceAdminAction)
+ if objectAPI == nil {
+ return
+ }
+
+ pools, ok := objectAPI.(*erasureServerPools)
+ if !ok {
+ writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
+ return
+ }
+
+ // Cancel any ongoing rebalance operation
+ globalNotificationSys.StopRebalance(r.Context())
+ writeSuccessResponseHeadersOnly(w)
+ logger.LogIf(ctx, pools.saveRebalanceStats(GlobalContext, 0, rebalSaveStoppedAt))
+}
diff --git a/cmd/admin-router.go b/cmd/admin-router.go
index ad28454cc..bd669cede 100644
--- a/cmd/admin-router.go
+++ b/cmd/admin-router.go
@@ -84,6 +84,11 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) {
adminRouter.Methods(http.MethodPost).Path(adminVersion+"/pools/decommission").HandlerFunc(gz(httpTraceAll(adminAPI.StartDecommission))).Queries("pool", "{pool:.*}")
adminRouter.Methods(http.MethodPost).Path(adminVersion+"/pools/cancel").HandlerFunc(gz(httpTraceAll(adminAPI.CancelDecommission))).Queries("pool", "{pool:.*}")
+
+ // Rebalance operations
+ adminRouter.Methods(http.MethodPost).Path(adminVersion + "/rebalance/start").HandlerFunc(gz(httpTraceAll(adminAPI.RebalanceStart)))
+ adminRouter.Methods(http.MethodGet).Path(adminVersion + "/rebalance/status").HandlerFunc(gz(httpTraceAll(adminAPI.RebalanceStatus)))
+ adminRouter.Methods(http.MethodPost).Path(adminVersion + "/rebalance/stop").HandlerFunc(gz(httpTraceAll(adminAPI.RebalanceStop)))
}
// Profiling operations - deprecated API
diff --git a/cmd/api-errors.go b/cmd/api-errors.go
index f7557e981..c24e72d6d 100644
--- a/cmd/api-errors.go
+++ b/cmd/api-errors.go
@@ -291,6 +291,10 @@ const (
ErrSiteReplicationIAMError
ErrSiteReplicationConfigMissing
+ // Pool rebalance errors
+ ErrAdminRebalanceAlreadyStarted
+ ErrAdminRebalanceNotStarted
+
// Bucket Quota error codes
ErrAdminBucketQuotaExceeded
ErrAdminNoSuchQuotaConfiguration
@@ -1397,6 +1401,16 @@ var errorCodes = errorCodeMap{
Description: "Site not found in site replication configuration",
HTTPStatusCode: http.StatusBadRequest,
},
+ ErrAdminRebalanceAlreadyStarted: {
+ Code: "XMinioAdminRebalanceAlreadyStarted",
+ Description: "Pool rebalance is already started",
+ HTTPStatusCode: http.StatusConflict,
+ },
+ ErrAdminRebalanceNotStarted: {
+ Code: "XMinioAdminRebalanceNotStarted",
+ Description: "Pool rebalance is not started",
+ HTTPStatusCode: http.StatusNotFound,
+ },
ErrMaximumExpires: {
Code: "AuthorizationQueryParametersError",
Description: "X-Amz-Expires must be less than a week (in seconds); that is, the given X-Amz-Expires must be less than 604800 seconds",
diff --git a/cmd/apierrorcode_string.go b/cmd/apierrorcode_string.go
index 1080f5c6a..6dff7fa8f 100644
--- a/cmd/apierrorcode_string.go
+++ b/cmd/apierrorcode_string.go
@@ -203,114 +203,116 @@ func _() {
_ = x[ErrSiteReplicationBucketMetaError-192]
_ = x[ErrSiteReplicationIAMError-193]
_ = x[ErrSiteReplicationConfigMissing-194]
- _ = x[ErrAdminBucketQuotaExceeded-195]
- _ = x[ErrAdminNoSuchQuotaConfiguration-196]
- _ = x[ErrHealNotImplemented-197]
- _ = x[ErrHealNoSuchProcess-198]
- _ = x[ErrHealInvalidClientToken-199]
- _ = x[ErrHealMissingBucket-200]
- _ = x[ErrHealAlreadyRunning-201]
- _ = x[ErrHealOverlappingPaths-202]
- _ = x[ErrIncorrectContinuationToken-203]
- _ = x[ErrEmptyRequestBody-204]
- _ = x[ErrUnsupportedFunction-205]
- _ = x[ErrInvalidExpressionType-206]
- _ = x[ErrBusy-207]
- _ = x[ErrUnauthorizedAccess-208]
- _ = x[ErrExpressionTooLong-209]
- _ = x[ErrIllegalSQLFunctionArgument-210]
- _ = x[ErrInvalidKeyPath-211]
- _ = x[ErrInvalidCompressionFormat-212]
- _ = x[ErrInvalidFileHeaderInfo-213]
- _ = x[ErrInvalidJSONType-214]
- _ = x[ErrInvalidQuoteFields-215]
- _ = x[ErrInvalidRequestParameter-216]
- _ = x[ErrInvalidDataType-217]
- _ = x[ErrInvalidTextEncoding-218]
- _ = x[ErrInvalidDataSource-219]
- _ = x[ErrInvalidTableAlias-220]
- _ = x[ErrMissingRequiredParameter-221]
- _ = x[ErrObjectSerializationConflict-222]
- _ = x[ErrUnsupportedSQLOperation-223]
- _ = x[ErrUnsupportedSQLStructure-224]
- _ = x[ErrUnsupportedSyntax-225]
- _ = x[ErrUnsupportedRangeHeader-226]
- _ = x[ErrLexerInvalidChar-227]
- _ = x[ErrLexerInvalidOperator-228]
- _ = x[ErrLexerInvalidLiteral-229]
- _ = x[ErrLexerInvalidIONLiteral-230]
- _ = x[ErrParseExpectedDatePart-231]
- _ = x[ErrParseExpectedKeyword-232]
- _ = x[ErrParseExpectedTokenType-233]
- _ = x[ErrParseExpected2TokenTypes-234]
- _ = x[ErrParseExpectedNumber-235]
- _ = x[ErrParseExpectedRightParenBuiltinFunctionCall-236]
- _ = x[ErrParseExpectedTypeName-237]
- _ = x[ErrParseExpectedWhenClause-238]
- _ = x[ErrParseUnsupportedToken-239]
- _ = x[ErrParseUnsupportedLiteralsGroupBy-240]
- _ = x[ErrParseExpectedMember-241]
- _ = x[ErrParseUnsupportedSelect-242]
- _ = x[ErrParseUnsupportedCase-243]
- _ = x[ErrParseUnsupportedCaseClause-244]
- _ = x[ErrParseUnsupportedAlias-245]
- _ = x[ErrParseUnsupportedSyntax-246]
- _ = x[ErrParseUnknownOperator-247]
- _ = x[ErrParseMissingIdentAfterAt-248]
- _ = x[ErrParseUnexpectedOperator-249]
- _ = x[ErrParseUnexpectedTerm-250]
- _ = x[ErrParseUnexpectedToken-251]
- _ = x[ErrParseUnexpectedKeyword-252]
- _ = x[ErrParseExpectedExpression-253]
- _ = x[ErrParseExpectedLeftParenAfterCast-254]
- _ = x[ErrParseExpectedLeftParenValueConstructor-255]
- _ = x[ErrParseExpectedLeftParenBuiltinFunctionCall-256]
- _ = x[ErrParseExpectedArgumentDelimiter-257]
- _ = x[ErrParseCastArity-258]
- _ = x[ErrParseInvalidTypeParam-259]
- _ = x[ErrParseEmptySelect-260]
- _ = x[ErrParseSelectMissingFrom-261]
- _ = x[ErrParseExpectedIdentForGroupName-262]
- _ = x[ErrParseExpectedIdentForAlias-263]
- _ = x[ErrParseUnsupportedCallWithStar-264]
- _ = x[ErrParseNonUnaryAgregateFunctionCall-265]
- _ = x[ErrParseMalformedJoin-266]
- _ = x[ErrParseExpectedIdentForAt-267]
- _ = x[ErrParseAsteriskIsNotAloneInSelectList-268]
- _ = x[ErrParseCannotMixSqbAndWildcardInSelectList-269]
- _ = x[ErrParseInvalidContextForWildcardInSelectList-270]
- _ = x[ErrIncorrectSQLFunctionArgumentType-271]
- _ = x[ErrValueParseFailure-272]
- _ = x[ErrEvaluatorInvalidArguments-273]
- _ = x[ErrIntegerOverflow-274]
- _ = x[ErrLikeInvalidInputs-275]
- _ = x[ErrCastFailed-276]
- _ = x[ErrInvalidCast-277]
- _ = x[ErrEvaluatorInvalidTimestampFormatPattern-278]
- _ = x[ErrEvaluatorInvalidTimestampFormatPatternSymbolForParsing-279]
- _ = x[ErrEvaluatorTimestampFormatPatternDuplicateFields-280]
- _ = x[ErrEvaluatorTimestampFormatPatternHourClockAmPmMismatch-281]
- _ = x[ErrEvaluatorUnterminatedTimestampFormatPatternToken-282]
- _ = x[ErrEvaluatorInvalidTimestampFormatPatternToken-283]
- _ = x[ErrEvaluatorInvalidTimestampFormatPatternSymbol-284]
- _ = x[ErrEvaluatorBindingDoesNotExist-285]
- _ = x[ErrMissingHeaders-286]
- _ = x[ErrInvalidColumnIndex-287]
- _ = x[ErrAdminConfigNotificationTargetsFailed-288]
- _ = x[ErrAdminProfilerNotEnabled-289]
- _ = x[ErrInvalidDecompressedSize-290]
- _ = x[ErrAddUserInvalidArgument-291]
- _ = x[ErrAdminResourceInvalidArgument-292]
- _ = x[ErrAdminAccountNotEligible-293]
- _ = x[ErrAccountNotEligible-294]
- _ = x[ErrAdminServiceAccountNotFound-295]
- _ = x[ErrPostPolicyConditionInvalidFormat-296]
- _ = x[ErrInvalidChecksum-297]
+ _ = x[ErrAdminRebalanceAlreadyStarted-195]
+ _ = x[ErrAdminRebalanceNotStarted-196]
+ _ = x[ErrAdminBucketQuotaExceeded-197]
+ _ = x[ErrAdminNoSuchQuotaConfiguration-198]
+ _ = x[ErrHealNotImplemented-199]
+ _ = x[ErrHealNoSuchProcess-200]
+ _ = x[ErrHealInvalidClientToken-201]
+ _ = x[ErrHealMissingBucket-202]
+ _ = x[ErrHealAlreadyRunning-203]
+ _ = x[ErrHealOverlappingPaths-204]
+ _ = x[ErrIncorrectContinuationToken-205]
+ _ = x[ErrEmptyRequestBody-206]
+ _ = x[ErrUnsupportedFunction-207]
+ _ = x[ErrInvalidExpressionType-208]
+ _ = x[ErrBusy-209]
+ _ = x[ErrUnauthorizedAccess-210]
+ _ = x[ErrExpressionTooLong-211]
+ _ = x[ErrIllegalSQLFunctionArgument-212]
+ _ = x[ErrInvalidKeyPath-213]
+ _ = x[ErrInvalidCompressionFormat-214]
+ _ = x[ErrInvalidFileHeaderInfo-215]
+ _ = x[ErrInvalidJSONType-216]
+ _ = x[ErrInvalidQuoteFields-217]
+ _ = x[ErrInvalidRequestParameter-218]
+ _ = x[ErrInvalidDataType-219]
+ _ = x[ErrInvalidTextEncoding-220]
+ _ = x[ErrInvalidDataSource-221]
+ _ = x[ErrInvalidTableAlias-222]
+ _ = x[ErrMissingRequiredParameter-223]
+ _ = x[ErrObjectSerializationConflict-224]
+ _ = x[ErrUnsupportedSQLOperation-225]
+ _ = x[ErrUnsupportedSQLStructure-226]
+ _ = x[ErrUnsupportedSyntax-227]
+ _ = x[ErrUnsupportedRangeHeader-228]
+ _ = x[ErrLexerInvalidChar-229]
+ _ = x[ErrLexerInvalidOperator-230]
+ _ = x[ErrLexerInvalidLiteral-231]
+ _ = x[ErrLexerInvalidIONLiteral-232]
+ _ = x[ErrParseExpectedDatePart-233]
+ _ = x[ErrParseExpectedKeyword-234]
+ _ = x[ErrParseExpectedTokenType-235]
+ _ = x[ErrParseExpected2TokenTypes-236]
+ _ = x[ErrParseExpectedNumber-237]
+ _ = x[ErrParseExpectedRightParenBuiltinFunctionCall-238]
+ _ = x[ErrParseExpectedTypeName-239]
+ _ = x[ErrParseExpectedWhenClause-240]
+ _ = x[ErrParseUnsupportedToken-241]
+ _ = x[ErrParseUnsupportedLiteralsGroupBy-242]
+ _ = x[ErrParseExpectedMember-243]
+ _ = x[ErrParseUnsupportedSelect-244]
+ _ = x[ErrParseUnsupportedCase-245]
+ _ = x[ErrParseUnsupportedCaseClause-246]
+ _ = x[ErrParseUnsupportedAlias-247]
+ _ = x[ErrParseUnsupportedSyntax-248]
+ _ = x[ErrParseUnknownOperator-249]
+ _ = x[ErrParseMissingIdentAfterAt-250]
+ _ = x[ErrParseUnexpectedOperator-251]
+ _ = x[ErrParseUnexpectedTerm-252]
+ _ = x[ErrParseUnexpectedToken-253]
+ _ = x[ErrParseUnexpectedKeyword-254]
+ _ = x[ErrParseExpectedExpression-255]
+ _ = x[ErrParseExpectedLeftParenAfterCast-256]
+ _ = x[ErrParseExpectedLeftParenValueConstructor-257]
+ _ = x[ErrParseExpectedLeftParenBuiltinFunctionCall-258]
+ _ = x[ErrParseExpectedArgumentDelimiter-259]
+ _ = x[ErrParseCastArity-260]
+ _ = x[ErrParseInvalidTypeParam-261]
+ _ = x[ErrParseEmptySelect-262]
+ _ = x[ErrParseSelectMissingFrom-263]
+ _ = x[ErrParseExpectedIdentForGroupName-264]
+ _ = x[ErrParseExpectedIdentForAlias-265]
+ _ = x[ErrParseUnsupportedCallWithStar-266]
+ _ = x[ErrParseNonUnaryAgregateFunctionCall-267]
+ _ = x[ErrParseMalformedJoin-268]
+ _ = x[ErrParseExpectedIdentForAt-269]
+ _ = x[ErrParseAsteriskIsNotAloneInSelectList-270]
+ _ = x[ErrParseCannotMixSqbAndWildcardInSelectList-271]
+ _ = x[ErrParseInvalidContextForWildcardInSelectList-272]
+ _ = x[ErrIncorrectSQLFunctionArgumentType-273]
+ _ = x[ErrValueParseFailure-274]
+ _ = x[ErrEvaluatorInvalidArguments-275]
+ _ = x[ErrIntegerOverflow-276]
+ _ = x[ErrLikeInvalidInputs-277]
+ _ = x[ErrCastFailed-278]
+ _ = x[ErrInvalidCast-279]
+ _ = x[ErrEvaluatorInvalidTimestampFormatPattern-280]
+ _ = x[ErrEvaluatorInvalidTimestampFormatPatternSymbolForParsing-281]
+ _ = x[ErrEvaluatorTimestampFormatPatternDuplicateFields-282]
+ _ = x[ErrEvaluatorTimestampFormatPatternHourClockAmPmMismatch-283]
+ _ = x[ErrEvaluatorUnterminatedTimestampFormatPatternToken-284]
+ _ = x[ErrEvaluatorInvalidTimestampFormatPatternToken-285]
+ _ = x[ErrEvaluatorInvalidTimestampFormatPatternSymbol-286]
+ _ = x[ErrEvaluatorBindingDoesNotExist-287]
+ _ = x[ErrMissingHeaders-288]
+ _ = x[ErrInvalidColumnIndex-289]
+ _ = x[ErrAdminConfigNotificationTargetsFailed-290]
+ _ = x[ErrAdminProfilerNotEnabled-291]
+ _ = x[ErrInvalidDecompressedSize-292]
+ _ = x[ErrAddUserInvalidArgument-293]
+ _ = x[ErrAdminResourceInvalidArgument-294]
+ _ = x[ErrAdminAccountNotEligible-295]
+ _ = x[ErrAccountNotEligible-296]
+ _ = x[ErrAdminServiceAccountNotFound-297]
+ _ = x[ErrPostPolicyConditionInvalidFormat-298]
+ _ = x[ErrInvalidChecksum-299]
}
-const _APIErrorCode_name = "NoneAccessDeniedBadDigestEntityTooSmallEntityTooLargePolicyTooLargeIncompleteBodyInternalErrorInvalidAccessKeyIDAccessKeyDisabledInvalidBucketNameInvalidDigestInvalidRangeInvalidRangePartNumberInvalidCopyPartRangeInvalidCopyPartRangeSourceInvalidMaxKeysInvalidEncodingMethodInvalidMaxUploadsInvalidMaxPartsInvalidPartNumberMarkerInvalidPartNumberInvalidRequestBodyInvalidCopySourceInvalidMetadataDirectiveInvalidCopyDestInvalidPolicyDocumentInvalidObjectStateMalformedXMLMissingContentLengthMissingContentMD5MissingRequestBodyErrorMissingSecurityHeaderNoSuchBucketNoSuchBucketPolicyNoSuchBucketLifecycleNoSuchLifecycleConfigurationInvalidLifecycleWithObjectLockNoSuchBucketSSEConfigNoSuchCORSConfigurationNoSuchWebsiteConfigurationReplicationConfigurationNotFoundErrorRemoteDestinationNotFoundErrorReplicationDestinationMissingLockRemoteTargetNotFoundErrorReplicationRemoteConnectionErrorReplicationBandwidthLimitErrorBucketRemoteIdenticalToSourceBucketRemoteAlreadyExistsBucketRemoteLabelInUseBucketRemoteArnTypeInvalidBucketRemoteArnInvalidBucketRemoteRemoveDisallowedRemoteTargetNotVersionedErrorReplicationSourceNotVersionedErrorReplicationNeedsVersioningErrorReplicationBucketNeedsVersioningErrorReplicationDenyEditErrorReplicationNoExistingObjectsObjectRestoreAlreadyInProgressNoSuchKeyNoSuchUploadInvalidVersionIDNoSuchVersionNotImplementedPreconditionFailedRequestTimeTooSkewedSignatureDoesNotMatchMethodNotAllowedInvalidPartInvalidPartOrderAuthorizationHeaderMalformedMalformedPOSTRequestPOSTFileRequiredSignatureVersionNotSupportedBucketNotEmptyAllAccessDisabledMalformedPolicyMissingFieldsMissingCredTagCredMalformedInvalidRegionInvalidServiceS3InvalidServiceSTSInvalidRequestVersionMissingSignTagMissingSignHeadersTagMalformedDateMalformedPresignedDateMalformedCredentialDateMalformedCredentialRegionMalformedExpiresNegativeExpiresAuthHeaderEmptyExpiredPresignRequestRequestNotReadyYetUnsignedHeadersMissingDateHeaderInvalidQuerySignatureAlgoInvalidQueryParamsBucketAlreadyOwnedByYouInvalidDurationBucketAlreadyExistsTooManyBucketsMetadataTooLargeUnsupportedMetadataMaximumExpiresSlowDownInvalidPrefixMarkerBadRequestKeyTooLongErrorInvalidBucketObjectLockConfigurationObjectLockConfigurationNotFoundObjectLockConfigurationNotAllowedNoSuchObjectLockConfigurationObjectLockedInvalidRetentionDatePastObjectLockRetainDateUnknownWORMModeDirectiveBucketTaggingNotFoundObjectLockInvalidHeadersInvalidTagDirectiveInvalidEncryptionMethodInvalidEncryptionKeyIDInsecureSSECustomerRequestSSEMultipartEncryptedSSEEncryptedObjectInvalidEncryptionParametersInvalidSSECustomerAlgorithmInvalidSSECustomerKeyMissingSSECustomerKeyMissingSSECustomerKeyMD5SSECustomerKeyMD5MismatchInvalidSSECustomerParametersIncompatibleEncryptionMethodKMSNotConfiguredKMSKeyNotFoundExceptionNoAccessKeyInvalidTokenEventNotificationARNNotificationRegionNotificationOverlappingFilterNotificationFilterNameInvalidFilterNamePrefixFilterNameSuffixFilterValueInvalidOverlappingConfigsUnsupportedNotificationContentSHA256MismatchContentChecksumMismatchReadQuorumWriteQuorumStorageFullRequestBodyParseObjectExistsAsDirectoryInvalidObjectNameInvalidObjectNamePrefixSlashInvalidResourceNameServerNotInitializedOperationTimedOutClientDisconnectedOperationMaxedOutInvalidRequestTransitionStorageClassNotFoundErrorInvalidStorageClassBackendDownMalformedJSONAdminNoSuchUserAdminNoSuchGroupAdminGroupNotEmptyAdminNoSuchJobAdminNoSuchPolicyAdminInvalidArgumentAdminInvalidAccessKeyAdminInvalidSecretKeyAdminConfigNoQuorumAdminConfigTooLargeAdminConfigBadJSONAdminNoSuchConfigTargetAdminConfigEnvOverriddenAdminConfigDuplicateKeysAdminConfigInvalidIDPTypeAdminConfigLDAPValidationAdminCredentialsMismatchInsecureClientRequestObjectTamperedSiteReplicationInvalidRequestSiteReplicationPeerRespSiteReplicationBackendIssueSiteReplicationServiceAccountErrorSiteReplicationBucketConfigErrorSiteReplicationBucketMetaErrorSiteReplicationIAMErrorSiteReplicationConfigMissingAdminBucketQuotaExceededAdminNoSuchQuotaConfigurationHealNotImplementedHealNoSuchProcessHealInvalidClientTokenHealMissingBucketHealAlreadyRunningHealOverlappingPathsIncorrectContinuationTokenEmptyRequestBodyUnsupportedFunctionInvalidExpressionTypeBusyUnauthorizedAccessExpressionTooLongIllegalSQLFunctionArgumentInvalidKeyPathInvalidCompressionFormatInvalidFileHeaderInfoInvalidJSONTypeInvalidQuoteFieldsInvalidRequestParameterInvalidDataTypeInvalidTextEncodingInvalidDataSourceInvalidTableAliasMissingRequiredParameterObjectSerializationConflictUnsupportedSQLOperationUnsupportedSQLStructureUnsupportedSyntaxUnsupportedRangeHeaderLexerInvalidCharLexerInvalidOperatorLexerInvalidLiteralLexerInvalidIONLiteralParseExpectedDatePartParseExpectedKeywordParseExpectedTokenTypeParseExpected2TokenTypesParseExpectedNumberParseExpectedRightParenBuiltinFunctionCallParseExpectedTypeNameParseExpectedWhenClauseParseUnsupportedTokenParseUnsupportedLiteralsGroupByParseExpectedMemberParseUnsupportedSelectParseUnsupportedCaseParseUnsupportedCaseClauseParseUnsupportedAliasParseUnsupportedSyntaxParseUnknownOperatorParseMissingIdentAfterAtParseUnexpectedOperatorParseUnexpectedTermParseUnexpectedTokenParseUnexpectedKeywordParseExpectedExpressionParseExpectedLeftParenAfterCastParseExpectedLeftParenValueConstructorParseExpectedLeftParenBuiltinFunctionCallParseExpectedArgumentDelimiterParseCastArityParseInvalidTypeParamParseEmptySelectParseSelectMissingFromParseExpectedIdentForGroupNameParseExpectedIdentForAliasParseUnsupportedCallWithStarParseNonUnaryAgregateFunctionCallParseMalformedJoinParseExpectedIdentForAtParseAsteriskIsNotAloneInSelectListParseCannotMixSqbAndWildcardInSelectListParseInvalidContextForWildcardInSelectListIncorrectSQLFunctionArgumentTypeValueParseFailureEvaluatorInvalidArgumentsIntegerOverflowLikeInvalidInputsCastFailedInvalidCastEvaluatorInvalidTimestampFormatPatternEvaluatorInvalidTimestampFormatPatternSymbolForParsingEvaluatorTimestampFormatPatternDuplicateFieldsEvaluatorTimestampFormatPatternHourClockAmPmMismatchEvaluatorUnterminatedTimestampFormatPatternTokenEvaluatorInvalidTimestampFormatPatternTokenEvaluatorInvalidTimestampFormatPatternSymbolEvaluatorBindingDoesNotExistMissingHeadersInvalidColumnIndexAdminConfigNotificationTargetsFailedAdminProfilerNotEnabledInvalidDecompressedSizeAddUserInvalidArgumentAdminResourceInvalidArgumentAdminAccountNotEligibleAccountNotEligibleAdminServiceAccountNotFoundPostPolicyConditionInvalidFormatInvalidChecksum"
+const _APIErrorCode_name = "NoneAccessDeniedBadDigestEntityTooSmallEntityTooLargePolicyTooLargeIncompleteBodyInternalErrorInvalidAccessKeyIDAccessKeyDisabledInvalidBucketNameInvalidDigestInvalidRangeInvalidRangePartNumberInvalidCopyPartRangeInvalidCopyPartRangeSourceInvalidMaxKeysInvalidEncodingMethodInvalidMaxUploadsInvalidMaxPartsInvalidPartNumberMarkerInvalidPartNumberInvalidRequestBodyInvalidCopySourceInvalidMetadataDirectiveInvalidCopyDestInvalidPolicyDocumentInvalidObjectStateMalformedXMLMissingContentLengthMissingContentMD5MissingRequestBodyErrorMissingSecurityHeaderNoSuchBucketNoSuchBucketPolicyNoSuchBucketLifecycleNoSuchLifecycleConfigurationInvalidLifecycleWithObjectLockNoSuchBucketSSEConfigNoSuchCORSConfigurationNoSuchWebsiteConfigurationReplicationConfigurationNotFoundErrorRemoteDestinationNotFoundErrorReplicationDestinationMissingLockRemoteTargetNotFoundErrorReplicationRemoteConnectionErrorReplicationBandwidthLimitErrorBucketRemoteIdenticalToSourceBucketRemoteAlreadyExistsBucketRemoteLabelInUseBucketRemoteArnTypeInvalidBucketRemoteArnInvalidBucketRemoteRemoveDisallowedRemoteTargetNotVersionedErrorReplicationSourceNotVersionedErrorReplicationNeedsVersioningErrorReplicationBucketNeedsVersioningErrorReplicationDenyEditErrorReplicationNoExistingObjectsObjectRestoreAlreadyInProgressNoSuchKeyNoSuchUploadInvalidVersionIDNoSuchVersionNotImplementedPreconditionFailedRequestTimeTooSkewedSignatureDoesNotMatchMethodNotAllowedInvalidPartInvalidPartOrderAuthorizationHeaderMalformedMalformedPOSTRequestPOSTFileRequiredSignatureVersionNotSupportedBucketNotEmptyAllAccessDisabledMalformedPolicyMissingFieldsMissingCredTagCredMalformedInvalidRegionInvalidServiceS3InvalidServiceSTSInvalidRequestVersionMissingSignTagMissingSignHeadersTagMalformedDateMalformedPresignedDateMalformedCredentialDateMalformedCredentialRegionMalformedExpiresNegativeExpiresAuthHeaderEmptyExpiredPresignRequestRequestNotReadyYetUnsignedHeadersMissingDateHeaderInvalidQuerySignatureAlgoInvalidQueryParamsBucketAlreadyOwnedByYouInvalidDurationBucketAlreadyExistsTooManyBucketsMetadataTooLargeUnsupportedMetadataMaximumExpiresSlowDownInvalidPrefixMarkerBadRequestKeyTooLongErrorInvalidBucketObjectLockConfigurationObjectLockConfigurationNotFoundObjectLockConfigurationNotAllowedNoSuchObjectLockConfigurationObjectLockedInvalidRetentionDatePastObjectLockRetainDateUnknownWORMModeDirectiveBucketTaggingNotFoundObjectLockInvalidHeadersInvalidTagDirectiveInvalidEncryptionMethodInvalidEncryptionKeyIDInsecureSSECustomerRequestSSEMultipartEncryptedSSEEncryptedObjectInvalidEncryptionParametersInvalidSSECustomerAlgorithmInvalidSSECustomerKeyMissingSSECustomerKeyMissingSSECustomerKeyMD5SSECustomerKeyMD5MismatchInvalidSSECustomerParametersIncompatibleEncryptionMethodKMSNotConfiguredKMSKeyNotFoundExceptionNoAccessKeyInvalidTokenEventNotificationARNNotificationRegionNotificationOverlappingFilterNotificationFilterNameInvalidFilterNamePrefixFilterNameSuffixFilterValueInvalidOverlappingConfigsUnsupportedNotificationContentSHA256MismatchContentChecksumMismatchReadQuorumWriteQuorumStorageFullRequestBodyParseObjectExistsAsDirectoryInvalidObjectNameInvalidObjectNamePrefixSlashInvalidResourceNameServerNotInitializedOperationTimedOutClientDisconnectedOperationMaxedOutInvalidRequestTransitionStorageClassNotFoundErrorInvalidStorageClassBackendDownMalformedJSONAdminNoSuchUserAdminNoSuchGroupAdminGroupNotEmptyAdminNoSuchJobAdminNoSuchPolicyAdminInvalidArgumentAdminInvalidAccessKeyAdminInvalidSecretKeyAdminConfigNoQuorumAdminConfigTooLargeAdminConfigBadJSONAdminNoSuchConfigTargetAdminConfigEnvOverriddenAdminConfigDuplicateKeysAdminConfigInvalidIDPTypeAdminConfigLDAPValidationAdminCredentialsMismatchInsecureClientRequestObjectTamperedSiteReplicationInvalidRequestSiteReplicationPeerRespSiteReplicationBackendIssueSiteReplicationServiceAccountErrorSiteReplicationBucketConfigErrorSiteReplicationBucketMetaErrorSiteReplicationIAMErrorSiteReplicationConfigMissingAdminRebalanceAlreadyStartedAdminRebalanceNotStartedAdminBucketQuotaExceededAdminNoSuchQuotaConfigurationHealNotImplementedHealNoSuchProcessHealInvalidClientTokenHealMissingBucketHealAlreadyRunningHealOverlappingPathsIncorrectContinuationTokenEmptyRequestBodyUnsupportedFunctionInvalidExpressionTypeBusyUnauthorizedAccessExpressionTooLongIllegalSQLFunctionArgumentInvalidKeyPathInvalidCompressionFormatInvalidFileHeaderInfoInvalidJSONTypeInvalidQuoteFieldsInvalidRequestParameterInvalidDataTypeInvalidTextEncodingInvalidDataSourceInvalidTableAliasMissingRequiredParameterObjectSerializationConflictUnsupportedSQLOperationUnsupportedSQLStructureUnsupportedSyntaxUnsupportedRangeHeaderLexerInvalidCharLexerInvalidOperatorLexerInvalidLiteralLexerInvalidIONLiteralParseExpectedDatePartParseExpectedKeywordParseExpectedTokenTypeParseExpected2TokenTypesParseExpectedNumberParseExpectedRightParenBuiltinFunctionCallParseExpectedTypeNameParseExpectedWhenClauseParseUnsupportedTokenParseUnsupportedLiteralsGroupByParseExpectedMemberParseUnsupportedSelectParseUnsupportedCaseParseUnsupportedCaseClauseParseUnsupportedAliasParseUnsupportedSyntaxParseUnknownOperatorParseMissingIdentAfterAtParseUnexpectedOperatorParseUnexpectedTermParseUnexpectedTokenParseUnexpectedKeywordParseExpectedExpressionParseExpectedLeftParenAfterCastParseExpectedLeftParenValueConstructorParseExpectedLeftParenBuiltinFunctionCallParseExpectedArgumentDelimiterParseCastArityParseInvalidTypeParamParseEmptySelectParseSelectMissingFromParseExpectedIdentForGroupNameParseExpectedIdentForAliasParseUnsupportedCallWithStarParseNonUnaryAgregateFunctionCallParseMalformedJoinParseExpectedIdentForAtParseAsteriskIsNotAloneInSelectListParseCannotMixSqbAndWildcardInSelectListParseInvalidContextForWildcardInSelectListIncorrectSQLFunctionArgumentTypeValueParseFailureEvaluatorInvalidArgumentsIntegerOverflowLikeInvalidInputsCastFailedInvalidCastEvaluatorInvalidTimestampFormatPatternEvaluatorInvalidTimestampFormatPatternSymbolForParsingEvaluatorTimestampFormatPatternDuplicateFieldsEvaluatorTimestampFormatPatternHourClockAmPmMismatchEvaluatorUnterminatedTimestampFormatPatternTokenEvaluatorInvalidTimestampFormatPatternTokenEvaluatorInvalidTimestampFormatPatternSymbolEvaluatorBindingDoesNotExistMissingHeadersInvalidColumnIndexAdminConfigNotificationTargetsFailedAdminProfilerNotEnabledInvalidDecompressedSizeAddUserInvalidArgumentAdminResourceInvalidArgumentAdminAccountNotEligibleAccountNotEligibleAdminServiceAccountNotFoundPostPolicyConditionInvalidFormatInvalidChecksum"
-var _APIErrorCode_index = [...]uint16{0, 4, 16, 25, 39, 53, 67, 81, 94, 112, 129, 146, 159, 171, 193, 213, 239, 253, 274, 291, 306, 329, 346, 364, 381, 405, 420, 441, 459, 471, 491, 508, 531, 552, 564, 582, 603, 631, 661, 682, 705, 731, 768, 798, 831, 856, 888, 918, 947, 972, 994, 1020, 1042, 1070, 1099, 1133, 1164, 1201, 1225, 1253, 1283, 1292, 1304, 1320, 1333, 1347, 1365, 1385, 1406, 1422, 1433, 1449, 1477, 1497, 1513, 1541, 1555, 1572, 1587, 1600, 1614, 1627, 1640, 1656, 1673, 1694, 1708, 1729, 1742, 1764, 1787, 1812, 1828, 1843, 1858, 1879, 1897, 1912, 1929, 1954, 1972, 1995, 2010, 2029, 2043, 2059, 2078, 2092, 2100, 2119, 2129, 2144, 2180, 2211, 2244, 2273, 2285, 2305, 2329, 2353, 2374, 2398, 2417, 2440, 2462, 2488, 2509, 2527, 2554, 2581, 2602, 2623, 2647, 2672, 2700, 2728, 2744, 2767, 2778, 2790, 2807, 2822, 2840, 2869, 2886, 2902, 2918, 2936, 2954, 2977, 2998, 3021, 3031, 3042, 3053, 3069, 3092, 3109, 3137, 3156, 3176, 3193, 3211, 3228, 3242, 3277, 3296, 3307, 3320, 3335, 3351, 3369, 3383, 3400, 3420, 3441, 3462, 3481, 3500, 3518, 3541, 3565, 3589, 3614, 3639, 3663, 3684, 3698, 3727, 3750, 3777, 3811, 3843, 3873, 3896, 3924, 3948, 3977, 3995, 4012, 4034, 4051, 4069, 4089, 4115, 4131, 4150, 4171, 4175, 4193, 4210, 4236, 4250, 4274, 4295, 4310, 4328, 4351, 4366, 4385, 4402, 4419, 4443, 4470, 4493, 4516, 4533, 4555, 4571, 4591, 4610, 4632, 4653, 4673, 4695, 4719, 4738, 4780, 4801, 4824, 4845, 4876, 4895, 4917, 4937, 4963, 4984, 5006, 5026, 5050, 5073, 5092, 5112, 5134, 5157, 5188, 5226, 5267, 5297, 5311, 5332, 5348, 5370, 5400, 5426, 5454, 5487, 5505, 5528, 5563, 5603, 5645, 5677, 5694, 5719, 5734, 5751, 5761, 5772, 5810, 5864, 5910, 5962, 6010, 6053, 6097, 6125, 6139, 6157, 6193, 6216, 6239, 6261, 6289, 6312, 6330, 6357, 6389, 6404}
+var _APIErrorCode_index = [...]uint16{0, 4, 16, 25, 39, 53, 67, 81, 94, 112, 129, 146, 159, 171, 193, 213, 239, 253, 274, 291, 306, 329, 346, 364, 381, 405, 420, 441, 459, 471, 491, 508, 531, 552, 564, 582, 603, 631, 661, 682, 705, 731, 768, 798, 831, 856, 888, 918, 947, 972, 994, 1020, 1042, 1070, 1099, 1133, 1164, 1201, 1225, 1253, 1283, 1292, 1304, 1320, 1333, 1347, 1365, 1385, 1406, 1422, 1433, 1449, 1477, 1497, 1513, 1541, 1555, 1572, 1587, 1600, 1614, 1627, 1640, 1656, 1673, 1694, 1708, 1729, 1742, 1764, 1787, 1812, 1828, 1843, 1858, 1879, 1897, 1912, 1929, 1954, 1972, 1995, 2010, 2029, 2043, 2059, 2078, 2092, 2100, 2119, 2129, 2144, 2180, 2211, 2244, 2273, 2285, 2305, 2329, 2353, 2374, 2398, 2417, 2440, 2462, 2488, 2509, 2527, 2554, 2581, 2602, 2623, 2647, 2672, 2700, 2728, 2744, 2767, 2778, 2790, 2807, 2822, 2840, 2869, 2886, 2902, 2918, 2936, 2954, 2977, 2998, 3021, 3031, 3042, 3053, 3069, 3092, 3109, 3137, 3156, 3176, 3193, 3211, 3228, 3242, 3277, 3296, 3307, 3320, 3335, 3351, 3369, 3383, 3400, 3420, 3441, 3462, 3481, 3500, 3518, 3541, 3565, 3589, 3614, 3639, 3663, 3684, 3698, 3727, 3750, 3777, 3811, 3843, 3873, 3896, 3924, 3952, 3976, 4000, 4029, 4047, 4064, 4086, 4103, 4121, 4141, 4167, 4183, 4202, 4223, 4227, 4245, 4262, 4288, 4302, 4326, 4347, 4362, 4380, 4403, 4418, 4437, 4454, 4471, 4495, 4522, 4545, 4568, 4585, 4607, 4623, 4643, 4662, 4684, 4705, 4725, 4747, 4771, 4790, 4832, 4853, 4876, 4897, 4928, 4947, 4969, 4989, 5015, 5036, 5058, 5078, 5102, 5125, 5144, 5164, 5186, 5209, 5240, 5278, 5319, 5349, 5363, 5384, 5400, 5422, 5452, 5478, 5506, 5539, 5557, 5580, 5615, 5655, 5697, 5729, 5746, 5771, 5786, 5803, 5813, 5824, 5862, 5916, 5962, 6014, 6062, 6105, 6149, 6177, 6191, 6209, 6245, 6268, 6291, 6313, 6341, 6364, 6382, 6409, 6441, 6456}
func (i APIErrorCode) String() string {
if i < 0 || i >= APIErrorCode(len(_APIErrorCode_index)-1) {
diff --git a/cmd/config-common.go b/cmd/config-common.go
index 8bc629c77..57078b7f4 100644
--- a/cmd/config-common.go
+++ b/cmd/config-common.go
@@ -29,10 +29,14 @@ import (
var errConfigNotFound = errors.New("config file not found")
-func readConfigWithMetadata(ctx context.Context, store objectIO, configFile string) ([]byte, ObjectInfo, error) {
- r, err := store.GetObjectNInfo(ctx, minioMetaBucket, configFile, nil, http.Header{}, readLock, ObjectOptions{})
+func readConfigWithMetadata(ctx context.Context, store objectIO, configFile string, opts ObjectOptions) ([]byte, ObjectInfo, error) {
+ lockType := readLock
+ if opts.NoLock {
+ lockType = noLock // erasureObjects.GetObjectNInfo honors lockType argument but not opts.NoLock.
+ }
+
+ r, err := store.GetObjectNInfo(ctx, minioMetaBucket, configFile, nil, http.Header{}, lockType, opts)
if err != nil {
- // Treat object not found as config not found.
if isErrObjectNotFound(err) {
return nil, ObjectInfo{}, errConfigNotFound
}
@@ -52,7 +56,7 @@ func readConfigWithMetadata(ctx context.Context, store objectIO, configFile stri
}
func readConfig(ctx context.Context, store objectIO, configFile string) ([]byte, error) {
- buf, _, err := readConfigWithMetadata(ctx, store, configFile)
+ buf, _, err := readConfigWithMetadata(ctx, store, configFile, ObjectOptions{})
return buf, err
}
@@ -70,16 +74,20 @@ func deleteConfig(ctx context.Context, objAPI objectDeleter, configFile string)
return err
}
-func saveConfig(ctx context.Context, store objectIO, configFile string, data []byte) error {
+func saveConfigWithOpts(ctx context.Context, store objectIO, configFile string, data []byte, opts ObjectOptions) error {
hashReader, err := hash.NewReader(bytes.NewReader(data), int64(len(data)), "", getSHA256Hash(data), int64(len(data)))
if err != nil {
return err
}
- _, err = store.PutObject(ctx, minioMetaBucket, configFile, NewPutObjReader(hashReader), ObjectOptions{MaxParity: true})
+ _, err = store.PutObject(ctx, minioMetaBucket, configFile, NewPutObjReader(hashReader), opts)
return err
}
+func saveConfig(ctx context.Context, store objectIO, configFile string, data []byte) error {
+ return saveConfigWithOpts(ctx, store, configFile, data, ObjectOptions{MaxParity: true})
+}
+
func checkConfig(ctx context.Context, objAPI ObjectLayer, configFile string) error {
if _, err := objAPI.GetObjectInfo(ctx, minioMetaBucket, configFile, ObjectOptions{}); err != nil {
// Treat object not found as config not found.
diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go
index 31cbe9609..2877c34c9 100644
--- a/cmd/erasure-server-pool-decom.go
+++ b/cmd/erasure-server-pool-decom.go
@@ -499,6 +499,15 @@ const (
// Init() initializes pools and saves additional information about them
// in 'pool.bin', this is eventually used for decommissioning the pool.
func (z *erasureServerPools) Init(ctx context.Context) error {
+ // Load rebalance metadata if present
+ err := z.loadRebalanceMeta(ctx)
+ if err != nil {
+ return fmt.Errorf("failed to load rebalance data: %w", err)
+ }
+
+ // Start rebalance routine
+ z.StartRebalance()
+
meta := poolMeta{}
if err := meta.load(ctx, z.serverPools[0], z.serverPools); err != nil {
@@ -573,6 +582,19 @@ func (z *erasureServerPools) Init(ctx context.Context) error {
return nil
}
+func (z *erasureServerPools) IsDecommissionRunning() bool {
+ z.poolMetaMutex.RLock()
+ defer z.poolMetaMutex.RUnlock()
+ meta := z.poolMeta
+ for _, pool := range meta.Pools {
+ if pool.Decommission != nil {
+ return true
+ }
+ }
+
+ return false
+}
+
func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) {
objInfo := gr.ObjInfo
diff --git a/cmd/erasure-server-pool-rebalance.go b/cmd/erasure-server-pool-rebalance.go
new file mode 100644
index 000000000..aad8fa2b0
--- /dev/null
+++ b/cmd/erasure-server-pool-rebalance.go
@@ -0,0 +1,876 @@
+// Copyright (c) 2015-2022 MinIO, Inc.
+//
+// This file is part of MinIO Object Storage stack
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package cmd
+
+import (
+ "context"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "math"
+ "math/rand"
+ "net/http"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/lithammer/shortuuid/v4"
+ "github.com/minio/madmin-go"
+ "github.com/minio/minio/internal/bucket/lifecycle"
+ "github.com/minio/minio/internal/hash"
+ "github.com/minio/minio/internal/logger"
+ "github.com/minio/pkg/env"
+)
+
+//go:generate msgp -file $GOFILE -unexported
+
+// rebalanceStats contains per-pool rebalance statistics like number of objects,
+// versions and bytes rebalanced out of a pool
+type rebalanceStats struct {
+ InitFreeSpace uint64 `json:"initFreeSpace" msg:"ifs"` // Pool free space at the start of rebalance
+ InitCapacity uint64 `json:"initCapacity" msg:"ic"` // Pool capacity at the start of rebalance
+
+ Buckets []string `json:"buckets" msg:"bus"` // buckets being rebalanced or to be rebalanced
+ RebalancedBuckets []string `json:"rebalancedBuckets" msg:"rbs"` // buckets rebalanced
+ Bucket string `json:"bucket" msg:"bu"` // Last rebalanced bucket
+ Object string `json:"object" msg:"ob"` // Last rebalanced object
+ NumObjects uint64 `json:"numObjects" msg:"no"` // Number of objects rebalanced
+ NumVersions uint64 `json:"numVersions" msg:"nv"` // Number of versions rebalanced
+ Bytes uint64 `json:"bytes" msg:"bs"` // Number of bytes rebalanced
+ Participating bool `json:"participating" msg:"par"`
+ Info rebalanceInfo `json:"info" msg:"inf"`
+}
+
+func (rs *rebalanceStats) update(bucket string, oi ObjectInfo) {
+ if oi.IsLatest {
+ rs.NumObjects++
+ }
+
+ rs.NumVersions++
+ rs.Bytes += uint64(oi.Size)
+ rs.Bucket = bucket
+ rs.Object = oi.Name
+}
+
+type rstats []*rebalanceStats
+
+//go:generate stringer -type=rebalStatus -trimprefix=rebal $GOFILE
+type rebalStatus uint8
+
+const (
+ rebalNone rebalStatus = iota
+ rebalStarted
+ rebalCompleted
+ rebalStopped
+ rebalFailed
+)
+
+type rebalanceInfo struct {
+ StartTime time.Time `msg:"startTs"` // Time at which rebalance-start was issued
+ EndTime time.Time `msg:"stopTs"` // Time at which rebalance operation completed or rebalance-stop was called
+ Status rebalStatus `msg:"status"` // Current state of rebalance operation. One of Started|Stopped|Completed|Failed.
+}
+
+// rebalanceMeta contains information pertaining to an ongoing rebalance operation.
+type rebalanceMeta struct {
+ cancel context.CancelFunc `msg:"-"` // to be invoked on rebalance-stop
+ lastRefreshedAt time.Time `msg:"-"`
+ StoppedAt time.Time `msg:"stopTs"` // Time when rebalance-stop was issued.
+ ID string `msg:"id"` // ID of the ongoing rebalance operation
+ PercentFreeGoal float64 `msg:"pf"` // Computed from total free space and capacity at the start of rebalance
+ PoolStats []*rebalanceStats `msg:"rss"` // Per-pool rebalance stats keyed by pool index
+}
+
+var errRebalanceNotStarted = errors.New("rebalance not started")
+
+func (z *erasureServerPools) loadRebalanceMeta(ctx context.Context) error {
+ r := &rebalanceMeta{}
+ err := r.load(ctx, z.serverPools[0])
+ if err != nil {
+ if errors.Is(err, errConfigNotFound) {
+ return nil
+ }
+ return err
+ }
+
+ z.rebalMu.Lock()
+ z.rebalMeta = r
+ z.rebalMu.Unlock()
+
+ return nil
+}
+
+// initRebalanceMeta initializes rebalance metadata for a new rebalance
+// operation and saves it in the object store.
+func (z *erasureServerPools) initRebalanceMeta(ctx context.Context, buckets []string) (arn string, err error) {
+ r := &rebalanceMeta{
+ ID: shortuuid.New(),
+ PoolStats: make([]*rebalanceStats, len(z.serverPools)),
+ }
+
+ // Fetch disk capacity and available space.
+ si, _ := z.StorageInfo(ctx)
+ diskStats := make([]struct {
+ AvailableSpace uint64
+ TotalSpace uint64
+ }, len(z.serverPools))
+ var totalCap, totalFree uint64
+ for _, disk := range si.Disks {
+ totalCap += disk.TotalSpace
+ totalFree += disk.AvailableSpace
+
+ diskStats[disk.PoolIndex].AvailableSpace += disk.AvailableSpace
+ diskStats[disk.PoolIndex].TotalSpace += disk.TotalSpace
+ }
+ r.PercentFreeGoal = float64(totalFree) / float64(totalCap)
+
+ now := time.Now()
+ for idx := range z.serverPools {
+ r.PoolStats[idx] = &rebalanceStats{
+ Buckets: make([]string, len(buckets)),
+ RebalancedBuckets: make([]string, 0, len(buckets)),
+ InitFreeSpace: diskStats[idx].AvailableSpace,
+ InitCapacity: diskStats[idx].TotalSpace,
+ }
+ copy(r.PoolStats[idx].Buckets, buckets)
+
+ if pfi := float64(diskStats[idx].AvailableSpace) / float64(diskStats[idx].TotalSpace); pfi < r.PercentFreeGoal {
+ r.PoolStats[idx].Participating = true
+ r.PoolStats[idx].Info = rebalanceInfo{
+ StartTime: now,
+ Status: rebalStarted,
+ }
+ }
+ }
+
+ err = r.save(ctx, z.serverPools[0])
+ if err != nil {
+ return arn, err
+ }
+
+ z.rebalMeta = r
+ return r.ID, nil
+}
+
+func (z *erasureServerPools) updatePoolStats(poolIdx int, bucket string, oi ObjectInfo) {
+ z.rebalMu.Lock()
+ defer z.rebalMu.Unlock()
+
+ r := z.rebalMeta
+ if r == nil {
+ return
+ }
+
+ r.PoolStats[poolIdx].update(bucket, oi)
+}
+
+const (
+ rebalMetaName = "rebalance.bin"
+ rebalMetaFmt = 1
+ rebalMetaVer = 1
+)
+
+func (z *erasureServerPools) nextRebalBucket(poolIdx int) (string, bool) {
+ z.rebalMu.RLock()
+ defer z.rebalMu.RUnlock()
+
+ r := z.rebalMeta
+ if r == nil {
+ return "", false
+ }
+
+ ps := r.PoolStats[poolIdx]
+ if ps == nil {
+ return "", false
+ }
+
+ if ps.Info.Status == rebalCompleted || !ps.Participating {
+ return "", false
+ }
+
+ if len(ps.Buckets) == 0 {
+ return "", false
+ }
+
+ return ps.Buckets[0], true
+}
+
+func (z *erasureServerPools) bucketRebalanceDone(bucket string, poolIdx int) {
+ z.rebalMu.Lock()
+ defer z.rebalMu.Unlock()
+
+ ps := z.rebalMeta.PoolStats[poolIdx]
+ if ps == nil {
+ return
+ }
+
+ for i, b := range ps.Buckets {
+ if b == bucket {
+ ps.Buckets = append(ps.Buckets[:i], ps.Buckets[i+1:]...)
+ ps.RebalancedBuckets = append(ps.RebalancedBuckets, bucket)
+ break
+ }
+ }
+}
+
+func (r *rebalanceMeta) load(ctx context.Context, store objectIO) error {
+ return r.loadWithOpts(ctx, store, ObjectOptions{})
+}
+
+func (r *rebalanceMeta) loadWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error {
+ data, _, err := readConfigWithMetadata(ctx, store, rebalMetaName, opts)
+ if err != nil {
+ return err
+ }
+
+ if len(data) == 0 {
+ return nil
+ }
+ if len(data) <= 4 {
+ return fmt.Errorf("rebalanceMeta: no data")
+ }
+
+ // Read header
+ switch binary.LittleEndian.Uint16(data[0:2]) {
+ case rebalMetaFmt:
+ default:
+ return fmt.Errorf("rebalanceMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
+ }
+ switch binary.LittleEndian.Uint16(data[2:4]) {
+ case rebalMetaVer:
+ default:
+ return fmt.Errorf("rebalanceMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
+ }
+
+ // OK, parse data.
+ if _, err = r.UnmarshalMsg(data[4:]); err != nil {
+ return err
+ }
+
+ r.lastRefreshedAt = time.Now()
+
+ return nil
+}
+
+func (r *rebalanceMeta) saveWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error {
+ data := make([]byte, 4, r.Msgsize()+4)
+
+ // Initialize the header.
+ binary.LittleEndian.PutUint16(data[0:2], rebalMetaFmt)
+ binary.LittleEndian.PutUint16(data[2:4], rebalMetaVer)
+
+ buf, err := r.MarshalMsg(data)
+ if err != nil {
+ return err
+ }
+
+ return saveConfigWithOpts(ctx, store, rebalMetaName, buf, opts)
+}
+
+func (r *rebalanceMeta) save(ctx context.Context, store objectIO) error {
+ return r.saveWithOpts(ctx, store, ObjectOptions{})
+}
+
+func (z *erasureServerPools) IsRebalanceStarted() bool {
+ z.rebalMu.RLock()
+ defer z.rebalMu.RUnlock()
+
+ if r := z.rebalMeta; r != nil {
+ if r.StoppedAt.IsZero() {
+ return true
+ }
+ }
+ return false
+}
+
+func (z *erasureServerPools) IsPoolRebalancing(poolIndex int) bool {
+ z.rebalMu.RLock()
+ defer z.rebalMu.RUnlock()
+
+ if r := z.rebalMeta; r != nil {
+ if !r.StoppedAt.IsZero() {
+ return false
+ }
+ ps := z.rebalMeta.PoolStats[poolIndex]
+ return ps.Participating && ps.Info.Status == rebalStarted
+ }
+ return false
+}
+
+func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) (err error) {
+ doneCh := make(chan struct{})
+ defer close(doneCh)
+
+ // Save rebalance.bin periodically.
+ go func() {
+ // Update rebalance.bin periodically once every 5-10s, chosen randomly
+ // to avoid multiple pool leaders herding to update around the same
+ // time.
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ randSleepFor := func() time.Duration {
+ return 5*time.Second + time.Duration(float64(5*time.Second)*r.Float64())
+ }
+
+ timer := time.NewTimer(randSleepFor())
+ defer timer.Stop()
+ var rebalDone bool
+ var traceMsg string
+
+ for {
+ select {
+ case <-doneCh:
+ // rebalance completed for poolIdx
+ now := time.Now()
+ z.rebalMu.Lock()
+ z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalCompleted
+ z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now
+ z.rebalMu.Unlock()
+
+ rebalDone = true
+ traceMsg = fmt.Sprintf("completed at %s", now)
+
+ case <-ctx.Done():
+
+ // rebalance stopped for poolIdx
+ now := time.Now()
+ z.rebalMu.Lock()
+ z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalStopped
+ z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now
+ z.rebalMu.Unlock()
+
+ rebalDone = true
+ traceMsg = fmt.Sprintf("stopped at %s", now)
+
+ case <-timer.C:
+ traceMsg = fmt.Sprintf("saved at %s", time.Now())
+ }
+
+ stopFn := globalRebalanceMetrics.log(rebalanceMetricSaveMetadata, poolIdx, traceMsg)
+ err := z.saveRebalanceStats(ctx, poolIdx, rebalSaveStats)
+ stopFn(err)
+ logger.LogIf(ctx, err)
+ timer.Reset(randSleepFor())
+
+ if rebalDone {
+ return
+ }
+ }
+ }()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
+ bucket, ok := z.nextRebalBucket(poolIdx)
+ if !ok {
+ // no more buckets to rebalance or target free_space/capacity reached
+ break
+ }
+
+ stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBucket, poolIdx, bucket)
+ err = z.rebalanceBucket(ctx, bucket, poolIdx)
+ if err != nil {
+ stopFn(err)
+ logger.LogIf(ctx, err)
+ return
+ }
+ stopFn(nil)
+ z.bucketRebalanceDone(bucket, poolIdx)
+ }
+
+ return err
+}
+
+func (z *erasureServerPools) checkIfRebalanceDone(poolIdx int) bool {
+ z.rebalMu.Lock()
+ defer z.rebalMu.Unlock()
+
+ // check if enough objects have been rebalanced
+ r := z.rebalMeta
+ poolStats := r.PoolStats[poolIdx]
+ if poolStats.Info.Status == rebalCompleted {
+ return true
+ }
+
+ pfi := float64(poolStats.InitFreeSpace+poolStats.Bytes) / float64(poolStats.InitCapacity)
+ // Mark pool rebalance as done if within 5% from PercentFreeGoal.
+ if diff := math.Abs(pfi - r.PercentFreeGoal); diff <= 0.05 {
+ r.PoolStats[poolIdx].Info.Status = rebalCompleted
+ r.PoolStats[poolIdx].Info.EndTime = time.Now()
+ return true
+ }
+
+ return false
+}
+
+// rebalanceBucket rebalances objects under bucket in poolIdx pool
+func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, poolIdx int) error {
+ ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
+ vc, _ := globalBucketVersioningSys.Get(bucket)
+ // Check if the current bucket has a configured lifecycle policy
+ lc, _ := globalLifecycleSys.Get(bucket)
+ // Check if bucket is object locked.
+ lr, _ := globalBucketObjectLockSys.Get(bucket)
+
+ pool := z.serverPools[poolIdx]
+ const envRebalanceWorkers = "_MINIO_REBALANCE_WORKERS"
+ wStr := env.Get(envRebalanceWorkers, strconv.Itoa(len(pool.sets)))
+ workerSize, err := strconv.Atoi(wStr)
+ if err != nil {
+ logger.LogIf(ctx, fmt.Errorf("invalid %s value: %s err: %v, defaulting to %d", envRebalanceWorkers, wStr, err, len(pool.sets)))
+ workerSize = len(pool.sets)
+ }
+ workers := make(chan struct{}, workerSize)
+ var wg sync.WaitGroup
+ for _, set := range pool.sets {
+ set := set
+ disks := set.getOnlineDisks()
+ if len(disks) == 0 {
+ logger.LogIf(ctx, fmt.Errorf("no online disks found for set with endpoints %s",
+ set.getEndpoints()))
+ continue
+ }
+
+ filterLifecycle := func(bucket, object string, fi FileInfo) bool {
+ if lc == nil {
+ return false
+ }
+ versioned := vc != nil && vc.Versioned(object)
+ objInfo := fi.ToObjectInfo(bucket, object, versioned)
+ event := evalActionFromLifecycle(ctx, *lc, lr, objInfo)
+ switch action := event.Action; action {
+ case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
+ globalExpiryState.enqueueByDays(objInfo, false, action == lifecycle.DeleteVersionAction)
+ // Skip this entry.
+ return true
+ case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
+ globalExpiryState.enqueueByDays(objInfo, true, action == lifecycle.DeleteRestoredVersionAction)
+ // Skip this entry.
+ return true
+ }
+ return false
+ }
+
+ rebalanceEntry := func(entry metaCacheEntry) {
+ defer func() {
+ <-workers
+ wg.Done()
+ }()
+
+ if entry.isDir() {
+ return
+ }
+
+ // rebalance on poolIdx has reached its goal
+ if z.checkIfRebalanceDone(poolIdx) {
+ return
+ }
+
+ fivs, err := entry.fileInfoVersions(bucket)
+ if err != nil {
+ return
+ }
+
+ // We need a reversed order for rebalance,
+ // to create the appropriate stack.
+ versionsSorter(fivs.Versions).reverse()
+
+ var rebalanced int
+ for _, version := range fivs.Versions {
+ // Skip transitioned objects for now. TBD
+ if version.IsRemote() {
+ continue
+ }
+
+ // Apply lifecycle rules on the objects that are expired.
+ if filterLifecycle(bucket, version.Name, version) {
+ logger.LogIf(ctx, fmt.Errorf("found %s/%s (%s) expired object based on ILM rules, skipping and scheduled for deletion", bucket, version.Name, version.VersionID))
+ continue
+ }
+
+ // We will skip rebalancing delete markers
+ // with single version, its as good as there
+ // is no data associated with the object.
+ if version.Deleted && len(fivs.Versions) == 1 {
+ logger.LogIf(ctx, fmt.Errorf("found %s/%s delete marked object with no other versions, skipping since there is no data to be rebalanced", bucket, version.Name))
+ continue
+ }
+
+ if version.Deleted {
+ _, err := z.DeleteObject(ctx,
+ bucket,
+ version.Name,
+ ObjectOptions{
+ Versioned: vc.PrefixEnabled(version.Name),
+ VersionID: version.VersionID,
+ MTime: version.ModTime,
+ DeleteReplication: version.ReplicationState,
+ DeleteMarker: true, // make sure we create a delete marker
+ SkipRebalancing: true, // make sure we skip the decommissioned pool
+ })
+ var failure bool
+ if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
+ logger.LogIf(ctx, err)
+ failure = true
+ }
+
+ if !failure {
+ z.updatePoolStats(poolIdx, bucket, version.ToObjectInfo(bucket, version.Name, vc.PrefixEnabled(version.Name)))
+ rebalanced++
+ }
+ continue
+ }
+
+ var failure bool
+ var oi ObjectInfo
+ for try := 0; try < 3; try++ {
+ // GetObjectReader.Close is called by rebalanceObject
+ gr, err := set.GetObjectNInfo(ctx,
+ bucket,
+ encodeDirObject(version.Name),
+ nil,
+ http.Header{},
+ noLock, // all mutations are blocked reads are safe without locks.
+ ObjectOptions{
+ VersionID: version.VersionID,
+ NoDecryption: true,
+ })
+ if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
+ // object deleted by the application, nothing to do here we move on.
+ return
+ }
+ if err != nil {
+ failure = true
+ logger.LogIf(ctx, err)
+ continue
+ }
+
+ stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceObject, poolIdx, bucket, version.Name)
+ if err = z.rebalanceObject(ctx, bucket, gr); err != nil {
+ stopFn(err)
+ failure = true
+ logger.LogIf(ctx, err)
+ continue
+ }
+
+ stopFn(nil)
+ failure = false
+ oi = gr.ObjInfo
+ break
+ }
+
+ if failure {
+ break // break out on first error
+ }
+ z.updatePoolStats(poolIdx, bucket, oi)
+ rebalanced++
+ }
+
+ // if all versions were rebalanced, we can delete the object versions.
+ if rebalanced == len(fivs.Versions) {
+ stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceRemoveObject, poolIdx, bucket, entry.name)
+ _, err := set.DeleteObject(ctx,
+ bucket,
+ encodeDirObject(entry.name),
+ ObjectOptions{
+ DeletePrefix: true, // use prefix delete to delete all versions at once.
+ },
+ )
+ stopFn(err)
+ auditLogRebalance(ctx, "Rebalance:DeleteObject", bucket, entry.name, "", err)
+ if err != nil {
+ logger.LogIf(ctx, err)
+ }
+ }
+ }
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ // How to resolve partial results.
+ resolver := metadataResolutionParams{
+ dirQuorum: len(disks) / 2, // make sure to capture all quorum ratios
+ objQuorum: len(disks) / 2, // make sure to capture all quorum ratios
+ bucket: bucket,
+ }
+ err := listPathRaw(ctx, listPathRawOptions{
+ disks: disks,
+ bucket: bucket,
+ recursive: true,
+ forwardTo: "",
+ minDisks: len(disks) / 2, // to capture all quorum ratios
+ reportNotFound: false,
+ agreed: func(entry metaCacheEntry) {
+ workers <- struct{}{}
+ wg.Add(1)
+ go rebalanceEntry(entry)
+ },
+ partial: func(entries metaCacheEntries, _ []error) {
+ entry, ok := entries.resolve(&resolver)
+ if ok {
+ workers <- struct{}{}
+ wg.Add(1)
+ go rebalanceEntry(*entry)
+ }
+ },
+ finished: nil,
+ })
+ logger.LogIf(ctx, err)
+ }()
+ }
+ wg.Wait()
+ return nil
+}
+
+type rebalSaveOpts uint8
+
+const (
+ rebalSaveStats rebalSaveOpts = iota
+ rebalSaveStoppedAt
+)
+
+func (z *erasureServerPools) saveRebalanceStats(ctx context.Context, poolIdx int, opts rebalSaveOpts) error {
+ lock := z.serverPools[0].NewNSLock(minioMetaBucket, rebalMetaName)
+ lkCtx, err := lock.GetLock(ctx, globalOperationTimeout)
+ if err != nil {
+ logger.LogIf(ctx, fmt.Errorf("failed to acquire write lock on %s/%s: %w", minioMetaBucket, rebalMetaName, err))
+ return err
+ }
+ defer lock.Unlock(lkCtx.Cancel)
+
+ ctx = lkCtx.Context()
+ noLockOpts := ObjectOptions{NoLock: true}
+ r := &rebalanceMeta{}
+ if err := r.loadWithOpts(ctx, z.serverPools[0], noLockOpts); err != nil {
+ return err
+ }
+
+ z.rebalMu.Lock()
+ defer z.rebalMu.Unlock()
+
+ switch opts {
+ case rebalSaveStoppedAt:
+ r.StoppedAt = time.Now()
+ case rebalSaveStats:
+ r.PoolStats[poolIdx] = z.rebalMeta.PoolStats[poolIdx]
+ }
+ z.rebalMeta = r
+
+ err = z.rebalMeta.saveWithOpts(ctx, z.serverPools[0], noLockOpts)
+ return err
+}
+
+func auditLogRebalance(ctx context.Context, apiName, bucket, object, versionID string, err error) {
+ errStr := ""
+ if err != nil {
+ errStr = err.Error()
+ }
+ auditLogInternal(ctx, AuditLogOptions{
+ Event: "rebalance",
+ APIName: apiName,
+ Bucket: bucket,
+ Object: object,
+ VersionID: versionID,
+ Error: errStr,
+ })
+}
+
+func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) {
+ oi := gr.ObjInfo
+
+ defer func() {
+ gr.Close()
+ auditLogRebalance(ctx, "RebalanceCopyData", oi.Bucket, oi.Name, oi.VersionID, err)
+ }()
+
+ actualSize, err := oi.GetActualSize()
+ if err != nil {
+ return err
+ }
+
+ if oi.isMultipart() {
+ res, err := z.NewMultipartUpload(ctx, bucket, oi.Name, ObjectOptions{
+ VersionID: oi.VersionID,
+ MTime: oi.ModTime,
+ UserDefined: oi.UserDefined,
+ })
+ if err != nil {
+ return fmt.Errorf("rebalanceObject: NewMultipartUpload() %w", err)
+ }
+ defer z.AbortMultipartUpload(ctx, bucket, oi.Name, res.UploadID, ObjectOptions{})
+
+ parts := make([]CompletePart, len(oi.Parts))
+ for i, part := range oi.Parts {
+ hr, err := hash.NewReader(gr, part.Size, "", "", part.ActualSize)
+ if err != nil {
+ return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
+ }
+ pi, err := z.PutObjectPart(ctx, bucket, oi.Name, res.UploadID,
+ part.Number,
+ NewPutObjReader(hr),
+ ObjectOptions{
+ PreserveETag: part.ETag, // Preserve original ETag to ensure same metadata.
+ IndexCB: func() []byte {
+ return part.Index // Preserve part Index to ensure decompression works.
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("rebalanceObject: PutObjectPart() %w", err)
+ }
+ parts[i] = CompletePart{
+ ETag: pi.ETag,
+ PartNumber: pi.PartNumber,
+ }
+ }
+ _, err = z.CompleteMultipartUpload(ctx, bucket, oi.Name, res.UploadID, parts, ObjectOptions{
+ MTime: oi.ModTime,
+ })
+ if err != nil {
+ err = fmt.Errorf("rebalanceObject: CompleteMultipartUpload() %w", err)
+ }
+ return err
+ }
+
+ hr, err := hash.NewReader(gr, oi.Size, "", "", actualSize)
+ if err != nil {
+ return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
+ }
+ _, err = z.PutObject(ctx,
+ bucket,
+ oi.Name,
+ NewPutObjReader(hr),
+ ObjectOptions{
+ VersionID: oi.VersionID,
+ MTime: oi.ModTime,
+ UserDefined: oi.UserDefined,
+ PreserveETag: oi.ETag, // Preserve original ETag to ensure same metadata.
+ IndexCB: func() []byte {
+ return oi.Parts[0].Index // Preserve part Index to ensure decompression works.
+ },
+ })
+ if err != nil {
+ err = fmt.Errorf("rebalanceObject: PutObject() %w", err)
+ }
+ return err
+}
+
+func (z *erasureServerPools) StartRebalance() {
+ z.rebalMu.Lock()
+ if z.rebalMeta == nil || !z.rebalMeta.StoppedAt.IsZero() { // rebalance not running, nothing to do
+ z.rebalMu.Unlock()
+ return
+ }
+ ctx, cancel := context.WithCancel(GlobalContext)
+ z.rebalMeta.cancel = cancel // to be used when rebalance-stop is called
+ z.rebalMu.Unlock()
+
+ z.rebalMu.RLock()
+ participants := make([]bool, len(z.rebalMeta.PoolStats))
+ for i, ps := range z.rebalMeta.PoolStats {
+ // skip pools which have completed rebalancing
+ if ps.Info.Status != rebalStarted {
+ continue
+ }
+
+ participants[i] = ps.Participating
+ }
+ z.rebalMu.RUnlock()
+
+ for poolIdx, doRebalance := range participants {
+ if !doRebalance {
+ continue
+ }
+ // nothing to do if this node is not pool's first node (i.e pool's rebalance 'leader').
+ if !globalEndpoints[poolIdx].Endpoints[0].IsLocal {
+ continue
+ }
+
+ go func(idx int) {
+ stopfn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBuckets, idx)
+ err := z.rebalanceBuckets(ctx, idx)
+ stopfn(err)
+ }(poolIdx)
+ }
+ return
+}
+
+// StopRebalance signals the rebalance goroutine running on this node (if any)
+// to stop, using the context.CancelFunc(s) saved at the time ofStartRebalance.
+func (z *erasureServerPools) StopRebalance() error {
+ z.rebalMu.Lock()
+ defer z.rebalMu.Unlock()
+
+ r := z.rebalMeta
+ if r == nil { // rebalance not running in this node, nothing to do
+ return nil
+ }
+
+ if cancel := r.cancel; cancel != nil {
+ // cancel != nil only on pool leaders
+ r.cancel = nil
+ cancel()
+ }
+ return nil
+}
+
+// for rebalance trace support
+type rebalanceMetrics struct{}
+
+var globalRebalanceMetrics rebalanceMetrics
+
+//go:generate stringer -type=rebalanceMetric -trimprefix=rebalanceMetric $GOFILE
+type rebalanceMetric uint8
+
+const (
+ rebalanceMetricRebalanceBuckets rebalanceMetric = iota
+ rebalanceMetricRebalanceBucket
+ rebalanceMetricRebalanceObject
+ rebalanceMetricRebalanceRemoveObject
+ rebalanceMetricSaveMetadata
+)
+
+func rebalanceTrace(r rebalanceMetric, poolIdx int, startTime time.Time, duration time.Duration, err error, path string) madmin.TraceInfo {
+ var errStr string
+ if err != nil {
+ errStr = err.Error()
+ }
+ return madmin.TraceInfo{
+ TraceType: madmin.TraceRebalance,
+ Time: startTime,
+ NodeName: globalLocalNodeName,
+ FuncName: fmt.Sprintf("rebalance.%s (pool-id=%d)", r.String(), poolIdx),
+ Duration: duration,
+ Path: path,
+ Error: errStr,
+ }
+}
+
+func (p *rebalanceMetrics) log(r rebalanceMetric, poolIdx int, paths ...string) func(err error) {
+ startTime := time.Now()
+ return func(err error) {
+ duration := time.Since(startTime)
+ if globalTrace.NumSubscribers(madmin.TraceRebalance) > 0 {
+ globalTrace.Publish(rebalanceTrace(r, poolIdx, startTime, duration, err, strings.Join(paths, " ")))
+ }
+ }
+}
diff --git a/cmd/erasure-server-pool-rebalance_gen.go b/cmd/erasure-server-pool-rebalance_gen.go
new file mode 100644
index 000000000..a0cbd56f4
--- /dev/null
+++ b/cmd/erasure-server-pool-rebalance_gen.go
@@ -0,0 +1,1228 @@
+package cmd
+
+// Code generated by github.com/tinylib/msgp DO NOT EDIT.
+
+import (
+ "github.com/tinylib/msgp/msgp"
+)
+
+// DecodeMsg implements msgp.Decodable
+func (z *rebalSaveOpts) DecodeMsg(dc *msgp.Reader) (err error) {
+ {
+ var zb0001 uint8
+ zb0001, err = dc.ReadUint8()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ (*z) = rebalSaveOpts(zb0001)
+ }
+ return
+}
+
+// EncodeMsg implements msgp.Encodable
+func (z rebalSaveOpts) EncodeMsg(en *msgp.Writer) (err error) {
+ err = en.WriteUint8(uint8(z))
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ return
+}
+
+// MarshalMsg implements msgp.Marshaler
+func (z rebalSaveOpts) MarshalMsg(b []byte) (o []byte, err error) {
+ o = msgp.Require(b, z.Msgsize())
+ o = msgp.AppendUint8(o, uint8(z))
+ return
+}
+
+// UnmarshalMsg implements msgp.Unmarshaler
+func (z *rebalSaveOpts) UnmarshalMsg(bts []byte) (o []byte, err error) {
+ {
+ var zb0001 uint8
+ zb0001, bts, err = msgp.ReadUint8Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ (*z) = rebalSaveOpts(zb0001)
+ }
+ o = bts
+ return
+}
+
+// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
+func (z rebalSaveOpts) Msgsize() (s int) {
+ s = msgp.Uint8Size
+ return
+}
+
+// DecodeMsg implements msgp.Decodable
+func (z *rebalStatus) DecodeMsg(dc *msgp.Reader) (err error) {
+ {
+ var zb0001 uint8
+ zb0001, err = dc.ReadUint8()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ (*z) = rebalStatus(zb0001)
+ }
+ return
+}
+
+// EncodeMsg implements msgp.Encodable
+func (z rebalStatus) EncodeMsg(en *msgp.Writer) (err error) {
+ err = en.WriteUint8(uint8(z))
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ return
+}
+
+// MarshalMsg implements msgp.Marshaler
+func (z rebalStatus) MarshalMsg(b []byte) (o []byte, err error) {
+ o = msgp.Require(b, z.Msgsize())
+ o = msgp.AppendUint8(o, uint8(z))
+ return
+}
+
+// UnmarshalMsg implements msgp.Unmarshaler
+func (z *rebalStatus) UnmarshalMsg(bts []byte) (o []byte, err error) {
+ {
+ var zb0001 uint8
+ zb0001, bts, err = msgp.ReadUint8Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ (*z) = rebalStatus(zb0001)
+ }
+ o = bts
+ return
+}
+
+// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
+func (z rebalStatus) Msgsize() (s int) {
+ s = msgp.Uint8Size
+ return
+}
+
+// DecodeMsg implements msgp.Decodable
+func (z *rebalanceInfo) DecodeMsg(dc *msgp.Reader) (err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, err = dc.ReadMapHeader()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, err = dc.ReadMapKeyPtr()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ case "startTs":
+ z.StartTime, err = dc.ReadTime()
+ if err != nil {
+ err = msgp.WrapError(err, "StartTime")
+ return
+ }
+ case "stopTs":
+ z.EndTime, err = dc.ReadTime()
+ if err != nil {
+ err = msgp.WrapError(err, "EndTime")
+ return
+ }
+ case "status":
+ {
+ var zb0002 uint8
+ zb0002, err = dc.ReadUint8()
+ if err != nil {
+ err = msgp.WrapError(err, "Status")
+ return
+ }
+ z.Status = rebalStatus(zb0002)
+ }
+ default:
+ err = dc.Skip()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ return
+}
+
+// EncodeMsg implements msgp.Encodable
+func (z rebalanceInfo) EncodeMsg(en *msgp.Writer) (err error) {
+ // map header, size 3
+ // write "startTs"
+ err = en.Append(0x83, 0xa7, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x73)
+ if err != nil {
+ return
+ }
+ err = en.WriteTime(z.StartTime)
+ if err != nil {
+ err = msgp.WrapError(err, "StartTime")
+ return
+ }
+ // write "stopTs"
+ err = en.Append(0xa6, 0x73, 0x74, 0x6f, 0x70, 0x54, 0x73)
+ if err != nil {
+ return
+ }
+ err = en.WriteTime(z.EndTime)
+ if err != nil {
+ err = msgp.WrapError(err, "EndTime")
+ return
+ }
+ // write "status"
+ err = en.Append(0xa6, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73)
+ if err != nil {
+ return
+ }
+ err = en.WriteUint8(uint8(z.Status))
+ if err != nil {
+ err = msgp.WrapError(err, "Status")
+ return
+ }
+ return
+}
+
+// MarshalMsg implements msgp.Marshaler
+func (z rebalanceInfo) MarshalMsg(b []byte) (o []byte, err error) {
+ o = msgp.Require(b, z.Msgsize())
+ // map header, size 3
+ // string "startTs"
+ o = append(o, 0x83, 0xa7, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x73)
+ o = msgp.AppendTime(o, z.StartTime)
+ // string "stopTs"
+ o = append(o, 0xa6, 0x73, 0x74, 0x6f, 0x70, 0x54, 0x73)
+ o = msgp.AppendTime(o, z.EndTime)
+ // string "status"
+ o = append(o, 0xa6, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73)
+ o = msgp.AppendUint8(o, uint8(z.Status))
+ return
+}
+
+// UnmarshalMsg implements msgp.Unmarshaler
+func (z *rebalanceInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, bts, err = msgp.ReadMapKeyZC(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ case "startTs":
+ z.StartTime, bts, err = msgp.ReadTimeBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "StartTime")
+ return
+ }
+ case "stopTs":
+ z.EndTime, bts, err = msgp.ReadTimeBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "EndTime")
+ return
+ }
+ case "status":
+ {
+ var zb0002 uint8
+ zb0002, bts, err = msgp.ReadUint8Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Status")
+ return
+ }
+ z.Status = rebalStatus(zb0002)
+ }
+ default:
+ bts, err = msgp.Skip(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ o = bts
+ return
+}
+
+// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
+func (z rebalanceInfo) Msgsize() (s int) {
+ s = 1 + 8 + msgp.TimeSize + 7 + msgp.TimeSize + 7 + msgp.Uint8Size
+ return
+}
+
+// DecodeMsg implements msgp.Decodable
+func (z *rebalanceMeta) DecodeMsg(dc *msgp.Reader) (err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, err = dc.ReadMapHeader()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, err = dc.ReadMapKeyPtr()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ case "stopTs":
+ z.StoppedAt, err = dc.ReadTime()
+ if err != nil {
+ err = msgp.WrapError(err, "StoppedAt")
+ return
+ }
+ case "id":
+ z.ID, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "ID")
+ return
+ }
+ case "pf":
+ z.PercentFreeGoal, err = dc.ReadFloat64()
+ if err != nil {
+ err = msgp.WrapError(err, "PercentFreeGoal")
+ return
+ }
+ case "rss":
+ var zb0002 uint32
+ zb0002, err = dc.ReadArrayHeader()
+ if err != nil {
+ err = msgp.WrapError(err, "PoolStats")
+ return
+ }
+ if cap(z.PoolStats) >= int(zb0002) {
+ z.PoolStats = (z.PoolStats)[:zb0002]
+ } else {
+ z.PoolStats = make([]*rebalanceStats, zb0002)
+ }
+ for za0001 := range z.PoolStats {
+ if dc.IsNil() {
+ err = dc.ReadNil()
+ if err != nil {
+ err = msgp.WrapError(err, "PoolStats", za0001)
+ return
+ }
+ z.PoolStats[za0001] = nil
+ } else {
+ if z.PoolStats[za0001] == nil {
+ z.PoolStats[za0001] = new(rebalanceStats)
+ }
+ err = z.PoolStats[za0001].DecodeMsg(dc)
+ if err != nil {
+ err = msgp.WrapError(err, "PoolStats", za0001)
+ return
+ }
+ }
+ }
+ default:
+ err = dc.Skip()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ return
+}
+
+// EncodeMsg implements msgp.Encodable
+func (z *rebalanceMeta) EncodeMsg(en *msgp.Writer) (err error) {
+ // map header, size 4
+ // write "stopTs"
+ err = en.Append(0x84, 0xa6, 0x73, 0x74, 0x6f, 0x70, 0x54, 0x73)
+ if err != nil {
+ return
+ }
+ err = en.WriteTime(z.StoppedAt)
+ if err != nil {
+ err = msgp.WrapError(err, "StoppedAt")
+ return
+ }
+ // write "id"
+ err = en.Append(0xa2, 0x69, 0x64)
+ if err != nil {
+ return
+ }
+ err = en.WriteString(z.ID)
+ if err != nil {
+ err = msgp.WrapError(err, "ID")
+ return
+ }
+ // write "pf"
+ err = en.Append(0xa2, 0x70, 0x66)
+ if err != nil {
+ return
+ }
+ err = en.WriteFloat64(z.PercentFreeGoal)
+ if err != nil {
+ err = msgp.WrapError(err, "PercentFreeGoal")
+ return
+ }
+ // write "rss"
+ err = en.Append(0xa3, 0x72, 0x73, 0x73)
+ if err != nil {
+ return
+ }
+ err = en.WriteArrayHeader(uint32(len(z.PoolStats)))
+ if err != nil {
+ err = msgp.WrapError(err, "PoolStats")
+ return
+ }
+ for za0001 := range z.PoolStats {
+ if z.PoolStats[za0001] == nil {
+ err = en.WriteNil()
+ if err != nil {
+ return
+ }
+ } else {
+ err = z.PoolStats[za0001].EncodeMsg(en)
+ if err != nil {
+ err = msgp.WrapError(err, "PoolStats", za0001)
+ return
+ }
+ }
+ }
+ return
+}
+
+// MarshalMsg implements msgp.Marshaler
+func (z *rebalanceMeta) MarshalMsg(b []byte) (o []byte, err error) {
+ o = msgp.Require(b, z.Msgsize())
+ // map header, size 4
+ // string "stopTs"
+ o = append(o, 0x84, 0xa6, 0x73, 0x74, 0x6f, 0x70, 0x54, 0x73)
+ o = msgp.AppendTime(o, z.StoppedAt)
+ // string "id"
+ o = append(o, 0xa2, 0x69, 0x64)
+ o = msgp.AppendString(o, z.ID)
+ // string "pf"
+ o = append(o, 0xa2, 0x70, 0x66)
+ o = msgp.AppendFloat64(o, z.PercentFreeGoal)
+ // string "rss"
+ o = append(o, 0xa3, 0x72, 0x73, 0x73)
+ o = msgp.AppendArrayHeader(o, uint32(len(z.PoolStats)))
+ for za0001 := range z.PoolStats {
+ if z.PoolStats[za0001] == nil {
+ o = msgp.AppendNil(o)
+ } else {
+ o, err = z.PoolStats[za0001].MarshalMsg(o)
+ if err != nil {
+ err = msgp.WrapError(err, "PoolStats", za0001)
+ return
+ }
+ }
+ }
+ return
+}
+
+// UnmarshalMsg implements msgp.Unmarshaler
+func (z *rebalanceMeta) UnmarshalMsg(bts []byte) (o []byte, err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, bts, err = msgp.ReadMapKeyZC(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ case "stopTs":
+ z.StoppedAt, bts, err = msgp.ReadTimeBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "StoppedAt")
+ return
+ }
+ case "id":
+ z.ID, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "ID")
+ return
+ }
+ case "pf":
+ z.PercentFreeGoal, bts, err = msgp.ReadFloat64Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "PercentFreeGoal")
+ return
+ }
+ case "rss":
+ var zb0002 uint32
+ zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "PoolStats")
+ return
+ }
+ if cap(z.PoolStats) >= int(zb0002) {
+ z.PoolStats = (z.PoolStats)[:zb0002]
+ } else {
+ z.PoolStats = make([]*rebalanceStats, zb0002)
+ }
+ for za0001 := range z.PoolStats {
+ if msgp.IsNil(bts) {
+ bts, err = msgp.ReadNilBytes(bts)
+ if err != nil {
+ return
+ }
+ z.PoolStats[za0001] = nil
+ } else {
+ if z.PoolStats[za0001] == nil {
+ z.PoolStats[za0001] = new(rebalanceStats)
+ }
+ bts, err = z.PoolStats[za0001].UnmarshalMsg(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "PoolStats", za0001)
+ return
+ }
+ }
+ }
+ default:
+ bts, err = msgp.Skip(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ o = bts
+ return
+}
+
+// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
+func (z *rebalanceMeta) Msgsize() (s int) {
+ s = 1 + 7 + msgp.TimeSize + 3 + msgp.StringPrefixSize + len(z.ID) + 3 + msgp.Float64Size + 4 + msgp.ArrayHeaderSize
+ for za0001 := range z.PoolStats {
+ if z.PoolStats[za0001] == nil {
+ s += msgp.NilSize
+ } else {
+ s += z.PoolStats[za0001].Msgsize()
+ }
+ }
+ return
+}
+
+// DecodeMsg implements msgp.Decodable
+func (z *rebalanceMetric) DecodeMsg(dc *msgp.Reader) (err error) {
+ {
+ var zb0001 uint8
+ zb0001, err = dc.ReadUint8()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ (*z) = rebalanceMetric(zb0001)
+ }
+ return
+}
+
+// EncodeMsg implements msgp.Encodable
+func (z rebalanceMetric) EncodeMsg(en *msgp.Writer) (err error) {
+ err = en.WriteUint8(uint8(z))
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ return
+}
+
+// MarshalMsg implements msgp.Marshaler
+func (z rebalanceMetric) MarshalMsg(b []byte) (o []byte, err error) {
+ o = msgp.Require(b, z.Msgsize())
+ o = msgp.AppendUint8(o, uint8(z))
+ return
+}
+
+// UnmarshalMsg implements msgp.Unmarshaler
+func (z *rebalanceMetric) UnmarshalMsg(bts []byte) (o []byte, err error) {
+ {
+ var zb0001 uint8
+ zb0001, bts, err = msgp.ReadUint8Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ (*z) = rebalanceMetric(zb0001)
+ }
+ o = bts
+ return
+}
+
+// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
+func (z rebalanceMetric) Msgsize() (s int) {
+ s = msgp.Uint8Size
+ return
+}
+
+// DecodeMsg implements msgp.Decodable
+func (z *rebalanceMetrics) DecodeMsg(dc *msgp.Reader) (err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, err = dc.ReadMapHeader()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, err = dc.ReadMapKeyPtr()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ default:
+ err = dc.Skip()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ return
+}
+
+// EncodeMsg implements msgp.Encodable
+func (z rebalanceMetrics) EncodeMsg(en *msgp.Writer) (err error) {
+ // map header, size 0
+ err = en.Append(0x80)
+ if err != nil {
+ return
+ }
+ return
+}
+
+// MarshalMsg implements msgp.Marshaler
+func (z rebalanceMetrics) MarshalMsg(b []byte) (o []byte, err error) {
+ o = msgp.Require(b, z.Msgsize())
+ // map header, size 0
+ o = append(o, 0x80)
+ return
+}
+
+// UnmarshalMsg implements msgp.Unmarshaler
+func (z *rebalanceMetrics) UnmarshalMsg(bts []byte) (o []byte, err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, bts, err = msgp.ReadMapKeyZC(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ default:
+ bts, err = msgp.Skip(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ o = bts
+ return
+}
+
+// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
+func (z rebalanceMetrics) Msgsize() (s int) {
+ s = 1
+ return
+}
+
+// DecodeMsg implements msgp.Decodable
+func (z *rebalanceStats) DecodeMsg(dc *msgp.Reader) (err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, err = dc.ReadMapHeader()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, err = dc.ReadMapKeyPtr()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ case "ifs":
+ z.InitFreeSpace, err = dc.ReadUint64()
+ if err != nil {
+ err = msgp.WrapError(err, "InitFreeSpace")
+ return
+ }
+ case "ic":
+ z.InitCapacity, err = dc.ReadUint64()
+ if err != nil {
+ err = msgp.WrapError(err, "InitCapacity")
+ return
+ }
+ case "bus":
+ var zb0002 uint32
+ zb0002, err = dc.ReadArrayHeader()
+ if err != nil {
+ err = msgp.WrapError(err, "Buckets")
+ return
+ }
+ if cap(z.Buckets) >= int(zb0002) {
+ z.Buckets = (z.Buckets)[:zb0002]
+ } else {
+ z.Buckets = make([]string, zb0002)
+ }
+ for za0001 := range z.Buckets {
+ z.Buckets[za0001], err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "Buckets", za0001)
+ return
+ }
+ }
+ case "rbs":
+ var zb0003 uint32
+ zb0003, err = dc.ReadArrayHeader()
+ if err != nil {
+ err = msgp.WrapError(err, "RebalancedBuckets")
+ return
+ }
+ if cap(z.RebalancedBuckets) >= int(zb0003) {
+ z.RebalancedBuckets = (z.RebalancedBuckets)[:zb0003]
+ } else {
+ z.RebalancedBuckets = make([]string, zb0003)
+ }
+ for za0002 := range z.RebalancedBuckets {
+ z.RebalancedBuckets[za0002], err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "RebalancedBuckets", za0002)
+ return
+ }
+ }
+ case "bu":
+ z.Bucket, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "Bucket")
+ return
+ }
+ case "ob":
+ z.Object, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "Object")
+ return
+ }
+ case "no":
+ z.NumObjects, err = dc.ReadUint64()
+ if err != nil {
+ err = msgp.WrapError(err, "NumObjects")
+ return
+ }
+ case "nv":
+ z.NumVersions, err = dc.ReadUint64()
+ if err != nil {
+ err = msgp.WrapError(err, "NumVersions")
+ return
+ }
+ case "bs":
+ z.Bytes, err = dc.ReadUint64()
+ if err != nil {
+ err = msgp.WrapError(err, "Bytes")
+ return
+ }
+ case "par":
+ z.Participating, err = dc.ReadBool()
+ if err != nil {
+ err = msgp.WrapError(err, "Participating")
+ return
+ }
+ case "inf":
+ err = z.Info.DecodeMsg(dc)
+ if err != nil {
+ err = msgp.WrapError(err, "Info")
+ return
+ }
+ default:
+ err = dc.Skip()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ return
+}
+
+// EncodeMsg implements msgp.Encodable
+func (z *rebalanceStats) EncodeMsg(en *msgp.Writer) (err error) {
+ // map header, size 11
+ // write "ifs"
+ err = en.Append(0x8b, 0xa3, 0x69, 0x66, 0x73)
+ if err != nil {
+ return
+ }
+ err = en.WriteUint64(z.InitFreeSpace)
+ if err != nil {
+ err = msgp.WrapError(err, "InitFreeSpace")
+ return
+ }
+ // write "ic"
+ err = en.Append(0xa2, 0x69, 0x63)
+ if err != nil {
+ return
+ }
+ err = en.WriteUint64(z.InitCapacity)
+ if err != nil {
+ err = msgp.WrapError(err, "InitCapacity")
+ return
+ }
+ // write "bus"
+ err = en.Append(0xa3, 0x62, 0x75, 0x73)
+ if err != nil {
+ return
+ }
+ err = en.WriteArrayHeader(uint32(len(z.Buckets)))
+ if err != nil {
+ err = msgp.WrapError(err, "Buckets")
+ return
+ }
+ for za0001 := range z.Buckets {
+ err = en.WriteString(z.Buckets[za0001])
+ if err != nil {
+ err = msgp.WrapError(err, "Buckets", za0001)
+ return
+ }
+ }
+ // write "rbs"
+ err = en.Append(0xa3, 0x72, 0x62, 0x73)
+ if err != nil {
+ return
+ }
+ err = en.WriteArrayHeader(uint32(len(z.RebalancedBuckets)))
+ if err != nil {
+ err = msgp.WrapError(err, "RebalancedBuckets")
+ return
+ }
+ for za0002 := range z.RebalancedBuckets {
+ err = en.WriteString(z.RebalancedBuckets[za0002])
+ if err != nil {
+ err = msgp.WrapError(err, "RebalancedBuckets", za0002)
+ return
+ }
+ }
+ // write "bu"
+ err = en.Append(0xa2, 0x62, 0x75)
+ if err != nil {
+ return
+ }
+ err = en.WriteString(z.Bucket)
+ if err != nil {
+ err = msgp.WrapError(err, "Bucket")
+ return
+ }
+ // write "ob"
+ err = en.Append(0xa2, 0x6f, 0x62)
+ if err != nil {
+ return
+ }
+ err = en.WriteString(z.Object)
+ if err != nil {
+ err = msgp.WrapError(err, "Object")
+ return
+ }
+ // write "no"
+ err = en.Append(0xa2, 0x6e, 0x6f)
+ if err != nil {
+ return
+ }
+ err = en.WriteUint64(z.NumObjects)
+ if err != nil {
+ err = msgp.WrapError(err, "NumObjects")
+ return
+ }
+ // write "nv"
+ err = en.Append(0xa2, 0x6e, 0x76)
+ if err != nil {
+ return
+ }
+ err = en.WriteUint64(z.NumVersions)
+ if err != nil {
+ err = msgp.WrapError(err, "NumVersions")
+ return
+ }
+ // write "bs"
+ err = en.Append(0xa2, 0x62, 0x73)
+ if err != nil {
+ return
+ }
+ err = en.WriteUint64(z.Bytes)
+ if err != nil {
+ err = msgp.WrapError(err, "Bytes")
+ return
+ }
+ // write "par"
+ err = en.Append(0xa3, 0x70, 0x61, 0x72)
+ if err != nil {
+ return
+ }
+ err = en.WriteBool(z.Participating)
+ if err != nil {
+ err = msgp.WrapError(err, "Participating")
+ return
+ }
+ // write "inf"
+ err = en.Append(0xa3, 0x69, 0x6e, 0x66)
+ if err != nil {
+ return
+ }
+ err = z.Info.EncodeMsg(en)
+ if err != nil {
+ err = msgp.WrapError(err, "Info")
+ return
+ }
+ return
+}
+
+// MarshalMsg implements msgp.Marshaler
+func (z *rebalanceStats) MarshalMsg(b []byte) (o []byte, err error) {
+ o = msgp.Require(b, z.Msgsize())
+ // map header, size 11
+ // string "ifs"
+ o = append(o, 0x8b, 0xa3, 0x69, 0x66, 0x73)
+ o = msgp.AppendUint64(o, z.InitFreeSpace)
+ // string "ic"
+ o = append(o, 0xa2, 0x69, 0x63)
+ o = msgp.AppendUint64(o, z.InitCapacity)
+ // string "bus"
+ o = append(o, 0xa3, 0x62, 0x75, 0x73)
+ o = msgp.AppendArrayHeader(o, uint32(len(z.Buckets)))
+ for za0001 := range z.Buckets {
+ o = msgp.AppendString(o, z.Buckets[za0001])
+ }
+ // string "rbs"
+ o = append(o, 0xa3, 0x72, 0x62, 0x73)
+ o = msgp.AppendArrayHeader(o, uint32(len(z.RebalancedBuckets)))
+ for za0002 := range z.RebalancedBuckets {
+ o = msgp.AppendString(o, z.RebalancedBuckets[za0002])
+ }
+ // string "bu"
+ o = append(o, 0xa2, 0x62, 0x75)
+ o = msgp.AppendString(o, z.Bucket)
+ // string "ob"
+ o = append(o, 0xa2, 0x6f, 0x62)
+ o = msgp.AppendString(o, z.Object)
+ // string "no"
+ o = append(o, 0xa2, 0x6e, 0x6f)
+ o = msgp.AppendUint64(o, z.NumObjects)
+ // string "nv"
+ o = append(o, 0xa2, 0x6e, 0x76)
+ o = msgp.AppendUint64(o, z.NumVersions)
+ // string "bs"
+ o = append(o, 0xa2, 0x62, 0x73)
+ o = msgp.AppendUint64(o, z.Bytes)
+ // string "par"
+ o = append(o, 0xa3, 0x70, 0x61, 0x72)
+ o = msgp.AppendBool(o, z.Participating)
+ // string "inf"
+ o = append(o, 0xa3, 0x69, 0x6e, 0x66)
+ o, err = z.Info.MarshalMsg(o)
+ if err != nil {
+ err = msgp.WrapError(err, "Info")
+ return
+ }
+ return
+}
+
+// UnmarshalMsg implements msgp.Unmarshaler
+func (z *rebalanceStats) UnmarshalMsg(bts []byte) (o []byte, err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, bts, err = msgp.ReadMapKeyZC(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ case "ifs":
+ z.InitFreeSpace, bts, err = msgp.ReadUint64Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "InitFreeSpace")
+ return
+ }
+ case "ic":
+ z.InitCapacity, bts, err = msgp.ReadUint64Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "InitCapacity")
+ return
+ }
+ case "bus":
+ var zb0002 uint32
+ zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Buckets")
+ return
+ }
+ if cap(z.Buckets) >= int(zb0002) {
+ z.Buckets = (z.Buckets)[:zb0002]
+ } else {
+ z.Buckets = make([]string, zb0002)
+ }
+ for za0001 := range z.Buckets {
+ z.Buckets[za0001], bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Buckets", za0001)
+ return
+ }
+ }
+ case "rbs":
+ var zb0003 uint32
+ zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "RebalancedBuckets")
+ return
+ }
+ if cap(z.RebalancedBuckets) >= int(zb0003) {
+ z.RebalancedBuckets = (z.RebalancedBuckets)[:zb0003]
+ } else {
+ z.RebalancedBuckets = make([]string, zb0003)
+ }
+ for za0002 := range z.RebalancedBuckets {
+ z.RebalancedBuckets[za0002], bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "RebalancedBuckets", za0002)
+ return
+ }
+ }
+ case "bu":
+ z.Bucket, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Bucket")
+ return
+ }
+ case "ob":
+ z.Object, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Object")
+ return
+ }
+ case "no":
+ z.NumObjects, bts, err = msgp.ReadUint64Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "NumObjects")
+ return
+ }
+ case "nv":
+ z.NumVersions, bts, err = msgp.ReadUint64Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "NumVersions")
+ return
+ }
+ case "bs":
+ z.Bytes, bts, err = msgp.ReadUint64Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Bytes")
+ return
+ }
+ case "par":
+ z.Participating, bts, err = msgp.ReadBoolBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Participating")
+ return
+ }
+ case "inf":
+ bts, err = z.Info.UnmarshalMsg(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Info")
+ return
+ }
+ default:
+ bts, err = msgp.Skip(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ o = bts
+ return
+}
+
+// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
+func (z *rebalanceStats) Msgsize() (s int) {
+ s = 1 + 4 + msgp.Uint64Size + 3 + msgp.Uint64Size + 4 + msgp.ArrayHeaderSize
+ for za0001 := range z.Buckets {
+ s += msgp.StringPrefixSize + len(z.Buckets[za0001])
+ }
+ s += 4 + msgp.ArrayHeaderSize
+ for za0002 := range z.RebalancedBuckets {
+ s += msgp.StringPrefixSize + len(z.RebalancedBuckets[za0002])
+ }
+ s += 3 + msgp.StringPrefixSize + len(z.Bucket) + 3 + msgp.StringPrefixSize + len(z.Object) + 3 + msgp.Uint64Size + 3 + msgp.Uint64Size + 3 + msgp.Uint64Size + 4 + msgp.BoolSize + 4 + z.Info.Msgsize()
+ return
+}
+
+// DecodeMsg implements msgp.Decodable
+func (z *rstats) DecodeMsg(dc *msgp.Reader) (err error) {
+ var zb0002 uint32
+ zb0002, err = dc.ReadArrayHeader()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ if cap((*z)) >= int(zb0002) {
+ (*z) = (*z)[:zb0002]
+ } else {
+ (*z) = make(rstats, zb0002)
+ }
+ for zb0001 := range *z {
+ if dc.IsNil() {
+ err = dc.ReadNil()
+ if err != nil {
+ err = msgp.WrapError(err, zb0001)
+ return
+ }
+ (*z)[zb0001] = nil
+ } else {
+ if (*z)[zb0001] == nil {
+ (*z)[zb0001] = new(rebalanceStats)
+ }
+ err = (*z)[zb0001].DecodeMsg(dc)
+ if err != nil {
+ err = msgp.WrapError(err, zb0001)
+ return
+ }
+ }
+ }
+ return
+}
+
+// EncodeMsg implements msgp.Encodable
+func (z rstats) EncodeMsg(en *msgp.Writer) (err error) {
+ err = en.WriteArrayHeader(uint32(len(z)))
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0003 := range z {
+ if z[zb0003] == nil {
+ err = en.WriteNil()
+ if err != nil {
+ return
+ }
+ } else {
+ err = z[zb0003].EncodeMsg(en)
+ if err != nil {
+ err = msgp.WrapError(err, zb0003)
+ return
+ }
+ }
+ }
+ return
+}
+
+// MarshalMsg implements msgp.Marshaler
+func (z rstats) MarshalMsg(b []byte) (o []byte, err error) {
+ o = msgp.Require(b, z.Msgsize())
+ o = msgp.AppendArrayHeader(o, uint32(len(z)))
+ for zb0003 := range z {
+ if z[zb0003] == nil {
+ o = msgp.AppendNil(o)
+ } else {
+ o, err = z[zb0003].MarshalMsg(o)
+ if err != nil {
+ err = msgp.WrapError(err, zb0003)
+ return
+ }
+ }
+ }
+ return
+}
+
+// UnmarshalMsg implements msgp.Unmarshaler
+func (z *rstats) UnmarshalMsg(bts []byte) (o []byte, err error) {
+ var zb0002 uint32
+ zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ if cap((*z)) >= int(zb0002) {
+ (*z) = (*z)[:zb0002]
+ } else {
+ (*z) = make(rstats, zb0002)
+ }
+ for zb0001 := range *z {
+ if msgp.IsNil(bts) {
+ bts, err = msgp.ReadNilBytes(bts)
+ if err != nil {
+ return
+ }
+ (*z)[zb0001] = nil
+ } else {
+ if (*z)[zb0001] == nil {
+ (*z)[zb0001] = new(rebalanceStats)
+ }
+ bts, err = (*z)[zb0001].UnmarshalMsg(bts)
+ if err != nil {
+ err = msgp.WrapError(err, zb0001)
+ return
+ }
+ }
+ }
+ o = bts
+ return
+}
+
+// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
+func (z rstats) Msgsize() (s int) {
+ s = msgp.ArrayHeaderSize
+ for zb0003 := range z {
+ if z[zb0003] == nil {
+ s += msgp.NilSize
+ } else {
+ s += z[zb0003].Msgsize()
+ }
+ }
+ return
+}
diff --git a/cmd/erasure-server-pool-rebalance_gen_test.go b/cmd/erasure-server-pool-rebalance_gen_test.go
new file mode 100644
index 000000000..0f0b8f466
--- /dev/null
+++ b/cmd/erasure-server-pool-rebalance_gen_test.go
@@ -0,0 +1,575 @@
+package cmd
+
+// Code generated by github.com/tinylib/msgp DO NOT EDIT.
+
+import (
+ "bytes"
+ "testing"
+
+ "github.com/tinylib/msgp/msgp"
+)
+
+func TestMarshalUnmarshalrebalanceInfo(t *testing.T) {
+ v := rebalanceInfo{}
+ bts, err := v.MarshalMsg(nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ left, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
+ }
+
+ left, err = msgp.Skip(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
+ }
+}
+
+func BenchmarkMarshalMsgrebalanceInfo(b *testing.B) {
+ v := rebalanceInfo{}
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.MarshalMsg(nil)
+ }
+}
+
+func BenchmarkAppendMsgrebalanceInfo(b *testing.B) {
+ v := rebalanceInfo{}
+ bts := make([]byte, 0, v.Msgsize())
+ bts, _ = v.MarshalMsg(bts[0:0])
+ b.SetBytes(int64(len(bts)))
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ bts, _ = v.MarshalMsg(bts[0:0])
+ }
+}
+
+func BenchmarkUnmarshalrebalanceInfo(b *testing.B) {
+ v := rebalanceInfo{}
+ bts, _ := v.MarshalMsg(nil)
+ b.ReportAllocs()
+ b.SetBytes(int64(len(bts)))
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestEncodeDecoderebalanceInfo(t *testing.T) {
+ v := rebalanceInfo{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+
+ m := v.Msgsize()
+ if buf.Len() > m {
+ t.Log("WARNING: TestEncodeDecoderebalanceInfo Msgsize() is inaccurate")
+ }
+
+ vn := rebalanceInfo{}
+ err := msgp.Decode(&buf, &vn)
+ if err != nil {
+ t.Error(err)
+ }
+
+ buf.Reset()
+ msgp.Encode(&buf, &v)
+ err = msgp.NewReader(&buf).Skip()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func BenchmarkEncoderebalanceInfo(b *testing.B) {
+ v := rebalanceInfo{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ en := msgp.NewWriter(msgp.Nowhere)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.EncodeMsg(en)
+ }
+ en.Flush()
+}
+
+func BenchmarkDecoderebalanceInfo(b *testing.B) {
+ v := rebalanceInfo{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ rd := msgp.NewEndlessReader(buf.Bytes(), b)
+ dc := msgp.NewReader(rd)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ err := v.DecodeMsg(dc)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestMarshalUnmarshalrebalanceMeta(t *testing.T) {
+ v := rebalanceMeta{}
+ bts, err := v.MarshalMsg(nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ left, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
+ }
+
+ left, err = msgp.Skip(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
+ }
+}
+
+func BenchmarkMarshalMsgrebalanceMeta(b *testing.B) {
+ v := rebalanceMeta{}
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.MarshalMsg(nil)
+ }
+}
+
+func BenchmarkAppendMsgrebalanceMeta(b *testing.B) {
+ v := rebalanceMeta{}
+ bts := make([]byte, 0, v.Msgsize())
+ bts, _ = v.MarshalMsg(bts[0:0])
+ b.SetBytes(int64(len(bts)))
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ bts, _ = v.MarshalMsg(bts[0:0])
+ }
+}
+
+func BenchmarkUnmarshalrebalanceMeta(b *testing.B) {
+ v := rebalanceMeta{}
+ bts, _ := v.MarshalMsg(nil)
+ b.ReportAllocs()
+ b.SetBytes(int64(len(bts)))
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestEncodeDecoderebalanceMeta(t *testing.T) {
+ v := rebalanceMeta{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+
+ m := v.Msgsize()
+ if buf.Len() > m {
+ t.Log("WARNING: TestEncodeDecoderebalanceMeta Msgsize() is inaccurate")
+ }
+
+ vn := rebalanceMeta{}
+ err := msgp.Decode(&buf, &vn)
+ if err != nil {
+ t.Error(err)
+ }
+
+ buf.Reset()
+ msgp.Encode(&buf, &v)
+ err = msgp.NewReader(&buf).Skip()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func BenchmarkEncoderebalanceMeta(b *testing.B) {
+ v := rebalanceMeta{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ en := msgp.NewWriter(msgp.Nowhere)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.EncodeMsg(en)
+ }
+ en.Flush()
+}
+
+func BenchmarkDecoderebalanceMeta(b *testing.B) {
+ v := rebalanceMeta{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ rd := msgp.NewEndlessReader(buf.Bytes(), b)
+ dc := msgp.NewReader(rd)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ err := v.DecodeMsg(dc)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestMarshalUnmarshalrebalanceMetrics(t *testing.T) {
+ v := rebalanceMetrics{}
+ bts, err := v.MarshalMsg(nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ left, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
+ }
+
+ left, err = msgp.Skip(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
+ }
+}
+
+func BenchmarkMarshalMsgrebalanceMetrics(b *testing.B) {
+ v := rebalanceMetrics{}
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.MarshalMsg(nil)
+ }
+}
+
+func BenchmarkAppendMsgrebalanceMetrics(b *testing.B) {
+ v := rebalanceMetrics{}
+ bts := make([]byte, 0, v.Msgsize())
+ bts, _ = v.MarshalMsg(bts[0:0])
+ b.SetBytes(int64(len(bts)))
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ bts, _ = v.MarshalMsg(bts[0:0])
+ }
+}
+
+func BenchmarkUnmarshalrebalanceMetrics(b *testing.B) {
+ v := rebalanceMetrics{}
+ bts, _ := v.MarshalMsg(nil)
+ b.ReportAllocs()
+ b.SetBytes(int64(len(bts)))
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestEncodeDecoderebalanceMetrics(t *testing.T) {
+ v := rebalanceMetrics{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+
+ m := v.Msgsize()
+ if buf.Len() > m {
+ t.Log("WARNING: TestEncodeDecoderebalanceMetrics Msgsize() is inaccurate")
+ }
+
+ vn := rebalanceMetrics{}
+ err := msgp.Decode(&buf, &vn)
+ if err != nil {
+ t.Error(err)
+ }
+
+ buf.Reset()
+ msgp.Encode(&buf, &v)
+ err = msgp.NewReader(&buf).Skip()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func BenchmarkEncoderebalanceMetrics(b *testing.B) {
+ v := rebalanceMetrics{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ en := msgp.NewWriter(msgp.Nowhere)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.EncodeMsg(en)
+ }
+ en.Flush()
+}
+
+func BenchmarkDecoderebalanceMetrics(b *testing.B) {
+ v := rebalanceMetrics{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ rd := msgp.NewEndlessReader(buf.Bytes(), b)
+ dc := msgp.NewReader(rd)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ err := v.DecodeMsg(dc)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestMarshalUnmarshalrebalanceStats(t *testing.T) {
+ v := rebalanceStats{}
+ bts, err := v.MarshalMsg(nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ left, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
+ }
+
+ left, err = msgp.Skip(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
+ }
+}
+
+func BenchmarkMarshalMsgrebalanceStats(b *testing.B) {
+ v := rebalanceStats{}
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.MarshalMsg(nil)
+ }
+}
+
+func BenchmarkAppendMsgrebalanceStats(b *testing.B) {
+ v := rebalanceStats{}
+ bts := make([]byte, 0, v.Msgsize())
+ bts, _ = v.MarshalMsg(bts[0:0])
+ b.SetBytes(int64(len(bts)))
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ bts, _ = v.MarshalMsg(bts[0:0])
+ }
+}
+
+func BenchmarkUnmarshalrebalanceStats(b *testing.B) {
+ v := rebalanceStats{}
+ bts, _ := v.MarshalMsg(nil)
+ b.ReportAllocs()
+ b.SetBytes(int64(len(bts)))
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestEncodeDecoderebalanceStats(t *testing.T) {
+ v := rebalanceStats{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+
+ m := v.Msgsize()
+ if buf.Len() > m {
+ t.Log("WARNING: TestEncodeDecoderebalanceStats Msgsize() is inaccurate")
+ }
+
+ vn := rebalanceStats{}
+ err := msgp.Decode(&buf, &vn)
+ if err != nil {
+ t.Error(err)
+ }
+
+ buf.Reset()
+ msgp.Encode(&buf, &v)
+ err = msgp.NewReader(&buf).Skip()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func BenchmarkEncoderebalanceStats(b *testing.B) {
+ v := rebalanceStats{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ en := msgp.NewWriter(msgp.Nowhere)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.EncodeMsg(en)
+ }
+ en.Flush()
+}
+
+func BenchmarkDecoderebalanceStats(b *testing.B) {
+ v := rebalanceStats{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ rd := msgp.NewEndlessReader(buf.Bytes(), b)
+ dc := msgp.NewReader(rd)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ err := v.DecodeMsg(dc)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestMarshalUnmarshalrstats(t *testing.T) {
+ v := rstats{}
+ bts, err := v.MarshalMsg(nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ left, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
+ }
+
+ left, err = msgp.Skip(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
+ }
+}
+
+func BenchmarkMarshalMsgrstats(b *testing.B) {
+ v := rstats{}
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.MarshalMsg(nil)
+ }
+}
+
+func BenchmarkAppendMsgrstats(b *testing.B) {
+ v := rstats{}
+ bts := make([]byte, 0, v.Msgsize())
+ bts, _ = v.MarshalMsg(bts[0:0])
+ b.SetBytes(int64(len(bts)))
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ bts, _ = v.MarshalMsg(bts[0:0])
+ }
+}
+
+func BenchmarkUnmarshalrstats(b *testing.B) {
+ v := rstats{}
+ bts, _ := v.MarshalMsg(nil)
+ b.ReportAllocs()
+ b.SetBytes(int64(len(bts)))
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestEncodeDecoderstats(t *testing.T) {
+ v := rstats{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+
+ m := v.Msgsize()
+ if buf.Len() > m {
+ t.Log("WARNING: TestEncodeDecoderstats Msgsize() is inaccurate")
+ }
+
+ vn := rstats{}
+ err := msgp.Decode(&buf, &vn)
+ if err != nil {
+ t.Error(err)
+ }
+
+ buf.Reset()
+ msgp.Encode(&buf, &v)
+ err = msgp.NewReader(&buf).Skip()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func BenchmarkEncoderstats(b *testing.B) {
+ v := rstats{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ en := msgp.NewWriter(msgp.Nowhere)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.EncodeMsg(en)
+ }
+ en.Flush()
+}
+
+func BenchmarkDecoderstats(b *testing.B) {
+ v := rstats{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ rd := msgp.NewEndlessReader(buf.Bytes(), b)
+ dc := msgp.NewReader(rd)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ err := v.DecodeMsg(dc)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go
index f57bda199..c4b33dc35 100644
--- a/cmd/erasure-server-pool.go
+++ b/cmd/erasure-server-pool.go
@@ -44,7 +44,11 @@ import (
type erasureServerPools struct {
poolMetaMutex sync.RWMutex
poolMeta poolMeta
- serverPools []*erasureSets
+
+ rebalMu sync.RWMutex
+ rebalMeta *rebalanceMeta
+
+ serverPools []*erasureSets
// Shut down async operations
shutdown context.CancelFunc
@@ -327,8 +331,9 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, b
g := errgroup.WithNErrs(len(z.serverPools))
for index := range z.serverPools {
index := index
- // skip suspended pools for any new I/O.
- if z.IsSuspended(index) {
+ // Skip suspended pools or pools participating in rebalance for any new
+ // I/O.
+ if z.IsSuspended(index) || z.IsPoolRebalancing(index) {
continue
}
pool := z.serverPools[index]
@@ -426,6 +431,10 @@ func (z *erasureServerPools) getPoolInfoExistingWithOpts(ctx context.Context, bu
if z.IsSuspended(pinfo.Index) && opts.SkipDecommissioned {
continue
}
+ // Skip object if it's from pools participating in a rebalance operation.
+ if opts.SkipRebalancing && z.IsPoolRebalancing(pinfo.Index) {
+ continue
+ }
if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) {
return pinfo, pinfo.Err
@@ -466,6 +475,7 @@ func (z *erasureServerPools) getPoolIdxExistingNoLock(ctx context.Context, bucke
return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{
NoLock: true,
SkipDecommissioned: true,
+ SkipRebalancing: true,
})
}
@@ -489,7 +499,10 @@ func (z *erasureServerPools) getPoolIdxNoLock(ctx context.Context, bucket, objec
// if none are found falls back to most available space pool, this function is
// designed to be only used by PutObject, CopyObject (newObject creation) and NewMultipartUpload.
func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) {
- idx, err = z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{SkipDecommissioned: true})
+ idx, err = z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{
+ SkipDecommissioned: true,
+ SkipRebalancing: true,
+ })
if err != nil && !isErrObjectNotFound(err) {
return idx, err
}
@@ -1387,9 +1400,10 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj
}
for idx, pool := range z.serverPools {
- if z.IsSuspended(idx) {
+ if z.IsSuspended(idx) || z.IsPoolRebalancing(idx) {
continue
}
+
result, err := pool.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList)
if err != nil {
return nil, err
diff --git a/cmd/iam-object-store.go b/cmd/iam-object-store.go
index 617a007a4..c4c6efee7 100644
--- a/cmd/iam-object-store.go
+++ b/cmd/iam-object-store.go
@@ -91,7 +91,7 @@ func (iamOS *IAMObjectStore) saveIAMConfig(ctx context.Context, item interface{}
}
func (iamOS *IAMObjectStore) loadIAMConfigBytesWithMetadata(ctx context.Context, objPath string) ([]byte, ObjectInfo, error) {
- data, meta, err := readConfigWithMetadata(ctx, iamOS.objAPI, objPath)
+ data, meta, err := readConfigWithMetadata(ctx, iamOS.objAPI, objPath, ObjectOptions{})
if err != nil {
return nil, meta, err
}
diff --git a/cmd/notification.go b/cmd/notification.go
index cfbf2dc24..e15cb9c54 100644
--- a/cmd/notification.go
+++ b/cmd/notification.go
@@ -630,6 +630,49 @@ func (sys *NotificationSys) ReloadPoolMeta(ctx context.Context) {
}
}
+// StopRebalance notifies all MinIO nodes to signal any ongoing rebalance
+// goroutine to stop.
+func (sys *NotificationSys) StopRebalance(ctx context.Context) {
+ ng := WithNPeers(len(sys.peerClients))
+ for idx, client := range sys.peerClients {
+ if client == nil {
+ continue
+ }
+ client := client
+ ng.Go(ctx, func() error {
+ return client.StopRebalance(ctx)
+ }, idx, *client.host)
+ }
+ for _, nErr := range ng.Wait() {
+ reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String())
+ if nErr.Err != nil {
+ logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err)
+ }
+ }
+}
+
+// LoadRebalanceMeta notifies all peers to load rebalance.bin from object layer.
+// Note: Only peers participating in rebalance operation, namely the first node
+// in each pool will load rebalance.bin.
+func (sys *NotificationSys) LoadRebalanceMeta(ctx context.Context, startRebalance bool) {
+ ng := WithNPeers(len(sys.peerClients))
+ for idx, client := range sys.peerClients {
+ if client == nil {
+ continue
+ }
+ client := client
+ ng.Go(ctx, func() error {
+ return client.LoadRebalanceMeta(ctx, startRebalance)
+ }, idx, *client.host)
+ }
+ for _, nErr := range ng.Wait() {
+ reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String())
+ if nErr.Err != nil {
+ logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err)
+ }
+ }
+}
+
// LoadTransitionTierConfig notifies remote peers to load their remote tier
// configs from config store.
func (sys *NotificationSys) LoadTransitionTierConfig(ctx context.Context) {
diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go
index 9b3d6329c..0236e63fd 100644
--- a/cmd/object-api-interface.go
+++ b/cmd/object-api-interface.go
@@ -85,6 +85,9 @@ type ObjectOptions struct {
// SkipDecommissioned set to 'true' if the call requires skipping the pool being decommissioned.
// mainly set for certain WRITE operations.
SkipDecommissioned bool
+ // SkipRebalancing should be set to 'true' if the call should skip pools
+ // participating in a rebalance operation. Typically set for 'write' operations.
+ SkipRebalancing bool
WalkFilter func(info FileInfo) bool // return WalkFilter returns 'true/false'
WalkMarker string // set to skip until this object
diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go
index 943bc7215..b2ed6961e 100644
--- a/cmd/peer-rest-client.go
+++ b/cmd/peer-rest-client.go
@@ -551,6 +551,28 @@ func (client *peerRESTClient) ReloadPoolMeta(ctx context.Context) error {
return nil
}
+func (client *peerRESTClient) StopRebalance(ctx context.Context) error {
+ respBody, err := client.callWithContext(ctx, peerRESTMethodStopRebalance, nil, nil, 0)
+ if err != nil {
+ logger.LogIf(ctx, err)
+ return err
+ }
+ defer http.DrainBody(respBody)
+ return nil
+}
+
+func (client *peerRESTClient) LoadRebalanceMeta(ctx context.Context, startRebalance bool) error {
+ values := url.Values{}
+ values.Set(peerRESTStartRebalance, strconv.FormatBool(startRebalance))
+ respBody, err := client.callWithContext(ctx, peerRESTMethodLoadRebalanceMeta, values, nil, 0)
+ if err != nil {
+ logger.LogIf(ctx, err)
+ return err
+ }
+ defer http.DrainBody(respBody)
+ return nil
+}
+
func (client *peerRESTClient) LoadTransitionTierConfig(ctx context.Context) error {
respBody, err := client.callWithContext(ctx, peerRESTMethodLoadTransitionTierConfig, nil, nil, 0)
if err != nil {
diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go
index 28901c4bc..11724c826 100644
--- a/cmd/peer-rest-common.go
+++ b/cmd/peer-rest-common.go
@@ -18,7 +18,8 @@
package cmd
const (
- peerRESTVersion = "v27" // change in GetAllBucketStats response.
+ peerRESTVersion = "v28" // Added Rebalance peer APIs
+
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
peerRESTPrefix = minioReservedBucketPath + "/peer"
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix
@@ -68,6 +69,8 @@ const (
peerRESTMethodDriveSpeedTest = "/drivespeedtest"
peerRESTMethodReloadSiteReplicationConfig = "/reloadsitereplicationconfig"
peerRESTMethodReloadPoolMeta = "/reloadpoolmeta"
+ peerRESTMethodLoadRebalanceMeta = "/loadrebalancemeta"
+ peerRESTMethodStopRebalance = "/stoprebalance"
peerRESTMethodGetLastDayTierStats = "/getlastdaytierstats"
peerRESTMethodDevNull = "/devnull"
peerRESTMethodNetperf = "/netperf"
@@ -75,25 +78,26 @@ const (
)
const (
- peerRESTBucket = "bucket"
- peerRESTBuckets = "buckets"
- peerRESTUser = "user"
- peerRESTGroup = "group"
- peerRESTUserTemp = "user-temp"
- peerRESTPolicy = "policy"
- peerRESTUserOrGroup = "user-or-group"
- peerRESTUserType = "user-type"
- peerRESTIsGroup = "is-group"
- peerRESTSignal = "signal"
- peerRESTSubSys = "sub-sys"
- peerRESTProfiler = "profiler"
- peerRESTSize = "size"
- peerRESTConcurrent = "concurrent"
- peerRESTDuration = "duration"
- peerRESTStorageClass = "storage-class"
- peerRESTMetricsTypes = "types"
- peerRESTDisk = "disk"
- peerRESTJobID = "job-id"
+ peerRESTBucket = "bucket"
+ peerRESTBuckets = "buckets"
+ peerRESTUser = "user"
+ peerRESTGroup = "group"
+ peerRESTUserTemp = "user-temp"
+ peerRESTPolicy = "policy"
+ peerRESTUserOrGroup = "user-or-group"
+ peerRESTUserType = "user-type"
+ peerRESTIsGroup = "is-group"
+ peerRESTSignal = "signal"
+ peerRESTSubSys = "sub-sys"
+ peerRESTProfiler = "profiler"
+ peerRESTSize = "size"
+ peerRESTConcurrent = "concurrent"
+ peerRESTDuration = "duration"
+ peerRESTStorageClass = "storage-class"
+ peerRESTMetricsTypes = "types"
+ peerRESTDisk = "disk"
+ peerRESTJobID = "job-id"
+ peerRESTStartRebalance = "start-rebalance"
peerRESTListenBucket = "bucket"
peerRESTListenPrefix = "prefix"
diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go
index 0af707fcc..adc907663 100644
--- a/cmd/peer-rest-server.go
+++ b/cmd/peer-rest-server.go
@@ -1048,6 +1048,60 @@ func (s *peerRESTServer) ReloadPoolMetaHandler(w http.ResponseWriter, r *http.Re
}
}
+func (s *peerRESTServer) StopRebalanceHandler(w http.ResponseWriter, r *http.Request) {
+ if !s.IsValid(w, r) {
+ s.writeErrorResponse(w, errors.New("invalid request"))
+ return
+ }
+
+ objAPI := newObjectLayerFn()
+ if objAPI == nil {
+ s.writeErrorResponse(w, errServerNotInitialized)
+ return
+ }
+ pools, ok := objAPI.(*erasureServerPools)
+ if !ok {
+ s.writeErrorResponse(w, errors.New("not a multiple pools setup"))
+ return
+ }
+
+ pools.StopRebalance()
+}
+
+func (s *peerRESTServer) LoadRebalanceMetaHandler(w http.ResponseWriter, r *http.Request) {
+ if !s.IsValid(w, r) {
+ s.writeErrorResponse(w, errors.New("invalid request"))
+ return
+ }
+
+ objAPI := newObjectLayerFn()
+ if objAPI == nil {
+ s.writeErrorResponse(w, errServerNotInitialized)
+ return
+ }
+
+ pools, ok := objAPI.(*erasureServerPools)
+ if !ok {
+ s.writeErrorResponse(w, errors.New("not a multiple pools setup"))
+ return
+ }
+
+ startRebalanceStr := r.Form.Get(peerRESTStartRebalance)
+ startRebalance, err := strconv.ParseBool(startRebalanceStr)
+ if err != nil {
+ s.writeErrorResponse(w, err)
+ return
+ }
+
+ if err := pools.loadRebalanceMeta(r.Context()); err != nil {
+ s.writeErrorResponse(w, err)
+ return
+ }
+ if startRebalance {
+ go pools.StartRebalance()
+ }
+}
+
func (s *peerRESTServer) LoadTransitionTierConfigHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
@@ -1352,5 +1406,7 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDevNull).HandlerFunc(httpTraceHdrs(server.DevNull))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadSiteReplicationConfig).HandlerFunc(httpTraceHdrs(server.ReloadSiteReplicationConfigHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadPoolMeta).HandlerFunc(httpTraceHdrs(server.ReloadPoolMetaHandler))
+ subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadRebalanceMeta).HandlerFunc(httpTraceHdrs(server.LoadRebalanceMetaHandler))
+ subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStopRebalance).HandlerFunc(httpTraceHdrs(server.StopRebalanceHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLastDayTierStats).HandlerFunc(httpTraceHdrs(server.GetLastDayTierStatsHandler))
}
diff --git a/cmd/rebalance-admin.go b/cmd/rebalance-admin.go
new file mode 100644
index 000000000..ff22abcae
--- /dev/null
+++ b/cmd/rebalance-admin.go
@@ -0,0 +1,108 @@
+// Copyright (c) 2022 MinIO, Inc.
+//
+// This file is part of MinIO Object Storage stack
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package cmd
+
+import (
+ "context"
+ "time"
+)
+
+type rebalPoolProgress struct {
+ NumObjects uint64 `json:"objects"`
+ NumVersions uint64 `json:"versions"`
+ Bytes uint64 `json:"bytes"`
+ Bucket string `json:"bucket"`
+ Object string `json:"object"`
+ Elapsed time.Duration `json:"elapsed"`
+ ETA time.Duration `json:"eta"`
+}
+
+type rebalancePoolStatus struct {
+ ID int `json:"id"` // Pool index (zero-based)
+ Status string `json:"status"` // Active if rebalance is running, empty otherwise
+ Used float64 `json:"used"` // Percentage used space
+ Progress rebalPoolProgress `json:"progress,omitempty"` // is empty when rebalance is not running
+}
+
+// rebalanceAdminStatus holds rebalance status related information exported to mc, console, etc.
+type rebalanceAdminStatus struct {
+ ID string // identifies the ongoing rebalance operation by a uuid
+ Pools []rebalancePoolStatus `json:"pools"` // contains all pools, including inactive
+ StoppedAt time.Time `json:"stoppedAt,omitempty"`
+}
+
+func rebalanceStatus(ctx context.Context, z *erasureServerPools) (r rebalanceAdminStatus, err error) {
+ // Load latest rebalance status
+ meta := &rebalanceMeta{}
+ err = meta.load(ctx, z.serverPools[0])
+ if err != nil {
+ return r, err
+ }
+
+ // Compute disk usage percentage
+ si, _ := z.StorageInfo(ctx)
+ diskStats := make([]struct {
+ AvailableSpace uint64
+ TotalSpace uint64
+ }, len(z.serverPools))
+ for _, disk := range si.Disks {
+ diskStats[disk.PoolIndex].AvailableSpace += disk.AvailableSpace
+ diskStats[disk.PoolIndex].TotalSpace += disk.TotalSpace
+ }
+
+ stopTime := meta.StoppedAt
+ r = rebalanceAdminStatus{
+ ID: meta.ID,
+ StoppedAt: meta.StoppedAt,
+ Pools: make([]rebalancePoolStatus, len(meta.PoolStats)),
+ }
+ for i, ps := range meta.PoolStats {
+ r.Pools[i] = rebalancePoolStatus{
+ ID: i,
+ Status: ps.Info.Status.String(),
+ Used: float64(diskStats[i].TotalSpace-diskStats[i].AvailableSpace) / float64(diskStats[i].TotalSpace),
+ }
+ if !ps.Participating {
+ continue
+ }
+ // for participating pools, total bytes to be rebalanced by this pool is given by,
+ // pf_c = (f_i + x)/c_i,
+ // pf_c - percentage free space across pools, f_i - ith pool's free space, c_i - ith pool's capacity
+ // i.e. x = c_i*pfc -f_i
+ totalBytesToRebal := float64(ps.InitCapacity)*meta.PercentFreeGoal - float64(ps.InitFreeSpace)
+ elapsed := time.Since(ps.Info.StartTime)
+ eta := time.Duration(totalBytesToRebal * float64(elapsed) / float64(ps.Bytes))
+ if !ps.Info.EndTime.IsZero() {
+ stopTime = ps.Info.EndTime
+ }
+
+ if !stopTime.IsZero() { // rebalance is stopped or completed
+ elapsed = stopTime.Sub(ps.Info.StartTime)
+ eta = 0
+ }
+
+ r.Pools[i].Progress = rebalPoolProgress{
+ NumObjects: ps.NumObjects,
+ NumVersions: ps.NumVersions,
+ Bytes: ps.Bytes,
+ Elapsed: elapsed,
+ ETA: eta,
+ }
+ }
+ return r, nil
+}
diff --git a/cmd/rebalancemetric_string.go b/cmd/rebalancemetric_string.go
new file mode 100644
index 000000000..930e04341
--- /dev/null
+++ b/cmd/rebalancemetric_string.go
@@ -0,0 +1,27 @@
+// Code generated by "stringer -type=rebalanceMetric -trimprefix=rebalanceMetric erasure-server-pool-rebalance.go"; DO NOT EDIT.
+
+package cmd
+
+import "strconv"
+
+func _() {
+ // An "invalid array index" compiler error signifies that the constant values have changed.
+ // Re-run the stringer command to generate them again.
+ var x [1]struct{}
+ _ = x[rebalanceMetricRebalanceBuckets-0]
+ _ = x[rebalanceMetricRebalanceBucket-1]
+ _ = x[rebalanceMetricRebalanceObject-2]
+ _ = x[rebalanceMetricRebalanceRemoveObject-3]
+ _ = x[rebalanceMetricSaveMetadata-4]
+}
+
+const _rebalanceMetric_name = "RebalanceBucketsRebalanceBucketRebalanceObjectRebalanceRemoveObjectSaveMetadata"
+
+var _rebalanceMetric_index = [...]uint8{0, 16, 31, 46, 67, 79}
+
+func (i rebalanceMetric) String() string {
+ if i >= rebalanceMetric(len(_rebalanceMetric_index)-1) {
+ return "rebalanceMetric(" + strconv.FormatInt(int64(i), 10) + ")"
+ }
+ return _rebalanceMetric_name[_rebalanceMetric_index[i]:_rebalanceMetric_index[i+1]]
+}
diff --git a/cmd/rebalstatus_string.go b/cmd/rebalstatus_string.go
new file mode 100644
index 000000000..0dc74b217
--- /dev/null
+++ b/cmd/rebalstatus_string.go
@@ -0,0 +1,27 @@
+// Code generated by "stringer -type=rebalStatus -trimprefix=rebal erasure-server-pool-rebalance.go"; DO NOT EDIT.
+
+package cmd
+
+import "strconv"
+
+func _() {
+ // An "invalid array index" compiler error signifies that the constant values have changed.
+ // Re-run the stringer command to generate them again.
+ var x [1]struct{}
+ _ = x[rebalNone-0]
+ _ = x[rebalStarted-1]
+ _ = x[rebalCompleted-2]
+ _ = x[rebalStopped-3]
+ _ = x[rebalFailed-4]
+}
+
+const _rebalStatus_name = "NoneStartedCompletedStoppedFailed"
+
+var _rebalStatus_index = [...]uint8{0, 4, 11, 20, 27, 33}
+
+func (i rebalStatus) String() string {
+ if i >= rebalStatus(len(_rebalStatus_index)-1) {
+ return "rebalStatus(" + strconv.FormatInt(int64(i), 10) + ")"
+ }
+ return _rebalStatus_name[_rebalStatus_index[i]:_rebalStatus_index[i+1]]
+}