1.实现server 类名
<?php
declare (strict_types = 1);
namespace app\service;
use think\Exception;
use think\facade\Cache;
use think\facade\Db;
use think\facade\Log;
use think\App;
class AsyncService
{
protected $maxProcesses = 10; // 最大进程数
protected $processes = []; // 存储子进程PID
protected $maxTotalProcesses = 20; // 最大总进程数
protected $processTimeout = 300; // 进程超时时间(秒)
protected $queueKey = 'async_task_queue'; // 缓存队列键名
protected $isProcessing = false; // 是否正在处理队列
protected $app; // 应用实例
protected $concurrentTasks = []; // 并发任务组
public function __construct()
{
$this->app = app();
}
/**
* 多进程异步执行
* @param array $tasks 任务数组,每个任务格式为 [对象类名, 方法名, 参数数组]
* @return void
*/
public function multiProcess(array $tasks)
{
try {
if (!extension_loaded('pcntl')) {
throw new \RuntimeException('需要安装并启用 pcntl 扩展');
}
Log::info('开始处理任务组,任务数量: ' . count($tasks));
// 清理已结束的进程
$this->cleanup();
// 将任务组加入并发任务列表
$this->concurrentTasks = array_merge($this->concurrentTasks, $tasks);
// 如果没有正在处理队列,则开始处理
if (!$this->isProcessing) {
$this->processConcurrentTasks();
}
} catch (\Throwable $e) {
Log::write('multiProcess 执行错误: ' . $e->getMessage() . "\n" . $e->getTraceAsString());
throw $e;
}
}
/**
* 处理并发任务
*/
protected function processConcurrentTasks()
{
try {
$this->isProcessing = true;
Log::info('开始处理并发任务');
while (true) {
// 获取当前空闲进程数
$currentProcesses = $this->getProcessCount();
$availableProcesses = $this->maxTotalProcesses - $currentProcesses;
if ($availableProcesses <= 0) {
Log::info('没有空闲进程,等待进程释放');
sleep(1);
$this->cleanup();
continue;
}
// 检查是否还有待处理的任务
if (empty($this->concurrentTasks)) {
Log::info('所有并发任务已处理完成');
$this->isProcessing = false;
break;
}
Log::info("当前空闲进程数: {$availableProcesses}, 待处理任务数: " . count($this->concurrentTasks));
// 计算本次要处理的任务数
$processCount = min($availableProcesses, count($this->concurrentTasks), $this->maxProcesses);
Log::info("本次将处理 {$processCount} 个任务");
// 取出要处理的任务
$tasksToProcess = array_splice($this->concurrentTasks, 0, $processCount);
// 并发执行任务
foreach ($tasksToProcess as $task) {
Log::info('开始执行任务: ' . json_encode($task));
$this->forkProcess([
'class' => $task[0],
'method' => $task[1],
'params' => $task[2] ?? []
]);
}
// 等待一段时间再检查
sleep(1);
$this->cleanup();
}
} catch (\Throwable $e) {
Log::write('processConcurrentTasks 执行错误: ' . $e->getMessage() . "\n" . $e->getTraceAsString());
$this->isProcessing = false;
throw $e;
}
}
/**
* 创建子进程执行任务
* @param array $task
*/
protected function forkProcess(array $task)
{
try {
$pid = pcntl_fork();
if ($pid === -1) {
throw new \RuntimeException('无法 fork 进程');
}
if ($pid === 0) {
// 子进程
try {
// 设置进程标题
if (function_exists('cli_set_process_title')) {
cli_set_process_title('php async worker');
}
// 设置超时处理
pcntl_signal(SIGALRM, function() {
Log::write('进程执行超时');
exit(1);
});
pcntl_alarm($this->processTimeout);
// 执行任务
$class = $task['class'];
$method = $task['method'];
$params = $task['params'];
Log::info("子进程开始执行任务: {$class}::{$method}");
// 在子进程中重新初始化应用
$app = new App();
$app->initialize();
// 使用应用实例创建控制器
$instance = $app->make($class);
$result = call_user_func_array([$instance, $method], $params);
Log::info('任务执行结果: ' . substr($result, 0, 100));
// 正常退出
exit(0);
} catch (\Throwable $e) {
Log::write('子进程执行错误: ' . $e->getMessage() . "\n" . $e->getTraceAsString());
exit(1);
}
} else {
// 父进程
$this->processes[$pid] = [
'pid' => $pid,
'start_time' => time(),
'status' => 'running'
];
Log::info("启动子进程,PID: {$pid}");
}
} catch (\Throwable $e) {
Log::write('forkProcess 执行错误: ' . $e->getMessage() . "\n" . $e->getTraceAsString());
throw $e;
}
}
/**
* 清理已结束的进程
*/
public function cleanup()
{
try {
foreach ($this->processes as $pid => $info) {
$status = 0;
$result = pcntl_waitpid($pid, $status, WNOHANG);
if ($result === -1 || $result > 0) {
// 进程已结束
unset($this->processes[$pid]);
if (pcntl_wifexited($status)) {
$exitCode = pcntl_wexitstatus($status);
Log::info("子进程 {$pid} 已结束,退出码: {$exitCode}");
// 进程结束后,检查是否还有待处理的任务
if (!$this->isProcessing && !empty($this->concurrentTasks)) {
$this->processConcurrentTasks();
}
}
} else {
// 检查进程是否超时
if (time() - $info['start_time'] > $this->processTimeout) {
// 发送终止信号
posix_kill($pid, SIGTERM);
Log::warning("进程 {$pid} 执行超时,已终止");
unset($this->processes[$pid]);
}
}
}
} catch (\Throwable $e) {
Log::write('cleanup 执行错误: ' . $e->getMessage() . "\n" . $e->getTraceAsString());
throw $e;
}
}
/**
* 获取当前运行的进程数
* @return int
*/
public function getProcessCount(): int
{
return count($this->processes);
}
/**
* 获取待处理的任务数
* @return int
*/
public function getPendingTaskCount(): int
{
return count($this->concurrentTasks);
}
/**
* 设置最大进程数
* @param int $maxProcesses
*/
public function setMaxProcesses(int $maxProcesses)
{
$this->maxProcesses = $maxProcesses;
}
/**
* 设置最大总进程数
* @param int $maxTotalProcesses
*/
public function setMaxTotalProcesses(int $maxTotalProcesses)
{
$this->maxTotalProcesses = $maxTotalProcesses;
}
/**
* 设置进程超时时间
* @param int $timeout
*/
public function setProcessTimeout(int $timeout)
{
$this->processTimeout = $timeout;
}
}
2.调用方法
public function testProcess()
{
try {
$asyncService = new AsyncService();
// 设置最大进程数
$asyncService->setMaxProcesses(20);
$asyncService->setMaxTotalProcesses(30);
// 准备多个任务
$tasks = [];
for ($i = 0; $i < 20; $i++) {
$url = "http://127.0.0.1/index/testTime?type=(".$i.")";
$tasks[] = [
'app\controller\Index',
'curl',
[$url]
];
}
// 并发执行所有任务
$asyncService->multiProcess($tasks);
return '任务已提交';
} catch (\Throwable $e) {
Log::error('testProcess 执行错误: ' . $e->getMessage() . "\n" . $e->getTraceAsString());
throw $e;
}
return "先输出文字2";
}
3.注意需要打开 pcntl_signal pcntl_alarm pcntl_waitpid pcntl_wifexited 扩展
Comments NOTHING