fix: pop entries from each drives in parallel (#9918)

This commit is contained in:
Harshavardhana
2020-06-25 23:20:12 -07:00
parent d53006c422
commit a5b1d8731c
4 changed files with 43 additions and 9 deletions

View File

@@ -145,9 +145,9 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
wg.Add(1)
go func(i int) {
defer wg.Done()
disk := p.readers[i]
if disk == nil {
// Since disk is nil, trigger another read.
rr := p.readers[i]
if rr == nil {
// Since reader is nil, trigger another read.
readTriggerCh <- true
return
}
@@ -160,7 +160,7 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
// For the last shard, the shardsize might be less than previous shard sizes.
// Hence the following statement ensures that the buffer size is reset to the right size.
p.buf[bufIdx] = p.buf[bufIdx][:p.shardSize]
_, err := disk.ReadAt(p.buf[bufIdx], p.offset)
_, err := rr.ReadAt(p.buf[bufIdx], p.offset)
if err != nil {
if _, ok := err.(*errHashMismatch); ok {
atomic.StoreInt32(&healRequired, 1)

View File

@@ -573,7 +573,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
for i, testCase := range testCases {
testCase := testCase
t.Run(fmt.Sprintf("Test%d-%s", i+1, instanceType), func(t *testing.T) {
t.Run(fmt.Sprintf("%s-Test%d", instanceType, i+1), func(t *testing.T) {
result, err := obj.ListObjects(context.Background(), testCase.bucketName,
testCase.prefix, testCase.marker, testCase.delimeter, int(testCase.maxKeys))
if err != nil && testCase.shouldPass {

View File

@@ -831,9 +831,17 @@ func (f *FileInfoCh) Push(fi FileInfo) {
// if the caller wishes to list N entries to call lexicallySortedEntry
// N times until this boolean is 'false'.
func lexicallySortedEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) (FileInfo, int, bool) {
var wg sync.WaitGroup
for i := range entryChs {
entries[i], entriesValid[i] = entryChs[i].Pop()
i := i
wg.Add(1)
// Pop() entries in parallel for large drive setups.
go func() {
defer wg.Done()
entries[i], entriesValid[i] = entryChs[i].Pop()
}()
}
wg.Wait()
var isTruncated = false
for _, valid := range entriesValid {
@@ -915,9 +923,17 @@ func mergeEntriesCh(entryChs []FileInfoCh, maxKeys int, ndisks int) (entries Fil
}
func isTruncated(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) bool {
var wg sync.WaitGroup
for i := range entryChs {
entries[i], entriesValid[i] = entryChs[i].Pop()
i := i
wg.Add(1)
// Pop() entries in parallel for large drive setups.
go func() {
defer wg.Done()
entries[i], entriesValid[i] = entryChs[i].Pop()
}()
}
wg.Wait()
var isTruncated = false
for _, valid := range entriesValid {

View File

@@ -859,9 +859,18 @@ func (z *xlZones) listObjects(ctx context.Context, bucket, prefix, marker, delim
// N times until this boolean is 'false'.
func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) (FileInfo, int, int, bool) {
for i, entryChs := range zoneEntryChs {
i := i
var wg sync.WaitGroup
for j := range entryChs {
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
j := j
wg.Add(1)
// Pop() entries in parallel for large drive setups.
go func() {
defer wg.Done()
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}()
}
wg.Wait()
}
var isTruncated = false
@@ -961,9 +970,18 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, ndisks int)
func isTruncatedZones(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) bool {
for i, entryChs := range zoneEntryChs {
i := i
var wg sync.WaitGroup
for j := range entryChs {
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
j := j
wg.Add(1)
// Pop() entries in parallel for large drive setups.
go func() {
defer wg.Done()
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}()
}
wg.Wait()
}
var isTruncated = false