Краткое содержание

В прошлый раз я описывал, как поставить задание Laravel в очередь из хранимой процедуры или триггера PostgreSQL.

В этой статье я объясню, как конвертировать события, сгенерированные в PostgreSQL, в события Laravel.

Рабочий пример опубликовано на GitHub.

Вместо того, чтобы представить

Часто вам необходимо знать, что происходит с данными в базе данных, и быстро реагировать.

Самый простой способ получить информацию об изменениях — периодически запрашивать базу данных, чтобы узнать, требуют ли какие-либо изменения ответа. И это часто то, что делают разработчики. Это не самый эффективный метод, поскольку он может вызвать ненужную нагрузку на сервер и потребует изменения существующих таблиц или создания новых.

Но PostgreSQL может уведомлять клиентов с помощью механизма LISTEN/NOTIFY. Документацию можно найти Здесь И Здесь.

Этот механизм имеет недостатки:

  • Требуется долгосрочное открытое соединение с базой данных, что может оказаться непрактичным.

  • Система уведомлений не гарантирует доставку или порядок сообщений и не должна использоваться в качестве полной очереди сообщений. «СЛУШАТЬ/УВЕДОМЛЕНИЕ» следует использовать только для легкого взаимодействия между процессами.

  • Размер сообщения ограничен размером строки (8192 байта в PostgreSQL 13).

Для более глубокого понимания этого механизма вы можете обратиться к этому документу.

Я просто собираюсь показать вам, как создать команду Artisan, несколько похожую на команду Queue:Work, которая будет выполнять свою работу лучше всего в сочетании с Supervisor.

«Протестировать» технологию

Чтобы помочь вам быстро начать работу, я собираюсь создать Docker-контейнер:

docker run -d -e POSTGRES_USER=test -e POSTGRES_PASSWORD=test \
-p 5433:5432 --name pgsql postgres

Обратите внимание, что я использую порт 5433, поскольку на исходном порту у меня работает «стационарная» СУБД.

Теперь запускаем три терминала, выполнив в каждом из них команду:

docker exec -it pgsql psql -U test

В первом и втором терминале выдаем команду:

LISTEN my_event;

Эта команда «подписывает» клиента на уведомления от сервера PostgreSQL через канал my_event. Если другой клиент выполнит команду NOTIFY my_event, все клиенты, выполнившие команду LISTEN my_event, получат уведомление.

В третьем терминале выдаем команду:

NOTIFY my_event, 'Hello, PostgreSQL!';

Затем в первом и втором терминале снова выдаем команду:

LISTEN my_event;

Здесь у меня возникло недоразумение. Я думал, что первый и второй терминалы должны получить уведомление автоматически, но по какой-то причине они требуют повторного вызова «LISTEN my_event».

Вы можете поиграть и убедиться, что получить уведомление без подписки невозможно, а также прочитать его дважды.

Работает!

Первые два терминала можно закрыть. Третье можно оставить для экспериментов.

Создание приложения Laravel

composer create-project laravel/laravel listen_notify
cd listen_notify
sudo chown -R $USER:www-data storage
sudo chown -R $USER:www-data bootstrap/cache
chmod -R 775 storage
chmod -R 775 bootstrap/cache

Настройки подключения к базе данных в .env

DB_CONNECTION=pgsql
DB_HOST=127.0.0.1
DB_PORT=5433
DB_DATABASE=test
DB_USERNAME=test
DB_PASSWORD=test

Удалите все существующие миграции. Нам это не понадобится.

ЧИТАТЬ   Закон об отходах дополнят главой о выявлении и ликвидации незаконных свалок

Теперь давайте создадим модели команды Craft, события и слушателя, который слушает это событие.

php atrisan make:command ListenNotifyCommand
php artisan make:event PostgresNotificationReceived
php artisan make:listener LogPostgresNotification --event=PostgresNotificationReceived

Давайте настроим EventServiceProvider и добавим следующее значение в массив $listen:

    protected $listen = [
        Registered::class => [
            SendEmailVerificationNotification::class,
        ],
        // Добавленное значение
        PostgresNotificationReceived::class => [
            LogPostgresNotification::class,
        ]
    ];

Каркас приложения готов.

Создание прослушивателя событий Laravel

Я не буду предлагать сложную логику. Я просто запишу полученные данные в лог-файл

