Skip to content

Commit

Permalink
Fixes for storage tests on Windows
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed May 27, 2023
1 parent 5ba8506 commit dee985f
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 48 deletions.
35 changes: 23 additions & 12 deletions client-nowasm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
"os"
"testing"

"github.com/stretchr/testify/assert"
qt "github.com/frankban/quicktest"

"github.com/stretchr/testify/require"

"github.com/anacrolix/torrent/internal/testutil"
Expand Down Expand Up @@ -38,24 +39,34 @@ func TestIssue335(t *testing.T) {
t.Fatalf("removing torrent dummy data dir: %v", err)
}
}()
logErr := func(f func() error, msg string) {
err := f()
t.Logf("%s: %v", msg, err)
if err != nil {
t.Fail()
}
}
cfg := TestingConfig(t)
cfg.Seed = false
cfg.Debug = true
cfg.DataDir = dir
comp, err := storage.NewBoltPieceCompletion(dir)
require.NoError(t, err)
defer comp.Close()
cfg.DefaultStorage = storage.NewMMapWithCompletion(dir, comp)
c := qt.New(t)
c.Assert(err, qt.IsNil)
defer logErr(comp.Close, "closing bolt piece completion")
mmapStorage := storage.NewMMapWithCompletion(dir, comp)
defer logErr(mmapStorage.Close, "closing mmap storage")
cfg.DefaultStorage = mmapStorage
cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()
c.Assert(err, qt.IsNil)
defer logErr(cl.Close, "closing client")
tor, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
require.NoError(t, err)
assert.True(t, new)
require.True(t, cl.WaitAll())
c.Assert(err, qt.IsNil)
c.Assert(new, qt.IsTrue)
c.Assert(cl.WaitAll(), qt.IsTrue)
tor.Drop()
_, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
require.NoError(t, err)
assert.True(t, new)
require.True(t, cl.WaitAll())
c.Assert(err, qt.IsNil)
c.Assert(new, qt.IsTrue)
c.Assert(cl.WaitAll(), qt.IsTrue)
}
5 changes: 3 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,10 @@ func (cl *Client) eachDhtServer(f func(DhtServer)) {
}

