fix: make single locks for both IAM and object-store (#9279)

Additionally add context support for IAM sub-system
This commit is contained in:
Harshavardhana
2020-04-07 14:26:39 -07:00
committed by GitHub
parent e375341c33
commit e7276b7b9b
9 changed files with 266 additions and 402 deletions

View File

@@ -74,37 +74,30 @@ func etcdKvsToSetPolicyDB(prefix string, kvs []*mvccpb.KeyValue) set.StringSet {
// IAMEtcdStore implements IAMStorageAPI
type IAMEtcdStore struct {
sync.RWMutex
ctx context.Context
client *etcd.Client
}
func newIAMEtcdStore() *IAMEtcdStore {
return &IAMEtcdStore{client: globalEtcdClient}
func newIAMEtcdStore(ctx context.Context) *IAMEtcdStore {
return &IAMEtcdStore{client: globalEtcdClient, ctx: ctx}
}
func (ies *IAMEtcdStore) getContext() context.Context {
func (ies *IAMEtcdStore) lock() {
ies.Lock()
}
func (ies *IAMEtcdStore) unlock() {
ies.Unlock()
}
func (ies *IAMEtcdStore) rlock() {
ies.RLock()
defer ies.RUnlock()
if ies.ctx == nil {
return context.Background()
}
return ies.ctx
}
func (ies *IAMEtcdStore) setContext(ctx context.Context) {
ies.Lock()
defer ies.Unlock()
ies.ctx = ctx
}
func (ies *IAMEtcdStore) clearContext() {
ies.Lock()
defer ies.Unlock()
ies.ctx = nil
func (ies *IAMEtcdStore) runlock() {
ies.RUnlock()
}
func (ies *IAMEtcdStore) saveIAMConfig(item interface{}, path string) error {
@@ -118,11 +111,11 @@ func (ies *IAMEtcdStore) saveIAMConfig(item interface{}, path string) error {
return err
}
}
return saveKeyEtcd(ies.getContext(), ies.client, path, data)
return saveKeyEtcd(ies.ctx, ies.client, path, data)
}
func (ies *IAMEtcdStore) loadIAMConfig(item interface{}, path string) error {
pdata, err := readKeyEtcd(ies.getContext(), ies.client, path)
pdata, err := readKeyEtcd(ies.ctx, ies.client, path)
if err != nil {
return err
}
@@ -138,19 +131,17 @@ func (ies *IAMEtcdStore) loadIAMConfig(item interface{}, path string) error {
}
func (ies *IAMEtcdStore) deleteIAMConfig(path string) error {
return deleteKeyEtcd(ies.getContext(), ies.client, path)
return deleteKeyEtcd(ies.ctx, ies.client, path)
}
func (ies *IAMEtcdStore) migrateUsersConfigToV1(isSTS bool) error {
func (ies *IAMEtcdStore) migrateUsersConfigToV1(ctx context.Context, isSTS bool) error {
basePrefix := iamConfigUsersPrefix
if isSTS {
basePrefix = iamConfigSTSPrefix
}
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
ctx, cancel := context.WithTimeout(ctx, defaultContextTimeout)
defer cancel()
ies.setContext(ctx)
defer ies.clearContext()
r, err := ies.client.Get(ctx, basePrefix, etcd.WithPrefix(), etcd.WithKeysOnly())
if err != nil {
return err
@@ -216,7 +207,7 @@ func (ies *IAMEtcdStore) migrateUsersConfigToV1(isSTS bool) error {
cred.AccessKey = user
u := newUserIdentity(cred)
if err := ies.saveIAMConfig(u, identityPath); err != nil {
logger.LogIf(context.Background(), err)
logger.LogIf(ctx, err)
return err
}
@@ -226,7 +217,7 @@ func (ies *IAMEtcdStore) migrateUsersConfigToV1(isSTS bool) error {
return nil
}
func (ies *IAMEtcdStore) migrateToV1() error {
func (ies *IAMEtcdStore) migrateToV1(ctx context.Context) error {
var iamFmt iamFormat
path := getIAMFormatFilePath()
if err := ies.loadIAMConfig(&iamFmt, path); err != nil {
@@ -248,29 +239,26 @@ func (ies *IAMEtcdStore) migrateToV1() error {
}
// Migrate long-term users
if err := ies.migrateUsersConfigToV1(false); err != nil {
logger.LogIf(context.Background(), err)
if err := ies.migrateUsersConfigToV1(ctx, false); err != nil {
logger.LogIf(ctx, err)
return err
}
// Migrate STS users
if err := ies.migrateUsersConfigToV1(true); err != nil {
logger.LogIf(context.Background(), err)
if err := ies.migrateUsersConfigToV1(ctx, true); err != nil {
logger.LogIf(ctx, err)
return err
}
// Save iam version file.
if err := ies.saveIAMConfig(newIAMFormatVersion1(), path); err != nil {
logger.LogIf(context.Background(), err)
logger.LogIf(ctx, err)
return err
}
return nil
}
// Should be called under config migration lock
func (ies *IAMEtcdStore) migrateBackendFormat(objAPI ObjectLayer) error {
if err := ies.migrateToV1(); err != nil {
return err
}
return nil
func (ies *IAMEtcdStore) migrateBackendFormat(ctx context.Context) error {
return ies.migrateToV1(ctx)
}
func (ies *IAMEtcdStore) loadPolicyDoc(policy string, m map[string]iampolicy.Policy) error {
@@ -283,11 +271,9 @@ func (ies *IAMEtcdStore) loadPolicyDoc(policy string, m map[string]iampolicy.Pol
return nil
}
func (ies *IAMEtcdStore) loadPolicyDocs(m map[string]iampolicy.Policy) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
func (ies *IAMEtcdStore) loadPolicyDocs(ctx context.Context, m map[string]iampolicy.Policy) error {
ctx, cancel := context.WithTimeout(ctx, defaultContextTimeout)
defer cancel()
ies.setContext(ctx)
defer ies.clearContext()
r, err := ies.client.Get(ctx, iamConfigPoliciesPrefix, etcd.WithPrefix(), etcd.WithKeysOnly())
if err != nil {
return err
@@ -317,9 +303,8 @@ func (ies *IAMEtcdStore) loadUser(user string, userType IAMUserType, m map[strin
if u.Credentials.IsExpired() {
// Delete expired identity.
ctx := ies.getContext()
deleteKeyEtcd(ctx, ies.client, getUserIdentityPath(user, userType))
deleteKeyEtcd(ctx, ies.client, getMappedPolicyPath(user, userType, false))
deleteKeyEtcd(ies.ctx, ies.client, getUserIdentityPath(user, userType))
deleteKeyEtcd(ies.ctx, ies.client, getMappedPolicyPath(user, userType, false))
return nil
}
@@ -351,7 +336,7 @@ func (ies *IAMEtcdStore) loadUser(user string, userType IAMUserType, m map[strin
}
func (ies *IAMEtcdStore) loadUsers(userType IAMUserType, m map[string]auth.Credentials) error {
func (ies *IAMEtcdStore) loadUsers(ctx context.Context, userType IAMUserType, m map[string]auth.Credentials) error {
var basePrefix string
switch userType {
case srvAccUser:
@@ -362,10 +347,8 @@ func (ies *IAMEtcdStore) loadUsers(userType IAMUserType, m map[string]auth.Crede
basePrefix = iamConfigUsersPrefix
}
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
ctx, cancel := context.WithTimeout(ctx, defaultContextTimeout)
defer cancel()
ies.setContext(ctx)
defer ies.clearContext()
r, err := ies.client.Get(ctx, basePrefix, etcd.WithPrefix(), etcd.WithKeysOnly())
if err != nil {
return err
@@ -396,11 +379,9 @@ func (ies *IAMEtcdStore) loadGroup(group string, m map[string]GroupInfo) error {
}
func (ies *IAMEtcdStore) loadGroups(m map[string]GroupInfo) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
func (ies *IAMEtcdStore) loadGroups(ctx context.Context, m map[string]GroupInfo) error {
ctx, cancel := context.WithTimeout(ctx, defaultContextTimeout)
defer cancel()
ies.setContext(ctx)
defer ies.clearContext()
r, err := ies.client.Get(ctx, iamConfigGroupsPrefix, etcd.WithPrefix(), etcd.WithKeysOnly())
if err != nil {
return err
@@ -432,11 +413,9 @@ func (ies *IAMEtcdStore) loadMappedPolicy(name string, userType IAMUserType, isG
}
func (ies *IAMEtcdStore) loadMappedPolicies(userType IAMUserType, isGroup bool, m map[string]MappedPolicy) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
func (ies *IAMEtcdStore) loadMappedPolicies(ctx context.Context, userType IAMUserType, isGroup bool, m map[string]MappedPolicy) error {
ctx, cancel := context.WithTimeout(ctx, defaultContextTimeout)
defer cancel()
ies.setContext(ctx)
defer ies.clearContext()
var basePrefix string
if isGroup {
basePrefix = iamConfigPolicyDBGroupsPrefix
@@ -467,7 +446,7 @@ func (ies *IAMEtcdStore) loadMappedPolicies(userType IAMUserType, isGroup bool,
}
func (ies *IAMEtcdStore) loadAll(sys *IAMSys, objectAPI ObjectLayer) error {
func (ies *IAMEtcdStore) loadAll(ctx context.Context, sys *IAMSys) error {
iamUsersMap := make(map[string]auth.Credentials)
iamGroupsMap := make(map[string]GroupInfo)
iamPolicyDocsMap := make(map[string]iampolicy.Policy)
@@ -475,51 +454,51 @@ func (ies *IAMEtcdStore) loadAll(sys *IAMSys, objectAPI ObjectLayer) error {
iamGroupPolicyMap := make(map[string]MappedPolicy)
isMinIOUsersSys := false
sys.RLock()
ies.rlock()
if sys.usersSysType == MinIOUsersSysType {
isMinIOUsersSys = true
}
sys.RUnlock()
ies.runlock()
if err := ies.loadPolicyDocs(iamPolicyDocsMap); err != nil {
if err := ies.loadPolicyDocs(ctx, iamPolicyDocsMap); err != nil {
return err
}
// load STS temp users
if err := ies.loadUsers(stsUser, iamUsersMap); err != nil {
if err := ies.loadUsers(ctx, stsUser, iamUsersMap); err != nil {
return err
}
if isMinIOUsersSys {
// load long term users
if err := ies.loadUsers(regularUser, iamUsersMap); err != nil {
if err := ies.loadUsers(ctx, regularUser, iamUsersMap); err != nil {
return err
}
if err := ies.loadUsers(srvAccUser, iamUsersMap); err != nil {
if err := ies.loadUsers(ctx, srvAccUser, iamUsersMap); err != nil {
return err
}
if err := ies.loadGroups(iamGroupsMap); err != nil {
if err := ies.loadGroups(ctx, iamGroupsMap); err != nil {
return err
}
if err := ies.loadMappedPolicies(regularUser, false, iamUserPolicyMap); err != nil {
if err := ies.loadMappedPolicies(ctx, regularUser, false, iamUserPolicyMap); err != nil {
return err
}
}
// load STS policy mappings into the same map
if err := ies.loadMappedPolicies(stsUser, false, iamUserPolicyMap); err != nil {
if err := ies.loadMappedPolicies(ctx, stsUser, false, iamUserPolicyMap); err != nil {
return err
}
// load policies mapped to groups
if err := ies.loadMappedPolicies(regularUser, true, iamGroupPolicyMap); err != nil {
if err := ies.loadMappedPolicies(ctx, regularUser, true, iamGroupPolicyMap); err != nil {
return err
}
// Sets default canned policies, if none are set.
setDefaultCannedPolicies(iamPolicyDocsMap)
sys.Lock()
defer sys.Unlock()
ies.lock()
defer ies.Unlock()
sys.iamUsersMap = iamUsersMap
sys.iamGroupsMap = iamGroupsMap
@@ -579,17 +558,17 @@ func (ies *IAMEtcdStore) deleteGroupInfo(name string) error {
return err
}
func (ies *IAMEtcdStore) watch(sys *IAMSys) {
func (ies *IAMEtcdStore) watch(ctx context.Context, sys *IAMSys) {
watchEtcd := func() {
for {
outerLoop:
// Refresh IAMSys with etcd watch.
watchCh := ies.client.Watch(context.Background(),
watchCh := ies.client.Watch(ctx,
iamConfigPrefix, etcd.WithPrefix(), etcd.WithKeysOnly())
for {
select {
case <-GlobalServiceDoneCh:
case <-ctx.Done():
return
case watchResp, ok := <-watchCh:
if !ok {
@@ -599,7 +578,7 @@ func (ies *IAMEtcdStore) watch(sys *IAMSys) {
goto outerLoop
}
if err := watchResp.Err(); err != nil {
logger.LogIf(context.Background(), err)
logger.LogIf(ctx, err)
// log and retry.
time.Sleep(1 * time.Second)
// Upon an error on watch channel
@@ -607,9 +586,9 @@ func (ies *IAMEtcdStore) watch(sys *IAMSys) {
goto outerLoop
}
for _, event := range watchResp.Events {
sys.Lock()
ies.lock()
ies.reloadFromEvent(sys, event)
sys.Unlock()
ies.unlock()
}
}
}