在 Hyperf 开发项目过程中经常使用 Job 进行一些耗时任务处理,而 Job 的执行是在异步消费队列中,在异步消费进程启动时会对数据进行初始化存储到容器的服务类理,以供处理后续的 Job 任务。
当用户通过 HTTP 接口更新数据到数据库中时,此时异步消费进程中的内存数据并没有得到及时更新,这就会造成一些困扰。
如何解决这个问题呢?
- 在 Job 执行时,每次读取一次最新的配置信息。虽然不会有延迟,但是当 Job 过多时,读取数据库的频次会非常高,影响 Job 处理速度,以及网络资源消耗。
- 在 Job 执行时,判断上次更新的时间,如果超过预设时间则进行更新。缺点是依然有延迟。
希望能够通过一个事件的方式进行触发更新,当 HTTP 接口产生了数据变更,此时将内存数据进行更新。
但是 HTTP 接口所在的进程是多个 worker 进程,并不能直接对异步进程进行读写操作。通过系统自带的 $container 进行获取服务类,是获取当前 worker 进程内的,而不是异步消费进程的。
还有个思路,就是当异步消费进程配置为 1 个时,可以在配置更新时产生一个新 Job,在新 Job 里面对内存进行更新,因为 Job 执行所处的进程是异步消费进程。
但如果配置了多个消费进程,如 3 个,就无法通过这样的方式,因为 Job 无法确定会落在哪个进程里执行,只能让一个进程失效,无法实现更新所有进程。
配置文件在 async_queue.php 中配置消费进程 processes 进程个数。
解决方案
这里通过 swoole 自带的进程通信进行触发自定义消息事件,并创建触发器监听自定义消息事件,自定义触发器所在的进程也是异步消费进程,只需要触发器对自身进程进行更新就行了。
在调用触发端一次性获取所有进程,进行投递消息即可实现对所有异步消费进程进行更新。
创建一个自定义 Event
app/Event/BlackListUpdated.php
<?php
namespace App\Event;
class BlackListUpdated
{
public $message = '';
public function __construct(string $message)
{
$this->message = $message;
}
}
创建自定义事件监听器
app/Listener/BlackListUpdatedListener.php
<?php
namespace App\Listener;
use App\Event\BlackListUpdated;
use App\Services\BlackListService;
use Hyperf\Contract\ContainerInterface;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Process\ProcessCollector;
use Hyperf\Event\Annotation\Listener;
use Hyperf\Framework\Event\OnPipeMessage;
use Hyperf\Process\Event\PipeMessage as UserProcessPipeMessage;
/**
* @Listener
*
* Class BlackListUpdatedListener
* @package App\Listener
*/
class BlackListUpdatedListener implements ListenerInterface
{
protected $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
public function listen(): array
{
// 返回一個該監聽器要監聽的事件陣列,可以同時監聽多個事件
return [
OnPipeMessage::class,
UserProcessPipeMessage::class,
];
}
/**
* @param object $event
*/
public function process(object $event)
{
// PipeMessage
if ($event instanceof OnPipeMessage || $event instanceof UserProcessPipeMessage) {
if($event->data instanceof BlackListUpdated){
$this->container->get(BlackListService::class)->flushKeyword();
}
}
}
}
这里不要监听 BlackListUpdated 事件,因为并不能给所有进程投递 BlackListUpdated 事件。
在 process 方法执行中判断消息数据类型对容器数据进行更新。
消息事件投递端
在 HTTP 接口中对数据更新后进行投递事件。
<?php
//...
/**
* 加入关键词
*
* @param Request $request
* @return array
*/
public function addKeyword(Request $request): array
{
$keyword = $request->input('keyword');
$this->blackListService->addKeyword($keyword);
if (class_exists(ProcessCollector::class) && !ProcessCollector::isEmpty()) {
$processes = ProcessCollector::all();
if ($processes) {
$string = serialize(new BlackListUpdated('update keyword'. date('Y-m-d H:i:s')));
foreach ($processes as $process) {
$process->exportSocket()->send($string, 10);
}
}
}
return eeJson('add keyword into blacklist service');
}
//...
学些了学习了