Utils.php 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. <?php
  2. declare(strict_types=1);
  3. namespace GuzzleHttp\Promise;
  4. final class Utils
  5. {
  6. /**
  7. * Get the global task queue used for promise resolution.
  8. *
  9. * This task queue MUST be run in an event loop in order for promises to be
  10. * settled asynchronously. It will be automatically run when synchronously
  11. * waiting on a promise.
  12. *
  13. * <code>
  14. * while ($eventLoop->isRunning()) {
  15. * GuzzleHttp\Promise\Utils::queue()->run();
  16. * }
  17. * </code>
  18. *
  19. * @param TaskQueueInterface|null $assign Optionally specify a new queue instance.
  20. */
  21. public static function queue(TaskQueueInterface $assign = null): TaskQueueInterface
  22. {
  23. static $queue;
  24. if ($assign) {
  25. $queue = $assign;
  26. } elseif (!$queue) {
  27. $queue = new TaskQueue();
  28. }
  29. return $queue;
  30. }
  31. /**
  32. * Adds a function to run in the task queue when it is next `run()` and
  33. * returns a promise that is fulfilled or rejected with the result.
  34. *
  35. * @param callable $task Task function to run.
  36. */
  37. public static function task(callable $task): PromiseInterface
  38. {
  39. $queue = self::queue();
  40. $promise = new Promise([$queue, 'run']);
  41. $queue->add(function () use ($task, $promise): void {
  42. try {
  43. if (Is::pending($promise)) {
  44. $promise->resolve($task());
  45. }
  46. } catch (\Throwable $e) {
  47. $promise->reject($e);
  48. }
  49. });
  50. return $promise;
  51. }
  52. /**
  53. * Synchronously waits on a promise to resolve and returns an inspection
  54. * state array.
  55. *
  56. * Returns a state associative array containing a "state" key mapping to a
  57. * valid promise state. If the state of the promise is "fulfilled", the
  58. * array will contain a "value" key mapping to the fulfilled value of the
  59. * promise. If the promise is rejected, the array will contain a "reason"
  60. * key mapping to the rejection reason of the promise.
  61. *
  62. * @param PromiseInterface $promise Promise or value.
  63. */
  64. public static function inspect(PromiseInterface $promise): array
  65. {
  66. try {
  67. return [
  68. 'state' => PromiseInterface::FULFILLED,
  69. 'value' => $promise->wait(),
  70. ];
  71. } catch (RejectionException $e) {
  72. return ['state' => PromiseInterface::REJECTED, 'reason' => $e->getReason()];
  73. } catch (\Throwable $e) {
  74. return ['state' => PromiseInterface::REJECTED, 'reason' => $e];
  75. }
  76. }
  77. /**
  78. * Waits on all of the provided promises, but does not unwrap rejected
  79. * promises as thrown exception.
  80. *
  81. * Returns an array of inspection state arrays.
  82. *
  83. * @see inspect for the inspection state array format.
  84. *
  85. * @param PromiseInterface[] $promises Traversable of promises to wait upon.
  86. */
  87. public static function inspectAll($promises): array
  88. {
  89. $results = [];
  90. foreach ($promises as $key => $promise) {
  91. $results[$key] = self::inspect($promise);
  92. }
  93. return $results;
  94. }
  95. /**
  96. * Waits on all of the provided promises and returns the fulfilled values.
  97. *
  98. * Returns an array that contains the value of each promise (in the same
  99. * order the promises were provided). An exception is thrown if any of the
  100. * promises are rejected.
  101. *
  102. * @param iterable<PromiseInterface> $promises Iterable of PromiseInterface objects to wait on.
  103. *
  104. * @throws \Throwable on error
  105. */
  106. public static function unwrap($promises): array
  107. {
  108. $results = [];
  109. foreach ($promises as $key => $promise) {
  110. $results[$key] = $promise->wait();
  111. }
  112. return $results;
  113. }
  114. /**
  115. * Given an array of promises, return a promise that is fulfilled when all
  116. * the items in the array are fulfilled.
  117. *
  118. * The promise's fulfillment value is an array with fulfillment values at
  119. * respective positions to the original array. If any promise in the array
  120. * rejects, the returned promise is rejected with the rejection reason.
  121. *
  122. * @param mixed $promises Promises or values.
  123. * @param bool $recursive If true, resolves new promises that might have been added to the stack during its own resolution.
  124. */
  125. public static function all($promises, bool $recursive = false): PromiseInterface
  126. {
  127. $results = [];
  128. $promise = Each::of(
  129. $promises,
  130. function ($value, $idx) use (&$results): void {
  131. $results[$idx] = $value;
  132. },
  133. function ($reason, $idx, Promise $aggregate): void {
  134. $aggregate->reject($reason);
  135. }
  136. )->then(function () use (&$results) {
  137. ksort($results);
  138. return $results;
  139. });
  140. if (true === $recursive) {
  141. $promise = $promise->then(function ($results) use ($recursive, &$promises) {
  142. foreach ($promises as $promise) {
  143. if (Is::pending($promise)) {
  144. return self::all($promises, $recursive);
  145. }
  146. }
  147. return $results;
  148. });
  149. }
  150. return $promise;
  151. }
  152. /**
  153. * Initiate a competitive race between multiple promises or values (values
  154. * will become immediately fulfilled promises).
  155. *
  156. * When count amount of promises have been fulfilled, the returned promise
  157. * is fulfilled with an array that contains the fulfillment values of the
  158. * winners in order of resolution.
  159. *
  160. * This promise is rejected with a {@see AggregateException} if the number
  161. * of fulfilled promises is less than the desired $count.
  162. *
  163. * @param int $count Total number of promises.
  164. * @param mixed $promises Promises or values.
  165. */
  166. public static function some(int $count, $promises): PromiseInterface
  167. {
  168. $results = [];
  169. $rejections = [];
  170. return Each::of(
  171. $promises,
  172. function ($value, $idx, PromiseInterface $p) use (&$results, $count): void {
  173. if (Is::settled($p)) {
  174. return;
  175. }
  176. $results[$idx] = $value;
  177. if (count($results) >= $count) {
  178. $p->resolve(null);
  179. }
  180. },
  181. function ($reason) use (&$rejections): void {
  182. $rejections[] = $reason;
  183. }
  184. )->then(
  185. function () use (&$results, &$rejections, $count) {
  186. if (count($results) !== $count) {
  187. throw new AggregateException(
  188. 'Not enough promises to fulfill count',
  189. $rejections
  190. );
  191. }
  192. ksort($results);
  193. return array_values($results);
  194. }
  195. );
  196. }
  197. /**
  198. * Like some(), with 1 as count. However, if the promise fulfills, the
  199. * fulfillment value is not an array of 1 but the value directly.
  200. *
  201. * @param mixed $promises Promises or values.
  202. */
  203. public static function any($promises): PromiseInterface
  204. {
  205. return self::some(1, $promises)->then(function ($values) {
  206. return $values[0];
  207. });
  208. }
  209. /**
  210. * Returns a promise that is fulfilled when all of the provided promises have
  211. * been fulfilled or rejected.
  212. *
  213. * The returned promise is fulfilled with an array of inspection state arrays.
  214. *
  215. * @see inspect for the inspection state array format.
  216. *
  217. * @param mixed $promises Promises or values.
  218. */
  219. public static function settle($promises): PromiseInterface
  220. {
  221. $results = [];
  222. return Each::of(
  223. $promises,
  224. function ($value, $idx) use (&$results): void {
  225. $results[$idx] = ['state' => PromiseInterface::FULFILLED, 'value' => $value];
  226. },
  227. function ($reason, $idx) use (&$results): void {
  228. $results[$idx] = ['state' => PromiseInterface::REJECTED, 'reason' => $reason];
  229. }
  230. )->then(function () use (&$results) {
  231. ksort($results);
  232. return $results;
  233. });
  234. }
  235. }