Block change to 1M from 128KiB and extensively use buffer pools.

Following are the benefits of this change.
```
benchmark                           old allocs     new allocs     delta
BenchmarkPutObjectPart5MbFS-4       46172          16383          -64.52%
BenchmarkPutObjectPart10MbFS-4      36849          7223           -80.40%
BenchmarkPutObjectPart25MbFS-4      31699          3865           -87.81%
BenchmarkPutObjectPart50MbFS-4      27577          2481           -91.00%
BenchmarkPutObjectVerySmallFS-4     57             56             -1.75%
BenchmarkPutObject10KbFS-4          57             56             -1.75%
BenchmarkPutObject100KbFS-4         57             56             -1.75%
BenchmarkPutObject1MbFS-4           176            56             -68.18%
BenchmarkPutObject5MbFS-4           720            90             -87.50%
BenchmarkPutObject10MbFS-4          1400           124            -91.14%
BenchmarkPutObject25MbFS-4          3440           261            -92.41%
BenchmarkPutObject50MbFS-4          6841           466            -93.19%
```
This commit is contained in:
Harshavardhana
2016-10-12 17:42:11 -07:00
parent 614a8cf7ad
commit 774e9c8e3a
9 changed files with 137 additions and 32 deletions

View File

@@ -1,5 +1,8 @@
go_import_path: github.com/minio/minio
sudo: required
dist: trusty
language: go
os:
@@ -20,4 +23,4 @@ after_success:
- bash <(curl -s https://codecov.io/bash)
go:
- 1.6.2
- 1.7.1

View File

@@ -21,6 +21,7 @@ import (
"errors"
"hash"
"io"
"sync"
"github.com/klauspost/reedsolomon"
"github.com/minio/blake2b-simd"
@@ -47,13 +48,23 @@ func newHash(algo string) hash.Hash {
}
}
var hashBufPool = sync.Pool{
New: func() interface{} {
b := make([]byte, readSizeV1)
return &b
},
}
// hashSum calculates the hash of the entire path and returns.
func hashSum(disk StorageAPI, volume, path string, writer hash.Hash) ([]byte, error) {
// Allocate staging buffer of 128KiB for copyBuffer.
buf := make([]byte, readSizeV1)
// Allocate staging buffer of 2MiB for copyBuffer.
bufp := hashBufPool.Get().(*[]byte)
// Reuse buffer.
defer hashBufPool.Put(bufp)
// Copy entire buffer to writer.
if err := copyBuffer(writer, disk, volume, path, buf); err != nil {
if err := copyBuffer(writer, disk, volume, path, *bufp); err != nil {
return nil, err
}

View File

@@ -24,6 +24,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"time"
"github.com/skyrings/skyring-common/tools/uuid"
@@ -289,6 +290,14 @@ func getFSAppendDataPath(uploadID string) string {
return path.Join(tmpMetaPrefix, uploadID+".data")
}
// Parts pool shared buffer.
var appendPartsBufPool = sync.Pool{
New: func() interface{} {
b := make([]byte, readSizeV1)
return &b
},
}
// Append parts to fsAppendDataFile.
func appendParts(disk StorageAPI, bucket, object, uploadID string) {
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
@@ -348,7 +357,10 @@ func appendParts(disk StorageAPI, bucket, object, uploadID string) {
partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, part.Name)
offset := int64(0)
totalLeft := part.Size
buf := make([]byte, readSizeV1)
// Get buffer from parts pool.
bufp := appendPartsBufPool.Get().(*[]byte)
buf := *bufp
for totalLeft > 0 {
curLeft := int64(readSizeV1)
if totalLeft < readSizeV1 {
@@ -370,12 +382,16 @@ func appendParts(disk StorageAPI, bucket, object, uploadID string) {
offset += n
totalLeft -= n
}
appendPartsBufPool.Put(bufp)
// All good, the part has been appended to the tmp file, rename it back.
if err = disk.RenameFile(minioMetaBucket, tmpDataPath, minioMetaBucket, fsAppendDataPath); err != nil {
errorIf(err, "Unable to rename %s to %s", tmpDataPath, fsAppendDataPath)
return
}
fsAppendMeta.AddObjectPart(part.Number, part.Name, part.ETag, part.Size)
if err = writeFSMetadata(disk, minioMetaBucket, fsAppendMetaPath, fsAppendMeta); err != nil {
errorIf(err, "Unable to write FS metadata %s", fsAppendMetaPath)
return
}
// If there are more parts that need to be appended to fsAppendDataFile
@@ -385,6 +401,14 @@ func appendParts(disk StorageAPI, bucket, object, uploadID string) {
}
}
// Parts pool shared buffer.
var partsBufPool = sync.Pool{
New: func() interface{} {
b := make([]byte, readSizeV1)
return &b
},
}
// PutObjectPart - reads incoming data until EOF for the part file on
// an ongoing multipart transaction. Internally incoming data is
// written to '.minio/tmp' location and safely renamed to
@@ -433,8 +457,11 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
if size > 0 && bufSize > size {
bufSize = size
}
buf := make([]byte, int(bufSize))
bytesWritten, cErr := fsCreateFile(fs.storage, teeReader, buf, minioMetaBucket, tmpPartPath)
bufp := partsBufPool.Get().(*[]byte)
buf := *bufp
defer partsBufPool.Put(bufp)
bytesWritten, cErr := fsCreateFile(fs.storage, teeReader, buf[:bufSize], minioMetaBucket, tmpPartPath)
if cErr != nil {
fs.storage.DeleteFile(minioMetaBucket, tmpPartPath)
return "", toObjectErr(cErr, minioMetaBucket, tmpPartPath)
@@ -578,6 +605,14 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
return fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
}
// Complete multipart pool shared buffer.
var completeBufPool = sync.Pool{
New: func() interface{} {
b := make([]byte, readSizeV1)
return &b
},
}
// CompleteMultipartUpload - completes an ongoing multipart
// transaction after receiving all the parts indicated by the client.
// Returns an md5sum calculated by concatenating all the individual
@@ -642,8 +677,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
} else {
tempObj := path.Join(tmpMetaPrefix, uploadID+"-"+"part.1")
// Allocate staging buffer.
var buf = make([]byte, readSizeV1)
bufp := completeBufPool.Get().(*[]byte)
buf := *bufp
defer completeBufPool.Put(bufp)
// Loop through all parts, validate them and then commit to disk.
for i, part := range parts {

View File

@@ -25,6 +25,7 @@ import (
"path"
"sort"
"strings"
"sync"
"github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/mimedb"
@@ -227,6 +228,13 @@ func (fs fsObjects) DeleteBucket(bucket string) error {
/// Object Operations
var getBufPool = sync.Pool{
New: func() interface{} {
b := make([]byte, readSizeV1)
return &b
},
}
// GetObject - get an object.
func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) (err error) {
// Verify if bucket is valid.
@@ -266,8 +274,11 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64,
if length > 0 && bufSize > length {
bufSize = length
}
// Allocate a staging buffer.
buf := make([]byte, int(bufSize))
bufp := getBufPool.Get().(*[]byte)
buf := *bufp
defer getBufPool.Put(bufp)
for {
// Figure out the right size for the buffer.
curLeft := bufSize
@@ -356,7 +367,15 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
}, nil
}
// PutObject - create an object.
var putBufPool = sync.Pool{
New: func() interface{} {
b := make([]byte, readSizeV1)
return &b
},
}
// PutObject - saves an object atomically of size - size bytes.
// With size bytes of '-1' this function reads till EOF.
func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
@@ -405,9 +424,12 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
if size > 0 && bufSize > size {
bufSize = size
}
buf := make([]byte, int(bufSize))
bufp := putBufPool.Get().(*[]byte)
buf := *bufp
defer putBufPool.Put(bufp)
teeReader := io.TeeReader(limitDataReader, md5Writer)
bytesWritten, err := fsCreateFile(fs.storage, teeReader, buf, minioMetaBucket, tempObj)
bytesWritten, err := fsCreateFile(fs.storage, teeReader, buf[:bufSize], minioMetaBucket, tempObj)
if err != nil {
fs.storage.DeleteFile(minioMetaBucket, tempObj)
return "", toObjectErr(err, bucket, object)

View File

@@ -27,7 +27,7 @@ const (
blockSizeV1 = 10 * 1024 * 1024 // 10MiB.
// Staging buffer read size for all internal operations version 1.
readSizeV1 = 128 * 1024 // 128KiB.
readSizeV1 = 2 * 1024 * 1024 // 2MiB.
// Buckets meta prefix.
bucketMetaPrefix = "buckets"

View File

@@ -75,11 +75,8 @@ func IsValidBucketName(bucket string) bool {
// Rejects strings with following characters.
//
// - Backslash ("\")
// - Caret ("^")
// - Grave accent / back tick ("`")
// - Vertical bar / pipe ("|")
//
// Minio does not support object names with trailing "/".
// Additionally minio does not support object names with trailing "/".
func IsValidObjectName(object string) bool {
if len(object) == 0 {
return false
@@ -103,7 +100,7 @@ func IsValidObjectPrefix(object string) bool {
return false
}
// Reject unsupported characters in object name.
if strings.ContainsAny(object, "`^|\\\"") {
if strings.ContainsAny(object, `\`) {
return false
}
return true

View File

@@ -23,6 +23,7 @@ import (
"path"
"runtime"
"strings"
"sync"
"syscall"
"unsafe"
)
@@ -30,9 +31,9 @@ import (
const (
// readDirentBufSize for syscall.ReadDirent() to hold multiple
// directory entries in one buffer. golang source uses 4096 as
// buffer size whereas we want 25 times larger to save lots of
// entries to avoid multiple syscall.ReadDirent() call.
readDirentBufSize = 4096 * 25
// buffer size whereas we use 1MiB for large entries in single
// operation to avoid multiple syscall.ReadDirent() call.
readDirentBufSize = 1 * 1024 * 1024 // 1MiB.
)
// actual length of the byte array from the c - world.
@@ -107,9 +108,19 @@ func parseDirents(dirPath string, buf []byte) (entries []string, err error) {
return entries, nil
}
var readDirBufPool = sync.Pool{
New: func() interface{} {
b := make([]byte, readDirentBufSize)
return &b
},
}
// Return all the entries at the directory dirPath.
func readDir(dirPath string) (entries []string, err error) {
buf := make([]byte, readDirentBufSize)
bufp := readDirBufPool.Get().(*[]byte)
buf := *bufp
defer readDirBufPool.Put(bufp)
d, err := os.Open(dirPath)
if err != nil {
// File is really not found.

View File

@@ -26,6 +26,7 @@ import (
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
@@ -42,6 +43,7 @@ type posix struct {
ioErrCount int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
diskPath string
minFreeDisk int64
pool sync.Pool
}
var errFaultyDisk = errors.New("Faulty disk")
@@ -70,16 +72,22 @@ func checkPathLength(pathName string) error {
func isDirEmpty(dirname string) bool {
f, err := os.Open(dirname)
if err != nil {
if os.IsNotExist(err) {
return false
}
errorIf(err, "Unable to access directory.")
return false
}
defer f.Close()
// List one entry.
_, err = f.Readdirnames(1)
if err == io.EOF {
// Returns true if we have reached EOF, directory is indeed empty.
return true
}
if err != nil {
if err == io.EOF {
// Returns true if we have reached EOF, directory is indeed empty.
return true
if os.IsNotExist(err) {
return false
}
errorIf(err, "Unable to list directory.")
return false
@@ -102,6 +110,13 @@ func newPosix(diskPath string) (StorageAPI, error) {
fs := &posix{
diskPath: diskPath,
minFreeDisk: fsMinSpacePercent, // Minimum 5% disk should be free.
// 1MiB buffer pool for posix internal operations.
pool: sync.Pool{
New: func() interface{} {
b := make([]byte, 1*1024*1024)
return &b
},
},
}
st, err := os.Stat(preparePath(diskPath))
if err != nil {
@@ -592,8 +607,13 @@ func (s *posix) AppendFile(volume, path string, buf []byte) (err error) {
// Close upon return.
defer w.Close()
// Return io.Copy
_, err = io.Copy(w, bytes.NewReader(buf))
bufp := s.pool.Get().(*[]byte)
// Reuse buffer.
defer s.pool.Put(bufp)
// Return io.CopyBuffer
_, err = io.CopyBuffer(w, bytes.NewReader(buf), *bufp)
return err
}
@@ -680,6 +700,11 @@ func deleteFile(basePath, deletePath string) error {
}
// Attempt to remove path.
if err := os.Remove(preparePath(deletePath)); err != nil {
if os.IsNotExist(err) {
return errFileNotFound
} else if os.IsPermission(err) {
return errFileAccessDenied
}
return err
}
// Recursively go down the next path and delete again.

View File

@@ -679,8 +679,8 @@ func TestPosixListDir(t *testing.T) {
t.Errorf("Unable to initialize posix, %s", err)
}
if err = posixStorage.DeleteFile("bin", "yes"); !os.IsPermission(err) {
t.Errorf("expected: Permission error, got: %s", err)
if err = posixStorage.DeleteFile("bin", "yes"); err != errFileAccessDenied {
t.Errorf("expected: %s error, got: %s", errFileAccessDenied, err)
}
}
@@ -793,8 +793,8 @@ func TestDeleteFile(t *testing.T) {
t.Errorf("Unable to initialize posix, %s", err)
}
if err = posixStorage.DeleteFile("bin", "yes"); !os.IsPermission(err) {
t.Errorf("expected: Permission error, got: %s", err)
if err = posixStorage.DeleteFile("bin", "yes"); err != errFileAccessDenied {
t.Errorf("expected: %s error, got: %s", errFileAccessDenied, err)
}
}