remove region locks and make them simpler (#20268)

- single flight approach is now optional, instead of default.
- parallelize the loaders upto 32 items per assets (more room for improvement possible)
This commit is contained in:
Harshavardhana
2024-08-15 08:41:03 -07:00
committed by GitHub
parent f1302c40fe
commit cc0c41d216
4 changed files with 333 additions and 111 deletions

View File

@@ -34,6 +34,7 @@ import (
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/kms"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v3/sync/errgroup"
"github.com/puzpuzpuz/xsync/v3"
)
@@ -182,19 +183,20 @@ func (iamOS *IAMObjectStore) loadPolicyDocWithRetry(ctx context.Context, policy
}
}
func (iamOS *IAMObjectStore) loadPolicyDoc(ctx context.Context, policy string, m map[string]PolicyDoc) error {
func (iamOS *IAMObjectStore) loadPolicy(ctx context.Context, policy string) (PolicyDoc, error) {
var p PolicyDoc
data, objInfo, err := iamOS.loadIAMConfigBytesWithMetadata(ctx, getPolicyDocPath(policy))
if err != nil {
if err == errConfigNotFound {
return errNoSuchPolicy
return p, errNoSuchPolicy
}
return err
return p, err
}
var p PolicyDoc
err = p.parseJSON(data)
if err != nil {
return err
return p, err
}
if p.Version == 0 {
@@ -205,6 +207,14 @@ func (iamOS *IAMObjectStore) loadPolicyDoc(ctx context.Context, policy string, m
p.UpdateDate = objInfo.ModTime
}
return p, nil
}
func (iamOS *IAMObjectStore) loadPolicyDoc(ctx context.Context, policy string, m map[string]PolicyDoc) error {
p, err := iamOS.loadPolicy(ctx, policy)
if err != nil {
return err
}
m[policy] = p
return nil
}
@@ -237,21 +247,21 @@ func (iamOS *IAMObjectStore) loadSecretKey(ctx context.Context, user string, use
return u.Credentials.SecretKey, nil
}
func (iamOS *IAMObjectStore) loadUser(ctx context.Context, user string, userType IAMUserType, m map[string]UserIdentity) error {
func (iamOS *IAMObjectStore) loadUserIdentity(ctx context.Context, user string, userType IAMUserType) (UserIdentity, error) {
var u UserIdentity
err := iamOS.loadIAMConfig(ctx, &u, getUserIdentityPath(user, userType))
if err != nil {
if err == errConfigNotFound {
return errNoSuchUser
return u, errNoSuchUser
}
return err
return u, err
}
if u.Credentials.IsExpired() {
// Delete expired identity - ignoring errors here.
iamOS.deleteIAMConfig(ctx, getUserIdentityPath(user, userType))
iamOS.deleteIAMConfig(ctx, getMappedPolicyPath(user, userType, false))
return nil
return u, errNoSuchUser
}
if u.Credentials.AccessKey == "" {
@@ -267,7 +277,7 @@ func (iamOS *IAMObjectStore) loadUser(ctx context.Context, user string, userType
iamOS.deleteIAMConfig(ctx, getUserIdentityPath(user, userType))
iamOS.deleteIAMConfig(ctx, getMappedPolicyPath(user, userType, false))
}
return nil
return u, errNoSuchUser
}
u.Credentials.Claims = jwtClaims.Map()
@@ -277,6 +287,35 @@ func (iamOS *IAMObjectStore) loadUser(ctx context.Context, user string, userType
u.Credentials.Description = u.Credentials.Comment
}
return u, nil
}
func (iamOS *IAMObjectStore) loadUserConcurrent(ctx context.Context, userType IAMUserType, users ...string) ([]UserIdentity, error) {
userIdentities := make([]UserIdentity, len(users))
g := errgroup.WithNErrs(len(users))
for index := range users {
index := index
g.Go(func() error {
userName := path.Dir(users[index])
user, err := iamOS.loadUserIdentity(ctx, userName, userType)
if err != nil && !errors.Is(err, errNoSuchUser) {
return fmt.Errorf("unable to load the user `%s`: %w", userName, err)
}
userIdentities[index] = user
return nil
}, index)
}
err := errors.Join(g.Wait()...)
return userIdentities, err
}
func (iamOS *IAMObjectStore) loadUser(ctx context.Context, user string, userType IAMUserType, m map[string]UserIdentity) error {
u, err := iamOS.loadUserIdentity(ctx, user, userType)
if err != nil {
return err
}
m[user] = u
return nil
}
@@ -358,16 +397,44 @@ func (iamOS *IAMObjectStore) loadMappedPolicyWithRetry(ctx context.Context, name
}
}
func (iamOS *IAMObjectStore) loadMappedPolicy(ctx context.Context, name string, userType IAMUserType, isGroup bool, m *xsync.MapOf[string, MappedPolicy]) error {
func (iamOS *IAMObjectStore) loadMappedPolicyInternal(ctx context.Context, name string, userType IAMUserType, isGroup bool) (MappedPolicy, error) {
var p MappedPolicy
err := iamOS.loadIAMConfig(ctx, &p, getMappedPolicyPath(name, userType, isGroup))
if err != nil {
if err == errConfigNotFound {
return errNoSuchPolicy
return p, errNoSuchPolicy
}
return err
return p, err
}
return p, nil
}
func (iamOS *IAMObjectStore) loadMappedPolicyConcurrent(ctx context.Context, userType IAMUserType, isGroup bool, users ...string) ([]MappedPolicy, error) {
mappedPolicies := make([]MappedPolicy, len(users))
g := errgroup.WithNErrs(len(users))
for index := range users {
index := index
g.Go(func() error {
userName := strings.TrimSuffix(users[index], ".json")
userMP, err := iamOS.loadMappedPolicyInternal(ctx, userName, userType, isGroup)
if err != nil && !errors.Is(err, errNoSuchPolicy) {
return fmt.Errorf("unable to load the user policy map `%s`: %w", userName, err)
}
mappedPolicies[index] = userMP
return nil
}, index)
}
err := errors.Join(g.Wait()...)
return mappedPolicies, err
}
func (iamOS *IAMObjectStore) loadMappedPolicy(ctx context.Context, name string, userType IAMUserType, isGroup bool, m *xsync.MapOf[string, MappedPolicy]) error {
p, err := iamOS.loadMappedPolicyInternal(ctx, name, userType, isGroup)
if err != nil {
return err
}
m.Store(name, p)
return nil
}
@@ -455,6 +522,27 @@ const (
maxIAMLoadOpTime = 5 * time.Second
)
func (iamOS *IAMObjectStore) loadPolicyDocConcurrent(ctx context.Context, policies ...string) ([]PolicyDoc, error) {
policyDocs := make([]PolicyDoc, len(policies))
g := errgroup.WithNErrs(len(policies))
for index := range policies {
index := index
g.Go(func() error {
policyName := path.Dir(policies[index])
policyDoc, err := iamOS.loadPolicy(ctx, policyName)
if err != nil && !errors.Is(err, errNoSuchPolicy) {
return fmt.Errorf("unable to load the policy doc `%s`: %w", policyName, err)
}
policyDocs[index] = policyDoc
return nil
}, index)
}
err := errors.Join(g.Wait()...)
return policyDocs, err
}
// Assumes cache is locked by caller.
func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iamCache, firstTime bool) error {
bootstrapTraceMsgFirstTime := func(s string) {
@@ -490,12 +578,37 @@ func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iam
policyLoadStartTime := UTCNow()
policiesList := listedConfigItems[policiesListKey]
for _, item := range policiesList {
policyName := path.Dir(item)
if err := iamOS.loadPolicyDoc(ctx, policyName, cache.iamPolicyDocsMap); err != nil && !errors.Is(err, errNoSuchPolicy) {
return fmt.Errorf("unable to load the policy doc `%s`: %w", policyName, err)
count := 32 // number of parallel IAM loaders
for {
if len(policiesList) < count {
policyDocs, err := iamOS.loadPolicyDocConcurrent(ctx, policiesList...)
if err != nil {
return err
}
for index := range policiesList {
if policyDocs[index].Policy.Version != "" {
policyName := path.Dir(policiesList[index])
cache.iamPolicyDocsMap[policyName] = policyDocs[index]
}
}
break
}
policyDocs, err := iamOS.loadPolicyDocConcurrent(ctx, policiesList[:count]...)
if err != nil {
return err
}
for index := range policiesList[:count] {
if policyDocs[index].Policy.Version != "" {
policyName := path.Dir(policiesList[index])
cache.iamPolicyDocsMap[policyName] = policyDocs[index]
}
}
policiesList = policiesList[count:]
}
if took := time.Since(policyLoadStartTime); took > maxIAMLoadOpTime {
logger.Info("Policy docs load took %.2fs (for %d items)", took.Seconds(), len(policiesList))
}
@@ -504,12 +617,37 @@ func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iam
bootstrapTraceMsgFirstTime("loading regular IAM users")
regUsersLoadStartTime := UTCNow()
regUsersList := listedConfigItems[usersListKey]
for _, item := range regUsersList {
userName := path.Dir(item)
if err := iamOS.loadUser(ctx, userName, regUser, cache.iamUsersMap); err != nil && err != errNoSuchUser {
return fmt.Errorf("unable to load the user: %w", err)
for {
if len(regUsersList) < count {
users, err := iamOS.loadUserConcurrent(ctx, regUser, regUsersList...)
if err != nil {
return err
}
for index := range regUsersList {
if users[index].Credentials.AccessKey != "" {
userName := path.Dir(regUsersList[index])
cache.iamUsersMap[userName] = users[index]
}
}
break
}
users, err := iamOS.loadUserConcurrent(ctx, regUser, regUsersList[:count]...)
if err != nil {
return err
}
for index := range regUsersList[:count] {
if users[index].Credentials.AccessKey != "" {
userName := path.Dir(regUsersList[index])
cache.iamUsersMap[userName] = users[index]
}
}
regUsersList = regUsersList[count:]
}
if took := time.Since(regUsersLoadStartTime); took > maxIAMLoadOpTime {
actualLoaded := len(cache.iamUsersMap)
logger.Info("Reg. users load took %.2fs (for %d items with %d expired items)", took.Seconds(),
@@ -533,12 +671,38 @@ func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iam
bootstrapTraceMsgFirstTime("loading user policy mapping")
userPolicyMappingLoadStartTime := UTCNow()
userPolicyMappingsList := listedConfigItems[policyDBUsersListKey]
for _, item := range userPolicyMappingsList {
userName := strings.TrimSuffix(item, ".json")
if err := iamOS.loadMappedPolicy(ctx, userName, regUser, false, cache.iamUserPolicyMap); err != nil && !errors.Is(err, errNoSuchPolicy) {
return fmt.Errorf("unable to load the policy mapping for the user: %w", err)
for {
if len(userPolicyMappingsList) < count {
mappedPolicies, err := iamOS.loadMappedPolicyConcurrent(ctx, regUser, false, userPolicyMappingsList...)
if err != nil {
return err
}
for index := range userPolicyMappingsList {
if mappedPolicies[index].Policies != "" {
userName := strings.TrimSuffix(userPolicyMappingsList[index], ".json")
cache.iamUserPolicyMap.Store(userName, mappedPolicies[index])
}
}
break
}
mappedPolicies, err := iamOS.loadMappedPolicyConcurrent(ctx, regUser, false, userPolicyMappingsList[:count]...)
if err != nil {
return err
}
for index := range userPolicyMappingsList[:count] {
if mappedPolicies[index].Policies != "" {
userName := strings.TrimSuffix(userPolicyMappingsList[index], ".json")
cache.iamUserPolicyMap.Store(userName, mappedPolicies[index])
}
}
userPolicyMappingsList = userPolicyMappingsList[count:]
}
if took := time.Since(userPolicyMappingLoadStartTime); took > maxIAMLoadOpTime {
logger.Info("User policy mappings load took %.2fs (for %d items)", took.Seconds(), len(userPolicyMappingsList))
}
@@ -617,21 +781,16 @@ func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iam
for _, item := range listedConfigItems[stsListKey] {
userName := path.Dir(item)
// loadUser() will delete expired user during the load.
err := iamOS.loadUser(ctx, userName, stsUser, stsAccountsFromStore)
if err != nil && !errors.Is(err, errNoSuchUser) {
return fmt.Errorf("unable to load user during STS purge: %w (%s)", err, item)
}
iamLogIf(ctx, iamOS.loadUser(ctx, userName, stsUser, stsAccountsFromStore))
// No need to return errors for failed expiration of STS users
}
// Loading the STS policy mappings from disk ensures that stale entries
// (removed during loadUser() in the loop above) are removed from memory.
for _, item := range listedConfigItems[policyDBSTSUsersListKey] {
stsName := strings.TrimSuffix(item, ".json")
err := iamOS.loadMappedPolicy(ctx, stsName, stsUser, false, stsAccPoliciesFromStore)
if err != nil && !errors.Is(err, errNoSuchPolicy) {
return fmt.Errorf("unable to load policies during STS purge: %w (%s)", err, item)
}
iamLogIf(ctx, iamOS.loadMappedPolicy(ctx, stsName, stsUser, false, stsAccPoliciesFromStore))
// No need to return errors for failed expiration of STS users
}
took := time.Since(purgeStart).Seconds()