Содержимое файла app/Listeners/LogPostgresNotification.php
<?php

namespace App\Listeners;

use App\Events\PostgresNotificationReceived;
use Illuminate\Support\Facades\Log;

class LogPostgresNotification
{

    public function handle(PostgresNotificationReceived $event): void
    {
        Log::info('Received Postgres notification: ', $event->notification);
    }
}

Создайте событие Laravel

Я тоже не буду мудрствовать, я просто передам полезную нагрузку через конструктор событию

Содержимое файла app/Events/PostgresNotificationReceived.php
<?php

namespace App\Events;

use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class PostgresNotificationReceived
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    public function __construct(public array $notification)
    {

    }

}

Напишите логику для команды Listen:Notify.

Давайте двигаться от простого к сложному. Для начала мы просто получим сообщение от PostgreSQL.

Для этого модифицируем метод handle, не забывая изменить поля $signature и $description.

Содержимое файла app/Console/Commands/ListenNotifyCommand.php.
<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use PDO;

class ListenNotifyCommand extends Command
{
    protected $signature="listen:notify";

    protected $description = 'Listen to PostgreSQL notify events';

    public function handle(): void
    {
        $pdo = DB::connection()->getPdo();

        // Listen to the 'my_channel' notifications
        $pdo->exec("LISTEN my_event");
        $this->info('Starting');
        // Forever loop
        while (true) {
            $notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);

            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
            }
        }
    }
}

Давайте проверим, работает ли наша команда. Запустите команду в терминале: PHP Artisan слушать: уведомить и в третьем терминале (мы его не закрывали) снова обслуживаем NOTIFY my_event, 'Привет, PostgreSQL!';

Большой! Работает!

Большой! Работает!

Начало. Приложение Laravel получило событие от PostgreSQL.

При изменении кода заказа не забудьте перезапустить процесс.

Добавлена ​​обработка сигнала

Немного о сигналах, гуру могут пропустить

Сигналы являются частью стандартов POSIX и используются для асинхронного уведомления процесса о событии в Unix и подобных операционных системах, например Linux. Приложения в этих системах могут обрабатывать входящие сигналы, например, останавливать процесс (SIGTERM, SIGINT), перезапускать (SIGHUP) и т. д.

Windows не поддерживает сигналы POSIX. Он использует собственные механизмы для управления процессами и потоками, включая функции Windows API для отправки и обработки управляющих сигналов, таких как Ctrl+C.

Однако некоторые среды Windows, такие как подсистема Windows для Linux (WSL), обеспечивают совместимость со стандартами POSIX и поддерживают сигналы POSIX.

В контексте PHP и командной строки нас интересуют следующие сигналы:

  • SIGINT (сигнал прерывания). Этот сигнал обычно отправляется, когда вы нажимаете Ctrl+C в терминале. Он сообщает процессу остановиться.

  • SIGTERM (окончание выполнения). Это стандартный сигнал остановки процесса в Unix. Программы могут перехватить этот сигнал и выполнить любую необходимую работу перед завершением работы. Если программа не перехватит SIGTERM, она немедленно завершится.

  • SIGKILL (немедленно завершить процесс). Этот сигнал невозможно перехватить или проигнорировать. Когда процесс получает сигнал SIGKILL, он немедленно завершает работу.

Мы добавляем в класс ListenNotifyCommand два поля ($hasPcntl, $running) логического типа и инициализируем их. Пишем метод — обработчик сигнала

    protected bool $hasPcntl = false;
    protected bool $running = true;

    private function handleSignal(int $signal): void
    {
        switch ($signal) {
            case SIGINT:
            case SIGTERM:
                $this->info( PHP_EOL . 'Received stop signal, shutting down...');
                $this->running = false;
                break;

            default:
        }
    }

Расширение, необходимое для обработки сигналов pcntl. Данное расширение недоступно для Windows, однако написать кроссплатформенное решение вполне возможно.

ЧИТАТЬ   Торговый Дом Энерго выбирает SIEM «СёрчИнформ» для анализа событий информационной безопасности

