Skip to content

Commit

Permalink
feat: Ray executor for EVA (georgia-tech-db#382)
Browse files Browse the repository at this point in the history
Add support for Exchange operator using ray

Co-authored-by: xzdandy <[email protected]>
Co-authored-by: Gaurav Tarlok Kakkar <[email protected]>
Co-authored-by: Kaushik Ravichandran <[email protected]>
  • Loading branch information
4 people authored Oct 30, 2022
1 parent b680659 commit b8cfc51
Show file tree
Hide file tree
Showing 40 changed files with 826 additions and 263 deletions.
6 changes: 6 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@ omit =
eva/parser/evaql/*
eva/udfs/abstract/*
eva/udfs/emotion_detector.py
eva/experimental/*

[report]
exclude_lines =
# pragma: no cover
if TYPE_CHECKING:

# Ray Execution
if ray_enabled:
class LogicalExchangeToPhysical(Rule):
class LogicalExchange(Operator):

2 changes: 1 addition & 1 deletion eva/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def _bind_func_expr(self, node: FunctionExpression):
raise BinderError(err_msg)

try:
node.function = path_to_class(udf_obj.impl_file_path, udf_obj.name)()
node.function = path_to_class(udf_obj.impl_file_path, udf_obj.name)
except Exception as e:
err_msg = (
f"{str(e)}. Please verify that the UDF class name in the"
Expand Down
6 changes: 6 additions & 0 deletions eva/catalog/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from eva.catalog.models.df_column import DataFrameColumn
from eva.catalog.models.df_metadata import DataFrameMetadata
from eva.catalog.models.udf import UdfMetadata
from eva.catalog.models.udf_io import UdfIO

__all__ = ("DataFrameMetadata", "DataFrameColumn", "UdfMetadata", "UdfIO")
7 changes: 5 additions & 2 deletions eva/eva.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ core:

executor:
# batch_mem_size configures the number of rows processed by the execution engine in one iteration
# #rows = max(1, row_mem_size / batch_mem_size)
batch_mem_size: 30000000 # 30mb
# rows = max(1, row_mem_size / batch_mem_size)
batch_mem_size: 30000000

# batch size used for gpu_operations
gpu_batch_size: 1
Expand Down Expand Up @@ -38,3 +38,6 @@ server:
host: "0.0.0.0"
port: 5432
socket_timeout: 60

experimental:
ray: False
4 changes: 4 additions & 0 deletions eva/executor/abstract_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ def children(self) -> List[AbstractExecutor]:
"""
return self._children

@children.setter
def children(self, children: List["AbstractExecutor"]):
self._children = children

@property
def node(self) -> AbstractPlan:
return self._node
Expand Down
7 changes: 5 additions & 2 deletions eva/executor/lateral_join_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator
from typing import Generator, Iterator

from eva.executor.abstract_executor import AbstractExecutor
from eva.executor.executor_utils import apply_predicate, apply_project
Expand All @@ -32,11 +32,14 @@ def validate(self):
def exec(self, *args, **kwargs) -> Iterator[Batch]:
outer = self.children[0]
inner = self.children[1]
for outer_batch in outer.exec():
for outer_batch in outer.exec(**kwargs):
for result_batch in inner.exec(lateral_input=outer_batch):
result_batch = Batch.join(outer_batch, result_batch)
result_batch.reset_index()
result_batch = apply_predicate(result_batch, self.predicate)
result_batch = apply_project(result_batch, self.join_project)
if not result_batch.empty():
yield result_batch

def __call__(self, *args, **kwargs) -> Generator[Batch, None, None]:
yield from self.exec(*args, **kwargs)
4 changes: 4 additions & 0 deletions eva/executor/limit_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ def exec(self) -> Iterator[Batch]:

remaining_tuples -= len(batch)
yield batch

if remaining_tuples <= 0:
assert remaining_tuples == 0
return
3 changes: 3 additions & 0 deletions eva/executor/plan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from eva.executor.storage_executor import StorageExecutor
from eva.executor.union_executor import UnionExecutor
from eva.executor.upload_executor import UploadExecutor
from eva.experimental.ray.executor.exchange_executor import ExchangeExecutor
from eva.models.storage.batch import Batch
from eva.planner.abstract_plan import AbstractPlan
from eva.planner.types import PlanOprType
Expand Down Expand Up @@ -112,6 +113,8 @@ def _build_execution_tree(self, plan: AbstractPlan) -> AbstractExecutor:
executor_node = FunctionScanExecutor(node=plan)
elif plan_opr_type == PlanOprType.CREATE_MATERIALIZED_VIEW:
executor_node = CreateMaterializedViewExecutor(node=plan)
elif plan_opr_type == PlanOprType.EXCHANGE:
executor_node = ExchangeExecutor(node=plan)
elif plan_opr_type == PlanOprType.PROJECT:
executor_node = ProjectExecutor(node=plan)
elif plan_opr_type == PlanOprType.PREDICATE_FILTER:
Expand Down
5 changes: 4 additions & 1 deletion eva/executor/predicate_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator
from typing import Generator, Iterator

from eva.executor.abstract_executor import AbstractExecutor
from eva.executor.executor_utils import apply_predicate
Expand All @@ -36,3 +36,6 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
batch = apply_predicate(batch, self.predicate)
if not batch.empty():
yield batch

def __call__(self, **kwargs) -> Generator[Batch, None, None]:
yield from self.exec(**kwargs)
9 changes: 6 additions & 3 deletions eva/executor/project_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator
from typing import Generator, Iterator

from eva.executor.abstract_executor import AbstractExecutor
from eva.executor.executor_utils import apply_project
Expand All @@ -30,10 +30,13 @@ def __init__(self, node: ProjectPlan):
def validate(self):
pass

def exec(self) -> Iterator[Batch]:
def exec(self, **kwargs) -> Iterator[Batch]:
child_executor = self.children[0]
for batch in child_executor.exec():
for batch in child_executor.exec(**kwargs):
batch = apply_project(batch, self.target_list)

if not batch.empty():
yield batch

def __call__(self, **kwargs) -> Generator[Batch, None, None]:
yield from self.exec(**kwargs)
10 changes: 7 additions & 3 deletions eva/executor/seq_scan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator
from typing import Generator, Iterator

from eva.executor.abstract_executor import AbstractExecutor
from eva.executor.executor_utils import apply_predicate, apply_project
Expand All @@ -37,10 +37,11 @@ def __init__(self, node: SeqScanPlan):
def validate(self):
pass

def exec(self) -> Iterator[Batch]:
def exec(self, **kwargs) -> Iterator[Batch]:

child_executor = self.children[0]
for batch in child_executor.exec():

for batch in child_executor.exec(**kwargs):
# apply alias to the batch
# id, data -> myvideo.id, myvideo.data
if self.alias:
Expand All @@ -53,3 +54,6 @@ def exec(self) -> Iterator[Batch]:

if not batch.empty():
yield batch

def __call__(self, **kwargs) -> Generator[Batch, None, None]:
yield from self.exec(**kwargs)
5 changes: 4 additions & 1 deletion eva/executor/storage_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator
from typing import Generator, Iterator

from eva.executor.abstract_executor import AbstractExecutor
from eva.models.storage.batch import Batch
Expand All @@ -37,3 +37,6 @@ def exec(self) -> Iterator[Batch]:
)
else:
return StorageEngine.read(self.node.video, self.node.batch_mem_size)

def __call__(self, **kwargs) -> Generator[Batch, None, None]:
yield from self.exec()
15 changes: 15 additions & 0 deletions eva/experimental/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# coding=utf-8
# Copyright 2018-2022 EVA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""experimental subdirectory"""
15 changes: 15 additions & 0 deletions eva/experimental/ray/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# coding=utf-8
# Copyright 2018-2022 EVA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""ray subdirectory"""
15 changes: 15 additions & 0 deletions eva/experimental/ray/executor/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# coding=utf-8
# Copyright 2018-2022 EVA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""ray executor subdirectory"""
107 changes: 107 additions & 0 deletions eva/experimental/ray/executor/exchange_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# coding=utf-8
# Copyright 2018-2022 EVA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator

from ray.util.queue import Queue

from eva.executor.abstract_executor import AbstractExecutor
from eva.experimental.ray.executor.ray_stage import (
StageCompleteSignal,
ray_stage,
ray_stage_wait_and_alert,
)
from eva.experimental.ray.planner.exchange_plan import ExchangePlan
from eva.models.storage.batch import Batch


class QueueReaderExecutor(AbstractExecutor):
def __init__(self):
super().__init__(None)

def validate(self):
pass

def exec(self, **kwargs) -> Iterator[Batch]:
assert (
"input_queues" in kwargs
), "Invalid ray exectuion stage. No input_queue found"
input_queues = kwargs["input_queues"]
assert len(input_queues) == 1, "Not support mulitple input queues yet"
iq = input_queues[0]

while True:
next_item = iq.get(block=True)
if next_item is StageCompleteSignal:
iq.put(StageCompleteSignal)
break
else:
yield next_item


class ExchangeExecutor(AbstractExecutor):
"""
Applies predicates to filter the frames which satisfy the condition
Arguments:
node (AbstractPlan): The SequentialScanPlan
"""

def __init__(self, node: ExchangePlan):
self.parallelism = node.parallelism
self.ray_conf = node.ray_conf
super().__init__(node)

def validate(self):
pass

def exec(self, is_top=True) -> Iterator[Batch]:
assert (
len(self.children) == 1
), "Exchange executor does not support children != 1"

# Find the exchange exector below the tree
curr_exec = self
input_queues = []
while len(curr_exec.children) > 0 and not isinstance(
curr_exec.children[0], ExchangeExecutor
):
curr_exec = curr_exec.children[0]

if len(curr_exec.children) > 0:
iq = yield from curr_exec.children[0].exec(is_top=False)
input_queues.append(iq)
queue_exec = QueueReaderExecutor()
curr_exec.children = [queue_exec]

output_queue = Queue(maxsize=100)
ray_task = []
for _ in range(self.parallelism):
ray_task.append(
ray_stage.options(**self.ray_conf).remote(
self.children[0], input_queues, [output_queue]
)
)
ray_stage_wait_and_alert.remote(ray_task, [output_queue])
while is_top:
res = output_queue.get(block=True)
if res is StageCompleteSignal:
break
else:
yield res
else:
return output_queue

def __call__(self, batch: Batch) -> Batch:
pass
42 changes: 42 additions & 0 deletions eva/experimental/ray/executor/ray_stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# coding=utf-8
# Copyright 2018-2022 EVA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Callable, List

import ray
from ray.util.queue import Queue


class StageCompleteSignal:
pass


@ray.remote(num_cpus=0)
def ray_stage_wait_and_alert(tasks: ray.ObjectRef, output_queue: Queue):
ray.get(tasks)
for q in output_queue:
q.put(StageCompleteSignal)


@ray.remote
def ray_stage(
exectuor: Callable, input_queues: List[Queue], output_queues: List[Queue]
):
if len(input_queues) > 1 or len(output_queues) > 1:
raise NotImplementedError

gen = exectuor(input_queues=input_queues)
for next_item in gen:
for oq in output_queues:
oq.put(next_item)
Loading

0 comments on commit b8cfc51

Please sign in to comment.