Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions melony/core/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncio

import click

from melony.core.consts import DEFAULT_QUEUE
from melony.core.finders import find_broker

__all__ = ()


@click.group()
def cli() -> None:
"""Melony CLI."""


@cli.command()
@click.option("--broker", "-b", required=True, help="Path to broker instance, e.g. src.tasks.broker")
@click.option("--queue", "-q", default=DEFAULT_QUEUE, show_default=True, help="Queue name to consume from")
@click.option("--processes", "-p", default=1, show_default=True, help="Number of worker processes")
def start_consume(broker: str, queue: str, processes: int) -> None:
"""Start consuming tasks from the queue."""
broker_instance = find_broker(broker)
consumer = broker_instance.consumer
asyncio.run(consumer.start_consume(queue=queue, processes=processes))


@cli.command()
@click.option("--broker", "-b", required=True, help="Path to broker instance, e.g. src.tasks.broker")
@click.option("--queue", "-q", default=DEFAULT_QUEUE, show_default=True, help="Queue name to consume from")
@click.option("--processes", "-p", default=1, show_default=True, help="Number of worker processes")
def start_cron_consume(broker: str, queue: str, processes: int) -> None:
"""Start consuming cron tasks."""
broker_instance = find_broker(broker)
cron_consumer = broker_instance.cron_consumer
asyncio.run(cron_consumer.start_consume(queue=queue, processes=processes))
2 changes: 1 addition & 1 deletion melony/core/cron_consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from melony.core.consts import CRON_QUEUE_PREFIX, DEFAULT_QUEUE
from melony.core.cron_tasks import CronEntry
from melony.core.task_finders import find_task_func
from melony.core.finders import find_task_func
from melony.logger import log_error, log_info

if TYPE_CHECKING:
Expand Down
13 changes: 11 additions & 2 deletions melony/core/task_finders.py → melony/core/finders.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from typing import Callable

from melony.core.brokers import BaseBroker

__all__ = ()


Expand All @@ -12,12 +14,19 @@ def find_task_func(func_path: str) -> Callable:
raise ImportError(f"Cannot import function '{func_path}': {exc}")


def find_broker(broker_path: str) -> BaseBroker:
try:
return _find_func(broker_path)
except (ImportError, AttributeError, ValueError) as exc:
raise ImportError(f"Cannot import broker '{broker_path}': {exc}")


def _find_func(func_path: str) -> Callable:
module_name, func_name = func_path.rsplit(".", 1)
module = importlib.import_module(module_name)
func = getattr(module, func_name)

if not callable(func):
raise ValueError(f"'{func_name}' is not callable")

return func
2 changes: 1 addition & 1 deletion melony/core/task_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from abc import ABC, abstractmethod

from melony.core.brokers import BaseBroker
from melony.core.task_finders import find_task_func
from melony.core.finders import find_task_func
from melony.core.tasks import _TaskMeta, AsyncTask, SyncTask, Task


Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ requires-python = ">=3.12"
dependencies = [
"asyncio>=4.0.0",
"classes>=0.4.1",
"click>=8.0.0",
"croniter>=3.0.0",
"pytest>=8.4.1",
"pytest-asyncio>=1.1.0",
Expand All @@ -27,3 +28,6 @@ omit = ["*/__init__.py"]
show_missing = true
skip_covered = false
precision = 2

[project.scripts]
melony = "melony.core.cli:cli"
12 changes: 6 additions & 6 deletions tests/redis_broker/test_consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async def test_async_redis_consumer_pop_tasks_returns_one_task():
None,
])
publisher = _async_publisher(conn)
with patch("melony.core.task_finders.find_task_func", return_value=_sync_func):
with patch("melony.core.finders.find_task_func", return_value=_sync_func):
consumer = AsyncRedisConsumer(publisher=publisher, broker=MagicMock())
tasks = await consumer._pop_tasks("melony_tasks:default")

Expand All @@ -91,7 +91,7 @@ async def test_async_redis_consumer_pop_tasks_returns_multiple_tasks():
None,
])
publisher = _async_publisher(conn)
with patch("melony.core.task_finders.find_task_func", return_value=_sync_func):
with patch("melony.core.finders.find_task_func", return_value=_sync_func):
consumer = AsyncRedisConsumer(publisher=publisher, broker=MagicMock())
tasks = await consumer._pop_tasks("melony_tasks:default")

Expand All @@ -105,7 +105,7 @@ async def test_async_redis_consumer_deserialize_task():
task = _make_sync_task()
serialized = task.as_json().encode("utf-8")

with patch("melony.core.task_finders.find_task_func", return_value=_sync_func):
with patch("melony.core.finders.find_task_func", return_value=_sync_func):
result = consumer._deserialize_to_task_from_redis((b"queue", serialized))

assert result.task_id == "t-1"
Expand All @@ -130,7 +130,7 @@ def test_sync_redis_consumer_pop_tasks_returns_one_task():
None,
]
publisher = _sync_publisher(conn)
with patch("melony.core.task_finders.find_task_func", return_value=_sync_func):
with patch("melony.core.finders.find_task_func", return_value=_sync_func):
consumer = SyncRedisConsumer(publisher=publisher, broker=MagicMock())
tasks = consumer._pop_tasks("melony_tasks:default")

Expand All @@ -149,7 +149,7 @@ def test_sync_redis_consumer_pop_tasks_returns_multiple_tasks():
None,
]
publisher = _sync_publisher(conn)
with patch("melony.core.task_finders.find_task_func", return_value=_sync_func):
with patch("melony.core.finders.find_task_func", return_value=_sync_func):
consumer = SyncRedisConsumer(publisher=publisher, broker=MagicMock())
tasks = consumer._pop_tasks("melony_tasks:default")

Expand All @@ -163,7 +163,7 @@ def test_sync_redis_consumer_deserialize_task():
task = _make_sync_task()
serialized = task.as_json().encode("utf-8")

with patch("melony.core.task_finders.find_task_func", return_value=_sync_func):
with patch("melony.core.finders.find_task_func", return_value=_sync_func):
result = consumer._deserialize_to_task_from_redis((b"queue", serialized))

assert result.task_id == "t-1"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_task_finders.py → tests/test_finders.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from melony.core.task_finders import find_task_func
from melony.core.finders import find_task_func


def test_find_task_func_returns_callable_from_stdlib():
Expand Down
Loading