123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- <?php
- declare(strict_types=1);
- namespace GuzzleHttp\Promise;
- class EachPromise implements PromisorInterface
- {
- private $pending = [];
- private $nextPendingIndex = 0;
-
- private $iterable;
-
- private $concurrency;
-
- private $onFulfilled;
-
- private $onRejected;
-
- private $aggregate;
-
- private $mutex;
-
- public function __construct($iterable, array $config = [])
- {
- $this->iterable = Create::iterFor($iterable);
- if (isset($config['concurrency'])) {
- $this->concurrency = $config['concurrency'];
- }
- if (isset($config['fulfilled'])) {
- $this->onFulfilled = $config['fulfilled'];
- }
- if (isset($config['rejected'])) {
- $this->onRejected = $config['rejected'];
- }
- }
-
- public function promise(): PromiseInterface
- {
- if ($this->aggregate) {
- return $this->aggregate;
- }
- try {
- $this->createPromise();
-
- $this->iterable->rewind();
- $this->refillPending();
- } catch (\Throwable $e) {
- $this->aggregate->reject($e);
- }
-
- return $this->aggregate;
- }
- private function createPromise(): void
- {
- $this->mutex = false;
- $this->aggregate = new Promise(function (): void {
- if ($this->checkIfFinished()) {
- return;
- }
- reset($this->pending);
-
-
- while ($promise = current($this->pending)) {
- next($this->pending);
- $promise->wait();
- if (Is::settled($this->aggregate)) {
- return;
- }
- }
- });
-
- $clearFn = function (): void {
- $this->iterable = $this->concurrency = $this->pending = null;
- $this->onFulfilled = $this->onRejected = null;
- $this->nextPendingIndex = 0;
- };
- $this->aggregate->then($clearFn, $clearFn);
- }
- private function refillPending(): void
- {
- if (!$this->concurrency) {
-
- while ($this->addPending() && $this->advanceIterator()) {
- }
- return;
- }
-
- $concurrency = is_callable($this->concurrency)
- ? ($this->concurrency)(count($this->pending))
- : $this->concurrency;
- $concurrency = max($concurrency - count($this->pending), 0);
-
- if (!$concurrency) {
- return;
- }
-
- $this->addPending();
-
-
-
-
- while (--$concurrency
- && $this->advanceIterator()
- && $this->addPending()) {
- }
- }
- private function addPending(): bool
- {
- if (!$this->iterable || !$this->iterable->valid()) {
- return false;
- }
- $promise = Create::promiseFor($this->iterable->current());
- $key = $this->iterable->key();
-
-
- $idx = $this->nextPendingIndex++;
- $this->pending[$idx] = $promise->then(
- function ($value) use ($idx, $key): void {
- if ($this->onFulfilled) {
- ($this->onFulfilled)(
- $value,
- $key,
- $this->aggregate
- );
- }
- $this->step($idx);
- },
- function ($reason) use ($idx, $key): void {
- if ($this->onRejected) {
- ($this->onRejected)(
- $reason,
- $key,
- $this->aggregate
- );
- }
- $this->step($idx);
- }
- );
- return true;
- }
- private function advanceIterator(): bool
- {
-
-
- if ($this->mutex) {
- return false;
- }
- $this->mutex = true;
- try {
- $this->iterable->next();
- $this->mutex = false;
- return true;
- } catch (\Throwable $e) {
- $this->aggregate->reject($e);
- $this->mutex = false;
- return false;
- }
- }
- private function step(int $idx): void
- {
-
- if (Is::settled($this->aggregate)) {
- return;
- }
- unset($this->pending[$idx]);
-
-
-
- if ($this->advanceIterator() && !$this->checkIfFinished()) {
-
- $this->refillPending();
- }
- }
- private function checkIfFinished(): bool
- {
- if (!$this->pending && !$this->iterable->valid()) {
-
- $this->aggregate->resolve(null);
- return true;
- }
- return false;
- }
- }
|