В своей прошлой статье «Как подружить Celery и SQLAlchemy 2.0 с асинхронным Python» я анализировал возможность запуска асинхронных задач «изнутри Celery» и в комментариях мне сообщили о существовании еще одной библиотеки под названием aio_pika. И, честно говоря, я никогда не слышал о ней раньше. Неудивительно, что у библиотеки всего около 1 тыс. звезд на GitHub (по сравнению с 20 тыс.+ у Celery). Я рассмотрел абсолютно все популярные решения (более 500 звезд) и остановился на этом из-за активной разработки (в настоящее время) и относительной популярности.

Стек вы увидите в статье: FastAPI, RabbitMQ, aio_pika и docker. Статья будет полезна тем, кто использует Celery в своих проектах, а также тем, кто только слышал об очередях и RabbitMQ.

Навигация:

  1. Настройка RabbitMQ

  2. Задачи роутера для Consumer’a

  3. Написать потребителю

  4. Интеграция в основное приложение

Предисловие

Библиотека позиционирует себя как «обертка asyncio для людей aiormq». Моя цель состояла в том, чтобы заменить сельдерей, используемый в проекте, на этот. Я решил сделать это, потому что его интерфейс не предполагает разделения приложения и воркеров на отдельные отделы, что мне бы очень понравилось. Второстепенными причинами были: отсутствие асинхронности, запах наследования (я говорю об атрибуте self, который надо писать первым аргументом функций) и отсутствие подсказок типа (это на последнем месте по важности!) . Celery в проекте использовался для задач IO-Bound и Delay, поэтому интеграция асинхронности оказалась очень полезной.

Настройка RabbitMQ

Я обновил свой RabbitMQ, добавив плагин «Плагин отложенных сообщений RabbitMQ». Он был нужен для выполнения «отложенных» задач. Те. стояла задача удалить временные файлы через какое-то время. Сельдерей с этим справился, т.к. у него была нативная поддержка этой фичи, но, насколько я понимаю, aio-pika не имеет этого. Этот плагин позволяет вам добавить эту функциональность в сам RabbitMQ. Моя конфигурация docker-compose теперь выглядит так:

ЧИТАТЬ   Бастрыкин защитил инвалида из Ханты-Мансийского автономного округа
docker-compose.yaml
 rabbit:
    image: rabbitmq:3-management
    hostname: rabbit
    env_file:
      - .env
    volumes:
      - ./services/rabbit/delayed_message.ez:/opt/rabbitmq/plugins/delayed_message.ez
      - ./services/rabbit/enabled:/etc/rabbitmq/enabled_plugins
    ports:
      - "15672:15672"

Через тома подключил скачанный плагин и тоже добавил его в список включенных по умолчанию. Мой файл enabled_plugins выглядел так:

[rabbitmq_delayed_message_exchange,rabbitmq_management,rabbitmq_prometheus].

*Точка в конце обязательна

Задачи роутера для Consumer’a

Следующим шагом я написал роутер для своего рабочего, что бы мне было удобно. В этот момент я немного озадачен:

роутер.py
class Router:
    _routes: dict[str, list[str]] = {}

    def __init__(self):
        modules = list(filter(
            lambda x: x != '__init__',
            map(lambda y: y.split('.')[0], os.listdir('tasks'))
        ))
        for module in modules:
            imported = import_module(f'tasks.{module}')
            if not hasattr(imported, '__all__'):
                continue
            self._routes[module] = imported.__all__
            del imported
    def get_method(self, action: str) -> Optional[Callable]:
        module = action.split(':')[0] # Название файла
        method = action.split(':')[1] # Название функции
        if self._exists(module, method):
            return getattr(import_module(f'tasks.{module}'), method)

Переменная _router заполняется задачами, находящимися в папке tasks, в которой находятся сами функции (задачи). Они также перечислены в переменной все для экспорта. Для наглядности задача выглядела так:

async def test(is_test: bool):
    print(f'Hello world! Value is: {is_test}')

__all__ = ['test']

Следующей задачей было решить проблему, что эти функции имеют произвольное количество аргументов. Я написал еще один метод для маршрутизатора, который может учитывать это:

роутер.py
def check_args(func: Callable, data: dict) -> bool:
    hints = get_type_hints(func)
    for arg, arg_type in hints.items():
        if arg not in data:
            return False
        if not isinstance(data[arg], arg_type):
            return False
    return True

Мы передаем этому методу функцию, которую мы импортировали из файла, а также данные, которые пытаемся перетащить в нее. Мы также проверяем типы, указанные в аргументах функции. Если все ок, то вернуть True

Таким образом, я регулировал количество доступных задач, создавая/удаляя файлы из папки задач. Это оказалось очень практичным и гибким решением.

