Skip to content

Commit

Permalink
add mmap
Browse files Browse the repository at this point in the history
  • Loading branch information
sunquan.logic committed Sep 10, 2021
1 parent a9253ef commit 75bad20
Show file tree
Hide file tree
Showing 27 changed files with 785 additions and 135 deletions.
2 changes: 2 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type (
}
)

// Open DB
// TODO 这里是不是要上一个目录锁比较好,防止多个进程打开同一个目录?
func Open(opt *Options) *DB {
db := &DB{opt: opt}
// 初始化LSM结构
Expand Down
18 changes: 18 additions & 0 deletions debug.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash
###
# Copyright 2021 logicrec Project Authors
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###

dlv test -test.run=$1
48 changes: 46 additions & 2 deletions file/file.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,53 @@
// Copyright 2021 logicrec Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright 2021 hardcore-os Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package file

import "io"

// Options
type Options struct {
FID uint32
FileName string
Dir string
Path string
Flag int
MaxSz int
}

type CoreFile interface {
Write(b []byte) (n int, err error)
Read(b []byte) (n int, err error)
Close() error
Truncature(n int64) error
ReName(name string) error
NewReader(offset int) io.Reader
Bytes(off, sz int) ([]byte, error)
AllocateSlice(sz, offset int) ([]byte, int, error)
Sync() error
Delete() error
Slice(offset int) []byte
}
37 changes: 26 additions & 11 deletions file/manifest.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
// Copyright 2021 hardcore-os Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package file

import (
"encoding/json"
"fmt"
"io/ioutil"
"sync"

"github.com/hardcore-os/corekv/utils"
)

// Manifest 维护sst文件元信息的文件
type Manifest struct {
lock *sync.RWMutex
opt *Options
f CoreFile
tables [][]*Cell // l0-l7 的sst file name
Expand All @@ -36,12 +50,14 @@ func (mf *Manifest) Tables() [][]*Cell {
// OpenManifest
func OpenManifest(opt *Options) *Manifest {
mf := &Manifest{
f: OpenMockFile(opt),
tables: make([][]*Cell, utils.MaxLevelNum),
opt: opt,
lock: &sync.RWMutex{},
}
data, err := ioutil.ReadAll(mf.f)
mmapFile, err := OpenMmapFile(opt.FileName, opt.Flag, opt.MaxSz)
utils.Panic(err)
mf.f = mmapFile
data := mf.f.Slice(0) // all in bytes
// 如果是新创建的数据库则直接启动,不需要加载sst
if len(data) == 0 {
return mf
Expand All @@ -61,11 +77,6 @@ func OpenManifest(opt *Options) *Manifest {
// AppendSST 存储level表到manifest的level中
func (mf *Manifest) AppendSST(levelNum int, cell *Cell) (err error) {
mf.tables[levelNum] = append(mf.tables[levelNum], cell)
// TODO 保留旧的MANIFEST文件作为检查点,当前直接截断
err = mf.f.Truncature(0)
if err != nil {
return err
}
res := make([][]string, len(mf.tables))
for i, cells := range mf.tables {
res[i] = make([]string, 0)
Expand All @@ -83,7 +94,11 @@ func (mf *Manifest) AppendSST(levelNum int, cell *Cell) (err error) {
// if err != nil {
// return err
// }
ioutil.WriteFile(fmt.Sprintf("%s/%s", mf.opt.Dir, mf.opt.Name), data, 0666)
//_, err = mf.f.Write(data)
mf.lock.Lock()
defer mf.lock.Unlock()
// TODO 保留旧的MANIFEST文件作为检查点,当前直接截断
fileData, _, err := mf.f.AllocateSlice(len(data), 0)
utils.Panic(err)
copy(fileData, data)
return err
}
227 changes: 227 additions & 0 deletions file/mmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright 2021 hardcore-os Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package file

import (
"encoding/binary"
"fmt"
"io"
"os"
"path/filepath"

"github.com/hardcore-os/corekv/utils"
"github.com/hardcore-os/corekv/utils/mmap"
"github.com/pkg/errors"
)

// MmapFile represents an mmapd file and includes both the buffer to the data and the file descriptor.
type MmapFile struct {
Data []byte
Fd *os.File
}

// OpenMmapFileUsing os
func OpenMmapFileUsing(fd *os.File, sz int, writable bool) (*MmapFile, error) {
filename := fd.Name()
fi, err := fd.Stat()
if err != nil {
return nil, errors.Wrapf(err, "cannot stat file: %s", filename)
}

var rerr error
fileSize := fi.Size()
if sz > 0 && fileSize == 0 {
// If file is empty, truncate it to sz.
if err := fd.Truncate(int64(sz)); err != nil {
return nil, errors.Wrapf(err, "error while truncation")
}
fileSize = int64(sz)
}

// fmt.Printf("Mmaping file: %s with writable: %v filesize: %d\n", fd.Name(), writable, fileSize)
buf, err := mmap.Mmap(fd, writable, fileSize) // Mmap up to file size.
if err != nil {
return nil, errors.Wrapf(err, "while mmapping %s with size: %d", fd.Name(), fileSize)
}

if fileSize == 0 {
dir, _ := filepath.Split(filename)
go SyncDir(dir)
}
return &MmapFile{
Data: buf,
Fd: fd,
}, rerr
}

// OpenMmapFile opens an existing file or creates a new file. If the file is
// created, it would truncate the file to maxSz. In both cases, it would mmap
// the file to maxSz and returned it. In case the file is created, z.NewFile is
// returned.
func OpenMmapFile(filename string, flag int, maxSz int) (*MmapFile, error) {
// fmt.Printf("opening file %s with flag: %v\n", filename, flag)
fd, err := os.OpenFile(filename, flag, 0666)
if err != nil {
return nil, errors.Wrapf(err, "unable to open: %s", filename)
}
writable := true
if flag == os.O_RDONLY {
writable = false
}
return OpenMmapFileUsing(fd, maxSz, writable)
}

type mmapReader struct {
Data []byte
offset int
}

func (mr *mmapReader) Read(buf []byte) (int, error) {
if mr.offset > len(mr.Data) {
return 0, io.EOF
}
n := copy(buf, mr.Data[mr.offset:])
mr.offset += n
if n < len(buf) {
return n, io.EOF
}
return n, nil
}

func (m *MmapFile) NewReader(offset int) io.Reader {
return &mmapReader{
Data: m.Data,
offset: offset,
}
}

// Bytes returns data starting from offset off of size sz. If there's not enough data, it would
// return nil slice and io.EOF.
func (m *MmapFile) Bytes(off, sz int) ([]byte, error) {
if len(m.Data[off:]) < sz {
return nil, io.EOF
}
return m.Data[off : off+sz], nil
}

// Slice returns the slice at the given offset.
func (m *MmapFile) Slice(offset int) []byte {
sz := binary.BigEndian.Uint32(m.Data[offset:])
start := offset + 4
next := start + int(sz)
if next > len(m.Data) {
return []byte{}
}
res := m.Data[start:next]
return res
}

// AllocateSlice allocates a slice of the given size at the given offset.
func (m *MmapFile) AllocateSlice(sz, offset int) ([]byte, int, error) {
start := offset + 4

// If the file is too small, double its size or increase it by 1GB, whichever is smaller.
if start+sz > len(m.Data) {
const oneGB = 1 << 30
growBy := len(m.Data)
if growBy > oneGB {
growBy = oneGB
}
if growBy < sz+4 {
growBy = sz + 4
}
if err := m.Truncature(int64(len(m.Data) + growBy)); err != nil {
return nil, 0, err
}
}

binary.BigEndian.PutUint32(m.Data[offset:], uint32(sz))
return m.Data[start : start+sz], start + sz, nil
}

func (m *MmapFile) Sync() error {
if m == nil {
return nil
}
return mmap.Msync(m.Data)
}

func (m *MmapFile) Delete() error {
// Badger can set the m.Data directly, without setting any Fd. In that case, this should be a
// NOOP.
if m.Fd == nil {
return nil
}

if err := mmap.Munmap(m.Data); err != nil {
return fmt.Errorf("while munmap file: %s, error: %v\n", m.Fd.Name(), err)
}
m.Data = nil
if err := m.Fd.Truncate(0); err != nil {
return fmt.Errorf("while truncate file: %s, error: %v\n", m.Fd.Name(), err)
}
if err := m.Fd.Close(); err != nil {
return fmt.Errorf("while close file: %s, error: %v\n", m.Fd.Name(), err)
}
return os.Remove(m.Fd.Name())
}

// Close would close the file. It would also truncate the file if maxSz >= 0.
func (m *MmapFile) Close() error {
if m.Fd == nil {
return nil
}
if err := m.Sync(); err != nil {
return fmt.Errorf("while sync file: %s, error: %v\n", m.Fd.Name(), err)
}
if err := mmap.Munmap(m.Data); err != nil {
return fmt.Errorf("while munmap file: %s, error: %v\n", m.Fd.Name(), err)
}
return m.Fd.Close()
}

func SyncDir(dir string) error {
df, err := os.Open(dir)
if err != nil {
return errors.Wrapf(err, "while opening %s", dir)
}
if err := df.Sync(); err != nil {
return errors.Wrapf(err, "while syncing %s", dir)
}
if err := df.Close(); err != nil {
return errors.Wrapf(err, "while closing %s", dir)
}
return nil
}

// Truncature 兼容接口
func (m *MmapFile) Truncature(maxSz int64) error {
var err error
if maxSz >= 0 {
if err = m.Fd.Truncate(maxSz); err != nil {
return fmt.Errorf("while truncate file: %s, error: %v\n", m.Fd.Name(), err)
}
if maxSz > int64(len(m.Data)) {
m.Data, err = mmap.Mremap(m.Data, int(maxSz))
return utils.Err(err)
}
}
return nil
}

// ReName 兼容接口
func (m *MmapFile) ReName(name string) error {
return nil
}
Loading

0 comments on commit 75bad20

Please sign in to comment.