From 2f7fb786928057bf5a0a6341af4ab1ee485dc94b Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 10 Nov 2016 07:44:41 -0800 Subject: [PATCH] rpc: Our rpcClient should make an attempt to reconnect. (#3221) rpcClient should attempt a reconnect if the call fails with 'rpc.ErrShutdown' this is needed since at times when the servers are taken down and brought back up. The hijacked connection from net.Dial is usually closed. So upon first attempt rpcClient might falsely indicate that disk to be down, to avoid this state make another dial attempt to really fail. Fixes #3206 Fixes #3205 --- cmd/auth-rpc-client.go | 8 +++----- cmd/bucket-metadata.go | 33 +++++---------------------------- cmd/format-config-v1.go | 2 +- cmd/net-rpc-client.go | 25 +++++++++++++++++-------- cmd/notify-listener.go | 11 +---------- cmd/posix.go | 2 +- cmd/server-mux.go | 2 +- cmd/server-mux_test.go | 2 +- docs/minio-env-variables.md | 2 +- 9 files changed, 31 insertions(+), 56 deletions(-) diff --git a/cmd/auth-rpc-client.go b/cmd/auth-rpc-client.go index 778b2a546..ef603184b 100644 --- a/cmd/auth-rpc-client.go +++ b/cmd/auth-rpc-client.go @@ -166,11 +166,9 @@ func (authClient *AuthRPCClient) Call(serviceMethod string, args interface { // Call the underlying rpc. err = authClient.rpc.Call(serviceMethod, args, reply) - // Invalidate token to mark for re-login on subsequent reconnect. - if err != nil { - if err.Error() == rpc.ErrShutdown.Error() { - authClient.isLoggedIn = false - } + // Invalidate token, and mark it for re-login on subsequent reconnect. + if err != nil && err == rpc.ErrShutdown { + authClient.isLoggedIn = false } } return err diff --git a/cmd/bucket-metadata.go b/cmd/bucket-metadata.go index 10d1d7cf2..7f299e00a 100644 --- a/cmd/bucket-metadata.go +++ b/cmd/bucket-metadata.go @@ -16,10 +16,7 @@ package cmd -import ( - "encoding/json" - "net/rpc" -) +import "encoding/json" // BucketMetaState - Interface to update bucket metadata in-memory // state. @@ -107,46 +104,26 @@ type remoteBMS struct { // change to remote peer via RPC call. func (rc *remoteBMS) UpdateBucketNotification(args *SetBNPArgs) error { reply := GenericReply{} - err := rc.Call("S3.SetBucketNotificationPeer", args, &reply) - // Check for network error and retry once. - if err != nil && err.Error() == rpc.ErrShutdown.Error() { - err = rc.Call("S3.SetBucketNotificationPeer", args, &reply) - } - return err + return rc.Call("S3.SetBucketNotificationPeer", args, &reply) } // remoteBMS.UpdateBucketListener - sends bucket listener change to // remote peer via RPC call. func (rc *remoteBMS) UpdateBucketListener(args *SetBLPArgs) error { reply := GenericReply{} - err := rc.Call("S3.SetBucketListenerPeer", args, &reply) - // Check for network error and retry once. - if err != nil && err.Error() == rpc.ErrShutdown.Error() { - err = rc.Call("S3.SetBucketListenerPeer", args, &reply) - } - return err + return rc.Call("S3.SetBucketListenerPeer", args, &reply) } // remoteBMS.UpdateBucketPolicy - sends bucket policy change to remote // peer via RPC call. func (rc *remoteBMS) UpdateBucketPolicy(args *SetBPPArgs) error { reply := GenericReply{} - err := rc.Call("S3.SetBucketPolicyPeer", args, &reply) - // Check for network error and retry once. - if err != nil && err.Error() == rpc.ErrShutdown.Error() { - err = rc.Call("S3.SetBucketPolicyPeer", args, &reply) - } - return err + return rc.Call("S3.SetBucketPolicyPeer", args, &reply) } // remoteBMS.SendEvent - sends event for bucket listener to remote // peer via RPC call. func (rc *remoteBMS) SendEvent(args *EventArgs) error { reply := GenericReply{} - err := rc.Call("S3.Event", args, &reply) - // Check for network error and retry once. - if err != nil && err.Error() == rpc.ErrShutdown.Error() { - err = rc.Call("S3.Event", args, &reply) - } - return err + return rc.Call("S3.Event", args, &reply) } diff --git a/cmd/format-config-v1.go b/cmd/format-config-v1.go index 65b2a49f5..cddf9f020 100644 --- a/cmd/format-config-v1.go +++ b/cmd/format-config-v1.go @@ -246,7 +246,7 @@ func genericFormatCheck(formatConfigs []*formatConfigV1, sErrs []error) (err err } // Calculate read quorum. - readQuorum := len(formatConfigs)/2 + 1 + readQuorum := len(formatConfigs) / 2 // Validate the err count under tolerant limit. if errCount > len(formatConfigs)-readQuorum { diff --git a/cmd/net-rpc-client.go b/cmd/net-rpc-client.go index b007caaaa..5726316e4 100644 --- a/cmd/net-rpc-client.go +++ b/cmd/net-rpc-client.go @@ -142,17 +142,26 @@ func (rpcClient *RPCClient) Call(serviceMethod string, args interface{}, reply i // rpc.Client for a subsequent reconnect. err := rpcLocalStack.Call(serviceMethod, args, reply) if err != nil { - if err.Error() == rpc.ErrShutdown.Error() { - // Reset rpcClient.rpc to nil to trigger a reconnect in future - // and close the underlying connection. - rpcClient.clearRPCClient() + // Any errors other than rpc.ErrShutdown just return quickly. + if err != rpc.ErrShutdown { + return err + } // else rpc.ErrShutdown returned by rpc.Call - // Close the underlying connection. - rpcLocalStack.Close() + // Reset the underlying rpc connection before + // moving to reconnect. + rpcClient.clearRPCClient() - // Set rpc error as rpc.ErrShutdown type. - err = rpc.ErrShutdown + // Close the underlying connection before reconnect. + rpcLocalStack.Close() + + // Try once more to re-connect. + rpcLocalStack, err = rpcClient.dialRPCClient() + if err != nil { + return err } + + // Attempt the rpc.Call once again, upon any error now just give up. + err = rpcLocalStack.Call(serviceMethod, args, reply) } return err } diff --git a/cmd/notify-listener.go b/cmd/notify-listener.go index 2079e7eff..d88cd9d79 100644 --- a/cmd/notify-listener.go +++ b/cmd/notify-listener.go @@ -19,7 +19,6 @@ package cmd import ( "fmt" "io/ioutil" - "net/rpc" "github.com/Sirupsen/logrus" ) @@ -71,15 +70,7 @@ func (lc listenerConn) Fire(entry *logrus.Entry) error { // Send Event RPC call and return error arg := EventArgs{Event: notificationEvent, Arn: lc.ListenerARN} - err := lc.BMSClient.SendEvent(&arg) - - // In case connection is shutdown, retry once. - if err != nil { - if err.Error() == rpc.ErrShutdown.Error() { - err = lc.BMSClient.SendEvent(&arg) - } - } - return err + return lc.BMSClient.SendEvent(&arg) } func (lc listenerConn) Levels() []logrus.Level { diff --git a/cmd/posix.go b/cmd/posix.go index 84b5ac474..f90d87dfb 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -639,7 +639,7 @@ func (s *posix) createFile(volume, path string) (f *os.File, err error) { } // PrepareFile - run prior actions before creating a new file for optimization purposes -// Currenty we use fallocate when available to avoid disk fragmentation as much as possible +// Currently we use fallocate when available to avoid disk fragmentation as much as possible func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) { // It doesn't make sense to create a negative-sized file diff --git a/cmd/server-mux.go b/cmd/server-mux.go index a3167dae0..1d612fa02 100644 --- a/cmd/server-mux.go +++ b/cmd/server-mux.go @@ -28,7 +28,7 @@ import ( "time" ) -// The value choosen below is longest word choosen +// The value chosen below is longest word chosen // from all the http verbs comprising of // "PRI", "OPTIONS", "GET", "HEAD", "POST", // "PUT", "DELETE", "TRACE", "CONNECT". diff --git a/cmd/server-mux_test.go b/cmd/server-mux_test.go index 62ba9af48..0a4510b78 100644 --- a/cmd/server-mux_test.go +++ b/cmd/server-mux_test.go @@ -107,7 +107,7 @@ func dial(addr string) error { return err } -// Tests initalizing listeners. +// Tests initializing listeners. func TestInitListeners(t *testing.T) { testCases := []struct { serverAddr string diff --git a/docs/minio-env-variables.md b/docs/minio-env-variables.md index 08c7e4418..a40ccf635 100644 --- a/docs/minio-env-variables.md +++ b/docs/minio-env-variables.md @@ -1,4 +1,4 @@ -# Minio Environmental varaibles +# Minio Environmental variables #### MINIO_ENABLE_FSMETA When enabled, minio-FS saves the HTTP headers that start with `X-Amz-Meta-` and `X-Minio-Meta`. These header meta data can be retrieved on HEAD and GET requests on the object.