forked from ethersphere/bee
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuilder.go
124 lines (113 loc) · 4.54 KB
/
builder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package builder
import (
"context"
"errors"
"fmt"
"io"
"github.com/ethersphere/bee/v2/pkg/encryption"
"github.com/ethersphere/bee/v2/pkg/file/pipeline"
"github.com/ethersphere/bee/v2/pkg/file/pipeline/bmt"
enc "github.com/ethersphere/bee/v2/pkg/file/pipeline/encryption"
"github.com/ethersphere/bee/v2/pkg/file/pipeline/feeder"
"github.com/ethersphere/bee/v2/pkg/file/pipeline/hashtrie"
"github.com/ethersphere/bee/v2/pkg/file/pipeline/store"
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/swarm"
)
// NewPipelineBuilder returns the appropriate pipeline according to the specified parameters
func NewPipelineBuilder(ctx context.Context, s storage.Putter, encrypt bool, rLevel redundancy.Level) pipeline.Interface {
if encrypt {
return newEncryptionPipeline(ctx, s, rLevel)
}
return newPipeline(ctx, s, rLevel)
}
// newPipeline creates a standard pipeline that only hashes content with BMT to create
// a merkle-tree of hashes that represent the given arbitrary size byte stream. Partial
// writes are supported. The pipeline flow is: Data -> Feeder -> BMT -> Storage -> HashTrie.
func newPipeline(ctx context.Context, s storage.Putter, rLevel redundancy.Level) pipeline.Interface {
pipeline := newShortPipelineFunc(ctx, s)
tw := hashtrie.NewHashTrieWriter(ctx, swarm.HashSize, redundancy.New(rLevel, false, pipeline), pipeline, s, rLevel)
lsw := store.NewStoreWriter(ctx, s, tw)
b := bmt.NewBmtWriter(lsw)
return feeder.NewChunkFeederWriter(swarm.ChunkSize, b)
}
// newShortPipelineFunc returns a constructor function for an ephemeral hashing pipeline
// needed by the hashTrieWriter.
func newShortPipelineFunc(ctx context.Context, s storage.Putter) func() pipeline.ChainWriter {
return func() pipeline.ChainWriter {
lsw := store.NewStoreWriter(ctx, s, nil)
return bmt.NewBmtWriter(lsw)
}
}
// newEncryptionPipeline creates an encryption pipeline that encrypts using CTR, hashes content with BMT to create
// a merkle-tree of hashes that represent the given arbitrary size byte stream. Partial
// writes are supported. The pipeline flow is: Data -> Feeder -> Encryption -> BMT -> Storage -> HashTrie.
// Note that the encryption writer will mutate the data to contain the encrypted span, but the span field
// with the unencrypted span is preserved.
func newEncryptionPipeline(ctx context.Context, s storage.Putter, rLevel redundancy.Level) pipeline.Interface {
tw := hashtrie.NewHashTrieWriter(ctx, swarm.HashSize+encryption.KeyLength, redundancy.New(rLevel, true, newShortPipelineFunc(ctx, s)), newShortEncryptionPipelineFunc(ctx, s), s, rLevel)
lsw := store.NewStoreWriter(ctx, s, tw)
b := bmt.NewBmtWriter(lsw)
enc := enc.NewEncryptionWriter(encryption.NewChunkEncrypter(), b)
return feeder.NewChunkFeederWriter(swarm.ChunkSize, enc)
}
// newShortEncryptionPipelineFunc returns a constructor function for an ephemeral hashing pipeline
// needed by the hashTrieWriter.
func newShortEncryptionPipelineFunc(ctx context.Context, s storage.Putter) func() pipeline.ChainWriter {
return func() pipeline.ChainWriter {
lsw := store.NewStoreWriter(ctx, s, nil)
b := bmt.NewBmtWriter(lsw)
return enc.NewEncryptionWriter(encryption.NewChunkEncrypter(), b)
}
}
// FeedPipeline feeds the pipeline with the given reader until EOF is reached.
// It returns the cryptographic root hash of the content.
func FeedPipeline(ctx context.Context, pipeline pipeline.Interface, r io.Reader) (addr swarm.Address, err error) {
data := make([]byte, swarm.ChunkSize)
for {
c, err := r.Read(data)
if err != nil {
if errors.Is(err, io.EOF) {
if c > 0 {
cc, err := pipeline.Write(data[:c])
if err != nil {
return swarm.ZeroAddress, err
}
if cc < c {
return swarm.ZeroAddress, fmt.Errorf("pipeline short write: %d mismatches %d", cc, c)
}
}
break
} else {
return swarm.ZeroAddress, err
}
}
cc, err := pipeline.Write(data[:c])
if err != nil {
return swarm.ZeroAddress, err
}
if cc < c {
return swarm.ZeroAddress, fmt.Errorf("pipeline short write: %d mismatches %d", cc, c)
}
select {
case <-ctx.Done():
return swarm.ZeroAddress, ctx.Err()
default:
}
}
select {
case <-ctx.Done():
return swarm.ZeroAddress, ctx.Err()
default:
}
sum, err := pipeline.Sum()
if err != nil {
return swarm.ZeroAddress, err
}
newAddress := swarm.NewAddress(sum)
return newAddress, nil
}