Skip to content

Commit

Permalink
Support configuration of part size file write (#346)
Browse files Browse the repository at this point in the history
  • Loading branch information
xinlongz1 authored Aug 10, 2023
1 parent 514c77f commit f228c15
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 72 deletions.
21 changes: 12 additions & 9 deletions lib/store/base/file_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -79,8 +79,8 @@ type FileEntry interface {
LinkTo(targetPath string) error
Delete() error

GetReader() (FileReader, error)
GetReadWriter() (FileReadWriter, error)
GetReader(readPartSize int) (FileReader, error)
GetReadWriter(readPartSize, writePartSize int) (FileReadWriter, error)

AddMetadata(md metadata.Metadata) error

Expand Down Expand Up @@ -438,29 +438,32 @@ func (entry *localFileEntry) Delete() error {
}

// GetReader returns a FileReader object for read operations.
func (entry *localFileEntry) GetReader() (FileReader, error) {
func (entry *localFileEntry) GetReader(readPartSize int) (FileReader, error) {
f, err := os.OpenFile(entry.GetPath(), os.O_RDONLY, 0775)
if err != nil {
return nil, err
}

reader := &localFileReadWriter{
entry: entry,
descriptor: f,
entry: entry,
descriptor: f,
readPartSize: readPartSize,
}
return reader, nil
}

// GetReadWriter returns a FileReadWriter object for read/write operations.
func (entry *localFileEntry) GetReadWriter() (FileReadWriter, error) {
func (entry *localFileEntry) GetReadWriter(readPartSize, writePartSize int) (FileReadWriter, error) {
f, err := os.OpenFile(entry.GetPath(), os.O_RDWR, 0775)
if err != nil {
return nil, err
}

readWriter := &localFileReadWriter{
entry: entry,
descriptor: f,
entry: entry,
descriptor: f,
readPartSize: readPartSize,
writePartSize: writePartSize,
}
return readWriter, nil
}
Expand Down
8 changes: 4 additions & 4 deletions lib/store/base/file_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -446,17 +446,17 @@ func TestLRUUpdateLastAccessTimeOnOpen(t *testing.T) {

// No LAT change below resolution.
clk.Add(time.Minute)
_, err := store.NewFileOp().AcceptState(s1).GetFileReader(fn)
_, err := store.NewFileOp().AcceptState(s1).GetFileReader(fn, 0 /* readPartSize */)
require.NoError(err)
checkLAT(store.NewFileOp().AcceptState(s1), t0)

clk.Add(time.Hour)
_, err = store.NewFileOp().AcceptState(s1).GetFileReader(fn)
_, err = store.NewFileOp().AcceptState(s1).GetFileReader(fn, 0 /* readPartSize */)
require.NoError(err)
checkLAT(store.NewFileOp().AcceptState(s1), clk.Now())

clk.Add(time.Hour)
_, err = store.NewFileOp().AcceptState(s1).GetFileReadWriter(fn)
_, err = store.NewFileOp().AcceptState(s1).GetFileReadWriter(fn, 0 /* readPartSize */, 0 /*writePartSize*/)
require.NoError(err)
checkLAT(store.NewFileOp().AcceptState(s1), clk.Now())
}
Expand Down
14 changes: 7 additions & 7 deletions lib/store/base/file_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -47,8 +47,8 @@ type FileOp interface {
GetFilePath(name string) (string, error)
GetFileStat(name string) (os.FileInfo, error)

GetFileReader(name string) (FileReader, error)
GetFileReadWriter(name string) (FileReadWriter, error)
GetFileReader(name string, readPartSize int) (FileReader, error)
GetFileReadWriter(name string, readPartSize, writePartSize int) (FileReadWriter, error)

GetFileMetadata(name string, md metadata.Metadata) error
SetFileMetadata(name string, md metadata.Metadata) (bool, error)
Expand Down Expand Up @@ -358,19 +358,19 @@ func (op *localFileOp) GetFileStat(name string) (info os.FileInfo, err error) {
}

// GetFileReader returns a FileReader object for read operations.
func (op *localFileOp) GetFileReader(name string) (r FileReader, err error) {
func (op *localFileOp) GetFileReader(name string, readPartSize int) (r FileReader, err error) {
if loadErr := op.lockHelper(name, _lockLevelRead, func(name string, entry FileEntry) {
r, err = entry.GetReader()
r, err = entry.GetReader(readPartSize)
}); loadErr != nil {
return nil, loadErr
}
return r, err
}

// GetFileReadWriter returns a FileReadWriter object for read/write operations.
func (op *localFileOp) GetFileReadWriter(name string) (w FileReadWriter, err error) {
func (op *localFileOp) GetFileReadWriter(name string, readPartSize, writePartSize int) (w FileReadWriter, err error) {
if loadErr := op.lockHelper(name, _lockLevelWrite, func(name string, entry FileEntry) {
w, err = entry.GetReadWriter()
w, err = entry.GetReadWriter(readPartSize, writePartSize)
}); loadErr != nil {
return nil, loadErr
}
Expand Down
34 changes: 17 additions & 17 deletions lib/store/base/file_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -25,8 +25,8 @@ import (
"sync/atomic"
"testing"

"github.com/uber/kraken/core"
"github.com/stretchr/testify/require"
"github.com/uber/kraken/core"
)

// These tests should pass for all FileStore/FileOp implementations
Expand Down Expand Up @@ -158,7 +158,7 @@ func testReloadFileEntry(require *require.Assertions, storeBundle *fileStoreTest
require.False(ok)

// GetFileReader should load file from disk into map, including metadata.
_, err = store.NewFileOp().AcceptState(s1).GetFileReader(fn)
_, err = store.NewFileOp().AcceptState(s1).GetFileReader(fn, 0 /* readPartSize */)
require.NoError(err)
ok = store.fileMap.Contains(fn)
require.True(ok)
Expand All @@ -177,14 +177,14 @@ func testMoveFile(require *require.Assertions, storeBundle *fileStoreTestBundle)
if !ok {
log.Fatal("file not found in state1")
}

partSize := 100
// Update content
readWriterState2, err := store.NewFileOp().AcceptState(s1).GetFileReadWriter(fn)
readWriterState2, err := store.NewFileOp().AcceptState(s1).GetFileReadWriter(fn, partSize, partSize)
require.NoError(err)
_, err = readWriterState2.Write([]byte{'t', 'e', 's', 't', '\n'})
require.NoError(err)
readWriterState2.Close()
readWriterState2, err = store.NewFileOp().AcceptState(s1).GetFileReadWriter(fn)
readWriterState2, err = store.NewFileOp().AcceptState(s1).GetFileReadWriter(fn, partSize, partSize)
require.NoError(err)

// Move from state1 to state2
Expand All @@ -194,7 +194,7 @@ func testMoveFile(require *require.Assertions, storeBundle *fileStoreTestBundle)
require.NoError(err)
_, err = os.Stat(filepath.Join(s1.GetDirectory(), store.fileEntryFactory.GetRelativePath(fn)))
require.True(os.IsNotExist(err))
_, err = store.NewFileOp().AcceptState(s2).GetFileReader(fn)
_, err = store.NewFileOp().AcceptState(s2).GetFileReader(fn, partSize)
require.NoError(err)

// Move from state1 to state3 would fail with state error
Expand All @@ -203,7 +203,7 @@ func testMoveFile(require *require.Assertions, storeBundle *fileStoreTestBundle)
require.True(IsFileStateError(err))

// Create new readWriter at new state
readWriterState1, err := store.NewFileOp().AcceptState(s2).GetFileReadWriter(fn)
readWriterState1, err := store.NewFileOp().AcceptState(s2).GetFileReadWriter(fn, partSize, partSize)
require.NoError(err)
// Check content
dataState1, err := ioutil.ReadAll(readWriterState1)
Expand Down Expand Up @@ -232,7 +232,7 @@ func testMoveFile(require *require.Assertions, storeBundle *fileStoreTestBundle)
_, err = os.Stat(filepath.Join(s2.GetDirectory(), store.fileEntryFactory.GetRelativePath(fn)))
require.NoError(err)
// Check content again
readWriterStateMoved, err := store.NewFileOp().AcceptState(s2).GetFileReadWriter(fn)
readWriterStateMoved, err := store.NewFileOp().AcceptState(s2).GetFileReadWriter(fn, partSize, partSize)
require.NoError(err)
dataMoved, err := ioutil.ReadAll(readWriterStateMoved)
require.NoError(err)
Expand All @@ -242,7 +242,7 @@ func testMoveFile(require *require.Assertions, storeBundle *fileStoreTestBundle)
// Move back to state1
err = store.NewFileOp().AcceptState(s2).MoveFile(fn, s1)
require.NoError(err)
_, err = store.NewFileOp().AcceptState(s1).GetFileReader(fn)
_, err = store.NewFileOp().AcceptState(s1).GetFileReader(fn, partSize)
require.NoError(err)
}

Expand Down Expand Up @@ -273,7 +273,7 @@ func testDeleteFile(require *require.Assertions, storeBundle *fileStoreTestBundl
content := "this a test for read after delete"

// Write to file
rw, err := store.NewFileOp().AcceptState(s1).GetFileReadWriter(fn)
rw, err := store.NewFileOp().AcceptState(s1).GetFileReadWriter(fn, 100 /*readPartSize*/, 100 /*writePartSize*/)
require.NoError(err)
rw.Write([]byte(content))

Expand All @@ -298,7 +298,7 @@ func testDeleteFile(require *require.Assertions, storeBundle *fileStoreTestBundl
rw.Close()

// Get deleted file should fail
_, err = store.NewFileOp().AcceptState(s1).GetFileReader(fn)
_, err = store.NewFileOp().AcceptState(s1).GetFileReader(fn, 100 /*readPartSize */)
require.True(os.IsNotExist(err))
}

Expand All @@ -312,7 +312,7 @@ func testGetFileReader(require *require.Assertions, storeBundle *fileStoreTestBu
}

// Get ReadWriter and modify the file.
readWriter, err := store.NewFileOp().AcceptState(s1).GetFileReadWriter(fn)
readWriter, err := store.NewFileOp().AcceptState(s1).GetFileReadWriter(fn, 100 /*readPartSize */, 100 /*writePartSize*/)
require.NoError(err)
defer readWriter.Close()
_, err = readWriter.Write([]byte{'t', 'e', 's', 't', '\n'})
Expand All @@ -324,7 +324,7 @@ func testGetFileReader(require *require.Assertions, storeBundle *fileStoreTestBu
wg.Add(1)
go func() {
defer wg.Done()
reader, err := store.NewFileOp().AcceptState(s1).GetFileReader(fn)
reader, err := store.NewFileOp().AcceptState(s1).GetFileReader(fn, 100 /*readPartSize */)
require.NoError(err)

b := make([]byte, 5)
Expand All @@ -341,7 +341,7 @@ func testGetFileReader(require *require.Assertions, storeBundle *fileStoreTestBu
}
wg.Wait()

reader, err := store.NewFileOp().AcceptState(s1).GetFileReader(fn)
reader, err := store.NewFileOp().AcceptState(s1).GetFileReader(fn, 100 /*readPartSize */)
require.NoError(err)
reader.Close()
}
Expand All @@ -361,7 +361,7 @@ func testGetFileReadWriter(require *require.Assertions, storeBundle *fileStoreTe
wg.Add(1)
go func() {
defer wg.Done()
readWriter, err := store.NewFileOp().AcceptState(s1).GetFileReadWriter(fn)
readWriter, err := store.NewFileOp().AcceptState(s1).GetFileReadWriter(fn, 100 /*readPartSize */, 100 /*writePartSize*/)
require.NoError(err)

_, err = readWriter.Write([]byte{'t', 'e', 's', 't', '\n'})
Expand All @@ -384,7 +384,7 @@ func testGetFileReadWriter(require *require.Assertions, storeBundle *fileStoreTe
wg.Wait()

// Verify content.
reader, err := store.NewFileOp().AcceptState(s1).GetFileReader(fn)
reader, err := store.NewFileOp().AcceptState(s1).GetFileReader(fn, 100 /*readPartSize */)
require.NoError(err)

b := make([]byte, 5)
Expand Down
80 changes: 73 additions & 7 deletions lib/store/base/file_readwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -40,8 +40,10 @@ type FileReadWriter interface {
// LocalFileReadWriter implements FileReadWriter interface, provides read/write
// operation on a local file.
type localFileReadWriter struct {
entry *localFileEntry
descriptor *os.File
entry *localFileEntry
descriptor *os.File
writePartSize int
readPartSize int
}

func (readWriter *localFileReadWriter) close() error {
Expand All @@ -55,22 +57,86 @@ func (readWriter localFileReadWriter) Close() error {

// Write writes up to len(b) bytes to the File.
func (readWriter localFileReadWriter) Write(p []byte) (int, error) {
return readWriter.descriptor.Write(p)
if readWriter.writePartSize == 0 {
return readWriter.descriptor.Write(p)
}
totalBytesWritten := 0
for totalBytesWritten < len(p) {
blockSize := readWriter.writePartSize
if remainning := len(p) - totalBytesWritten; remainning < blockSize {
blockSize = remainning
}
n, err := readWriter.descriptor.Write(p[totalBytesWritten : totalBytesWritten+blockSize])
totalBytesWritten += n
if err != nil {
return totalBytesWritten, err
}
}
return totalBytesWritten, nil
}

// WriteAt writes len(p) bytes from p to the underlying data stream at offset.
func (readWriter localFileReadWriter) WriteAt(p []byte, offset int64) (int, error) {
return readWriter.descriptor.WriteAt(p, offset)
if readWriter.writePartSize == 0 {
return readWriter.descriptor.WriteAt(p, offset)
}

totalBytesWritten := 0
for totalBytesWritten < len(p) {
blockSize := readWriter.writePartSize
if remainning := len(p) - totalBytesWritten; remainning < blockSize {
blockSize = remainning
}
n, err := readWriter.descriptor.WriteAt(p[totalBytesWritten:totalBytesWritten+blockSize], offset)
totalBytesWritten += n
offset += int64(n)
if err != nil {
return totalBytesWritten, err
}
}
return totalBytesWritten, nil
}

// Read reads up to len(b) bytes from the File.
func (readWriter localFileReadWriter) Read(p []byte) (int, error) {
return readWriter.descriptor.Read(p)
if readWriter.readPartSize == 0 {
return readWriter.descriptor.Read(p)
}

totalBytesRead := 0
for totalBytesRead < len(p) {
blockSize := readWriter.readPartSize
if remaining := len(p) - totalBytesRead; remaining < blockSize {
blockSize = remaining
}
n, err := readWriter.descriptor.Read(p[totalBytesRead : totalBytesRead+blockSize])
totalBytesRead += n
if err != nil {
return totalBytesRead, err
}
}
return totalBytesRead, nil
}

// ReadAt reads len(b) bytes from the File starting at byte offset off.
func (readWriter localFileReadWriter) ReadAt(p []byte, offset int64) (int, error) {
return readWriter.descriptor.ReadAt(p, offset)
if readWriter.readPartSize == 0 {
return readWriter.descriptor.ReadAt(p, offset)
}
totalBytesRead := 0
for totalBytesRead < len(p) {
blockSize := readWriter.readPartSize
if remaining := len(p) - totalBytesRead; remaining < blockSize {
blockSize = remaining
}
n, err := readWriter.descriptor.ReadAt(p[totalBytesRead:totalBytesRead+blockSize], offset)
totalBytesRead += n
offset += int64(n)
if err != nil {
return totalBytesRead, err
}
}
return totalBytesRead, nil
}

// Seek sets the offset for the next Read or Write on file to offset,
Expand Down
Loading

0 comments on commit f228c15

Please sign in to comment.