Skip to content

Commit

Permalink
feat: Fix legacy issue in index (georgia-tech-db#553)
Browse files Browse the repository at this point in the history
* remove unnecessary secondary index

* fix lint error

* reformat

* add support of using udf expression and register udf signature

* reformat

* fix

* reformat
  • Loading branch information
jiashenC authored Jan 13, 2023
1 parent 346bf54 commit a05d84d
Show file tree
Hide file tree
Showing 18 changed files with 321 additions and 224 deletions.
34 changes: 28 additions & 6 deletions eva/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
)
from eva.binder.statement_binder_context import StatementBinderContext
from eva.catalog.catalog_manager import CatalogManager
from eva.catalog.catalog_type import ColumnType, NdArrayType, TableType
from eva.catalog.catalog_type import IndexType, NdArrayType, TableType
from eva.expression.abstract_expression import AbstractExpression
from eva.expression.function_expression import FunctionExpression
from eva.expression.tuple_value_expression import TupleValueExpression
Expand Down Expand Up @@ -80,18 +80,40 @@ def _bind_explain_statement(self, node: ExplainStatement):
@bind.register(CreateIndexStatement)
def _bind_create_index_statement(self, node: CreateIndexStatement):
self.bind(node.table_ref)
if node.udf_func:
self.bind(node.udf_func)

# TODO: create index currently only supports single numpy column.
assert len(node.col_list) == 1, "Index cannot be created on more than 1 column"

# TODO: create index currently only works on TableInfo, but will extend later.
assert node.table_ref.is_table_atom(), "Index can only be created on Tableinfo"

col_def = node.col_list[0]
table_ref_obj = node.table_ref.table.table_obj
col = [col for col in table_ref_obj.columns if col.name == col_def.name][0]
assert col.type == ColumnType.NDARRAY, "Index input needs to be numpy array"
assert col.array_type == NdArrayType.FLOAT32, "Index input needs to be float32"
if IndexType.is_faiss_index_type(node.index_type):
if not node.udf_func:
# Feature table type needs to be float32 numpy array.
col_def = node.col_list[0]
table_ref_obj = node.table_ref.table.table_obj
col = [
col for col in table_ref_obj.columns if col.name == col_def.name
][0]
if not col.array_type == NdArrayType.FLOAT32:
raise BinderError("Index input needs to be float32.")
if not len(col.array_dimensions) == 2:
raise BinderError("Index input needs to be 2 dimensional.")
else:
# Output of the UDF should be 2 dimension and float32 type.
catalog_manager = CatalogManager()
udf_obj = catalog_manager.get_udf_catalog_entry_by_name(
node.udf_func.name
)
for output in udf_obj.outputs:
if not output.array_type == NdArrayType.FLOAT32:
raise BinderError("Index input needs to be float32.")
if not len(output.array_dimensions) == 2:
raise BinderError("Index input needs to be 2 dimensional.")
else:
raise BinderError("Index type {} is not supported.".format(node.index_type))

@bind.register(SelectStatement)
def _bind_select_statement(self, node: SelectStatement):
Expand Down
4 changes: 2 additions & 2 deletions eva/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,11 @@ def insert_index_catalog_entry(
name: str,
save_file_path: str,
index_type: IndexType,
secondary_index_table: TableCatalogEntry,
feat_column: ColumnCatalogEntry,
udf_signature: str,
) -> IndexCatalogEntry:
index_catalog_entry = self._index_service.insert_entry(
name, save_file_path, index_type, secondary_index_table, feat_column
name, save_file_path, index_type, feat_column, udf_signature
)
return index_catalog_entry

Expand Down
4 changes: 4 additions & 0 deletions eva/catalog/catalog_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,7 @@ def to_numpy_type(cls, t):

class IndexType(EVAEnum):
HNSW # noqa: F821

@classmethod
def is_faiss_index_type(cls, t):
return t in [cls.HNSW]
22 changes: 7 additions & 15 deletions eva/catalog/models/index_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from eva.catalog.catalog_type import IndexType
from eva.catalog.models.base_model import BaseModel
from eva.catalog.models.column_catalog import ColumnCatalogEntry
from eva.catalog.models.table_catalog import TableCatalogEntry


