Skip to content

Commit 29fe629

Browse files
committed
Intermediate commit, working my way through the code, step by step. Didn't even try to compile it yet
1 parent ffafb2e commit 29fe629

File tree

2 files changed

+144
-52
lines changed

2 files changed

+144
-52
lines changed

_delta_apply.c

+141-42
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,24 @@ typedef uchar bool;
1414
const ull gDIV_grow_by = 100;
1515

1616

17+
// DELTA STREAM ACCESS
18+
///////////////////////
19+
inline
20+
ull msb_size(const uchar** datap, const uchar* top)
21+
{
22+
const uchar *data = *datap;
23+
ull cmd, size = 0;
24+
uint i = 0;
25+
do {
26+
cmd = *data++;
27+
size |= (cmd & 0x7f) << i;
28+
i += 7;
29+
} while (cmd & 0x80 && data < top);
30+
*datap = data;
31+
return size;
32+
}
33+
34+
1735
// DELTA INFO
1836
/////////////
1937
typedef struct {
@@ -22,6 +40,65 @@ typedef struct {
2240
} DeltaInfo;
2341

2442

43+
// TOP LEVEL STREAM INFO
44+
/////////////////////////////
45+
typedef struct {
46+
const uchar* tds;
47+
Py_ssize_t* tdslen;
48+
Py_ssize_t target_size; // size of the target buffer which can hold all data
49+
PyObject* parent_object;
50+
} ToplevelStreamInfo;
51+
52+
53+
void TSI_init(ToplevelStreamInfo* info)
54+
{
55+
info->tds = 0;
56+
info->tdslen = 0;
57+
info->target_size = 0;
58+
info->parent_object = 0;
59+
60+
}
61+
62+
void TSI_destroy(ToplevelStreamInfo* info)
63+
{
64+
if (info->parent_object){
65+
Py_DECREF(info->parent_object);
66+
info->parent_object = 0;
67+
} else if (info->tds){
68+
PyMem_Free(info->tds);
69+
}
70+
}
71+
72+
// initialize our set stream to point to the first chunk
73+
// Fill in the header information, which is the base and target size
74+
void TSI_init_stream(ToplevelStreamInfo* info)
75+
{
76+
assert(info->tds && info->tdslen)
77+
78+
// init stream
79+
const uchar* tdsend = info->tds + info->tdslen;
80+
msb_size(&info->tds, tdsend);
81+
info->target_size = msb_size(&info->tds, tdsend);
82+
}
83+
84+
// duplicate the data currently owned by the parent object drop its refcount
85+
// return 1 on success
86+
bool TSI_copy_stream_from_object(ToplevelStreamInfo* info)
87+
{
88+
assert(info.parent_object);
89+
90+
uchar* ptmp = PyMem_Malloc(info.tdslen);
91+
if (!ptmp){
92+
return 0;
93+
}
94+
memcpy((void*)ptmp, info.tds, info.tdslen);
95+
tds = ptmp;
96+
Py_DECREF(info.parent_object);
97+
info.parent_object = 0;
98+
99+
return 1;
100+
}
101+
25102
// DELTA CHUNK
26103
////////////////
27104
// Internal Delta Chunk Objects
@@ -452,7 +529,7 @@ bool DIV_connect_with_base(DeltaInfoVector* tdcv, const DeltaInfoVector* bdcv)
452529
typedef struct {
453530
PyObject_HEAD
454531
// -----------
455-
DeltaInfoVector vec;
532+
ToplevelStreamInfo istream;
456533

457534
} DeltaChunkList;
458535

@@ -465,34 +542,20 @@ int DCL_init(DeltaChunkList*self, PyObject *args, PyObject *kwds)
465542
return -1;
466543
}
467544

468-
DIV_init(&self->vec, 0);
545+
TSI_init(&self->istream, 0);
469546
return 0;
470547
}
471548

