Skip to content

Commit

Permalink
fixing bugs in drivers
Browse files Browse the repository at this point in the history
  • Loading branch information
elaeon committed Mar 23, 2019
1 parent 6236c86 commit 840d119
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 64 deletions.
3 changes: 1 addition & 2 deletions src/dama/data/ds.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def write_metadata(self):
metadata["group_name"] = "s/n" if self.group_name is None else self.group_name
metadata["is_valid"] = True
metadata.set_schema(dtypes, unique_key=["hash", ["path", "name", "driver_name", "group_name"]])
metadata.insert_update_data(keys=["path", "name", "driver_name", "group_name"])
metadata.insert_update_data(keys=["hash"]) # ["path", "name", "driver_name", "group_name"])

def calc_hash(self, with_hash: str) -> str:
hash_obj = Hash(hash_fn=with_hash)
Expand Down Expand Up @@ -469,7 +469,6 @@ def load(hash_hex: str, metadata_driver: AbsDriver, metadata_path: str = None, a
metadata_driver.login.table,
metadata_driver.url))
else:
print(query, hash_hex)
row = data[0]
data_driver = locate(row[1])
path = row[2]
Expand Down
53 changes: 26 additions & 27 deletions src/dama/data/it.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,28 @@ def assign_struct_array(it, type_elem, start_i, end_i, dtype, dims):
if type_elem == np.ndarray and len(stc_arr.shape) == 1:
for i, row in enumerate(it):
stc_arr[i] = tuple(row)
elif len(dtype) == 1:
elif len(dtype) == 1 and not isnamedtupleinstance(type_elem):
group = dtype.names[0]
for i, row in enumerate(it):
if isnamedtupleinstance(row):
stc_arr[i] = row
else:
stc_arr[group][i] = row
elif type_elem == str or type_elem == np.str_ or type_elem == int or type_elem == np.string_\
or type_elem == np.object or type_elem == object or type_elem == float:
for (group, (_, _)), row in zip(dtype.fields.items(), it):
stc_arr[group] = row
stc_arr[group][i] = row
elif type_elem == list or type_elem == tuple:
elems = defaultdict(list)
for row in it:
for i, group in enumerate(dtype.names):
elems[group].append(row[i])

for group, data in elems.items():
stc_arr[group] = data
elif isnamedtupleinstance(type_elem):
elems = defaultdict(list)
for row in it:
for i, group in enumerate(dtype.names):
elems[group].append(getattr(row, group))

for group, data in elems.items():
stc_arr[group] = data
else:
for i, row in enumerate(it):
stc_arr[i] = row
raise NotImplementedError
return stc_arr


Expand Down Expand Up @@ -194,7 +202,7 @@ def __next__(self):
else:
return next(self.data)

def to_iter(self, raw: bool = False):
def to_iter(self):
groups = self.groups
for batch in self:
row = []
Expand Down Expand Up @@ -269,7 +277,7 @@ def chunk_taste(self, length, dtypes) -> None:
self.dtypes = np.dtype([(DEFAUL_GROUP_NAME, None)])
self.dtype = None
else:
self.type_elem = type(elem)
self.type_elem = type(elem) if not isinstance(elem, numbers.Number) else numbers.Number
self.dtypes = self._define_dtypes(elem, dtypes)
self.shape = self._define_shape(elem, length)
self.dtype = max_dtype(self.dtypes)
Expand Down Expand Up @@ -402,15 +410,7 @@ class BatchIterator(BaseIterator):

def __init__(self, it: Iterator, chunks: Chunks = None, static: bool = False, start_i: int = 0):
super(BatchIterator, self).__init__(it, dtypes=it.dtypes, length=it.length, type_elem=self.type_elem)
#if batch_group is StcArrayGroup:
# self.shape = StcArrayGroup.fit_shape(it.shape)
#else:
self.shape = it.shape
#if len(self.groups) == 1:
# print(chunks, "+++++++")
# print(self.shape.to_chunks(chunks.length))
# self.chunksize = chunks
#else:
self.chunksize = self.shape.to_chunks(chunks.length)
self.batch_size = self.chunksize.length
self.start_i = start_i
Expand Down Expand Up @@ -512,12 +512,11 @@ def _cycle_it(self):
for elem in self:
yield elem