// Stops the client. All connections to peers are closed and all activity will come to a halt.
func (cl *Client) Close() (errs []error) {
func (cl *Client) Close() error {
var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
cl.lock()
var errs []error
for _, t := range cl.torrents {
err := t.close(&closeGroup)
if err != nil {
Expand All @@ -441,7 +442,7 @@ func (cl *Client) Close() (errs []error) {
cl.unlock()
cl.event.Broadcast()
closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
return
return errors.Join(errs...)
}

func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
Expand Down
18 changes: 14 additions & 4 deletions cmd/torrent-verify/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,37 @@ import (
"os"
"path/filepath"

"github.com/anacrolix/torrent/storage"

"github.com/anacrolix/tagflag"
"github.com/edsrzf/mmap-go"

"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mmap_span"
)

func mmapFile(name string) (mm mmap.MMap, err error) {
func mmapFile(name string) (mm storage.FileMapping, err error) {
f, err := os.Open(name)
if err != nil {
return
}
defer f.Close()
defer func() {
if err != nil {
f.Close()
}
}()
fi, err := f.Stat()
if err != nil {
return
}
if fi.Size() == 0 {
return
}
return mmap.MapRegion(f, -1, mmap.RDONLY, mmap.COPY, 0)
reg, err := mmap.MapRegion(f, -1, mmap.RDONLY, mmap.COPY, 0)
if err != nil {
return
}
return storage.WrapFileMapping(reg, f), nil
}

func verifyTorrent(info *metainfo.Info, root string) error {
Expand All @@ -40,7 +50,7 @@ func verifyTorrent(info *metainfo.Info, root string) error {
if err != nil {
return err
}
if int64(len(mm)) != file.Length {
if int64(len(mm.Bytes())) != file.Length {
return fmt.Errorf("file %q has wrong length", filename)
}
span.Append(mm)
Expand Down
16 changes: 10 additions & 6 deletions mmap_span/mmap_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,22 @@ import (
"io"
"sync"

"github.com/edsrzf/mmap-go"

"github.com/anacrolix/torrent/segments"
)

type Mmap interface {
Flush() error
Unmap() error
Bytes() []byte
}

type MMapSpan struct {
mu sync.RWMutex
mMaps []mmap.MMap
mMaps []Mmap
segmentLocater segments.Index
}

func (ms *MMapSpan) Append(mMap mmap.MMap) {
func (ms *MMapSpan) Append(mMap Mmap) {
ms.mMaps = append(ms.mMaps, mMap)
}

Expand Down Expand Up @@ -53,7 +57,7 @@ func (me *MMapSpan) InitIndex() {
if i == len(me.mMaps) {
return -1, false
}
l := int64(len(me.mMaps[i]))
l := int64(len(me.mMaps[i].Bytes()))
i++
return l, true
})
Expand All @@ -77,7 +81,7 @@ func copyBytes(dst, src []byte) int {

func (ms *MMapSpan) locateCopy(copyArgs func(remainingArgument, mmapped []byte) (dst, src []byte), p []byte, off int64) (n int) {
ms.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
mMapBytes := ms.mMaps[i][e.Start:]
mMapBytes := ms.mMaps[i].Bytes()[e.Start:]
// log.Printf("got segment %v: %v, copying %v, %v", i, e, len(p), len(mMapBytes))
_n := copyBytes(copyArgs(p, mMapBytes))
p = p[_n:]
Expand Down
1 change: 1 addition & 0 deletions storage/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
func TestShortFile(t *testing.T) {
td := t.TempDir()
s := NewFile(td)
defer s.Close()
info := &metainfo.Info{
Name: "a",
Length: 2,
Expand Down
10 changes: 8 additions & 2 deletions storage/issue95_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,31 @@ func testIssue95(t *testing.T, c ClientImpl) {
}
t1, err := c.OpenTorrent(i1, metainfo.HashBytes([]byte("a")))
require.NoError(t, err)
defer t1.Close()
i2 := &metainfo.Info{
Files: []metainfo.FileInfo{{Path: []string{"a"}}},
Pieces: make([]byte, 20),
}
t2, err := c.OpenTorrent(i2, metainfo.HashBytes([]byte("b")))
require.NoError(t, err)
defer t2.Close()
t2p := t2.Piece(i2.Piece(0))
assert.NoError(t, t1.Close())
assert.NotPanics(t, func() { t2p.Completion() })
}

func TestIssue95File(t *testing.T) {
td := t.TempDir()
testIssue95(t, NewFile(td))
cs := NewFile(td)
defer cs.Close()
testIssue95(t, cs)
}

func TestIssue95MMap(t *testing.T) {
td := t.TempDir()
testIssue95(t, NewMMap(td))
cs := NewMMap(td)
defer cs.Close()
testIssue95(t, cs)
}

func TestIssue95ResourcePieces(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion storage/issue96_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (

func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) ClientImplCloser) {
td := t.TempDir()
cs := NewClient(csf(td))
cic := csf(td)
defer cic.Close()
cs := NewClient(cic)
info := &metainfo.Info{
PieceLength: 1,
Files: []metainfo.FileInfo{{Path: []string{"a"}, Length: 1}},
Expand Down
81 changes: 61 additions & 20 deletions storage/mmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,21 +124,19 @@ func mMapTorrent(md *metainfo.Info, location string) (mms *mmap_span.MMapSpan, e
return
}
fileName := filepath.Join(location, safeName)
var mm mmap.MMap
var mm FileMapping
mm, err = mmapFile(fileName, miFile.Length)
if err != nil {
err = fmt.Errorf("file %q: %s", miFile.DisplayPath(md), err)
return
}
if mm != nil {
mms.Append(mm)
}
mms.Append(mm)
}
mms.InitIndex()
return
}

func mmapFile(name string, size int64) (ret mmap.MMap, err error) {
func mmapFile(name string, size int64) (_ FileMapping, err error) {
dir := filepath.Dir(name)
err = os.MkdirAll(dir, 0o750)
if err != nil {
Expand All @@ -150,7 +148,11 @@ func mmapFile(name string, size int64) (ret mmap.MMap, err error) {
if err != nil {
return
}
defer file.Close()
defer func() {
if err != nil {
file.Close()
}
}()
var fi os.FileInfo
fi, err = file.Stat()
if err != nil {
Expand All @@ -164,22 +166,61 @@ func mmapFile(name string, size int64) (ret mmap.MMap, err error) {
return
}
}
if size == 0 {
// Can't mmap() regions with length 0.
return
}
intLen := int(size)
if int64(intLen) != size {
err = errors.New("size too large for system")
return func() (ret mmapWithFile, err error) {
ret.f = file
if size == 0 {
// Can't mmap() regions with length 0.
return
}
intLen := int(size)
if int64(intLen) != size {
err = errors.New("size too large for system")
return
}
ret.mmap, err = mmap.MapRegion(file, intLen, mmap.RDWR, 0, 0)
if err != nil {
err = fmt.Errorf("error mapping region: %s", err)
return
}
if int64(len(ret.mmap)) != size {
panic(len(ret.mmap))
}
return
}()
}

// Combines a mmapped region and file into a storage Mmap abstraction, which handles closing the
// mmap file handle.
func WrapFileMapping(region mmap.MMap, file *os.File) FileMapping {
return mmapWithFile{
f: file,
mmap: region,
}
ret, err = mmap.MapRegion(file, intLen, mmap.RDWR, 0, 0)
if err != nil {
err = fmt.Errorf("error mapping region: %s", err)
return
}

type FileMapping = mmap_span.Mmap

// Handles closing the mmap's file handle (needed for Windows). Could be implemented differently by
// OS.
type mmapWithFile struct {
f *os.File
mmap mmap.MMap
}

func (m mmapWithFile) Flush() error {
return m.mmap.Flush()
}

func (m mmapWithFile) Unmap() (err error) {
if m.mmap != nil {
err = m.mmap.Unmap()
}
if int64(len(ret)) != size {
panic(len(ret))
return errors.Join(err, m.f.Close())
}

func (m mmapWithFile) Bytes() []byte {
if m.mmap == nil {
return nil
}
return
return m.mmap
}
25 changes: 25 additions & 0 deletions storage/mmap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package storage

import (
"testing"

qt "github.com/frankban/quicktest"

"github.com/anacrolix/torrent/internal/testutil"
)

func TestMmapWindows(t *testing.T) {
c := qt.New(t)
dir, mi := testutil.GreetingTestTorrent()
cs := NewMMap(dir)
defer func() {
c.Check(cs.Close(), qt.IsNil)
}()
info, err := mi.UnmarshalInfo()
c.Assert(err, qt.IsNil)
ts, err := cs.OpenTorrent(&info, mi.HashInfoBytes())
c.Assert(err, qt.IsNil)
defer func() {
c.Check(ts.Close(), qt.IsNil)
}()
}
5 changes: 4 additions & 1 deletion t.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ func (t *Torrent) Drop() {
defer wg.Wait()
t.cl.lock()
defer t.cl.unlock()
t.cl.dropTorrent(t.infoHash, &wg)
err := t.cl.dropTorrent(t.infoHash, &wg)
if err != nil {
panic(err)
}
}

// Number of bytes of the entire torrent we have completed. This is the sum of
Expand Down

0 comments on commit dee985f

Please sign in to comment.