Дорабатываем метод handle

    public function handle(): int
    {
        // Проверка, что модуль pcntl подключён
        $this->hasPcntl = extension_loaded('pcntl');

        if ($this->hasPcntl) {
            // Если модуль pcntl подключён, назначаем обработчики сигналов
            pcntl_signal(SIGINT, [$this, 'handleSignal']);
            pcntl_signal(SIGTERM, [$this, 'handleSignal']);
        }

        $pdo = DB::connection()->getPdo();
        $pdo->exec("LISTEN my_event");
        $this->info('Start listening');

        while ($this->running) {
            $notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
            $this->info('iter');
            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
            }

            if ($this->hasPcntl) {
                // Если модуль pcntl подключён, вызываем обработчики сигналов
                pcntl_signal_dispatch();
            }
        }
        // Возвращаем 0, как код завершения
        return 0;
    }
Перейдите в приложение/Консоль/Команды/ListenNotifyCommand.php.
<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use PDO;

class ListenNotifyCommand extends Command
{
    protected $signature="listen:notify";

    protected $description = 'Listen to PostgreSQL notify events';

    protected bool $hasPcntl = false;
    protected bool $running = true;

    public function handle(): int
    {
        $this->hasPcntl = extension_loaded('pcntl');

        if ($this->hasPcntl) {
            pcntl_signal(SIGINT, [$this, 'handleSignal']);
            pcntl_signal(SIGTERM, [$this, 'handleSignal']);
        }

        $pdo = DB::connection()->getPdo();
        $pdo->exec("LISTEN my_event");
        $this->info('Start listening');

        while ($this->running) {
            $notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
            $this->info('iter');
            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
            }

            if ($this->hasPcntl) {
                pcntl_signal_dispatch();
            }
        }
        return 0;
    }

    private function handleSignal(int $signal): void
    {
        switch ($signal) {
            case SIGINT:
            case SIGTERM:
                $this->info( PHP_EOL . 'Received stop signal, shutting down...');
                $this->running = false;
                break;

            default:
        }
    }

}

Вы можете запустить команду, а также вызвать NOTIFY в соседней консоли, все должно работать. Если команда выполнена в Linux и подключен модуль pcntl, то при нажатии Ctrl+C высветится сообщение: Получен сигнал завершения работы, завершение работы… Это означает, что скрипт корректно обрабатывает сигналы и останавливается, а не вынужден остановиться.

Настройка Supervisor для мониторинга сценария

Руководитель — удобный инструмент для управления фоновыми процессами в Unix-подобных операционных системах. Он контролирует сценарий, автоматически перезапускает его в случае сбоя и дает вам возможность управлять его состоянием, например запуском, остановкой и перезапуском.

Supervisor также совместим с сигналами Unix, что позволяет настраивать поведение процесса на основе различных сигналов. Мы настроили сценарий для обработки сигналов SIGINT и SIGTERM для его правильного завершения, что соответствует Supervisor.

Когда супервизор отправляет сигнал SIGTERM процессу, он ожидает, что процесс завершится, и передает управление системе.

Если процесс успешно обрабатывает SIGTERM и успешно завершает работу, обычно возвращается код выхода 0.

Если процесс не вернул чек в течение разумного времени (по умолчанию 10 секунд), супервизор отправляет SIGKILL. Это время можно изменить в настройках, опция stopwaitsecs.

Пример файла конфигурации Supervisor

[program:postrgres_laravel]
process_name=%(program_name)s_%(process_num)02d
command=php /path/to/your/laravel/artisan listen:notify
autostart=true
autorestart=true
user=www-data
numprocs=1
redirect_stderr=true
stdout_logfile=/var/log/postrgres_laravel.log

Подробнее о супервизоре в документации Laravel.

Сериализация полезной нагрузки

Аргумент NOTIFY всегда является строкой. Те. если мы хотим передать что-то сложное, нам нужно сериализовать это. PostgreSQL умеет работать с JSON, воспользуемся этим умением.

Давайте создадим хранимую функцию, которая принимает json в качестве входных данных и отправляет его в канал my_event.

Файл миграции 2024_03_05_125805_create_send_notify_function.php
<?php

use Illuminate\Database\Migrations\Migration;