class IndexCatalog(BaseModel):
Expand All @@ -30,50 +29,44 @@ class IndexCatalog(BaseModel):
`_name:` the name of the index.
`_save_file_path:` the path to the index file on disk
`_type:` the type of the index (refer to `IndexType`)
`_secondary_index_id:` the `_row_id` of the `TableCatalog` entry for the table on which the index is built
`_feat_column_id:` the `_row_id` of the `ColumnCatalog` entry for the column on which the index is built.
`_udf_signature:` if the index is created by running udf expression on input column, this will store
the udf signature of the used udf. Otherwise, this field is None.
"""

__tablename__ = "index_catalog"

_name = Column("name", String(100), unique=True)
_save_file_path = Column("save_file_path", String(128))
_type = Column("type", Enum(IndexType), default=Enum)
_secondary_index_id = Column(
"secondary_index_id", Integer, ForeignKey("table_catalog._row_id")
)
_feat_column_id = Column("column_id", Integer, ForeignKey("column_catalog._row_id"))
_udf_signature = Column("udf", String, default=None)

_secondary_index = relationship("TableCatalog")
_feat_column = relationship("ColumnCatalog")

def __init__(
self,
name: str,
save_file_path: str,
type: IndexType,
secondary_index_id: int = None,
feat_column_id: int = None,
udf_signature: str = None,
):
self._name = name
self._save_file_path = save_file_path
self._type = type
self._secondary_index_id = secondary_index_id
self._feat_column_id = feat_column_id
self._udf_signature = udf_signature

def as_dataclass(self) -> "IndexCatalogEntry":
secondary_index = (
self._secondary_index.as_dataclass() if self._secondary_index else None
)
feat_column = self._feat_column.as_dataclass() if self._feat_column else None
return IndexCatalogEntry(
row_id=self._row_id,
name=self._name,
save_file_path=self._save_file_path,
type=self._type,
secondary_index_id=self._secondary_index_id,
feat_column_id=self._feat_column_id,
secondary_index=secondary_index,
udf_signature=self._udf_signature,
feat_column=feat_column,
)

Expand All @@ -88,7 +81,6 @@ class IndexCatalogEntry:
save_file_path: str
type: IndexType
row_id: int = None
secondary_index_id: int = None
feat_column_id: int = None
secondary_index: TableCatalogEntry = None
udf_signature: str = None
feat_column: ColumnCatalogEntry = None
14 changes: 7 additions & 7 deletions eva/catalog/services/index_catalog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

from eva.catalog.models.column_catalog import ColumnCatalogEntry
from eva.catalog.models.index_catalog import IndexCatalog, IndexCatalogEntry
from eva.catalog.models.table_catalog import TableCatalogEntry
from eva.catalog.services.base_service import BaseService
from eva.utils.logging_manager import logger

Expand All @@ -33,11 +32,11 @@ def insert_entry(
name: str,
save_file_path: str,
type: str,
secondary_index_table: TableCatalogEntry,
feat_column: ColumnCatalogEntry,
udf_signature: str,
) -> IndexCatalogEntry:
index_entry = IndexCatalog(
name, save_file_path, type, secondary_index_table.row_id, feat_column.row_id
name, save_file_path, type, feat_column.row_id, udf_signature
)
index_entry = index_entry.save()
return index_entry.as_dataclass()
Expand All @@ -58,11 +57,12 @@ def get_entry_by_id(self, id: int) -> IndexCatalogEntry:

def delete_entry_by_name(self, name: str):
try:
index_record = self.model.query.filter(self.model._name == name).one()
index_obj = self.model.query.filter(self.model._name == name).one()
index_metadata = index_obj.as_dataclass()
# clean up the on disk data
if os.path.exists(index_record.save_file_path):
os.remove(index_record.save_file_path)
index_record.delete()
if os.path.exists(index_metadata.save_file_path):
os.remove(index_metadata.save_file_path)
index_obj.delete()
except Exception:
logger.exception("Delete index failed for name {}".format(name))
return False
Expand Down
Loading

0 comments on commit a05d84d

Please sign in to comment.