Add compressed file index (#15247)

This commit is contained in:
Klaus Post
2022-07-11 17:30:56 -07:00
committed by GitHub
parent 3d969bd2b4
commit 911a17b149
14 changed files with 534 additions and 109 deletions

View File

@@ -20,6 +20,8 @@ package cmd
import (
"bytes"
"context"
"crypto/hmac"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
@@ -43,12 +45,15 @@ import (
"github.com/minio/minio/internal/config/dns"
"github.com/minio/minio/internal/config/storageclass"
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/fips"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/hash/sha256"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/trie"
"github.com/minio/pkg/wildcard"
"github.com/minio/sio"
)
const (
@@ -513,12 +518,12 @@ func partNumberToRangeSpec(oi ObjectInfo, partNumber int) *HTTPRangeSpec {
// Returns the compressed offset which should be skipped.
// If encrypted offsets are adjusted for encrypted block headers/trailers.
// Since de-compression is after decryption encryption overhead is only added to compressedOffset.
func getCompressedOffsets(objectInfo ObjectInfo, offset int64) (compressedOffset int64, partSkip int64, firstPart int) {
func getCompressedOffsets(oi ObjectInfo, offset int64, decrypt func([]byte) ([]byte, error)) (compressedOffset int64, partSkip int64, firstPart int, decryptSkip int64, seqNum uint32) {
var skipLength int64
var cumulativeActualSize int64
var firstPartIdx int
if len(objectInfo.Parts) > 0 {
for i, part := range objectInfo.Parts {
if len(oi.Parts) > 0 {
for i, part := range oi.Parts {
cumulativeActualSize += part.ActualSize
if cumulativeActualSize <= offset {
compressedOffset += part.Size
@@ -529,8 +534,52 @@ func getCompressedOffsets(objectInfo ObjectInfo, offset int64) (compressedOffset
}
}
}
partSkip = offset - skipLength
return compressedOffset, offset - skipLength, firstPartIdx
// Load index and skip more if feasible.
if partSkip > 0 && len(oi.Parts) > firstPartIdx && len(oi.Parts[firstPartIdx].Index) > 0 {
_, isEncrypted := crypto.IsEncrypted(oi.UserDefined)
if isEncrypted {
dec, err := decrypt(oi.Parts[firstPartIdx].Index)
if err == nil {
// Load Index
var idx s2.Index
_, err := idx.Load(restoreIndexHeaders(dec))
// Find compressed/uncompressed offsets of our partskip
compOff, uCompOff, err2 := idx.Find(partSkip)
if err == nil && err2 == nil && compOff > 0 {
// Encrypted.
const sseDAREEncPackageBlockSize = SSEDAREPackageBlockSize + SSEDAREPackageMetaSize
// Number of full blocks in skipped area
seqNum = uint32(compOff / SSEDAREPackageBlockSize)
// Skip this many inside a decrypted block to get to compression block start
decryptSkip = compOff % SSEDAREPackageBlockSize
// Skip this number of full blocks.
skipEnc := compOff / SSEDAREPackageBlockSize
skipEnc *= sseDAREEncPackageBlockSize
compressedOffset += skipEnc
// Skip this number of uncompressed bytes.
partSkip -= uCompOff
}
}
} else {
// Not encrypted
var idx s2.Index
_, err := idx.Load(restoreIndexHeaders(oi.Parts[firstPartIdx].Index))
// Find compressed/uncompressed offsets of our partskip
compOff, uCompOff, err2 := idx.Find(partSkip)
if err == nil && err2 == nil && compOff > 0 {
compressedOffset += compOff
partSkip -= uCompOff
}
}
}
return compressedOffset, partSkip, firstPartIdx, decryptSkip, seqNum
}
// GetObjectReader is a type that wraps a reader with a lock to
@@ -618,6 +667,8 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) (
if err != nil {
return nil, 0, 0, err
}
var decryptSkip int64
var seqNum uint32
off, length = int64(0), oi.Size
decOff, decLength := int64(0), actualSize
@@ -626,10 +677,14 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) (
if err != nil {
return nil, 0, 0, err
}
decrypt := func(b []byte) ([]byte, error) {
return b, nil
}
if isEncrypted {
decrypt = oi.compressionIndexDecrypt
}
// In case of range based queries on multiparts, the offset and length are reduced.
off, decOff, firstPart = getCompressedOffsets(oi, off)
off, decOff, firstPart, decryptSkip, seqNum = getCompressedOffsets(oi, off, decrypt)
decLength = length
length = oi.Size - off
// For negative length we read everything.
@@ -646,7 +701,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) (
if isEncrypted {
copySource := h.Get(xhttp.AmzServerSideEncryptionCopyCustomerAlgorithm) != ""
// Attach decrypter on inputReader
inputReader, err = DecryptBlocksRequestR(inputReader, h, 0, firstPart, oi, copySource)
inputReader, err = DecryptBlocksRequestR(inputReader, h, seqNum, firstPart, oi, copySource)
if err != nil {
// Call the cleanup funcs
for i := len(cFns) - 1; i >= 0; i-- {
@@ -654,10 +709,18 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) (
}
return nil, err
}
if decryptSkip > 0 {
inputReader = ioutil.NewSkipReader(inputReader, decryptSkip)
}
oi.Size = decLength
}
// Decompression reader.
s2Reader := s2.NewReader(inputReader)
var dopts []s2.ReaderOption
if off > 0 {
// We are not starting at the beginning, so ignore stream identifiers.
dopts = append(dopts, s2.ReaderIgnoreStreamIdentifier())
}
s2Reader := s2.NewReader(inputReader, dopts...)
// Apply the skipLen and limit on the decompressed stream.
if decOff > 0 {
if err = s2Reader.Skip(decOff); err != nil {
@@ -778,6 +841,41 @@ func (g *GetObjectReader) Close() error {
return nil
}
func compressionIndexEncrypter(key crypto.ObjectKey, input func() []byte) func() []byte {
var data []byte
var fetched bool
return func() []byte {
if !fetched {
data = input()
fetched = true
}
if len(data) == 0 {
return data
}
var buffer bytes.Buffer
mac := hmac.New(sha256.New, key[:])
mac.Write([]byte("compression-index"))
if _, err := sio.Encrypt(&buffer, bytes.NewReader(data), sio.Config{Key: mac.Sum(nil), CipherSuites: fips.DARECiphers()}); err != nil {
logger.CriticalIf(context.Background(), errors.New("unable to encrypt compression index using object key"))
}
return buffer.Bytes()
}
}
func (o *ObjectInfo) compressionIndexDecrypt(input []byte) ([]byte, error) {
if len(input) == 0 {
return input, nil
}
key, err := decryptObjectInfo(nil, o.Bucket, o.Name, o.UserDefined)
if err != nil {
return nil, err
}
mac := hmac.New(sha256.New, key)
mac.Write([]byte("compression-index"))
return sio.DecryptBuffer(nil, input, sio.Config{Key: mac.Sum(nil), CipherSuites: fips.DARECiphers()})
}
// SealMD5CurrFn seals md5sum with object encryption key and returns sealed
// md5sum
type SealMD5CurrFn func([]byte) []byte
@@ -888,11 +986,13 @@ func init() {
// input 'on' is always recommended such that this function works
// properly, because we do not wish to create an object even if
// client closed the stream prematurely.
func newS2CompressReader(r io.Reader, on int64) io.ReadCloser {
func newS2CompressReader(r io.Reader, on int64) (rc io.ReadCloser, idx func() []byte) {
pr, pw := io.Pipe()
// Copy input to compressor
comp := s2.NewWriter(pw, compressOpts...)
indexCh := make(chan []byte, 1)
go func() {
comp := s2.NewWriter(pw, compressOpts...)
defer close(indexCh)
cn, err := io.Copy(comp, r)
if err != nil {
comp.Close()
@@ -907,9 +1007,25 @@ func newS2CompressReader(r io.Reader, on int64) io.ReadCloser {
return
}
// Close the stream.
// If more than 8MB was written, generate index.
if cn > 8<<20 {
idx, err := comp.CloseIndex()
idx = removeIndexHeaders(idx)
indexCh <- idx
pw.CloseWithError(err)
return
}
pw.CloseWithError(comp.Close())
}()
return pr
var gotIdx []byte
return pr, func() []byte {
if gotIdx != nil {
return gotIdx
}
// Will get index or nil if closed.
gotIdx = <-indexCh
return gotIdx
}
}
// compressSelfTest performs a self-test to ensure that compression
@@ -933,7 +1049,7 @@ func compressSelfTest() {
}
}
const skip = 2<<20 + 511
r := newS2CompressReader(bytes.NewBuffer(data), int64(len(data)))
r, _ := newS2CompressReader(bytes.NewBuffer(data), int64(len(data)))
b, err := io.ReadAll(r)
failOnErr(err)
failOnErr(r.Close())
@@ -1012,3 +1128,65 @@ func hasSpaceFor(di []*DiskInfo, size int64) bool {
wantLeft := uint64(float64(total) * (1.0 - diskFillFraction))
return available > wantLeft
}
// removeIndexHeaders will trim all headers and trailers from a given index.
// This is expected to save 20 bytes.
// These can be restored using RestoreIndexHeaders.
// This removes a layer of security, but is the most compact representation.
// Returns nil if headers contains errors.
// The returned slice references the provided slice.
func removeIndexHeaders(b []byte) []byte {
const save = 4 + len(s2.S2IndexHeader) + len(s2.S2IndexTrailer) + 4
if len(b) <= save {
return nil
}
if b[0] != s2.ChunkTypeIndex {
return nil
}
chunkLen := int(b[1]) | int(b[2])<<8 | int(b[3])<<16
b = b[4:]
// Validate we have enough...
if len(b) < chunkLen {
return nil
}
b = b[:chunkLen]
if !bytes.Equal(b[:len(s2.S2IndexHeader)], []byte(s2.S2IndexHeader)) {
return nil
}
b = b[len(s2.S2IndexHeader):]
if !bytes.HasSuffix(b, []byte(s2.S2IndexTrailer)) {
return nil
}
b = bytes.TrimSuffix(b, []byte(s2.S2IndexTrailer))
if len(b) < 4 {
return nil
}
return b[:len(b)-4]
}
// restoreIndexHeaders will index restore headers removed by RemoveIndexHeaders.
// No error checking is performed on the input.
func restoreIndexHeaders(in []byte) []byte {
if len(in) == 0 {
return nil
}
b := make([]byte, 0, 4+len(s2.S2IndexHeader)+len(in)+len(s2.S2IndexTrailer)+4)
b = append(b, s2.ChunkTypeIndex, 0, 0, 0)
b = append(b, []byte(s2.S2IndexHeader)...)
b = append(b, in...)
var tmp [4]byte
binary.LittleEndian.PutUint32(tmp[:], uint32(len(b)+4+len(s2.S2IndexTrailer)))
b = append(b, tmp[:4]...)
// Trailer
b = append(b, []byte(s2.S2IndexTrailer)...)
chunkLen := len(b) - 4 /*skippableFrameHeader*/
b[1] = uint8(chunkLen >> 0)
b[2] = uint8(chunkLen >> 8)
b[3] = uint8(chunkLen >> 16)
return b
}