PHP5.5引入了迭代生成器的概念,迭代的概念早就在PHP有了,但是迭代生成器是PHP的一個新特性,這跟python3中的迭代生成器類似,看看PHP5.5的迭代生成器如何定義。
<?php
function xrange($start, $end, $step = 1) {
for ($i = $start; $i <= $end; $i += $step) {
yield $i;
}
}
foreach (xrange(1, 1000000) as $num) {
echo $num, "\n";
}
注意關鍵字:yield,正是這個yeild關鍵字構建了一個迭代器,這個函數xrange跟以往的函數的不同之處就在這裡。一般情況都是return一個值,而yield一個值就表示這是個迭代器,沒循環一次這個迭代器就生成這個值,故名為迭代生成器,迭代生成器這個函數可以進行foreach循環,每次都產生一個值。
PHP5.5之前是通過定義類實現Iterator接口的方式來構造迭代器,通過yield構造迭代器將更加提升性能節省系統開銷。
這種方法的優點是顯而易見的.它可以讓你在處理大數據集合的時候不用一次性的加載到內存中,甚至你可以處理無限大的數據流。
如上面例子所示,這個迭代器的功能是生成從1到1000000的數字,循環輸出,那麼使用以往的方式是生成好這1到1000000的數字到數組中,將會十分占用內存,因為是事先就要生成好所有結果,而不是用的時候按需生成,也就是說調用xrange這個迭代器的時候,裡面的函數還沒有真正的運行,直到你每一次的迭代。
再看看PHP官網的例子:
<?php
function xrange($start, $limit, $step = 1) {
for ($i = $start; $i <= $limit; $i += $step) {
yield $i;
}
}
echo 'Single digit odd numbers: ';
/*
* Note that an array is never created or returned,
* which saves memory.
*/
foreach (xrange(1, 9, 2) as $number) {
echo "$number ";
}
echo "\n";
?>
這裡的xrange是一個迭代,功能和range是一樣的,如果使用range函數的話,那麼函數內部實現會儲存每個迭代的中間過程,即每個中間變量都有 個內存空間,那麼首先程序使用的內存空間就大了,而且分配內存,回收內存都會導致程序的運行時間加長。但是如果使用上yield實現的xrange函數的 話,裡面所有的中間變量都只使用一個內存$i,這樣節省的時間和空間都會變小。
那麼為什麼yield會有這樣的效果呢?聯想到lua中的yield,這裡就算是協程的概念了。在lua語言中,當程序運行到yield的時候,使用協程 將上下文環境記錄住,然後將程序操作權歸還到主函數,當主函數調用resume的時候,會重新喚起協程,讀取yield記錄的上下文。這樣形成了程序語言 級別的多協程操作。php 5.5這裡的yield也是同樣的道理,當程序運行到yield的時候,當前程序就喚起協程記錄上下文,然後主函數繼續操作,只是php中沒有使用如 resume一樣的關鍵字,而是“在使用的時候喚起”協程。比如上例中的foreach迭代器就能喚起yield。所以上面的這個例子就能理解了。
迭代生成器
(迭代)生成器也是一個函數,不同的是這個函數的返回值是依次返回,而不是只返回一個單獨的值.或者,換句話說,生成器使你能更方便的實現了迭代器接口.下面通過實現一個xrange函數來簡單說明:
<?php
function xrange($start, $end, $step = 1) {
for ($i = $start; $i <= $end; $i += $step) {
yield $i;
}
}
foreach (xrange(1, 1000000) as $num) {
echo $num, "\n";
}
上面這個xrange()函數提供了和PHP的內建函數range()一樣的功能.但是不同的是range()函數返回的是一個包含值從1到100萬0的數組(注:請查看手冊). 而xrange()函數返回的是依次輸出這些值的一個迭代器, 而不會真正以數組形式返回.
這種方法的優點是顯而易見的.它可以讓你在處理大數據集合的時候不用一次性的加載到內存中.甚至你可以處理無限大的數據流.
當然,也可以不同通過生成器來實現這個功能,而是可以通過繼承Iterator接口實現.但通過使用生成器實現起來會更方便,不用再去實現iterator接口中的5個方法了.
生成器為可中斷的函數
要從生成器認識協程, 理解它內部是如何工作是非常重要的: 生成器是一種可中斷的函數, 在它裡面的yield構成了中斷點.
還是看上面的例子, 調用xrange(1,1000000)的時候, xrange()函數裡代碼其實並沒有真正地運行. 它只是返回了一個迭代器:
<?php
$range = xrange(1, 1000000);
var_dump($range); // object(Generator)#1
var_dump($range instanceof Iterator); // bool(true)
?>
這也解釋了為什麼xrange叫做迭代生成器, 因為它返回一個迭代器, 而這個迭代器實現了Iterator接口.
調用迭代器的方法一次, 其中的代碼運行一次.例如, 如果你調用$range->rewind(), 那麼xrange()裡的代碼就會運行到控制流第一次出現yield的地方. 而函數內傳遞給yield語句的返回值可以通過$range->current()獲取.
為了繼續執行生成器中yield後的代碼, 你就需要調用$range->next()方法. 這將再次啟動生成器, 直到下一次yield語句出現. 因此,連續調用next()和current()方法, 你就能從生成器裡獲得所有的值, 直到再沒有yield語句出現.
對xrange()來說, 這種情形出現在$i超過$end時. 在這中情況下, 控制流將到達函數的終點,因此將不執行任何代碼.一旦這種情況發生,vaild()方法將返回假, 這時迭代結束.
協程
協程的支持是在迭代生成器的基礎上, 增加了可以回送數據給生成器的功能(調用者發送數據給被調用的生成器函數). 這就把生成器到調用者的單向通信轉變為兩者之間的雙向通信.
傳遞數據的功能是通過迭代器的send()方法實現的. 下面的logger()協程是這種通信如何運行的例子:
<?php
function logger($fileName) {
$fileHandle = fopen($fileName, 'a');
while (true) {
fwrite($fileHandle, yield . "\n");
}
}
$logger = logger(__DIR__ . '/log');
$logger->send('Foo');
$logger->send('Bar')
?>
正如你能看到,這兒yield沒有作為一個語句來使用, 而是用作一個表達式, 即它能被演化成一個值. 這個值就是調用者傳遞給send()方法的值. 在這個例子裡, yield表達式將首先被”Foo”替代寫入Log, 然後被”Bar”替代寫入Log.
上面的例子裡演示了yield作為接受者, 接下來我們看如何同時進行接收和發送的例子:
<?php
function gen() {
$ret = (yield 'yield1');
var_dump($ret);
$ret = (yield 'yield2');
var_dump($ret);
}
$gen = gen();
var_dump($gen->current()); // string(6) "yield1"
var_dump($gen->send('ret1')); // string(4) "ret1" (the first var_dump in gen)
// string(6) "yield2" (the var_dump of the ->send() return value)
var_dump($gen->send('ret2')); // string(4) "ret2" (again from within gen)
// NULL (the return value of ->send())
?>
要很快的理解輸出的精確順序可能稍微有點困難, 但你確定要搞清楚為什按照這種方式輸出. 以便後續繼續閱讀.
另外, 我要特別指出的有兩點:
第一點,yield表達式兩邊的括號在PHP7以前不是可選的, 也就是說在PHP5.5和PHP5.6中圓括號是必須的.
第二點,你可能已經注意到調用current()之前沒有調用rewind().這是因為生成迭代對象的時候已經隱含地執行了rewind操作.
多任務協作
如果閱讀了上面的logger()例子, 你也許會疑惑“為了雙向通信我為什麼要使用協程呢?我完全可以使用其他非協程方法實現同樣的功能啊?”, 是的, 你是對的, 但上面的例子只是為了演示了基本用法, 這個例子其實並沒有真正的展示出使用協程的優點.
正如上面介紹裡提到的,協程是非常強大的概念,不過卻應用的很稀少而且常常十分復雜.要給出一些簡單而真實的例子很難.
在這篇文章裡,我決定去做的是使用協程實現多任務協作.我們要解決的問題是你想並發地運行多任務(或者“程序”).不過我們都知道CPU在一個時刻只能運行一個任務(不考慮多核的情況).因此處理器需要在不同的任務之間進行切換,而且總是讓每個任務運行 “一小會兒”.
多任務協作這個術語中的“協作”很好的說明了如何進行這種切換的:它要求當前正在運行的任務自動把控制傳回給調度器,這樣就可以運行其他任務了. 這與“搶占”多任務相反, 搶占多任務是這樣的:調度器可以中斷運行了一段時間的任務, 不管它喜歡還是不喜歡. 協作多任務在Windows的早期版本(windows95)和Mac OS中有使用, 不過它們後來都切換到使用搶先多任務了. 理由相當明確:如果你依靠程序自動交出控制的話, 那麼一些惡意的程序將很容易占用整個CPU, 不與其他任務共享.
現在你應當明白協程和任務調度之間的關系:yield指令提供了任務中斷自身的一種方法, 然後把控制交回給任務調度器. 因此協程可以運行多個其他任務. 更進一步來說, yield還可以用來在任務和調度器之間進行通信.
為了實現我們的多任務調度, 首先實現“任務” — 一個用輕量級的包裝的協程函數:
<?php
class Task {
protected $taskId;
protected $coroutine;
protected $sendValue = null;
protected $beforeFirstYield = true;
public function __construct($taskId, Generator $coroutine) {
$this->taskId = $taskId;
$this->coroutine = $coroutine;
}
public function getTaskId() {
return $this->taskId;
}
public function setSendValue($sendValue) {
$this->sendValue = $sendValue;
}
public function run() {
if ($this->beforeFirstYield) {
$this->beforeFirstYield = false;
return $this->coroutine->current();
} else {
$retval = $this->coroutine->send($this->sendValue);
$this->sendValue = null;
return $retval;
}
}
public function isFinished() {
return !$this->coroutine->valid();
}
}
如代碼, 一個任務就是用任務ID標記的一個協程(函數). 使用setSendValue()方法, 你可以指定哪些值將被發送到下次的恢復(在之後你會了解到我們需要這個), run()函數確實沒有做什麼, 除了調用send()方法的協同程序, 要理解為什麼添加了一個 beforeFirstYieldflag變量, 需要考慮下面的代碼片段:
<?php
function gen() {
yield 'foo';
yield 'bar';
}
$gen = gen();
var_dump($gen->send('something'));
// 如之前提到的在send之前, 當$gen迭代器被創建的時候一個renwind()方法已經被隱式調用
// 所以實際上發生的應該類似:
//$gen->rewind();
//var_dump($gen->send('something'));
//這樣renwind的執行將會導致第一個yield被執行, 並且忽略了他的返回值.
//真正當我們調用yield的時候, 我們得到的是第二個yield的值! 導致第一個yield的值被忽略.
//string(3) "bar"
通過添加 beforeFirstYieldcondition 我們可以確定第一個yield的值能被正確返回.
調度器現在不得不比多任務循環要做稍微多點了, 然後才運行多任務:
<?php
class Scheduler {
protected $maxTaskId = 0;
protected $taskMap = []; // taskId => task
protected $taskQueue;
public function __construct() {
$this->taskQueue = new SplQueue();
}
public function newTask(Generator $coroutine) {
$tid = ++$this->maxTaskId;
$task = new Task($tid, $coroutine);
$this->taskMap[$tid] = $task;
$this->schedule($task);
return $tid;
}
public function schedule(Task $task) {
$this->taskQueue->enqueue($task);
}
public function run() {
while (!$this->taskQueue->isEmpty()) {
$task = $this->taskQueue->dequeue();
$task->run();
if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
}
?>
newTask()方法(使用下一個空閒的任務id)創建一個新任務,然後把這個任務放入任務map數組裡. 接著它通過把任務放入任務隊列裡來實現對任務的調度. 接著run()方法掃描任務隊列, 運行任務.如果一個任務結束了, 那麼它將從隊列裡刪除, 否則它將在隊列的末尾再次被調度.
讓我們看看下面具有兩個簡單(沒有什麼意義)任務的調度器:
<?php
function task1() {
for ($i = 1; $i <= 10; ++$i) {
echo "This is task 1 iteration $i.\n";
yield;
}
}
function task2() {
for ($i = 1; $i <= 5; ++$i) {
echo "This is task 2 iteration $i.\n";
yield;
}
}
$scheduler = new Scheduler;
$scheduler->newTask(task1());
$scheduler->newTask(task2());
$scheduler->run();
兩個任務都僅僅回顯一條信息,然後使用yield把控制回傳給調度器.輸出結果如下:
This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.
輸出確實如我們所期望的:對前五個迭代來說,兩個任務是交替運行的, 而在第二個任務結束後, 只有第一個任務繼續運行.
與調度器之間通信
既然調度器已經運行了, 那麼我們來看下一個問題:任務和調度器之間的通信.
我們將使用進程用來和操作系統會話的同樣的方式來通信:系統調用.
我們需要系統調用的理由是操作系統與進程相比它處在不同的權限級別上. 因此為了執行特權級別的操作(如殺死另一個進程), 就不得不以某種方式把控制傳回給內核, 這樣內核就可以執行所說的操作了. 再說一遍, 這種行為在內部是通過使用中斷指令來實現的. 過去使用的是通用的int指令, 如今使用的是更特殊並且更快速的syscall/sysenter指令.
我們的任務調度系統將反映這種設計:不是簡單地把調度器傳遞給任務(這樣就允許它做它想做的任何事), 我們將通過給yield表達式傳遞信息來與系統調用通信. 這兒yield即是中斷, 也是傳遞信息給調度器(和從調度器傳遞出信息)的方法.
為了說明系統調用, 我們對可調用的系統調用做一個小小的封裝:
<?php
class SystemCall {
protected $callback;
public function __construct(callable $callback) {
$this->callback = $callback;
}
public function __invoke(Task $task, Scheduler $scheduler) {
$callback = $this->callback;
return $callback($task, $scheduler);
}
}
它和其他任何可調用的對象(使用_invoke)一樣的運行, 不過它要求調度器把正在調用的任務和自身傳遞給這個函數.
為了解決這個問題我們不得不微微的修改調度器的run方法:
<?php
public function run() {
while (!$this->taskQueue->isEmpty()) {
$task = $this->taskQueue->dequeue();
$retval = $task->run();
if ($retval instanceof SystemCall) {
$retval($task, $this);
continue;
}
if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
第一個系統調用除了返回任務ID外什麼都沒有做:
<?php
function getTaskId() {
return new SystemCall(function(Task $task, Scheduler $scheduler) {
$task->setSendValue($task->getTaskId());
$scheduler->schedule($task);
});
}
這個函數設置任務id為下一次發送的值, 並再次調度了這個任務 .由於使用了系統調用, 所以調度器不能自動調用任務, 我們需要手工調度任務(稍後你將明白為什麼這麼做). 要使用這個新的系統調用的話, 我們要重新編寫以前的例子:
<?php
function task($max) {
$tid = (yield getTaskId()); // <-- here's the syscall!
for ($i = 1; $i <= $max; ++$i) {
echo "This is task $tid iteration $i.\n";
yield;
}
}
$scheduler = new Scheduler;
$scheduler->newTask(task(10));
$scheduler->newTask(task(5));
$scheduler->run();
?>
這段代碼將給出與前一個例子相同的輸出. 請注意系統調用如何同其他任何調用一樣正常地運行, 只不過預先增加了yield.
要創建新的任務, 然後再殺死它們的話, 需要兩個以上的系統調用:
<?php
function newTask(Generator $coroutine) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($coroutine) {
$task->setSendValue($scheduler->newTask($coroutine));
$scheduler->schedule($task);
}
);
}
function killTask($tid) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($tid) {
$task->setSendValue($scheduler->killTask($tid));
$scheduler->schedule($task);
}
);
}
killTask函數需要在調度器裡增加一個方法:
<?php
public function killTask($tid) {
if (!isset($this->taskMap[$tid])) {
return false;
}
unset($this->taskMap[$tid]);
// This is a bit ugly and could be optimized so it does not have to walk the queue,
// but assuming that killing tasks is rather rare I won't bother with it now
foreach ($this->taskQueue as $i => $task) {
if ($task->getTaskId() === $tid) {
unset($this->taskQueue[$i]);
break;
}
}
return true;
}
用來測試新功能的微腳本:
<?php
function childTask() {
$tid = (yield getTaskId());
while (true) {
echo "Child task $tid still alive!\n";
yield;
}
}
function task() {
$tid = (yield getTaskId());
$childTid = (yield newTask(childTask()));
for ($i = 1; $i <= 6; ++$i) {
echo "Parent task $tid iteration $i.\n";
yield;
if ($i == 3) yield killTask($childTid);
}
}
$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();
?>
這段代碼將打印以下信息:
Parent task 1 iteration 1.
Child task 2 still alive!
Parent task 1 iteration 2.
Child task 2 still alive!
Parent task 1 iteration 3.
Child task 2 still alive!
Parent task 1 iteration 4.
Parent task 1 iteration 5.
Parent task 1 iteration 6.
經過三次迭代以後子任務將被殺死, 因此這就是”Child is still alive”消息結束的時候. 不過你要明白這還不是真正的父子關系. 因為在父任務結束後子任務仍然可以運行, 子任務甚至可以殺死父任務. 可以修改調度器使它具有更層級化的任務結構, 不過這個不是我們這個文章要繼續討論的范圍了.
現在你可以實現許多進程管理調用. 例如 wait(它一直等待到任務結束運行時), exec(它替代當前任務)和fork(它創建一個當前任務的克隆). fork非常酷,而 且你可以使用PHP的協程真正地實現它, 因為它們都支持克隆.
讓我們把這些留給有興趣的讀者吧,我們來看下一個議題.
非阻塞IO
很明顯, 我們的任務管理系統的真正很酷的應用應該是web服務器. 它有一個任務是在套接字上偵聽是否有新連接, 當有新連接要建立的時候, 它創建一個新任務來處理新連接.
Web服務器最難的部分通常是像讀數據這樣的套接字操作是阻塞的. 例如PHP將等待到客戶端完成發送為止. 對一個Web服務器來說, 這有點不太高效. 因為服務器在一個時間點上只能處理一個連接.
解決方案是確保在真正對套接字讀寫之前該套接字已經“准備就緒”. 為了查找哪個套接字已經准備好讀或者寫了, 可以使用 流選擇函數.
首先,讓我們添加兩個新的 syscall, 它們將等待直到指定socket 准備好:
<?php
function waitForRead($socket) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($socket) {
$scheduler->waitForRead($socket, $task);
}
);
}
function waitForWrite($socket) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($socket) {
$scheduler->waitForWrite($socket, $task);
}
);
}
這些 syscall 只是在調度器中代理其各自的方法:
<?php
// resourceID => [socket, tasks]
protected $waitingForRead = [];
protected $waitingForWrite = [];
public function waitForRead($socket, Task $task) {
if (isset($this->waitingForRead[(int) $socket])) {
$this->waitingForRead[(int) $socket][1][] = $task;
} else {
$this->waitingForRead[(int) $socket] = [$socket, [$task]];
}
}
public function waitForWrite($socket, Task $task) {
if (isset($this->waitingForWrite[(int) $socket])) {
$this->waitingForWrite[(int) $socket][1][] = $task;
} else {
$this->waitingForWrite[(int) $socket] = [$socket, [$task]];
}
}
waitingForRead 及 waitingForWrite 屬性是兩個承載等待的socket 及等待它們的任務的數組. 有趣的部分在於下面的方法,它將檢查 socket 是否可用, 並重新安排各自任務:
<?php
protected function ioPoll($timeout) {
$rSocks = [];
foreach ($this->waitingForRead as list($socket)) {
$rSocks[] = $socket;
}
$wSocks = [];
foreach ($this->waitingForWrite as list($socket)) {
$wSocks[] = $socket;
}
$eSocks = []; // dummy
if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
return;
}
foreach ($rSocks as $socket) {
list(, $tasks) = $this->waitingForRead[(int) $socket];
unset($this->waitingForRead[(int) $socket]);
foreach ($tasks as $task) {
$this->schedule($task);
}
}
foreach ($wSocks as $socket) {
list(, $tasks) = $this->waitingForWrite[(int) $socket];
unset($this->waitingForWrite[(int) $socket]);
foreach ($tasks as $task) {
$this->schedule($task);
}
}
}
stream_select 函數接受承載讀取、寫入以及待檢查的socket的數組(我們無需考慮最後一類). 數組將按引用傳遞, 函數只會保留那些狀態改變了的數組元素. 我們可以遍歷這些數組, 並重新安排與之相關的任務.
為了正常地執行上面的輪詢動作, 我們將在調度器裡增加一個特殊的任務:
<?php
protected function ioPollTask() {
while (true) {
if ($this->taskQueue->isEmpty()) {
$this->ioPoll(null);
} else {
$this->ioPoll(0);
}
yield;
}
}
?>
需要在某個地方注冊這個任務, 例如, 你可以在run()方法的開始增加$this->newTask($this->ioPollTask()). 然後就像其他任務一樣每執行完整任務循環一次就執行輪詢操作一次(這麼做一定不是最好的方法), ioPollTask將使用0秒的超時來調用ioPoll, 也就是stream_select將立即返回(而不是等待).
只有任務隊列為空時,我們才使用null超時,這意味著它一直等到某個套接口准備就緒.如果我們沒有這麼做,那麼輪詢任務將一而再, 再而三的循環運行, 直到有新的連接建立. 這將導致100%的CPU利用率. 相反, 讓操作系統做這種等待會更有效.
現在編寫服務器就相對容易了:
<?php
function server($port) {
echo "Starting server at port $port...\n";
$socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
if (!$socket) throw new Exception($errStr, $errNo);
stream_set_blocking($socket, 0);
while (true) {
yield waitForRead($socket);
$clientSocket = stream_socket_accept($socket, 0);
yield newTask(handleClient($clientSocket));
}
}
function handleClient($socket) {
yield waitForRead($socket);
$data = fread($socket, 8192);
$msg = "Received following request:\n\n$data";
$msgLength = strlen($msg);
$response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
yield waitForWrite($socket);
fwrite($socket, $response);
fclose($socket);
}
$scheduler = new Scheduler;
$scheduler->newTask(server(8000));
$scheduler->run();
這段代碼實現了接收localhost:8000上的連接, 然後返回發送來的內容作為HTTP響應. 當然它還能處理真正的復雜HTTP請求, 上面的代碼片段只是演示了一般性的概念.
你可以使用類似於ab -n 10000 -c 100 localhost:8000/這樣命令來測試服務器. 這條命令將向服務器發送10000個請求, 並且其中100個請求將同時到達. 使用這樣的數目, 我得到了處於中間的10毫秒的響應時間. 不過還有一個問題:有少數幾個請求真正處理的很慢(如5秒), 這就是為什麼總吞吐量只有2000請求/秒(如果是10毫秒的響應時間的話, 總的吞吐量應該更像是10000請求/秒)
協程堆棧
如果你試圖用我們的調度系統建立更大的系統的話, 你將很快遇到問題:我們習慣了把代碼分解為更小的函數, 然後調用它們. 然而, 如果使用了協程的話, 就不能這麼做了. 例如,看下面代碼:
<?php
function echoTimes($msg, $max) {
for ($i = 1; $i <= $max; ++$i) {
echo "$msg iteration $i\n";
yield;
}
}
function task() {
echoTimes('foo', 10); // print foo ten times
echo "---\n";
echoTimes('bar', 5); // print bar five times
yield; // force it to be a coroutine
}
$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();
這段代碼試圖把重復循環“輸出n次“的代碼嵌入到一個獨立的協程裡,然後從主任務裡調用它. 然而它無法運行. 正如在這篇文章的開始所提到的, 調用生成器(或者協程)將沒有真正地做任何事情, 它僅僅返回一個對象.這 也出現在上面的例子裡:echoTimes調用除了放回一個(無用的)協程對象外不做任何事情.
為了仍然允許這麼做,我們需要在這個裸協程上寫一個小小的封裝.我們將調用它:“協程堆棧”. 因為它將管理嵌套的協程調用堆棧. 這將是通過生成協程來調用子協程成為可能:
$retval = (yield someCoroutine($foo, $bar));
使用yield,子協程也能再次返回值:
yield retval("I'm a return value!");
retval函數除了返回一個值的封裝外沒有做任何其他事情.這個封裝將表示它是一個返回值.
<?php
class CoroutineReturnValue {
protected $value;
public function __construct($value) {
$this->value = $value;
}
public function getValue() {
return $this->value;
}
}
function retval($value) {
return new CoroutineReturnValue($value);
}
為了把協程轉變為協程堆棧(它支持子調用),我們將不得不編寫另外一個函數(很明顯,它是另一個協程):
<?php
function stackedCoroutine(Generator $gen) {
$stack = new SplStack;
for (;;) {
$value = $gen->current();
if ($value instanceof Generator) {
$stack->push($gen);
$gen = $value;
continue;
}
$isReturnValue = $value instanceof CoroutineReturnValue;
if (!$gen->valid() || $isReturnValue) {
if ($stack->isEmpty()) {
return;
}
$gen = $stack->pop();
$gen->send($isReturnValue ? $value->getValue() : NULL);
continue;
}
$gen->send(yield $gen->key() => $value);
}
}
這個函數在調用者和當前正在運行的子協程之間扮演著簡單代理的角色.在$gen->send(yield $gen->key()=>$value);這行完成了代理功能.另外它檢查返回值是否是生成器,萬一是生成器的話,它將開始運行這個生成器,並把前一個協程壓入堆棧裡.一旦它獲得了CoroutineReturnValue的話,它將再次請求堆棧彈出,然後繼續執行前一個協程.
為了使協程堆棧在任務裡可用,任務構造器裡的$this-coroutine =$coroutine;這行需要替代為$this->coroutine = StackedCoroutine($coroutine);.
現在我們可以稍微改進上面web服務器例子:把wait+read(和wait+write和warit+accept)這樣的動作分組為函數.為了分組相關的 功能,我將使用下面類:
<?php
class CoSocket {
protected $socket;
public function __construct($socket) {
$this->socket = $socket;
}
public function accept() {
yield waitForRead($this->socket);
yield retval(new CoSocket(stream_socket_accept($this->socket, 0)));
}
public function read($size) {
yield waitForRead($this->socket);
yield retval(fread($this->socket, $size));
}
public function write($string) {
yield waitForWrite($this->socket);
fwrite($this->socket, $string);
}
public function close() {
@fclose($this->socket);
}
}
現在服務器可以編寫的稍微簡潔點了:
<?php
function server($port) {
echo "Starting server at port $port...\n";
$socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
if (!$socket) throw new Exception($errStr, $errNo);
stream_set_blocking($socket, 0);
$socket = new CoSocket($socket);
while (true) {
yield newTask(
handleClient(yield $socket->accept())
);
}
}
function handleClient($socket) {
$data = (yield $socket->read(8192));
$msg = "Received following request:\n\n$data";
$msgLength = strlen($msg);
$response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
yield $socket->write($response);
yield $socket->close();
}
錯誤處理
作為一個優秀的程序員, 相信你已經察覺到上面的例子缺少錯誤處理. 幾乎所有的 socket 都是易出錯的. 我沒有這樣做的原因一方面固然是因為錯誤處理的乏味(特別是 socket), 另一方面也在於它很容易使代碼體積膨脹.
不過, 我仍然想講下常見的協程錯誤處理:協程允許使用 throw() 方法在其內部拋出一個錯誤.
throw() 方法接受一個 Exception, 並將其拋出到協程的當前懸掛點, 看看下面代碼:
<?php
function gen() {
echo "Foo\n";
try {
yield;
} catch (Exception $e) {
echo "Exception: {$e->getMessage()}\n";
}
echo "Bar\n";
}
$gen = gen();
$gen->rewind(); // echos "Foo"
$gen->throw(new Exception('Test')); // echos "Exception: Test"
// and "Bar"
這非常好, 有沒有? 因為我們現在可以使用系統調用以及子協程調用異常拋出了.
不過我們要對系統調用Scheduler::run() 方法做一些小調整:
<?php
if ($retval instanceof SystemCall) {
try {
$retval($task, $this);
} catch (Exception $e) {
$task->setException($e);
$this->schedule($task);
}
continue;
}
Task 類也要添加 throw 調用處理:
<?php
class Task {
// ...
protected $exception = null;
public function setException($exception) {
$this->exception = $exception;
}
public function run() {
if ($this->beforeFirstYield) {
$this->beforeFirstYield = false;
return $this->coroutine->current();
} elseif ($this->exception) {
$retval = $this->coroutine->throw($this->exception);
$this->exception = null;
return $retval;
} else {
$retval = $this->coroutine->send($this->sendValue);
$this->sendValue = null;
return $retval;
}
}
// ...
}
現在, 我們已經可以在系統調用中使用異常拋出了!例如,要調用 killTask,讓我們在傳遞 ID 不可用時拋出一個異常:
<?php
function killTask($tid) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($tid) {
if ($scheduler->killTask($tid)) {
$scheduler->schedule($task);
} else {
throw new InvalidArgumentException('Invalid task ID!');
}
}
);
}
試試看:
<?php
function task() {
try {
yield killTask(500);
} catch (Exception $e) {
echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "\n";
}
}
這些代碼現在尚不能正常運作,因為 stackedCoroutine 函數無法正確處理異常.要修復需要做些調整:
<?php
function stackedCoroutine(Generator $gen) {
$stack = new SplStack;
$exception = null;
for (;;) {
try {
if ($exception) {
$gen->throw($exception);
$exception = null;
continue;
}
$value = $gen->current();
if ($value instanceof Generator) {
$stack->push($gen);
$gen = $value;
continue;
}
$isReturnValue = $value instanceof CoroutineReturnValue;
if (!$gen->valid() || $isReturnValue) {
if ($stack->isEmpty()) {
return;
}
$gen = $stack->pop();
$gen->send($isReturnValue ? $value->getValue() : NULL);
continue;
}
try {
$sendValue = (yield $gen->key() => $value);
} catch (Exception $e) {
$gen->throw($e);
continue;
}
$gen->send($sendValue);
} catch (Exception $e) {
if ($stack->isEmpty()) {
throw $e;
}
$gen = $stack->pop();
$exception = $e;
}
}
}
結束語
在這篇文章裡,我使用多任務協作構建了一個任務調度器, 其中包括執行“系統調用”, 做非阻塞操作和處理錯誤. 所有這些裡真正很酷的事情是任務的結果代碼看起來完全同步, 甚至任務正在執行大量的異步操作的時候也是這樣.
如果你打算從套接口讀取數據的話, 你將不需要傳遞某個回調函數或者注冊一個事件偵聽器. 相反, 你只要書寫yield $socket->read(). 這兒大部分都是你常常也要編寫的,只 在它的前面增加yield.
當我第一次聽到協程的時候, 我發現這個概念完全令人折服, 正是因為這個激勵我在PHP中實現了它. 同時我發現協程真正非常的令人驚歎:在令人敬畏的代碼和一大堆亂代碼之間只有一線之隔, 我認為協程恰好處在這條線上, 不多不少. 不過, 要說使用上面所述的方法書寫異步代碼是否真的有益, 這個就見仁見智了.