Skip to content

Commit

Permalink
Make block.Stream() return a copy of data, even in the case where it …
Browse files Browse the repository at this point in the history
…performs a merge internally. (m3db#773)
  • Loading branch information
richardartoul authored Jun 25, 2018
1 parent f799769 commit c72df36
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 64 deletions.
28 changes: 18 additions & 10 deletions src/dbnode/storage/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (b *dbBlock) Checksum() (uint32, error) {

// This will merge the existing stream with the merge target's stream,
// as well as recalculate and store the new checksum.
_, err = b.forceMergeWithLock(tempCtx, stream)
err = b.forceMergeWithLock(tempCtx, stream)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -230,7 +230,11 @@ func (b *dbBlock) Stream(blocker context.Context) (xio.BlockReader, error) {
lockUpgraded = true
b.Lock()

// NB: need to re-check as we just upgraded the lock.
// Need to re-check everything since we upgraded the lock.
if b.closed {
return xio.EmptyBlockReader, errReadFromClosedBlock
}

stream, err := b.streamWithRLock(blocker)
if err != nil {
return xio.EmptyBlockReader, err
Expand All @@ -242,7 +246,14 @@ func (b *dbBlock) Stream(blocker context.Context) (xio.BlockReader, error) {

// This will merge the existing stream with the merge target's stream,
// as well as recalculate and store the new checksum.
return b.forceMergeWithLock(blocker, stream)
err = b.forceMergeWithLock(blocker, stream)
if err != nil {
return xio.EmptyBlockReader, err
}

// This will return a copy of the data so that it is still safe to
// close the block after calling this method.
return b.streamWithRLock(blocker)
}

func (b *dbBlock) HasMergeTarget() bool {
Expand Down Expand Up @@ -351,13 +362,10 @@ func (b *dbBlock) streamWithRLock(ctx context.Context) (xio.BlockReader, error)
return blockReader, nil
}

// TODO(rartoul): The existing ctx is still holding a reference to the old segment so that will hang around
// and waste memory until the block is closed. We could improve this by swapping out the underlying ctx with
// a new one, allowing us to close the old one and release the old segment, freeing memory.
func (b *dbBlock) forceMergeWithLock(ctx context.Context, stream xio.SegmentReader) (xio.BlockReader, error) {
func (b *dbBlock) forceMergeWithLock(ctx context.Context, stream xio.SegmentReader) error {
targetStream, err := b.mergeTarget.Stream(ctx)
if err != nil {
return xio.EmptyBlockReader, err
return err
}
start := b.startWithRLock()
mergedBlockReader := newDatabaseMergedBlockReader(start, b.blockSize,
Expand All @@ -366,12 +374,12 @@ func (b *dbBlock) forceMergeWithLock(ctx context.Context, stream xio.SegmentRead
b.opts)
mergedSegment, err := mergedBlockReader.Segment()
if err != nil {
return xio.EmptyBlockReader, err
return err
}

b.resetMergeTargetWithLock()
b.resetSegmentWithLock(mergedSegment)
return mergedBlockReader, nil
return nil
}

func (b *dbBlock) resetNewBlockStartWithLock(start time.Time, blockSize time.Duration) {
Expand Down
197 changes: 143 additions & 54 deletions src/dbnode/storage/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestDatabaseBlockMerge(t *testing.T) {
require.Equal(t, digest.SegmentChecksum(seg), mergedChecksum)

// Make sure each segment reader was only finalized once
require.Equal(t, 2, len(segmentReaders))
require.Equal(t, 3, len(segmentReaders))
depCtx.BlockingClose()
block1.Close()
block2.Close()
Expand All @@ -206,59 +206,90 @@ func TestDatabaseBlockMergeRace(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

// Test data
curr := time.Now()
data := []ts.Datapoint{
ts.Datapoint{
Timestamp: curr,
Value: 0,
},
ts.Datapoint{
Timestamp: curr.Add(time.Second),
Value: 1,
},
}
durations := []time.Duration{
time.Minute,
time.Hour,
}

// Setup
blockOpts := NewOptions()
encodingOpts := encoding.NewOptions()

// Create the two blocks we plan to merge
encoder := m3tsz.NewEncoder(data[0].Timestamp, nil, true, encodingOpts)
encoder.Encode(data[0], xtime.Second, nil)
seg := encoder.Discard()
block1 := NewDatabaseBlock(data[0].Timestamp, durations[0], seg, blockOpts).(*dbBlock)

encoder.Reset(data[1].Timestamp, 10)
encoder.Encode(data[1], xtime.Second, nil)
seg = encoder.Discard()
block2 := NewDatabaseBlock(data[1].Timestamp, durations[1], seg, blockOpts).(*dbBlock)

// Lazily merge the two blocks
block1.Merge(block2)

var wg sync.WaitGroup
wg.Add(2)

go func() {
depCtx := block1.opts.ContextPool().Get()
_, err := block1.Stream(depCtx)
require.NoError(t, err)
wg.Done()
}()

go func() {
depCtx := block1.opts.ContextPool().Get()
_, err := block1.Stream(depCtx)
require.NoError(t, err)
wg.Done()
}()
var (
numRuns = 1000
numRoutines = 20
)

wg.Wait()
for i := 0; i < numRuns; i++ {
func() {
// Test data
curr := time.Now()
data := []ts.Datapoint{
ts.Datapoint{
Timestamp: curr,
Value: 0,
},
ts.Datapoint{
Timestamp: curr.Add(time.Second),
Value: 1,
},
}
durations := []time.Duration{
time.Minute,
time.Hour,
}

// Setup
blockOpts := NewOptions()
encodingOpts := encoding.NewOptions()

// Create the two blocks we plan to merge
encoder := m3tsz.NewEncoder(data[0].Timestamp, nil, true, encodingOpts)
encoder.Encode(data[0], xtime.Second, nil)
seg := encoder.Discard()
block1 := NewDatabaseBlock(data[0].Timestamp, durations[0], seg, blockOpts).(*dbBlock)

encoder.Reset(data[1].Timestamp, 10)
encoder.Encode(data[1], xtime.Second, nil)
seg = encoder.Discard()
block2 := NewDatabaseBlock(data[1].Timestamp, durations[1], seg, blockOpts).(*dbBlock)

// Lazily merge the two blocks
block1.Merge(block2)

var wg sync.WaitGroup
wg.Add(numRoutines)

blockFn := func(block *dbBlock) {
defer wg.Done()

depCtx := block.opts.ContextPool().Get()
var (
// Make sure we shadow the top level variables
// with the same name
stream xio.BlockReader
seg ts.Segment
err error
)
stream, err = block.Stream(depCtx)
block.Close()
if err == errReadFromClosedBlock {
return
}
require.NoError(t, err)

seg, err = stream.Segment()
require.NoError(t, err)
reader := xio.NewSegmentReader(seg)
iter := m3tsz.NewReaderIterator(reader, true, encodingOpts)

i := 0
for iter.Next() {
dp, _, _ := iter.Current()
require.True(t, data[i].Equal(dp))
i++
}
require.NoError(t, iter.Err())
}

for i := 0; i < numRoutines; i++ {
go blockFn(block1)
}

wg.Wait()
}()
}
}

// TestDatabaseBlockMergeChained is similar to TestDatabaseBlockMerge except
Expand Down Expand Up @@ -353,7 +384,7 @@ func TestDatabaseBlockMergeChained(t *testing.T) {
require.Equal(t, digest.SegmentChecksum(seg), mergedChecksum)

// Make sure each segment reader was only finalized once
require.Equal(t, 3, len(segmentReaders))
require.Equal(t, 5, len(segmentReaders))
depCtx.BlockingClose()
block1.Close()
block2.Close()
Expand Down Expand Up @@ -462,6 +493,64 @@ func TestDatabaseBlockChecksumMergesAndRecalculates(t *testing.T) {
require.Equal(t, digest.SegmentChecksum(seg), mergedChecksum)
}

func TestDatabaseBlockStreamMergePerformsCopy(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

// Test data
curr := time.Now()
data := []ts.Datapoint{
ts.Datapoint{
Timestamp: curr,
Value: 0,
},
ts.Datapoint{
Timestamp: curr.Add(time.Second),
Value: 1,
},
}
durations := []time.Duration{
time.Minute,
time.Hour,
}

// Setup
blockOpts := NewOptions()
encodingOpts := encoding.NewOptions()

// Create the two blocks we plan to merge
encoder := m3tsz.NewEncoder(data[0].Timestamp, nil, true, encodingOpts)
encoder.Encode(data[0], xtime.Second, nil)
seg := encoder.Discard()
block1 := NewDatabaseBlock(data[0].Timestamp, durations[0], seg, blockOpts).(*dbBlock)

encoder.Reset(data[1].Timestamp, 10)
encoder.Encode(data[1], xtime.Second, nil)
seg = encoder.Discard()
block2 := NewDatabaseBlock(data[1].Timestamp, durations[1], seg, blockOpts).(*dbBlock)

err := block1.Merge(block2)
require.NoError(t, err)

depCtx := block1.opts.ContextPool().Get()
stream, err := block1.Stream(depCtx)
require.NoError(t, err)
block1.Close()

seg, err = stream.Segment()
require.NoError(t, err)
reader := xio.NewSegmentReader(seg)
iter := m3tsz.NewReaderIterator(reader, true, encodingOpts)

i := 0
for iter.Next() {
dp, _, _ := iter.Current()
require.True(t, data[i].Equal(dp))
i++
}
require.NoError(t, iter.Err())
}

func TestDatabaseSeriesBlocksAddBlock(t *testing.T) {
now := time.Now()
blockTimes := []time.Time{now, now.Add(time.Second), now.Add(time.Minute), now.Add(-time.Second), now.Add(-time.Hour)}
Expand Down

0 comments on commit c72df36

Please sign in to comment.