diff --git a/cmd/healthcheck-handler.go b/cmd/healthcheck-handler.go index 8c3e5a9ef..5c1335bae 100644 --- a/cmd/healthcheck-handler.go +++ b/cmd/healthcheck-handler.go @@ -23,7 +23,7 @@ import ( // ClusterCheckHandler returns if the server is ready for requests. func ClusterCheckHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "ClusterCheckCheckHandler") + ctx := newContext(r, w, "ClusterCheckHandler") objLayer := newObjectLayerFn() // Service not initialized yet diff --git a/cmd/iam.go b/cmd/iam.go index 1007be641..5ce1b975a 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -451,7 +451,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { for range retry.NewTimerWithJitter(retryCtx, time.Second, 5*time.Second, retry.MaxJitter) { // let one of the server acquire the lock, if not let them timeout. // which shall be retried again by this loop. - if err := txnLk.GetLock(newDynamicTimeout(1*time.Second, 5*time.Second)); err != nil { + if err := txnLk.GetLock(newDynamicTimeout(3*time.Second, 5*time.Second)); err != nil { logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. trying to acquire lock") continue } diff --git a/cmd/local-locker.go b/cmd/local-locker.go index 707613e26..0f8e5068f 100644 --- a/cmd/local-locker.go +++ b/cmd/local-locker.go @@ -17,6 +17,7 @@ package cmd import ( + "context" "fmt" "sync" "time" @@ -71,30 +72,35 @@ func (l *localLocker) canTakeLock(resources ...string) bool { return noLkCnt == len(resources) } -func (l *localLocker) Lock(args dsync.LockArgs) (reply bool, err error) { - l.mutex.Lock() - defer l.mutex.Unlock() +func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + l.mutex.Lock() + defer l.mutex.Unlock() - if !l.canTakeLock(args.Resources...) { - // Not all locks can be taken on resources, - // reject it completely. - return false, nil - } - - // No locks held on the all resources, so claim write - // lock on all resources at once. - for _, resource := range args.Resources { - l.lockMap[resource] = []lockRequesterInfo{ - { - Writer: true, - Source: args.Source, - UID: args.UID, - Timestamp: UTCNow(), - TimeLastCheck: UTCNow(), - }, + if !l.canTakeLock(args.Resources...) { + // Not all locks can be taken on resources, + // reject it completely. + return false, nil } + + // No locks held on the all resources, so claim write + // lock on all resources at once. + for _, resource := range args.Resources { + l.lockMap[resource] = []lockRequesterInfo{ + { + Writer: true, + Source: args.Source, + UID: args.UID, + Timestamp: UTCNow(), + TimeLastCheck: UTCNow(), + }, + } + } + return true, nil } - return true, nil } func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) { @@ -138,28 +144,33 @@ func (l *localLocker) removeEntry(name, uid string, lri *[]lockRequesterInfo) bo return false } -func (l *localLocker) RLock(args dsync.LockArgs) (reply bool, err error) { - l.mutex.Lock() - defer l.mutex.Unlock() - lrInfo := lockRequesterInfo{ - Writer: false, - Source: args.Source, - UID: args.UID, - Timestamp: UTCNow(), - TimeLastCheck: UTCNow(), - } - resource := args.Resources[0] - if lri, ok := l.lockMap[resource]; ok { - if reply = !isWriteLock(lri); reply { - // Unless there is a write lock - l.lockMap[resource] = append(l.lockMap[resource], lrInfo) +func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + l.mutex.Lock() + defer l.mutex.Unlock() + lrInfo := lockRequesterInfo{ + Writer: false, + Source: args.Source, + UID: args.UID, + Timestamp: UTCNow(), + TimeLastCheck: UTCNow(), } - } else { - // No locks held on the given name, so claim (first) read lock - l.lockMap[resource] = []lockRequesterInfo{lrInfo} - reply = true + resource := args.Resources[0] + if lri, ok := l.lockMap[resource]; ok { + if reply = !isWriteLock(lri); reply { + // Unless there is a write lock + l.lockMap[resource] = append(l.lockMap[resource], lrInfo) + } + } else { + // No locks held on the given name, so claim (first) read lock + l.lockMap[resource] = []lockRequesterInfo{lrInfo} + reply = true + } + return reply, nil } - return reply, nil } func (l *localLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) { @@ -202,22 +213,27 @@ func (l *localLocker) IsOnline() bool { return true } -func (l *localLocker) Expired(args dsync.LockArgs) (expired bool, err error) { - l.mutex.Lock() - defer l.mutex.Unlock() +func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired bool, err error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + l.mutex.Lock() + defer l.mutex.Unlock() - // Lock found, proceed to verify if belongs to given uid. - for _, resource := range args.Resources { - if lri, ok := l.lockMap[resource]; ok { - // Check whether uid is still active - for _, entry := range lri { - if entry.UID == args.UID { - return false, nil + // Lock found, proceed to verify if belongs to given uid. + for _, resource := range args.Resources { + if lri, ok := l.lockMap[resource]; ok { + // Check whether uid is still active + for _, entry := range lri { + if entry.UID == args.UID { + return false, nil + } } } } + return true, nil } - return true, nil } // Similar to removeEntry but only removes an entry only if the lock entry exists in map. diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go index 8e61caf71..02421bafd 100644 --- a/cmd/lock-rest-client.go +++ b/cmd/lock-rest-client.go @@ -58,7 +58,7 @@ func (client *lockRESTClient) String() string { // Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected // permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() // after verifying format.json -func (client *lockRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { +func (client *lockRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { if values == nil { values = make(url.Values) } @@ -83,7 +83,7 @@ func (client *lockRESTClient) Close() error { } // restCall makes a call to the lock REST server. -func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply bool, err error) { +func (client *lockRESTClient) restCall(ctx context.Context, call string, args dsync.LockArgs) (reply bool, err error) { values := url.Values{} values.Set(lockRESTUID, args.UID) values.Set(lockRESTSource, args.Source) @@ -92,7 +92,7 @@ func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply buffer.WriteString(resource) buffer.WriteString("\n") } - respBody, err := client.call(call, values, &buffer, -1) + respBody, err := client.callWithContext(ctx, call, values, &buffer, -1) defer http.DrainBody(respBody) switch err { case nil: @@ -105,28 +105,28 @@ func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply } // RLock calls read lock REST API. -func (client *lockRESTClient) RLock(args dsync.LockArgs) (reply bool, err error) { - return client.restCall(lockRESTMethodRLock, args) +func (client *lockRESTClient) RLock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { + return client.restCall(ctx, lockRESTMethodRLock, args) } // Lock calls lock REST API. -func (client *lockRESTClient) Lock(args dsync.LockArgs) (reply bool, err error) { - return client.restCall(lockRESTMethodLock, args) +func (client *lockRESTClient) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { + return client.restCall(ctx, lockRESTMethodLock, args) } // RUnlock calls read unlock REST API. func (client *lockRESTClient) RUnlock(args dsync.LockArgs) (reply bool, err error) { - return client.restCall(lockRESTMethodRUnlock, args) + return client.restCall(context.Background(), lockRESTMethodRUnlock, args) } // Unlock calls write unlock RPC. func (client *lockRESTClient) Unlock(args dsync.LockArgs) (reply bool, err error) { - return client.restCall(lockRESTMethodUnlock, args) + return client.restCall(context.Background(), lockRESTMethodUnlock, args) } // Expired calls expired handler to check if lock args have expired. -func (client *lockRESTClient) Expired(args dsync.LockArgs) (expired bool, err error) { - return client.restCall(lockRESTMethodExpired, args) +func (client *lockRESTClient) Expired(ctx context.Context, args dsync.LockArgs) (expired bool, err error) { + return client.restCall(ctx, lockRESTMethodExpired, args) } func newLockAPI(endpoint Endpoint) dsync.NetLocker { diff --git a/cmd/lock-rest-client_test.go b/cmd/lock-rest-client_test.go index d05659ccd..cd2b7c6d5 100644 --- a/cmd/lock-rest-client_test.go +++ b/cmd/lock-rest-client_test.go @@ -17,6 +17,7 @@ package cmd import ( + "context" "testing" "github.com/minio/minio/pkg/dsync" @@ -34,12 +35,12 @@ func TestLockRESTlient(t *testing.T) { } // Attempt all calls. - _, err = lkClient.RLock(dsync.LockArgs{}) + _, err = lkClient.RLock(context.Background(), dsync.LockArgs{}) if err == nil { t.Fatal("Expected for Rlock to fail") } - _, err = lkClient.Lock(dsync.LockArgs{}) + _, err = lkClient.Lock(context.Background(), dsync.LockArgs{}) if err == nil { t.Fatal("Expected for Lock to fail") } diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index 6c9e91910..620115c9a 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -96,7 +96,7 @@ func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) { return } - success, err := l.ll.Lock(args) + success, err := l.ll.Lock(r.Context(), args) if err == nil && !success { err = errLockConflict } @@ -141,7 +141,7 @@ func (l *lockRESTServer) RLockHandler(w http.ResponseWriter, r *http.Request) { return } - success, err := l.ll.RLock(args) + success, err := l.ll.RLock(r.Context(), args) if err == nil && !success { err = errLockConflict } @@ -185,20 +185,14 @@ func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request) return } - l.ll.mutex.Lock() - defer l.ll.mutex.Unlock() - - // Lock found, proceed to verify if belongs to given uid. - for _, resource := range args.Resources { - if lri, ok := l.ll.lockMap[resource]; ok { - // Check whether uid is still active - for _, entry := range lri { - if entry.UID == args.UID { - l.writeErrorResponse(w, errLockNotExpired) - return - } - } - } + expired, err := l.ll.Expired(r.Context(), args) + if err != nil { + l.writeErrorResponse(w, err) + return + } + if !expired { + l.writeErrorResponse(w, errLockNotExpired) + return } } @@ -219,7 +213,10 @@ func getLongLivedLocks(interval time.Duration) map[Endpoint][]nameLockRequesterI for idx := range lriArray { // Check whether enough time has gone by since last check if time.Since(lriArray[idx].TimeLastCheck) >= interval { - rslt = append(rslt, nameLockRequesterInfoPair{name: name, lri: lriArray[idx]}) + rslt = append(rslt, nameLockRequesterInfoPair{ + name: name, + lri: lriArray[idx], + }) lriArray[idx].TimeLastCheck = UTCNow() } } @@ -254,12 +251,15 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error { continue } + ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second) + // Call back to original server verify whether the lock is // still active (based on name & uid) - expired, err := c.Expired(dsync.LockArgs{ + expired, err := c.Expired(ctx, dsync.LockArgs{ UID: nlrip.lri.UID, Resources: []string{nlrip.name}, }) + cancel() if err != nil { nlripsMap[nlrip.name]++ c.Close() diff --git a/cmd/rest/client.go b/cmd/rest/client.go index fffc3c24d..dbbf03d6a 100644 --- a/cmd/rest/client.go +++ b/cmd/rest/client.go @@ -85,10 +85,20 @@ const ( querySep = "?" ) +type restError string + +func (e restError) Error() string { + return string(e) +} + +func (e restError) Timeout() bool { + return true +} + // CallWithContext - make a REST call with context. func (c *Client) CallWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) { if !c.IsOnline() { - return nil, &NetworkError{Err: errors.New("remote server offline")} + return nil, &NetworkError{Err: &url.Error{Op: method, URL: c.url.String(), Err: restError("remote server offline")}} } req, err := http.NewRequest(http.MethodPost, c.url.String()+method+querySep+values.Encode(), body) if err != nil { diff --git a/cmd/server-main.go b/cmd/server-main.go index f1cf56596..6f70fe03b 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -229,7 +229,7 @@ func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) { for range retry.NewTimer(retryCtx) { // let one of the server acquire the lock, if not let them timeout. // which shall be retried again by this loop. - if err = txnLk.GetLock(newDynamicTimeout(1*time.Second, 3*time.Second)); err != nil { + if err = txnLk.GetLock(newDynamicTimeout(3*time.Second, 3*time.Second)); err != nil { logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock") continue } diff --git a/pkg/dsync/drwmutex.go b/pkg/dsync/drwmutex.go index 44683d6d1..15bbec525 100644 --- a/pkg/dsync/drwmutex.go +++ b/pkg/dsync/drwmutex.go @@ -140,8 +140,8 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, timeout time.Duration, id, locks := make([]string, len(restClnts)) // Try to acquire the lock. - success := lock(dm.clnt, &locks, id, source, isReadLock, dm.Names...) - if !success { + locked = lock(retryCtx, dm.clnt, &locks, id, source, isReadLock, dm.Names...) + if !locked { continue } @@ -158,16 +158,16 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, timeout time.Duration, id, } dm.m.Unlock() - return true + return locked } // Failed to acquire the lock on this attempt, incrementally wait // for a longer back-off time and try again afterwards. - return false + return locked } // lock tries to acquire the distributed lock, returning true or false. -func lock(ds *Dsync, locks *[]string, id, source string, isReadLock bool, lockNames ...string) bool { +func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, isReadLock bool, lockNames ...string) bool { restClnts := ds.GetLockersFn() @@ -199,11 +199,11 @@ func lock(ds *Dsync, locks *[]string, id, source string, isReadLock bool, lockNa var locked bool var err error if isReadLock { - if locked, err = c.RLock(args); err != nil { + if locked, err = c.RLock(ctx, args); err != nil { log("dsync: Unable to call RLock failed with %s for %#v at %s\n", err, args, c) } } else { - if locked, err = c.Lock(args); err != nil { + if locked, err = c.Lock(ctx, args); err != nil { log("dsync: Unable to call Lock failed with %s for %#v at %s\n", err, args, c) } } diff --git a/pkg/dsync/rpc-client-impl_test.go b/pkg/dsync/rpc-client-impl_test.go index 042ec563d..92c1ff529 100644 --- a/pkg/dsync/rpc-client-impl_test.go +++ b/pkg/dsync/rpc-client-impl_test.go @@ -17,6 +17,7 @@ package dsync_test import ( + "context" "net/rpc" "sync" @@ -89,12 +90,12 @@ func (rpcClient *ReconnectRPCClient) Call(serviceMethod string, args interface{} return err } -func (rpcClient *ReconnectRPCClient) RLock(args LockArgs) (status bool, err error) { +func (rpcClient *ReconnectRPCClient) RLock(ctx context.Context, args LockArgs) (status bool, err error) { err = rpcClient.Call("Dsync.RLock", &args, &status) return status, err } -func (rpcClient *ReconnectRPCClient) Lock(args LockArgs) (status bool, err error) { +func (rpcClient *ReconnectRPCClient) Lock(ctx context.Context, args LockArgs) (status bool, err error) { err = rpcClient.Call("Dsync.Lock", &args, &status) return status, err } @@ -109,7 +110,7 @@ func (rpcClient *ReconnectRPCClient) Unlock(args LockArgs) (status bool, err err return status, err } -func (rpcClient *ReconnectRPCClient) Expired(args LockArgs) (expired bool, err error) { +func (rpcClient *ReconnectRPCClient) Expired(ctx context.Context, args LockArgs) (expired bool, err error) { err = rpcClient.Call("Dsync.Expired", &args, &expired) return expired, err } diff --git a/pkg/dsync/rpc-client-interface.go b/pkg/dsync/rpc-client-interface.go index aec5187e9..d8c2542d2 100644 --- a/pkg/dsync/rpc-client-interface.go +++ b/pkg/dsync/rpc-client-interface.go @@ -16,6 +16,8 @@ package dsync +import "context" + // LockArgs is minimal required values for any dsync compatible lock operation. type LockArgs struct { // Unique ID of lock/unlock request. @@ -34,12 +36,12 @@ type NetLocker interface { // Do read lock for given LockArgs. It should return // * a boolean to indicate success/failure of the operation // * an error on failure of lock request operation. - RLock(args LockArgs) (bool, error) + RLock(ctx context.Context, args LockArgs) (bool, error) // Do write lock for given LockArgs. It should return // * a boolean to indicate success/failure of the operation // * an error on failure of lock request operation. - Lock(args LockArgs) (bool, error) + Lock(ctx context.Context, args LockArgs) (bool, error) // Do read unlock for given LockArgs. It should return // * a boolean to indicate success/failure of the operation @@ -52,7 +54,7 @@ type NetLocker interface { Unlock(args LockArgs) (bool, error) // Expired returns if current lock args has expired. - Expired(args LockArgs) (bool, error) + Expired(ctx context.Context, args LockArgs) (bool, error) // Returns underlying endpoint of this lock client instance. String() string