return new class extends Migration {
    public function up(): void
    {
        DB::unprepared('
            CREATE OR REPLACE FUNCTION send_notify(data json) RETURNS VOID AS $$
            BEGIN
                PERFORM pg_notify(\'my_event\', data::text);
            END;
            $$ LANGUAGE plpgsql;
        ');
    }

    public function down(): void
    {
        DB::unprepared('DROP FUNCTION IF EXISTS send_notify(json);');
    }
};

Выполните миграцию

php artisan migrate

У меня все прошло хорошо. Теперь нам нужно немного доработать ручку.

            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
                $payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
                $this->info('Decoded payload: ' . print_r($payload, true));
            }

Здесь я выбираю $payload и отправляю ее на терминал.

ЧИТАТЬ   Основные события на финансовых рынках с 10 по 14 июля

Давайте проверим, все ли у нас работает. Запустите команду php artisan Listen:notify. На этот раз в терминале psql мы представим следующую конструкцию:

select send_notify(json_build_object('key1', 'Hello, PostgreSQL!'));

Смотрим терминал, он работает. Перейдем к чему-то менее тривиальному:

select send_notify(json_build_object('key1', 'Hello, PostgreSQL!', 'key2', json_build_object('key2_inner', 2, 'key3_inner', 3)))
И это снова работает!

И это снова работает!

Главное не увлекаться и помнить ограничение в 8192 байта.

Положил все это вместе

Сейчас их осталось очень мало. В нашей команде отправьте событие, чтобы его могли прослушать все слушатели, подписавшиеся на него. Для этого добавьте одну строку:

            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
                $payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
                $this->info('Decoded payload: ' . print_r($payload, true));
                // Новая строка
                Event::dispatch(new PostgresNotificationReceived($payload));
            }
Полный список файла app/Console/Commands/ListenNotifyCommand.php
<?php

namespace App\Console\Commands;

use App\Events\PostgresNotificationReceived;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Event;
use PDO;

class ListenNotifyCommand extends Command
{
    protected $signature="listen:notify";

    protected $description = 'Listen to PostgreSQL notify events';

    protected bool $hasPcntl = false;
    protected bool $running = true;


    public function handle(): int
    {
        $this->hasPcntl = extension_loaded('pcntl');

        if ($this->hasPcntl) {
            pcntl_signal(SIGINT, [$this, 'handleSignal']);
            pcntl_signal(SIGTERM, [$this, 'handleSignal']);
        }

        $pdo = DB::connection()->getPdo();
        $pdo->exec("LISTEN my_event");
        $this->info('Start listening');

        while ($this->running) {
            $notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);

            if ($notification) {
                $this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
                $payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
                $this->info('Decoded payload: ' . print_r($payload, true));
                Event::dispatch(new PostgresNotificationReceived($payload));
            }

            if ($this->hasPcntl) {
                pcntl_signal_dispatch();
            }
        }

        return 0;

    }

    private function handleSignal(int $signal): void
    {
        switch ($signal) {
            case SIGINT:
            case SIGTERM:
                $this->info( PHP_EOL . 'Received stop signal, shutting down...');
                $this->running = false;
                break;

            default:
        }
    }

}

Давайте запустим команду: PHP Artisan слушать: уведомить и запустите команду в следующем терминале

select send_notify(json_build_object('key1', 'Hello, PostgreSQL!', 'key2', json_build_object('key2_inner', 2, 'key3_inner', 3)));

По нашей задумке слушатель событий записывает полезную нагрузку в журнал. Давайте посмотрим на газету.

Полезная нагрузка в журнале

Полезная нагрузка в журнале

Запускаю виртуальную машину и проверяю, как скрипт обрабатывает сигналы в Linux

Мы видим, что сигнал был обработан и приложение завершилось нормально.

Мы видим, что сигнал был обработан и приложение завершилось нормально.

Мы видим, что в ответ на Ctrl+C скрипт выдает сообщение: Сигнал стоп получен, стоп…

Что можно улучшить?

Скорее всего, обработку полученного сообщения следует сразу запускать, как задачу, в асинхронной очереди Laravel, чтобы обработка сообщения не замедляла «бесконечный» цикл, который может привести к потере сообщения или скрипта. крушение. через процесс супервизора.

В этой задаче принимаются и другие решения относительно распределения событий.

Заключение

Вы можете использовать функцию send_notify в других хранимых функциях или триггерах PostgreSQL, передавая значения переменных TG_TABLE_NAME и TG_OP триггеров, чтобы решить, что и как обрабатывать. В этой статье просто показаны основы получения событий из PostgreSQL. Как их применить на практике зависит только от фантазии разработчика.

Функциональное приложение можно найти на GitHub

Source

От admin