def to_iter(self, raw: bool = False):
if raw is True:
for slice_obj in self:
def to_iter(self):
for slice_obj in self:
if isinstance(slice_obj.batch, Manager):
yield slice_obj.batch.to_ndarray()
else:
for slice_obj in self:
else:
yield slice_obj.batch

@classmethod
Expand Down Expand Up @@ -551,7 +550,7 @@ def cycle(self):
length=np.inf)

def only_data(self):
return BatchIterator.from_batchs(self.to_iter(raw=True), dtypes=self.dtypes, from_batch_size=self.batch_size,
return BatchIterator.from_batchs(self.to_iter(), dtypes=self.dtypes, from_batch_size=self.batch_size,
length=self.length)


Expand Down
6 changes: 0 additions & 6 deletions src/dama/drivers/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ def manager(self, chunks: Chunks):
groups = [(group, self[group]) for group in self.groups]
return DaGroupDict.convert(groups, chunks=chunks)

def absgroup(self):
pass

def open(self):
if self.conn is None:
self.conn = h5py.File(self.url, mode=self.mode)
Expand Down Expand Up @@ -130,9 +127,6 @@ def manager(self, chunks: Chunks):
groups = [(group, self[group]) for group in self.groups]
return DaGroupDict.convert(groups, chunks=chunks)

def absgroup(self):
pass

def open(self):
if self.conn is None:
self.conn = zarr.open(self.url, mode=self.mode)
Expand Down
3 changes: 2 additions & 1 deletion src/dama/groups/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dama.data.it import Iterator, BatchIterator
from dama.utils.miscellaneous import filter_dtypes
from psycopg2.extras import execute_values
import numbers
import uuid


Expand All @@ -27,7 +28,7 @@ def __getitem__(self, item):
return Table(self.conn, dtypes, name=self.name, query_parts=query_parts)
elif isinstance(item, list) or isinstance(item, tuple):
it = Iterator(item)
if it.type_elem == int:
if it.type_elem == numbers.Number:
dtypes = self.dtypes
query_parts["slice"] = [slice(index, index + 1) for index in item]
elif it.type_elem == slice:
Expand Down
6 changes: 3 additions & 3 deletions src/dama/groups/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ def insert(self, data, chunks=None):
for row in data:
shape = row.batch.shape.to_tuple()
if len(shape) == 1 and num_groups > 1:
value = row.batch.to_df.values # .to_ndarray().reshape(1, -1)
value = row.batch.to_df().values
elif len(shape) == 1 and num_groups == 1:
value = row.batch.to_df().values # .to_ndarray().reshape(-1, 1)
value = row.batch.to_df().values
else:
value = row.batch.to_ndarray(object)
cur.executemany(insert_str, value)
Expand All @@ -105,7 +105,7 @@ def update(self, values, item):
query = "UPDATE {name} SET {columns_val} WHERE ID = ?".format(name=self.name,
columns_val=",".join(columns))
cur = self.conn.cursor()
values_list = list(values) + [item+1]
values_list = list(values) + [item + 1]
cur.execute(query, values_list)
self.conn.commit()
cur.close()
Expand Down
4 changes: 0 additions & 4 deletions src/dama/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,6 @@ def get_dataset(self) -> Data:
data_hash = metadata.query(query,
(self.model_name, self.model_version, self.module_cls_name(),
group_name, self.base_path))
print(data_hash)
print(self.metadata_path)
print(query)
print(self.model_name, self.model_version, self.module_cls_name(), group_name, self.base_path)
if len(data_hash) > 0:
driver = Sqlite(login=Login(table=settings["data_tag"]), path=self.metadata_path)
with Data.load(data_hash[0][0], metadata_driver=driver) as dataset:
Expand Down
16 changes: 8 additions & 8 deletions src/dama/utils/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,27 +197,27 @@ def set_schema(self, dtypes: np.dtype, unique_key: list = None):
def insert_data(self):
try:
data = [self[group] for group in self.driver.groups]
print(data, "IIIIMETADATA")
self.driver.insert(data)
except sqlite3.IntegrityError as e:
log.error(str(e) + " in " + self.driver.url)

def insert_update_data(self, keys=None):
try:
data = [self[group] for group in self.driver.groups]
print(data, "IIIIMETADATA")
data = [[self[group] for group in self.driver.groups]]
self.driver[-1] = data
except sqlite3.IntegrityError as e:
log.warning(e)
columns = ["{col}=?".format(col=group) for group in keys]
query = "SELECT id FROM {name} WHERE {columns_val}".format(name=self.name,
columns_val=" AND ".join(columns))
values = tuple([self[key] for key in keys])
data = self.query(query, values)
if len(data) > 0:
index = data[0][0]-1
values = [self[group] for group in self.driver.groups]
self.driver[index] = values # .update(values, index)
query_result = self.query(query, values)
if len(query_result) > 0:
index = query_result[0][0]-1
data = [self[group] for group in self.driver.groups]
self.driver[index] = data # .update(data, index)
else:
raise Exception("This dataset {} already exists with a distinct name.".format(values))

def query(self, query: str, values: tuple) -> tuple:
try:
Expand Down
2 changes: 1 addition & 1 deletion src/dama/utils/numeric_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def nested_shape_aux(x) -> list:
if len(shapes) > 0:
dim.extend(shapes[0])
return dim
elif isinstance(x, numbers.Number) or isinstance(x, str):
else:
return []


Expand Down
10 changes: 5 additions & 5 deletions tests/test_clf.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_load_meta(self):

def test_empty_load(self):
with Data(name="test") as dataset, \
Data(name="test_cv_emp", driver=HDF5(mode="w", path=TMP_PATH), metadata_path=TMP_PATH) as ds:
Data(name="test_cv_lm", driver=HDF5(mode="w", path=TMP_PATH), metadata_path=TMP_PATH) as ds:
dataset.from_data({"x": self.X, "y": self.Y})
classif = RandomForest(metadata_path=TMP_PATH)
cv = CV(group_data="x", group_target="y", train_size=.7, valid_size=.1)
Expand All @@ -83,7 +83,6 @@ def test_empty_load(self):
data_validation_group="validation_x", target_validation_group="validation_y")
classif.save(name="test", path=TMP_PATH, model_version="1")
ds_hash = classif.ds.hash
print(ds_hash)
dataset.destroy()

