-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathbuffer.go
261 lines (213 loc) · 7.59 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
package tiledb
/*
#include <tiledb/tiledb.h>
#include <stdlib.h>
*/
import "C"
import (
"bytes"
"errors"
"fmt"
"io"
"math"
"runtime"
"unsafe"
)
// bufferHandleState contains a native TileDB buffer handle, and the resources that must be released
// alongside it.
// Cleanup-based finalizers do not run in a predetermined order, so this type exists to tie the lifetime
// of a buffer and its pinner together.
type bufferHandleState struct {
ptr *C.tiledb_buffer_t
pinner runtime.Pinner
}
func freeCapiBufferState(p unsafe.Pointer) {
h := (*bufferHandleState)(p)
if h.ptr != nil {
C.tiledb_buffer_free(&h.ptr)
}
h.pinner.Unpin()
}
type bufferHandle struct {
// Important: this capiHandle stores a pointer to bufferHandleState, not to tiledb_buffer_t!
*capiHandle
}
func newBufferHandle(ptr *C.tiledb_buffer_t) bufferHandle {
state := &bufferHandleState{ptr: ptr}
return bufferHandle{newCapiHandle(unsafe.Pointer(state), freeCapiBufferState)}
}
func (x bufferHandle) getState() *bufferHandleState {
return (*bufferHandleState)(x.capiHandle.Get())
}
func (x bufferHandle) Get() *C.tiledb_buffer_t {
return x.getState().ptr
}
func (x bufferHandle) Pin(p any) {
x.getState().pinner.Pin(p)
}
// Buffer A generic Buffer object used by some TileDB APIs
type Buffer struct {
tiledbBuffer bufferHandle
context *Context
pinner runtime.Pinner
}
func newBufferFromHandle(context *Context, handle bufferHandle) *Buffer {
return &Buffer{tiledbBuffer: handle, context: context}
}
// NewBuffer allocates a new buffer.
func NewBuffer(context *Context) (*Buffer, error) {
if context == nil {
return nil, errors.New("error creating tiledb buffer, context is nil")
}
var bufferPtr *C.tiledb_buffer_t
ret := C.tiledb_buffer_alloc(context.tiledbContext.Get(), &bufferPtr)
runtime.KeepAlive(context)
if ret != C.TILEDB_OK {
return nil, fmt.Errorf("error creating tiledb buffer: %w", context.LastError())
}
return newBufferFromHandle(context, newBufferHandle(bufferPtr)), nil
}
// Free releases the internal TileDB core data that was allocated on the C heap.
// It is automatically called when this object is garbage collected, but can be
// called earlier to manually release memory if needed. Free is idempotent and
// can safely be called many times on the same object; if it has already
// been freed, it will not be freed again.
func (b *Buffer) Free() {
b.tiledbBuffer.Free()
}
// Context exposes the internal TileDB context used to initialize the buffer.
func (b *Buffer) Context() *Context {
return b.context
}
// SetType sets the buffer datatype.
func (b *Buffer) SetType(datatype Datatype) error {
ret := C.tiledb_buffer_set_type(b.context.tiledbContext.Get(), b.tiledbBuffer.Get(), C.tiledb_datatype_t(datatype))
runtime.KeepAlive(b)
if ret != C.TILEDB_OK {
return fmt.Errorf("error setting datatype for tiledb buffer: %w", b.context.LastError())
}
return nil
}
// Type returns the buffer datatype.
func (b *Buffer) Type() (Datatype, error) {
var bufferType C.tiledb_datatype_t
ret := C.tiledb_buffer_get_type(b.context.tiledbContext.Get(), b.tiledbBuffer.Get(), &bufferType)
runtime.KeepAlive(b)
if ret != C.TILEDB_OK {
return 0, fmt.Errorf("error getting tiledb buffer type: %w", b.context.LastError())
}
return Datatype(bufferType), nil
}
// Serialize returns a copy of the bytes in the buffer.
//
// Deprecated: Use WriteTo or ReadAt instead for increased performance.
func (b *Buffer) Serialize(serializationType SerializationType) ([]byte, error) {
bs, err := b.dataCopy()
if err != nil {
return nil, err
}
switch serializationType {
case TILEDB_CAPNP:
// The entire byte array contains Cap'nP data. Don't bother it.
case TILEDB_JSON:
// The data might be a null-terminated string. Strip off the terminator.
bs = bytes.TrimSuffix(bs, []byte{0})
default:
return nil, fmt.Errorf("unsupported serialization type: %v", serializationType)
}
return bs, nil
}
// ReadAt writes the contents of a Buffer at a given offset to a slice.
func (b *Buffer) ReadAt(p []byte, off int64) (int, error) {
if off < 0 {
return 0, errors.New("offset cannot be negative")
}
var cbuffer unsafe.Pointer // b must be kept alive while cbuffer is being accessed.
var csize C.uint64_t
ret := C.tiledb_buffer_get_data(b.context.tiledbContext.Get(), b.tiledbBuffer.Get(), &cbuffer, &csize)
if ret != C.TILEDB_OK {
return 0, fmt.Errorf("error getting tiledb buffer data: %w", b.context.LastError())
}
if uintptr(off) >= uintptr(csize) || cbuffer == nil {
// Match ReaderAt behavior of os.File and fail with io.EOF if the offset is greater or equal to the size.
return 0, io.EOF
}
availableBytes := uint64(csize) - uint64(off)
sizeToRead := min(math.MaxInt, int(availableBytes))
readSize := copy(p, unsafe.Slice((*byte)(unsafe.Pointer(uintptr(cbuffer)+uintptr(off))), sizeToRead))
runtime.KeepAlive(b)
var err error
if int64(readSize)+off == int64(csize) {
err = io.EOF
}
return readSize, err
}
// WriteTo writes the contents of a Buffer to an io.Writer.
func (b *Buffer) WriteTo(w io.Writer) (int64, error) {
var cbuffer unsafe.Pointer // b must be kept alive while cbuffer is being accessed.
var csize C.uint64_t
ret := C.tiledb_buffer_get_data(b.context.tiledbContext.Get(), b.tiledbBuffer.Get(), &cbuffer, &csize)
if ret != C.TILEDB_OK {
return 0, fmt.Errorf("error getting tiledb buffer data: %w", b.context.LastError())
}
if cbuffer == nil || csize == 0 {
return 0, nil
}
remaining := int64(csize)
// Because io.Writer supports writing up to 2GB of data at a time, we have to use a loop
// for the bigger buffers.
for remaining > 0 {
writeSize := min(math.MaxInt, int(remaining))
// Construct a slice from the buffer's data without copying it.
n, err := w.Write(unsafe.Slice((*byte)(unsafe.Pointer(uintptr(cbuffer)+uintptr(csize)-uintptr(remaining))), writeSize))
runtime.KeepAlive(b)
remaining -= int64(n)
if err != nil {
return int64(csize) - remaining, fmt.Errorf("error writing buffer to writer: %w", err)
}
}
return int64(csize), nil
}
// Static assert that Buffer implements io.WriterTo.
var _ io.WriterTo = (*Buffer)(nil)
var _ io.ReaderAt = (*Buffer)(nil)
// SetBuffer sets the buffer to point at the given Go slice. The memory is now
// Go-managed.
func (b *Buffer) SetBuffer(buffer []byte) error {
cbuffer := unsafe.Pointer(unsafe.SliceData(buffer))
b.tiledbBuffer.Pin(cbuffer)
ret := C.tiledb_buffer_set_data(b.context.tiledbContext.Get(), b.tiledbBuffer.Get(), cbuffer, C.uint64_t(len(buffer)))
runtime.KeepAlive(b)
if ret != C.TILEDB_OK {
return fmt.Errorf("error setting tiledb buffer: %w", b.context.LastError())
}
return nil
}
// dataCopy returns a copy of the bytes stored in the buffer.
func (b *Buffer) dataCopy() ([]byte, error) {
var cbuffer unsafe.Pointer // b must be kept alive while cbuffer is being accessed.
var csize C.uint64_t
ret := C.tiledb_buffer_get_data(b.context.tiledbContext.Get(), b.tiledbBuffer.Get(), &cbuffer, &csize)
if ret != C.TILEDB_OK {
return nil, fmt.Errorf("error getting tiledb buffer data: %w", b.context.LastError())
}
if cbuffer == nil {
return nil, nil
}
if csize > math.MaxInt32 {
return nil, fmt.Errorf("TileDB's buffer (%d) larger than maximum allowed CGo buffer (%d)", csize, math.MaxInt32)
}
cpy := C.GoBytes(cbuffer, C.int(csize))
runtime.KeepAlive(b)
return cpy, nil
}
func (b *Buffer) Len() (uint64, error) {
var cbuffer unsafe.Pointer
var csize C.uint64_t
ret := C.tiledb_buffer_get_data(b.context.tiledbContext.Get(), b.tiledbBuffer.Get(), &cbuffer, &csize)
runtime.KeepAlive(b)
if ret != C.TILEDB_OK {
return 0, fmt.Errorf("error getting tiledb buffer data: %w", b.context.LastError())
}
return uint64(csize), nil
}