PHP:通过 Swoole 扩展实现 HTTP 协议服务器,上传超大文件

不知道有没有人考虑过通过 PHP 上传大文件,比如 1GB、10GB、100GB... ,又如果你运行机器的内存只有几百 M,总之低于目标大文件的大小。

如果这是在公司业务里,大多数会定一个上传上限,比如几 MB,或者否定这个需求。

在某些业务里确实也有用到过上传大型文件的需求,网上的解决方案基本上都是趋向于通过分片上传,或者将文件直接存储到第三方云存储,系统只保存一个链接。

举一个例子,比如 WEBDAV,他对于上传他是没有分片上传这一设计的,就算你实现了分片接收文件,总不能要求所有以 webdav 协议为标准的客户端都去实现分片上传。

但在此做一个结论,在复杂的互联网网络环境里,分片上传断点续传是比较可靠稳定的。那么以下的实现仅限于网络可靠的私有网络。

首先我们需要了解,PHP  默认配置中只支持 8M 的上传,要支持更大的文件,你需要将 php.ini 中 upload_max_filesize 调整到大于等于目标文件大小,才能上传这个文件,这个配置也不能灵活动态调整。

之后最重要,PHP 的请求处理机制,他会把所有 HTTP 请求头、内容完整读取到内存才会分发请求,执行脚本文件。

试想一下,如果访客上传一个 10G 文件或者更大,内存岂不是会被一次性被吞掉。这是非常不合理的,而且这段时间速度会非常慢,处于假死状态。

这点在 Golang 基础库里做的不错,它几乎可以实现接近无消耗内存上传大文件,全 CPU 操作。

我们今天采用 Swoole 实现大文件上传,swoole 也有提供创建 HTTP 服务的方法,但依然有上传大小的限制。

修改最大数据包大小参数,package_max_length,不过这个是支持动态调整的,但依然是会将数据吞到内存里。

Swoole 底层是全内存的,因此如果设置过大可能会导致大量并发请求将服务器资源耗尽。

接下来,我们通过 Swoole 创建 TCP 服务,手动完成 HTTP 协议交互,实现大文件上传。

第一步了解下,简单实现一个 TCP 服务。

<?php
$server = new Swoole\Server('0.0.0.0', 5555);

$server->on('start', function ($server) {
    echo "TCP Server is started at tcp://127.0.0.1:9503\n";
});

$server->on('connect', function ($server, $fd){
    echo "connection open: {$fd}\n";
});

$server->on('receive', function ($server, $fd, $reactor_id, $data) {
    $server->send($fd, "Swoole: {$data}");
});

$server->on('close', function ($server, $fd) {
    echo "connection close: {$fd}\n";
});

$server->start();

将以上代码保存为 myhttp.php ,并尝试运行

php myhttp.php

此时只能还不能作为 HTTP 服务使用,只能通过 telnet 尝试连接,测试连接和消息收发。

接下来,我们通过简单改造,在 onReceive 事件时,给连接返回一段 HTTP  基础相应信息。

<?php
$server = new Swoole\Server('0.0.0.0', 5555);

$server->on('start', function ($server) {
    echo "TCP Server is started at tcp://127.0.0.1:9503\n";
});

$server->on('connect', function ($server, $fd){
    echo "connection open: {$fd}\n";
});

$server->on('receive', function ($server, $fd, $reactor_id, $data) {
    echo sprintf("<< [收到数据请求:]\n%s\n", $data);
    $raw = <<<EOF
HTTP/1.1 200 OK
Content-Type: text/html
Content-Length: 11
Connection: close
Server: eller-server

hello world
EOF;
    echo sprintf(">> [回应数请求:]\n%s\n", $raw);
    $server->send($fd, $raw);
});

$server->on('close', function ($server, $fd) {
    echo "connection close: {$fd}\n";
});

$server->start();

此时,你再通过浏览器访问,浏览器也能展示出来数据了。

相应的,你通过终端也能接收到浏览器的请求信息和你回应的信息了。