with RandomForest.load(model_name="test", path=TMP_PATH, model_version="1", metadata_path=TMP_PATH) as classif:
Expand Down Expand Up @@ -265,10 +264,11 @@ def test_predict(self):

class TestModelVersion(unittest.TestCase):
def test_load_add_version(self):
np.random.seed(0)
x = np.random.rand(100)
y = x > .5
with Data(name="test", driver=HDF5(mode="w", path=TMP_PATH), metadata_path=TMP_PATH) as dataset, \
Data(name="test_cv", driver=HDF5(mode="w", path=TMP_PATH), metadata_path=TMP_PATH) as ds:
Data(name="test_cv_keras", driver=HDF5(mode="w", path=TMP_PATH), metadata_path=TMP_PATH) as ds:
dataset.from_data({"x": x.reshape(-1, 1), "y": y})

classif = RandomForest(metadata_path=TMP_PATH)
Expand Down Expand Up @@ -304,7 +304,7 @@ def train(clf, model_params=None):
x = np.random.rand(100)
y = x > .5
with Data(name="test", driver=HDF5(mode="w", path=TMP_PATH), metadata_path=TMP_PATH) as dataset, \
Data(name="test_cv", driver=HDF5(mode="w", path=TMP_PATH), metadata_path=TMP_PATH) as ds:
Data(name="test_cv_keras", driver=HDF5(mode="w", path=TMP_PATH), metadata_path=TMP_PATH) as ds:
dataset.from_data({"x": x.reshape(-1, 1), "y": y})

cv = CV(group_data="x", group_target="y", train_size=.7, valid_size=.1)
Expand Down Expand Up @@ -333,7 +333,7 @@ def train(self, clf, model_params=None):
x = np.random.rand(100)
y = x > .5
with Data(name="test", driver=HDF5(mode="w", path=TMP_PATH), metadata_path=TMP_PATH) as dataset, \
Data(name="test_cv", driver=HDF5(mode="a", path=TMP_PATH), metadata_path=TMP_PATH) as ds:
Data(name="test_cv_keras", driver=HDF5(mode="a", path=TMP_PATH), metadata_path=TMP_PATH) as ds:
dataset.from_data({"x": x.reshape(-1, 1), "y": y})
cv = CV(group_data="x", group_target="y", train_size=.7, valid_size=.1)
stc = cv.apply(dataset)
Expand Down
4 changes: 2 additions & 2 deletions tests/test_ds.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,9 @@ def _it(x_a):
for e in x_a:
yield (e, 1)