ЧИТАТЬ   Новая нейронная сеть SAM, которая меняет правила игры в компьютерном зрении

Написать потребителю

потребитель.py

async def process_message(message: AbstractIncomingMessage):
    async with message.process():
        message = MessageSchema.parse_obj(json.loads(message.body.decode()))
        method = router.get_method(message.action) # Импортируем функцию и записываем в переменную
        if method:
            if not router.check_args(method, message.body): # Проверяем атрибуты, которые собираемся передавать
                print('Invalid args')
                return
            if inspect.iscoroutinefunction(method): # Проверяем является ли функция async или нет
                await method(**message.body)
            else:
                method(**message.body)


async def main() -> None:
    queue_key = rabbit_config.RABBITMQ_QUEUE

    connection = await aio_pika.connect_robust(rabbit_config.url)
    # Для корректной работы с RabbitMQ указываем publisher_confirms=False
    channel = await connection.channel(publisher_confirms=False)
    # Кол-во задач, которые consumer может выполнять в момент времени. В моём случае 100
    await channel.set_qos(prefetch_count=100)
    queue = await channel.declare_queue(queue_key)
    
    exchange = await channel.declare_exchange(
        # Объявляем exchange с именем main и типом, который поддерживает отложенные задачи
        # Важно чтобы это имя (main) совпадало с именем на стороне publisher
        'main', ExchangeType.X_DELAYED_MESSAGE, 
        arguments={
            'x-delayed-type': 'direct'
        }
    )
    await queue.bind(exchange, queue_key)
    await queue.consume(process_message)
    try:
        await asyncio.Future()
    finally:
        await connection.close()


if __name__ == "__main__":
    asyncio.run(main())

В общем, с потребительской частью покончено и можно приступать к интеграции всех этих качеств в основное приложение (редактор).

Интеграция в основное приложение

На помощь снова приходит ООП, и я написал класс для работы с айо-пикой, который полностью покрыл мои потребности. Его инициализация произошла в новом времени жизни (что вскоре полностью устранит старые пути):

@asynccontextmanager
async def lifespan(_: FastAPI):
    await rabbit_connection.connect()
    yield
    await rabbit_connection.disconnect()

app = FastAPI(lifespan=lifespan)

Вот реализация этого класса:

rabbit_connection.py
class RabbitConnection:
    _connection: AbstractRobustConnection | None = None
    _channel: AbstractRobustChannel | None = None
    _exchange: AbstractRobustExchange | None = None

    async def disconnect(self) -> None:
        if self._channel and not self._channel.is_closed:
            await self._channel.close()
        if self._connection and not self._connection.is_closed:
            await self._connection.close()
        self._connection = None
        self._channel = None

    async def connect(self) -> None:
        try:
            self._connection = await connect_robust(rabbit_config.url)
            self._channel = await self._connection.channel(publisher_confirms=False)
            self._exchange = await self._channel.declare_exchange(
                # Повторяем из consumer'a. Важно указать одинакое
                # имя exchange'ов. В моём случае `main`
                'main', ExchangeType.X_DELAYED_MESSAGE,
                arguments={
                    'x-delayed-type': 'direct'
                }
            )
        except Exception as e:
            await self.disconnect()

    async def send_messages(
            self,
            messages: list[MessageSchema],
            *,
            routing_key: str = rabbit_config.RABBITMQ_QUEUE,
            delay: int = None # Задержка, через которое нужно выполнить задачу (в секундах)
    ) -> None:
        async with self._channel.transaction():
            headers = None
            if delay:
                headers = {
                    'x-delay': f'{delay * 1000}' # Это тоже из документации плагина для RabbitMQ
                }
            for message in messages:
                message = Message(
                    body=json.dumps(message.dict()).encode(),
                    headers=headers
                )
                await self._exchange.publish(
                    message,
                    routing_key=routing_key,
                    mandatory=False if delay else True # Чтобы в логах был порядок ;)
                )


rabbit_connection = RabbitConnection()

Соответственно, чтобы отправить задание работнику, достаточно было сделать следующее:

ЧИТАТЬ   Деревянный дом с гаражом в двух минутах от Минска. Цена этого места хорошая
main.py
@router.get('/test')
async def test():
    message = MessageSchema(
        action='images:delete',
        body={'path': 'assets/temp/temp.png'}
    )
    await rabbit_connection.send_messages(
      [message for _ in range(150)], 
      delay=20
    )
    return {'status': 'published'}

Подводя итог, хочется сказать, что работник теперь чувствует себя гораздо увереннее и может делать гораздо больше и быстрее. Надеюсь, статья была полезной. Всем спасибо, до свидания.

Source

От admin