Skip to content

Commit

Permalink
Changed CLI logic. (taskiq-python#46)
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Kirilin <[email protected]>
  • Loading branch information
s3rius committed Sep 9, 2022
1 parent 77b3451 commit 11906d0
Show file tree
Hide file tree
Showing 24 changed files with 349 additions and 134 deletions.
19 changes: 19 additions & 0 deletions docs/examples/extending/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from argparse import ArgumentParser
from typing import Sequence

from taskiq.abc.cmd import TaskiqCMD


class MyCommand(TaskiqCMD):
short_help = "Demo command"

def exec(self, args: Sequence[str]) -> None:
parser = ArgumentParser()
parser.add_argument(
"--test",
dest="test",
default="default",
help="My test parameter.",
)
parsed = parser.parse_args(args)
print(parsed)
69 changes: 69 additions & 0 deletions docs/extending-taskiq/cli.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
---
order: 4
---

# CLI

You can easily add new subcommands to taskiq. All default subcommands also use this mechanism,
since it's easy to use.

At first you need to add a class that implements `taskiq.abc.cmd.TaskiqCMD` abstract class.

@[code python](../examples/extending/cli.py)

In the `exec` method, you should parse incoming arguments. But since all CLI arguments to taskiq are shifted you can ignore the `args` parameter.

Also, you can use your favorite tool to build CLI, like [click](https://click.palletsprojects.com/) or [typer](https://typer.tiangolo.com/).

After you have such class, you need to add entrypoint that points to that class.

::: tabs

@tab setuptools setup.py

```python
from setuptools import setup

setup(
# ...,
entry_points={
'taskiq-cli': [
'demo = my_project.cmd:MyCommand',
]
}
)
```

@tab setuptools pyproject.toml

```toml
[project.entry-points.taskiq-cli]
demo = "my_project.cmd:MyCommand"
```

@tab poetry

```toml
[tool.poetry.plugins.taskiq-cli]
demo = "my_project.cmd:MyCommand"
```

:::

You can read more about entry points in [python documentation](https://packaging.python.org/en/latest/specifications/entry-points/).
The subcommand name is the same as the name of the entry point you've created.


```bash
$ taskiq demo --help
usage: demo [-h] [--test TEST]

optional arguments:
-h, --help show this help message and exit
--test TEST My test parameter.
```

```bash
$ taskiq demo --test aaa
Namespace(test='aaa')
```
2 changes: 1 addition & 1 deletion docs/guide/architecture-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ You have to provide a path to your broker. As an example, if you want to start l
with a broker that is stored in a variable `my broker` in the module `my_project.broker` run this in your terminal:

```
taskiq my_project.broker:mybroker
taskiq worker my_project.broker:mybroker
```

taskiq can discover task modules to import automatically,
Expand Down
18 changes: 12 additions & 6 deletions docs/guide/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@ order: 4

# CLI

Core library comes with CLI programm called `taskiq`, which is used to run workers.
Core library comes with CLI programm called `taskiq`, which is used to run different subcommands.

To run it you have to specify the broker you want to use and modules with defined tasks.

By default taskiq is shipped with only one command: `worker`. You can search for more taskiq plugins
using pypi. Some plugins may add new commands to taskiq.

## Worker

To run worker process, you have to specify the broker you want to use and modules with defined tasks.
Like this:

```bash
taskiq mybroker:broker_var my_project.module1 my_project.module2
taskiq worker mybroker:broker_var my_project.module1 my_project.module2
```

## Autoimporting
### Autoimporting

Enumerating all modules with tasks is not an option sometimes.
That's why taskiq can autodiscover tasks in current directory recursively.
Expand All @@ -24,7 +30,7 @@ We have two options for this:
* `--fs-discover` or `-fsd`. This option enables search of task files in current directory recursively, using the given pattern.


## Type casts
### Type casts

One of features taskiq have is automatic type casts. For examle you have a type-hinted task like this:
```python
Expand All @@ -38,7 +44,7 @@ dataclasses as the input parameters.

To disable this pass the `--no-parse` option to the taskiq.

## Hot reload
### Hot reload

This is annoying to restart workers every time you modify tasks. That's why taskiq supports hot-reload.
To enable this option simply pass the `--reload` or `-r` option to taskiq CLI.
Expand Down
4 changes: 2 additions & 2 deletions docs/guide/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ docker run --rm -d ^
Now we need to start worker process by running taskiq cli command. You can get more info about the CLI in the [CLI](./cli.md) section.

```bash:no-line-numbers
taskiq broker:broker
taskiq worker broker:broker
```

After the worker is up, we can run our script as an ordinary python file and see how the worker executes tasks.
Expand Down Expand Up @@ -206,7 +206,7 @@ docker run --rm -d ^
Let's run taskiq once again. The command is the same.

```bash:no-line-numbers
taskiq broker:broker
taskiq worker broker:broker
```

Now, if we run this file with python, we can get the correct results with a valid execution time.
Expand Down
10 changes: 5 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ python = "^3.7"
typing-extensions = ">=3.10.0.0"
pydantic = "^1.6.2"
pyzmq = { version = "^23.2.0", optional = true }
uvloop = { version = "^0.16.0", optional = true }
uvloop = { version = ">=0.16.0,<1", optional = true }
watchdog = "^2.1.9"
gitignore-parser = "^0.0.8"
gitignore-parser = "^0.1.0"
importlib-metadata = "<4.3"


[tool.poetry.dev-dependencies]
pytest = "^7.1.2"
Expand All @@ -47,7 +49,7 @@ coverage = "^6.4.2"
pytest-cov = "^3.0.0"
mock = "^4.0.3"
anyio = "^3.6.1"
pytest-xdist = {version = "^2.5.0", extras = ["psutil"]}
pytest-xdist = { version = "^2.5.0", extras = ["psutil"] }
types-mock = "^4.0.15"

[tool.poetry.extras]
Expand All @@ -57,6 +59,9 @@ uv = ["uvloop"]
[tool.poetry.scripts]
taskiq = "taskiq.__main__:main"

[tool.poetry.plugins.taskiq-cli]
worker = "taskiq.cli.worker.cmd:WorkerCMD"

[tool.mypy]
strict = true
ignore_missing_imports = true
Expand Down
76 changes: 70 additions & 6 deletions taskiq/__main__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,75 @@
from taskiq.cli.args import TaskiqArgs
from taskiq.cli.worker import run_worker
import argparse
import sys
from typing import Dict

from importlib_metadata import entry_points, version

def main() -> None:
"""Main entrypoint for CLI."""
args = TaskiqArgs.from_cli()
run_worker(args)
from taskiq.abc.cmd import TaskiqCMD


def main() -> None: # noqa: C901, WPS210
"""
Main entrypoint of the taskiq.
This function collects all python entrypoints
and assembles a final argument parser.
All found entrypoints are used as subcommands.
All arguments are passed to them as it was a normal
call.
"""
plugins = entry_points().select(group="taskiq-cli")
found_plugins = len(plugins)
parser = argparse.ArgumentParser(
description=f"""
CLI for taskiq. Distributed task queue.
This is a meta CLI. It searches for installed plugins
using python entrypoints
and passes all arguments to them.
We found {found_plugins} installed plugins.
""",
)
parser.add_argument(
"-V",
"--version",
dest="version",
action="store_true",
help="print current taskiq version and exit",
)
subcommands: Dict[str, TaskiqCMD] = {}
subparsers = parser.add_subparsers(
title="Available subcommands",
metavar="",
dest="subcommand",
)
for entrypoint in entry_points().select(group="taskiq-cli"):
try:
cmd_class = entrypoint.load()
except ImportError:
continue
if issubclass(cmd_class, TaskiqCMD):
subparsers.add_parser(
entrypoint.name,
help=cmd_class.short_help,
add_help=False,
)
subcommands[entrypoint.name] = cmd_class()

args, _ = parser.parse_known_args()

if args.version:
print(version("taskiq")) # noqa: WPS421
return

if args.subcommand is None:
parser.print_help()
return

command = subcommands[args.subcommand]
sys.argv.pop(0)
command.exec(sys.argv[1:])


if __name__ == "__main__":
Expand Down
16 changes: 16 additions & 0 deletions taskiq/abc/cmd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from abc import ABC, abstractmethod
from typing import Sequence


class TaskiqCMD(ABC):
"""Base class for new commands."""

short_help = ""

@abstractmethod
def exec(self, args: Sequence[str]) -> None:
"""
Execute the command.
:param args: CLI args.
"""
8 changes: 4 additions & 4 deletions taskiq/brokers/inmemory_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

from taskiq.abc.broker import AsyncBroker
from taskiq.abc.result_backend import AsyncResultBackend, TaskiqResult
from taskiq.cli.args import TaskiqArgs
from taskiq.cli.receiver import Receiver
from taskiq.cli.worker.args import WorkerArgs
from taskiq.cli.worker.receiver import Receiver
from taskiq.exceptions import TaskiqError
from taskiq.message import BrokerMessage

Expand Down Expand Up @@ -102,12 +102,12 @@ def __init__( # noqa: WPS211
)
self.receiver = Receiver(
self,
TaskiqArgs(
WorkerArgs(
broker="",
modules=[],
max_threadpool_threads=sync_tasks_pool_size,
no_parse=not cast_types,
log_collector_format=logs_format or TaskiqArgs.log_collector_format,
log_collector_format=logs_format or WorkerArgs.log_collector_format,
),
)

Expand Down
4 changes: 2 additions & 2 deletions taskiq/brokers/zmq_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ async def listen(
:param callback: function to call when message received.
"""
loop = asyncio.get_event_loop()
while True:
with self.socket.connect(self.sub_host) as sock:
with self.socket.connect(self.sub_host) as sock:
while True:
received_str = await sock.recv_string()
try:
broker_msg = BrokerMessage.parse_raw(received_str)
Expand Down
Loading

0 comments on commit 11906d0

Please sign in to comment.