472549
static
473550
void DCL_dealloc(DeltaChunkList* self)
474551
{
475-
DIV_destroy(&(self->vec));
476-
}
477-
478-
static
479-
PyObject* DCL_len(DeltaChunkList* self)
480-
{
481-
return PyLong_FromUnsignedLongLong(DIV_len(&self->vec));
482-
}
483-
484-
static inline
485-
ull DCL_rbound(DeltaChunkList* self)
486-
{
487-
if (DIV_empty(&self->vec))
488-
return 0;
489-
return DIV_rbound(&self->vec);
552+
TSI_destroy(&(self->istream));
490553
}
491554

492555
static
493556
PyObject* DCL_py_rbound(DeltaChunkList* self)
494557
{
495-
return PyLong_FromUnsignedLongLong(DCL_rbound(self));
558+
return PyLong_FromUnsignedLongLong(self->istream->target_size);
496559
}
497560

498561
// Write using a write function, taking remaining bytes from a base buffer
@@ -535,7 +598,6 @@ PyObject* DCL_apply(DeltaChunkList* self, PyObject* args)
535598

536599
static PyMethodDef DCL_methods[] = {
537600
{"apply", (PyCFunction)DCL_apply, METH_VARARGS, "Apply the given iterable of delta streams" },
538-
{"__len__", (PyCFunction)DCL_len, METH_NOARGS, NULL},
539601
{"rbound", (PyCFunction)DCL_py_rbound, METH_NOARGS, NULL},
540602
{NULL} /* Sentinel */
541603
};
@@ -596,21 +658,6 @@ DeltaChunkList* DCL_new_instance(void)
596658
return dcl;
597659
}
598660

599-
inline
600-
ull msb_size(const uchar** datap, const uchar* top)
601-
{
602-
const uchar *data = *datap;
603-
ull cmd, size = 0;
604-
uint i = 0;
605-
do {
606-
cmd = *data++;
607-
size |= (cmd & 0x7f) << i;
608-
i += 7;
609-
} while (cmd & 0x80 && data < top);
610-
*datap = data;
611-
return size;
612-
}
613-
614661
static PyObject* connect_deltas(PyObject *self, PyObject *dstreams)
615662
{
616663
// obtain iterator
@@ -626,22 +673,71 @@ static PyObject* connect_deltas(PyObject *self, PyObject *dstreams)
626673
}
627674

628675
DeltaInfoVector dcv;
629-
DeltaInfoVector tdcv;
676+
ToplevelStreamInfo tdsinfo;
677+
TSI_init(&tdsinfo);
630678
DIV_init(&dcv, 100); // should be enough to keep the average text file
631-
DIV_init(&tdcv, 0);
632679