[root@localhost phpalbum]# php myhttp.php 
TCP Server is started at tcp://127.0.0.1:9503
connection open: 1
<< [收到数据请求:]
GET / HTTP/1.1
Host: 192.168.32.137:5555
Connection: keep-alive
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,ja;q=0.8


>> [回应数请求:]
HTTP/1.1 200 OK
Content-Type: text/html
Content-Length: 11
Connection: close
Server: eller-server

hello world

接下来,只需要完善 HTTP 请求头的解析操作和相应操作就能实现一个基础的 HTTP 服务。

解析请求头可以使用 Swoole  提供的方法:

$req = Request::create(['parse_cookie' => false]);
$req->parse($data);

之后,就可以按照 Swoole 的文档使用这对象,https://wiki.swoole.com/#/http_server?id=httprequest

我们也可以自己写一个简单解析方法,目的是从数据流中分离请求头,并解析出 Header 数据。

我们可以通过抓包或者将接收到的数据保存到文件,进行对比查看。

这里随便拿一个我测试的 HTTP 请求举例:

POST /api?ss=33 HTTP/1.1
Content-Type: application/json
User-Agent: PostmanRuntime/7.26.8
Accept: */*
Cache-Control: no-cache
Postman-Token: 277cd4f6-8841-4bd7-b484-6569eb447a27
Host: 192.168.32.137:5555
Accept-Encoding: gzip, deflate, br
Connection: keep-alive
Content-Length: 14

{"name":"jax"}

具体如下:

  • 首行请求行,是标注请求方法、请求 URI、请求协议,以及它是通过 \r\n 分割的,十六进制写作 "0d0a"
  • 接下来是请求头,存储着客户端的基本信息和附加的请求头数据,也就是我们熟知的 Header 数据,它由多行构成,也是由 \r\n 换行的。
  • 再后面会空一行,继续换行,这里实际上是两个换行 \r\n\r\n,十六进制写作 "0d0a0d0a"
  • 最后就是 payload(body) 数据了,从此刻起,获取数据的多少有 header 头的 Content-Length 长度决定,一直获取直至结束。注意:这里大请求可能会进行分包,需要多次获取。

以下是我的解析方法:

    protected function preread_http_header(&$data)
    {//ff d8 ff e0
//    $pos = strpos($data,"\r\n");
//    var_dump(substr($data,0, $pos));

        $pos = strpos($data, hexToStr("0d0a0d0a"));
        $header = substr($data, 0, $pos);
//    file_put_contents('./test.txt', $header.PHP_EOL, FILE_APPEND);
        if ($headers = explode("\r\n", $header)) {
            $key_map = ['Content-Length' => 0];
            foreach ($headers as $item) {
                $buffer = explode(":", $item);
                if (count($buffer) >= 2) {
                    $key_map[trim($buffer[0])] = trim($buffer[1]);
                }
                unset($buffer);
            }
            unset($headers);
            unset($header);
            unset($pos);
            return $key_map;
        } else {
            return false;
        }
    }

    protected  function preread_http_request(&$data){
        $pos = strpos($data, hex2bin("0d0a"));
        $first_line = substr($data, 0, $pos);
        list($method, $path, $protocol_version) = explode(' ', $first_line);
        $request = [
            'method' => trim(strtoupper($method)),
            'request_uri' => trim($path),
            'protocol_version' => trim($protocol_version),
        ];
        $request['headers'] = $this->preread_http_header($data);
        return $request;
    }

解析后的结果:

array(4) {
  ["method"]=>
  string(3) "GET"
  ["request_uri"]=>
  string(12) "/favicon.ico"
  ["protocol_version"]=>
  string(8) "HTTP/1.1"
  ["headers"]=>
  array(9) {
    ["Host"]=>
    string(14) "192.168.32.137"
    ["Connection"]=>
    string(10) "keep-alive"
    ["Pragma"]=>
    string(8) "no-cache"
    ["Cache-Control"]=>
    string(8) "no-cache"
    ["User-Agent"]=>
    string(114) "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
    ["Accept"]=>
    string(64) "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8"
    ["Referer"]=>
    string(4) "http"
    ["Accept-Encoding"]=>
    string(13) "gzip, deflate"
    ["Accept-Language"]=>
    string(23) "zh-CN,zh;q=0.9,ja;q=0.8"
  }
}

 此时我们还需要再写一个解析 payload 数据的方法,也是一样,通过分割符定位 header 之后的内容,就认为是 body,将具体数据进行解析:

    protected function parse_http_payload(array $request, string &$data)
    {
        $payload = [];
        $form = [];
        $raw = '';
        $pos = strpos($data, hex2bin("0d0a0d0a"));
        if ($pos != -1) {
            $body = substr($data, $pos + strlen(hex2bin("0d0a0d0a")));
        } else {
            $body = $data;
        }

        if (isset($request['headers']['Content-Type'])) {
            if ($request['headers']['Content-Type'] == 'application/x-www-form-urlencoded') {
                $arr_data = explode('&', $body);
                foreach ($arr_data as $item_data) {
                    $value_ret = explode('=', $item_data);
                    $form[$value_ret[0]] = $value_ret[1] ?? '';
                }

            } else if (preg_match('#multipart/form-data; boundary=([^\b]+)\b#is', $request['headers']['Content-Type'], $matches)) {
                $boundary_str = $matches[1];
                $arr_data = explode($boundary_str, $body);
                foreach ($arr_data as $item_data) {
                    if (preg_match('#Content-Disposition: form-data; name="([^"]+)"\r\n\r\n(.*)\r\n#is', $item_data, $value_ret)) {
                        $form[$value_ret[1]] = $value_ret[2];
                    }
                    //Content-Disposition: form-data; name="file"; filename="222.png"
                    //Content-Type: image/png
                    elseif (preg_match('#Content-Disposition: form-data; name="file"; filename="([^"]+)"\r\nContent-Type: ([A-Za-z0-9/]+)\r\n\r\n(.*)\r\n#is', $item_data, $value_ret)) {
                        $form[$value_ret[1]] = ['type' => $value_ret[2], 'stream' => $value_ret[3]];
                        file_put_contents('aa3.png', $value_ret[3]);
                    }
                }
            } else if (in_array($request['headers']['Content-Type'], [
                'text/plain',
                'application/json',
                'application/javascript',
                'text/html',
                'application/xml',
            ])) {
                $raw = $body;
            } else {
                // 其他都认为是资源文件
                $raw = $body;
            }
        }

        $payload = [
            'from' => $form,
            'raw'  => $raw
        ];

        unset($body, $arr_data, $item_data, $boundary_str, $value_ret, $form, $raw);
        return $payload;
    }

以上就是对请求体的简单解析,这时候还不能够实现大文件上传,依然会将文件吞到内存里。

我们通过对  onReceive 方法进行改造,实现对数据包的拆分,读取完整的 HTTP 请求后再进行触发执行。

区别在于,我们设立一个 Content-Length 上限值,如果大于则将后续的数据存储到文件。

由于多文件会进行分包,多次进行接收数据,onReceive 会被执行多次,我们需要要接收的数据和已经接收的数据做记录。

比如第一次接收到数据包将 Content-Length 存起来,直到累计接收到大于等于这个数据包时则认为数据完整。

这里需要注意,如果你使用的是 SWOOLE_BASE 模式,实际 onReceive 会在多个进程里,你需要使用 Swoole 提供的内存 Table 来实现数据共享存储。

在这里我们使用 SWOOLE_PROCESS 模式创建服务,这种模式也是默认的,也就是 onReceive 对于一个连接来说是固定的进程,我们不需要跨进程通信。

我们只需要将数据存储到当前类成员就行了:

protected $fd_data = [];

public function onReceive($server, $fd, $reactor_id, $data)
{
	if(!isset($this->fd_data[$fd])){
		$this->fd_data[$fd] = [
			'is_completed'   => 0, // 是否完成
			'is_parse_head'  => 0, // 是否解析 head
			'require_length' => 0, // 请求需要长度
			'recv_length'    => 0, // 已经接收长度
		];
	}
 }

首次执行 onReceive 时,要进行解析 header 操作。

这里有两种情况,一种是,首次解析 header 时就已经读取完 Content-Length 的所有数据了,那就直接触发请求 Request

还有一种,经过不断接收数据包,累计接收达到 Content-Length 的数据,此时再触发请求 Request

这是完成的代码:

public function onReceive($server, $fd, $reactor_id, $data)
{
	if(!isset($this->fd_data[$fd])){
		$this->fd_data[$fd] = [
			'is_completed'   => 0, // 是否完成
			'is_parse_head'  => 0, // 是否解析 head
			'require_length' => 0, // 请求需要长度
			'recv_length'    => 0, // 已经接收长度
		];
	}

	if (!$this->fd_data[$fd]['is_parse_head']) {

		$this->debugText("<<<<<<<<<<尝试解析 header:$fd\n");
		$request = $this->preread_http_request($data);

		$body_pos = strpos($data, hex2bin("0d0a0d0a"));
		$content_length = intval($request['headers']['Content-Length']);
		$recv_length = strlen($data) - $body_pos + strlen(hex2bin("0d0a0d0a"));
		$this->fd_data[$fd]= [
			'is_parse_head'  => 1,
			'require_length' => $request['headers']['Content-Length'] ?? 0,
			'recv_length'    => $recv_length
		];

		if ($recv_length >= $content_length) {
			// 接收完毕
			$this->debugText(sprintf("<< [一次接收完毕,触发请求,$recv_length, $content_length]\n"));
			$this->onRequest($server, $fd, $request, $data);
		} else {
			$this->debugText(sprintf("<< [未接收完毕]\n"));
			// 未接收完毕,需要多次接收
			return false;
		}
	} else {
		$this->debugText(sprintf("<< [分段接收]\n"));
		// 这里接收数据
		$this->fd_data[$fd]['recv_length'] += strlen($data);

		if ( $this->fd_data[$fd]['recv_length'] >=  $this->fd_data[$fd]['require_length']) {
			$this->debugText(sprintf("<< [接收完毕,触发请求] %s == %s\n", $this->fd_data[$fd]['recv_length'], $this->fd_data[$fd]['require_length']));
			$this->onRequest($server, $fd, [], '');
		}else{
			return false;
		}

	}

	// 请求完释放内存数据
	$this->debugText(sprintf("释放数据拉 => %s\n", $fd));
	unset($this->fd_data[$fd]);
}

仔细观察这里会发现,如果当最后一次回调到 onReceive 时,由于是多次处理的回调,他已经拿不到首次解析的 Header 请求数据了。

我们需要将数据存储起来等到最后一次回调后拿到数据去触发 request 请求,这个数据可以存储在共享内存、或者类成员也可以其他文件等等。

我不打算将数据存储在其他地方,打算通过协程挂起暂停恢复的形式,将首次的回调暂停下来,等到数据接收完毕后再恢复协程,这时候首次的协程继续执行,

Request Header 解析的数据就在当前作用域空间,通过这样实现请求的分发。

// 获取当前协程 ID
$cid = Swoole\Coroutine::getuid();

// 暂停协程执行
Swoole\Coroutine::yield();

// 恢复协程执行
Swoole\Coroutine::resume($cid);

除此之外,我们还需要将完整数据包存储到文件,供 onRequest 时调用:

// 首次写入
file_put_contents('./tmp/'.$fd, substr($data, $body_pos + strlen(hex2bin("0d0a0d0a"))));
// 之后每次追加
file_put_contents('./tmp/'.$fd, $data,FILE_APPEND);

经过对比,我们发现上传后的文件 MD5 一致。

为了加快性能,我们开启一键协程化,他会自动将所有耗时函数改成协程调用。

将以下代码放入 on Start 里(官方说 enableCoroutine 应放到创建进程后,Co::set() 放在创建进程前):

Swoole\Runtime::enableCoroutine($flags = SWOOLE_HOOK_ALL);

实际运行来看,enableCoroutine 要比 Co::set() 设置的协程化性能要高,这点原因不清楚。

为了再加快速度,我们将调试代码统一到一个方法里,统一关闭输出,因为这在终端打印非常耗时,所以仅调试时开启输出。

还有一点就是最终在 onRequest 请求结束后,记得将缓存文件( tmp/* )在合理的机制下删除,否则会导致 onRequest 里的程序没有完整读完。

这是最终的 onReceive 代码:

public function onReceive($server, $fd, $reactor_id, $data)
{
	if(!isset($this->fd_data[$fd])){
		$this->fd_data[$fd] = [
			'is_completed'   => 0, // 是否完成
			'is_parse_head'  => 0, // 是否解析 head
			'require_length' => 0, // 请求需要长度
			'recv_length'    => 0, // 已经接收长度
		];
	}

	if (!$this->fd_data[$fd]['is_parse_head']) {

		$this->debugText("<<<<<<<<<<尝试解析 header:$fd\n");
		$request = $this->preread_http_request($data);

		$body_pos = strpos($data, hex2bin("0d0a0d0a"));
		$content_length = intval($request['headers']['Content-Length']);
		$recv_length = strlen($data) - $body_pos + strlen(hex2bin("0d0a0d0a"));
		$this->fd_data[$fd]= [
			'is_parse_head'  => 1,
			'require_length' => $request['headers']['Content-Length'] ?? 0,
			'recv_length'    => $recv_length
		];
		file_put_contents('./tmp/'.$fd, substr($data, $body_pos + strlen(hex2bin("0d0a0d0a"))));

		if ($recv_length >= $content_length) {
			// 接收完毕
			$this->debugText(sprintf("<< [一次接收完毕,触发请求,$recv_length, $content_length]\n"));
			$this->onRequest($server, $fd, $request, $data);
		} else {
			$this->debugText(sprintf("<< [未接收完毕]\n"));
			// 未接收完毕,需要多次接收
			$cid = Swoole\Coroutine::getuid();
			$this->debugText(sprintf("<< [让出协程] code => %s\n",$cid));
			$this->fd_data[$fd]['cid'] = $cid;
			Swoole\Coroutine::yield();
			$this->debugText(sprintf("<< [恢复协程,准备触发 Request]\n"));
			$this->onRequest($server, $fd, $request, $data);
			return false;
		}
	} else {
		$this->debugText(sprintf("<< [分段接收]\n"));
		// 这里接收数据
		file_put_contents('./tmp/'.$fd, $data,FILE_APPEND);
		$this->fd_data[$fd]['recv_length'] += strlen($data);

		if ( $this->fd_data[$fd]['recv_length'] >=  $this->fd_data[$fd]['require_length']) {
			$this->debugText(sprintf("<< [接收完毕,触发请求] %s == %s\n", $this->fd_data[$fd]['recv_length'], $this->fd_data[$fd]['require_length']));
			$cid = $this->fd_data[$fd]['cid'];
			Swoole\Coroutine::resume($cid);
			$this->debugText(sprintf("<< [准备恢复协程,%s]\n",$cid));
		}else{
			return false;
		}

	}

	// 请求完释放内存数据
	$this->debugText(sprintf("释放数据拉 => %s\n", $fd));
	unset($this->fd_data[$fd]);
}

目前测试是通过 RAW 形式上传文件的,没有去解析 formdata 的数据,如果你需要,可以试着从 formdata 解析数据。

POSTMAN 截图测试如下:

HTTP 还有很多细节可以参照文档继续完善,比如其中的 keep alive 机制,目前我们都是短链接,一个请求打开一个链接关闭一个链接。

如果你尝试维护 keep alive 也可以实现更高性能的链接复用,减少握手次数,降低资源消耗。

最后,以上完整代码会放置在 Github 欢迎阅览:https://github.com/ellermister/myhttp-swoole-php

Comments

  • avatar
    david

    大佬,你的群怎么解散了

    2021-11-05 07:07