thinkphp 异步执行 并发执行server方法,pcntl_signal() 多进程

发布于 9 天前  8 次阅读



1.实现server 类名

<?php
declare (strict_types = 1);

namespace app\service;

use think\Exception;
use think\facade\Cache;
use think\facade\Db;
use think\facade\Log;
use think\App;

class AsyncService
{
protected $maxProcesses = 10; // 最大进程数
protected $processes = []; // 存储子进程PID
protected $maxTotalProcesses = 20; // 最大总进程数
protected $processTimeout = 300; // 进程超时时间(秒)
protected $queueKey = 'async_task_queue'; // 缓存队列键名
protected $isProcessing = false; // 是否正在处理队列
protected $app; // 应用实例
protected $concurrentTasks = []; // 并发任务组

public function __construct()
{
    $this->app = app();
}

/**
 * 多进程异步执行
 * @param array $tasks 任务数组,每个任务格式为 [对象类名, 方法名, 参数数组]
 * @return void
 */
public function multiProcess(array $tasks)
{
    try {
        if (!extension_loaded('pcntl')) {
            throw new \RuntimeException('需要安装并启用 pcntl 扩展');
        }
        Log::info('开始处理任务组,任务数量: ' . count($tasks));
        // 清理已结束的进程
        $this->cleanup();

        // 将任务组加入并发任务列表
        $this->concurrentTasks = array_merge($this->concurrentTasks, $tasks);

        // 如果没有正在处理队列,则开始处理
        if (!$this->isProcessing) {
            $this->processConcurrentTasks();
        }
    } catch (\Throwable $e) {
        Log::write('multiProcess 执行错误: ' . $e->getMessage() . "\n" . $e->getTraceAsString());
        throw $e;
    }
}

/**
 * 处理并发任务
 */
protected function processConcurrentTasks()
{
    try {
        $this->isProcessing = true;
        Log::info('开始处理并发任务');

        while (true) {
            // 获取当前空闲进程数
            $currentProcesses = $this->getProcessCount();
            $availableProcesses = $this->maxTotalProcesses - $currentProcesses;

            if ($availableProcesses <= 0) {
                Log::info('没有空闲进程,等待进程释放');
                sleep(1);
                $this->cleanup();
                continue;
            }

            // 检查是否还有待处理的任务
            if (empty($this->concurrentTasks)) {
                Log::info('所有并发任务已处理完成');
                $this->isProcessing = false;
                break;
            }
            Log::info("当前空闲进程数: {$availableProcesses}, 待处理任务数: " . count($this->concurrentTasks));
            // 计算本次要处理的任务数
            $processCount = min($availableProcesses, count($this->concurrentTasks), $this->maxProcesses);
            Log::info("本次将处理 {$processCount} 个任务");
            // 取出要处理的任务
            $tasksToProcess = array_splice($this->concurrentTasks, 0, $processCount);
            // 并发执行任务
            foreach ($tasksToProcess as $task) {
                Log::info('开始执行任务: ' . json_encode($task));
                $this->forkProcess([
                    'class' => $task[0],
                    'method' => $task[1],
                    'params' => $task[2] ?? []
                ]);
            }
            // 等待一段时间再检查
            sleep(1);
            $this->cleanup();
        }
    } catch (\Throwable $e) {
        Log::write('processConcurrentTasks 执行错误: ' . $e->getMessage() . "\n" . $e->getTraceAsString());
        $this->isProcessing = false;
        throw $e;
    }
}

/**
 * 创建子进程执行任务
 * @param array $task
 */
protected function forkProcess(array $task)
{
    try {
        $pid = pcntl_fork();

        if ($pid === -1) {
            throw new \RuntimeException('无法 fork 进程');
        }

        if ($pid === 0) {
            // 子进程
            try {
                // 设置进程标题
                if (function_exists('cli_set_process_title')) {
                    cli_set_process_title('php async worker');
                }

                // 设置超时处理
                pcntl_signal(SIGALRM, function() {
                    Log::write('进程执行超时');
                    exit(1);
                });
                pcntl_alarm($this->processTimeout);

                // 执行任务
                $class = $task['class'];
                $method = $task['method'];
                $params = $task['params'];

                Log::info("子进程开始执行任务: {$class}::{$method}");

                // 在子进程中重新初始化应用
                $app = new App();
                $app->initialize();

                // 使用应用实例创建控制器
                $instance = $app->make($class);
                $result = call_user_func_array([$instance, $method], $params);

                Log::info('任务执行结果: ' . substr($result, 0, 100));

                // 正常退出
                exit(0);
            } catch (\Throwable $e) {
                Log::write('子进程执行错误: ' . $e->getMessage() . "\n" . $e->getTraceAsString());
                exit(1);
            }
        } else {
            // 父进程
            $this->processes[$pid] = [
                'pid' => $pid,
                'start_time' => time(),
                'status' => 'running'
            ];
            Log::info("启动子进程,PID: {$pid}");
        }
    } catch (\Throwable $e) {
        Log::write('forkProcess 执行错误: ' . $e->getMessage() . "\n" . $e->getTraceAsString());
        throw $e;
    }
}

/**
 * 清理已结束的进程
 */
public function cleanup()
{
    try {
        foreach ($this->processes as $pid => $info) {
            $status = 0;
            $result = pcntl_waitpid($pid, $status, WNOHANG);

            if ($result === -1 || $result > 0) {
                // 进程已结束
                unset($this->processes[$pid]);
                if (pcntl_wifexited($status)) {
                    $exitCode = pcntl_wexitstatus($status);
                    Log::info("子进程 {$pid} 已结束,退出码: {$exitCode}");

                    // 进程结束后,检查是否还有待处理的任务
                    if (!$this->isProcessing && !empty($this->concurrentTasks)) {
                        $this->processConcurrentTasks();
                    }
                }
            } else {
                // 检查进程是否超时
                if (time() - $info['start_time'] > $this->processTimeout) {
                    // 发送终止信号
                    posix_kill($pid, SIGTERM);
                    Log::warning("进程 {$pid} 执行超时,已终止");
                    unset($this->processes[$pid]);
                }
            }
        }
    } catch (\Throwable $e) {
        Log::write('cleanup 执行错误: ' . $e->getMessage() . "\n" . $e->getTraceAsString());
        throw $e;
    }
}

/**
 * 获取当前运行的进程数
 * @return int
 */
public function getProcessCount(): int
{
    return count($this->processes);
}

/**
 * 获取待处理的任务数
 * @return int
 */
public function getPendingTaskCount(): int
{
    return count($this->concurrentTasks);
}

/**
 * 设置最大进程数
 * @param int $maxProcesses
 */
public function setMaxProcesses(int $maxProcesses)
{
    $this->maxProcesses = $maxProcesses;
}

/**
 * 设置最大总进程数
 * @param int $maxTotalProcesses
 */
public function setMaxTotalProcesses(int $maxTotalProcesses)
{
    $this->maxTotalProcesses = $maxTotalProcesses;
}

/**
 * 设置进程超时时间
 * @param int $timeout
 */
public function setProcessTimeout(int $timeout)
{
    $this->processTimeout = $timeout;
}

}

2.调用方法

public function testProcess()
{

    try {
        $asyncService = new AsyncService();

        // 设置最大进程数
        $asyncService->setMaxProcesses(20);
        $asyncService->setMaxTotalProcesses(30);

        // 准备多个任务
        $tasks = [];
        for ($i = 0; $i < 20; $i++) {
            $url = "http://127.0.0.1/index/testTime?type=(".$i.")";
            $tasks[] = [
                'app\controller\Index',
                'curl',
                [$url]
            ];
        }

        // 并发执行所有任务
        $asyncService->multiProcess($tasks);

        return '任务已提交';
    } catch (\Throwable $e) {
        Log::error('testProcess 执行错误: ' . $e->getMessage() . "\n" . $e->getTraceAsString());
        throw $e;
    }

    return "先输出文字2";
}

3.注意需要打开 pcntl_signal pcntl_alarm pcntl_waitpid pcntl_wifexited 扩展

ℳ๓古依博學之誌°ꦿ⁵²º᭄
最后更新于 2025-04-10