From 730ac5381cd3b85df5ab22be9e9756f5e9e7f062 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas <634494+krishnasrinivas@users.noreply.github.com> Date: Fri, 18 Jan 2019 07:48:25 -0800 Subject: [PATCH] Simplify parallelReader.Read() (#7109) Simplify parallelReader.Read() which also fixes previous implementation where it was returning before all the parallel reading go-routines had terminated which caused race conditions. --- cmd/erasure-decode.go | 113 +++++++++++++++++++++--------------------- 1 file changed, 57 insertions(+), 56 deletions(-) diff --git a/cmd/erasure-decode.go b/cmd/erasure-decode.go index fd737d394..ded3e82a7 100644 --- a/cmd/erasure-decode.go +++ b/cmd/erasure-decode.go @@ -19,6 +19,7 @@ package cmd import ( "context" "io" + "sync" "github.com/minio/minio/cmd/logger" ) @@ -58,74 +59,74 @@ func (p *parallelReader) canDecode(buf [][]byte) bool { // Read reads from readers in parallel. Returns p.dataBlocks number of bufs. func (p *parallelReader) Read() ([][]byte, error) { - type errIdx struct { - idx int - buf []byte - err error - } - - errCh := make(chan errIdx) - currReaderIndex := 0 newBuf := make([][]byte, len(p.readers)) + var newBufLK sync.RWMutex if p.offset+p.shardSize > p.shardFileSize { p.shardSize = p.shardFileSize - p.offset } - read := func(currReaderIndex int) { - if p.buf[currReaderIndex] == nil { - p.buf[currReaderIndex] = make([]byte, p.shardSize) - } - p.buf[currReaderIndex] = p.buf[currReaderIndex][:p.shardSize] - _, err := p.readers[currReaderIndex].ReadAt(p.buf[currReaderIndex], p.offset) - errCh <- errIdx{currReaderIndex, p.buf[currReaderIndex], err} + readTriggerCh := make(chan bool, len(p.readers)) + for i := 0; i < p.dataBlocks; i++ { + // Setup read triggers for p.dataBlocks number of reads so that it reads in parallel. + readTriggerCh <- true } - readerCount := 0 - for _, r := range p.readers { - if r != nil { - readerCount++ - } - } - if readerCount < p.dataBlocks { - return nil, errXLReadQuorum - } - - readerCount = 0 - for i, r := range p.readers { - if r == nil { - continue - } - go read(i) - readerCount++ - if readerCount == p.dataBlocks { - currReaderIndex = i + 1 + readerIndex := 0 + var wg sync.WaitGroup + // if readTrigger is true, it implies next disk.ReadAt() should be tried + // if readTrigger is false, it implies previous disk.ReadAt() was successful and there is no need + // to try reading the next disk. + for readTrigger := range readTriggerCh { + newBufLK.RLock() + canDecode := p.canDecode(newBuf) + newBufLK.RUnlock() + if canDecode { break } - } - - for errVal := range errCh { - if errVal.err == nil { - newBuf[errVal.idx] = errVal.buf - if p.canDecode(newBuf) { - p.offset += int64(p.shardSize) - return newBuf, nil - } - continue - } - p.readers[errVal.idx] = nil - for currReaderIndex < len(p.readers) { - if p.readers[currReaderIndex] != nil { - break - } - currReaderIndex++ - } - - if currReaderIndex == len(p.readers) { + if readerIndex == len(p.readers) { break } - go read(currReaderIndex) - currReaderIndex++ + if !readTrigger { + continue + } + wg.Add(1) + go func(i int) { + defer wg.Done() + disk := p.readers[i] + if disk == nil { + // Since disk is nil, trigger another read. + readTriggerCh <- true + return + } + if p.buf[i] == nil { + // Reading first time on this disk, hence the buffer needs to be allocated. + // Subsequent reads will re-use this buffer. + p.buf[i] = make([]byte, p.shardSize) + } + // For the last shard, the shardsize might be less than previous shard sizes. + // Hence the following statement ensures that the buffer size is reset to the right size. + p.buf[i] = p.buf[i][:p.shardSize] + _, err := disk.ReadAt(p.buf[i], p.offset) + if err != nil { + p.readers[i] = nil + // Since ReadAt returned error, trigger another read. + readTriggerCh <- true + return + } + newBufLK.Lock() + newBuf[i] = p.buf[i] + newBufLK.Unlock() + // Since ReadAt returned success, there is no need to trigger another read. + readTriggerCh <- false + }(readerIndex) + readerIndex++ + } + wg.Wait() + + if p.canDecode(newBuf) { + p.offset += p.shardSize + return newBuf, nil } return nil, errXLReadQuorum