玩转Fiber: include/require也能异步?

in 前后端开发 with 0 comment

走入Fiber:你不知道的PHP更新

纤程的宣传

Fiber于PHP8.1引入,由于网上相似的文章很多,所以我就不造轮子了,自己看吧:

链接资料:了解Fiber与协程用法
链接资料:分析Fiber的协程用法与MomentPHP的yield异步大法

如何实现

首先,我们得想办法把看起来同步的include底层变成异步
但是很可惜,include只能引入URL和本地文件,而且是全程阻塞的
那么何不将同步变成协程内同步呢?于是,一个变通的方案产生了

代码实现

我们使用的异步框架是MomentPHP,自带异步文件open方法和仿照GO语言的go函数
首先是让include执行我们的异步函数,这时候 注册一个协议 就很重要了

/**
 * 包装异步IO
 */
class AsyncIOWrapper{
    /**
     * @var resource 上下文选项
     */
    public $context;

    /**
     * @var \MomentCore\FilePipe 处理类
     */
    private $stream;

    public function rename(string $path_from, string $path_to): bool{
        return 0 == count(\Fiber::suspend(move($path_from,$path_to)));
    }

    public function stream_close(){
        return $this -> stream -> __destruct();
    }

    public function stream_eof(){
        return !$this -> stream -> _alive();
    }

    public function stream_flush(): bool{
        if(!$this -> stream -> _alive()) return false;
        $prom = new Promise($rs,$rj);
        $this -> stream -> on('empty',$rs);
        \Fiber::suspend($prom);
        return true;
    }

    public function stream_set_option(){
        return true;
    }

    public function stream_stat(){
        return $this -> stream -> stat();
    }

    public function stream_open(
        string $path,
        string $mode
    ): bool{
        try{
            if(!preg_match('/^fs:\/\/([^?*:|\<>\']+)$/',$path,$match))
                return false;
            $this -> stream = \MomentCore\open($match[1],$mode);
            return true;
        }catch(\Throwable){
            return false;
        }
    }

    public function stream_read(int $count): string|false{
        return \Fiber::suspend($this -> stream -> read($count));
    }

    public function stream_seek(int $offset, int $whence = SEEK_SET): bool{
        return $this -> stream -> seek($offset,$whence);
    }
    
    public function stream_tell(): int{
        return $this -> stream -> tell();
    }
    
    public function stream_write(string $data): int{
        $this -> stream -> write($data);
        $this -> stream_flush();
        return strlen($data);
    }

    public function __destruct(){
        if(!$this -> stream) return;
        $this -> stream -> __destruct();
    }
}
if (in_array('fs', stream_get_wrappers()))
    stream_wrapper_unregister('fs');
stream_wrapper_register('fs','AsyncIOWrapper');

这些看起来很儿戏,但是MomentPHP在背后做了很多工作才实现了异步
首先是协程化

function go(callable $main,array $args = [],?callable $onext = null){
    $fiber = new \Fiber($main);

    return Promise::await(function () use ($fiber,$args,$onext) {
        $return = $fiber -> start(...$args);
        try{
            $return = yield $return;
        }catch(\Throwable $e){
            $fiber -> throw($e);
        }
        // Promise实现时循环执行
        while (true){
            // 已退出
            if($fiber -> isTerminated())
                return $fiber -> getReturn();
            // 继续执行虚拟线程
            $return = yield $fiber -> resume($return);
        }
    },$onext);
}

然后是一堆异步的方法,如write() read()...
最后是使用这个异步方式

go(fn() => include('fs://a.php'));

怎么样,简单吗?

Responses