chunks = Chunks({"x": (10, 1), "y": (10,)})
chunks = Chunks({"x": (10,), "y": (10,)})
with Data(name="test", driver=Zarr(mode="w", path=TMP_PATH), metadata_path=TMP_PATH, chunks=chunks) as dataset:
x = np.random.rand(100).reshape(-1, 1)
x = np.random.rand(100)
dtypes = np.dtype([("x", np.dtype(float)), ("y", np.dtype(float))])
x_p = Iterator(_it(x), dtypes=dtypes)
dataset.from_data(x_p[:100])
Expand Down
38 changes: 34 additions & 4 deletions tests/test_it.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@

from dama.data.it import Iterator, BatchIterator, Slice
from dama.data.ds import Data
from dama.groups.core import DaGroup
from dama.abc.group import DaGroupDict
from dama.fmtypes import DEFAUL_GROUP_NAME
from dama.utils.core import Chunks
from dama.utils.seq import grouper_chunk

import numbers

def stream():
i = 0
Expand Down Expand Up @@ -84,7 +83,7 @@ def test_it_attrs(self):
self.assertEqual(it.length, np.inf)
self.assertEqual(it.shape, (np.inf,))
self.assertEqual(it.num_splits(), np.inf)
self.assertEqual(it.type_elem, int)
self.assertEqual(it.type_elem, numbers.Number)
self.assertEqual(it.groups, (DEFAUL_GROUP_NAME,))

def test_it_attrs_length(self):
Expand All @@ -94,7 +93,7 @@ def test_it_attrs_length(self):
self.assertEqual(it.length, 10)
self.assertEqual(it.shape, (10,))
self.assertEqual(it.num_splits(), 10)
self.assertEqual(it.type_elem, int)
self.assertEqual(it.type_elem, numbers.Number)
self.assertEqual(it.groups, (DEFAUL_GROUP_NAME,))

def test_sample(self):
Expand Down Expand Up @@ -159,6 +158,37 @@ def _it():
it = Iterator(_it(), dtypes=np.dtype([("x", np.dtype("float"))]))
self.assertEqual(it.shape["x"], (np.inf, 3, 1))

def test_list_dtype(self):
l = [["a0", 0, "c0", 0], ["a1", 1, "c1", 1], ["a2", 2, "c2", 0]]
dtypes = np.dtype([("a", np.dtype(object)), ("b", np.dtype(int)), ("c", np.dtype(object)),
("d", np.dtype(bool))])
it = Iterator(l, dtypes=dtypes).batchs(3)
for e in it:
df_v = e.batch.to_df().values
array = np.asarray(l)
self.assertEqual((df_v[:, 0] == array[:, 0]).all(), True)
self.assertEqual((df_v[:, 1] == array[:, 1].astype(int)).all(), True)
self.assertEqual((df_v[:, 2] == array[:, 2]).all(), True)
self.assertEqual((df_v[:, 3] == array[:, 3].astype(bool)).all(), True)

def test_batch_iterator_from(self):
x = np.random.rand(20)
batch_size = 5
def iterator(x):
init = 0
end = batch_size
while end <= x.shape[0]:
yield (x[init:end], x[init:end]+1)
init = end
end += batch_size

b = BatchIterator.from_batchs(iterator(x), length=len(x), from_batch_size=batch_size,
dtypes=np.dtype([("x", np.dtype(float)), ("y", np.dtype(float))]), to_slice=True)
#DaGroupDict.convert()
for e in b:
print(e.batch)



class TestIteratorBatch(unittest.TestCase):

Expand Down
2 changes: 1 addition & 1 deletion tests/test_reg.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def train(reg, model_params=None):
x = np.random.rand(100)
y = x > .5
with Data(name="test", driver=HDF5(path=TMP_PATH, mode="w"), metadata_path=TMP_PATH) as dataset, \
Data(name="test_cv", driver=HDF5(path=TMP_PATH, mode="w"), metadata_path=TMP_PATH) as ds:
Data(name="test_cv_keras", driver=HDF5(path=TMP_PATH, mode="w"), metadata_path=TMP_PATH) as ds:
dataset.from_data({"x": x.reshape(-1, 1), "y": y})
cv = CV(group_data="x", group_target="y", train_size=.7, valid_size=.1)
stc = cv.apply(dataset)
Expand Down

0 comments on commit 840d119

Please sign in to comment.