633-
unsigned int dsi = 0;
634-
PyObject* ds = 0;
680+
681+
// GET TOPLEVEL DELTA STREAM
635682
int error = 0;
636-
for (ds = PyIter_Next(stream_iter), dsi = 0; ds != NULL; ++dsi, ds = PyIter_Next(stream_iter))
683+
PyObject* ds = 0;
684+
unsigned int dsi = 0;
685+
ds = PyIter_Next(stream_iter);
686+
if (!ds){
687+
error = 1;
688+
goto _error;
689+
}
690+
691+
dsi += 1;
692+
tdsinfo.parent_object = PyObject_CallMethod(ds, "read", 0);
693+
if (!PyObject_CheckReadBuffer(tdsinfo.parent_object)){
694+
Py_DECREF(ds);
695+
error = 1;
696+
goto _error;
697+
}
698+
699+
PyObject_AsReadBuffer(tdsinfo.parent_object, (const void**)&tdsinfo.tds, &tdsinfo.tdslen);
700+
if (tdslen > pow(2, 32)){
701+
// parent object is deallocated by info structure
702+
Py_DECREF(ds);
703+
PyErr_SetString(PyExc_RuntimeError("Cannot handle deltas larger than 4GB"));
704+
tdsinfo.tdb = 0;
705+
706+
error = 1;
707+
goto _error;
708+
}
709+
Py_DECREF(ds);
710+
711+
// INTEGRATE ANCESTOR DELTA STREAMS
712+
PyObject* db = 0;
713+
TSI_init_stream(&tdsinfo, tdb);
714+
715+
716+
for (ds = PyIter_Next(stream_iter); ds != NULL; ++dsi, ds = PyIter_Next(stream_iter))
637717
{
638-
PyObject* db = PyObject_CallMethod(ds, "read", 0);
718+
// Its important to initialize this before the next block which can jump
719+
// to code who needs this to exist !
720+
PyObject* db = 0;
721+
722+
// When processing the first delta, we know we will have to alter the tds
723+
// Hence we copy it and deallocate the parent object
724+
if (ds == 1) {
725+
if (!TSI_copy_stream_from_object(&tdsinfo)){
726+
PyErr_SetString(PyExc_RuntimeError, "Could not allocate memory to copy toplevel buffer");
727+
// info structure takes care of the parent_object
728+
error = 1;
729+
goto loop_end;
730+
}
731+
}
732+
733+
db = PyObject_CallMethod(ds, "read", 0);
639734
if (!PyObject_CheckReadBuffer(db)){
640735
error = 1;
641736
PyErr_SetString(PyExc_RuntimeError, "Returned buffer didn't support the buffer protocol");
642737
goto loop_end;
643738
}
644739

740+
// Fill the stream info structure
645741
const uchar* data;
646742
Py_ssize_t dlen;
647743
PyObject_AsReadBuffer(db, (const void**)&data, &dlen);
@@ -778,10 +874,13 @@ static PyObject* connect_deltas(PyObject *self, PyObject *dstreams)
778874
}
779875
}// END for each stream object
780876

781-
if (dsi == 0 && ! error){
877+
if (dsi == 0){
782878
PyErr_SetString(PyExc_ValueError, "No streams provided");
783879
}
784880

881+
882+
_error:
883+
785884
if (stream_iter != dstreams){
786885
Py_DECREF(stream_iter);
787886
}
@@ -800,7 +899,7 @@ static PyObject* connect_deltas(PyObject *self, PyObject *dstreams)
800899
error = 1;
801900
} else {
802901
// Plain copy, don't deallocate
803-
dcl->vec = tdcv;
902+
dcl->istream = tdsinfo;
804903
}
805904

806905
if (error){

stream.py

+3-10
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ def _set_cache_too_slow_without_c(self, attr):
349349

350350
# call len directly, as the (optional) c version doesn't implement the sequence
351351
# protocol
352-
if dcl.__len__() == 0:
352+
if dcl.rbound() == 0:
353353
self._size = 0
354354
self._mm_target = allocate_memory(0)
355355
return
@@ -367,15 +367,6 @@ def _set_cache_too_slow_without_c(self, attr):
367367

368368
self._mm_target.seek(0)
369369

370-
def _set_cache_(self, attr):
371-
"""Determine which version to use depending on the configuration of the deltas
372-
:note: we are only called if we have the performance module"""
373-
# otherwise it depends on the amount of memory to shift around
374-
if len(self._dstreams) > 1 and self._bstream.size < 150000:
375-
return self._set_cache_too_slow_without_c(attr)
376-
else:
377-
return self._set_cache_brute_(attr)
378-
379370
def _set_cache_brute_(self, attr):
380371
"""If we are here, we apply the actual deltas"""
381372

@@ -456,6 +447,8 @@ def _set_cache_brute_(self, attr):
456447
#{ Configuration
457448
if not has_perf_mod:
458449
_set_cache_ = _set_cache_brute_
450+
else:
451+
_set_cache_ = _set_cache_too_slow_without_c
459452

460453
#} END configuration
461454

0 commit comments

Comments
 (0)