CommandPool.php 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. <?php
  2. namespace Aws;
  3. use GuzzleHttp\Promise\PromiseInterface;
  4. use GuzzleHttp\Promise\PromisorInterface;
  5. use GuzzleHttp\Promise\EachPromise;
  6. /**
  7. * Sends and iterator of commands concurrently using a capped pool size.
  8. *
  9. * The pool will read command objects from an iterator until it is cancelled or
  10. * until the iterator is consumed.
  11. */
  12. class CommandPool implements PromisorInterface
  13. {
  14. /** @var EachPromise */
  15. private $each;
  16. /**
  17. * The CommandPool constructor accepts a hash of configuration options:
  18. *
  19. * - concurrency: (callable|int) Maximum number of commands to execute
  20. * concurrently. Provide a function to resize the pool dynamically. The
  21. * function will be provided the current number of pending requests and
  22. * is expected to return an integer representing the new pool size limit.
  23. * - before: (callable) function to invoke before sending each command. The
  24. * before function accepts the command and the key of the iterator of the
  25. * command. You can mutate the command as needed in the before function
  26. * before sending the command.
  27. * - fulfilled: (callable) Function to invoke when a promise is fulfilled.
  28. * The function is provided the result object, id of the iterator that the
  29. * result came from, and the aggregate promise that can be resolved/rejected
  30. * if you need to short-circuit the pool.
  31. * - rejected: (callable) Function to invoke when a promise is rejected.
  32. * The function is provided an AwsException object, id of the iterator that
  33. * the exception came from, and the aggregate promise that can be
  34. * resolved/rejected if you need to short-circuit the pool.
  35. * - preserve_iterator_keys: (bool) Retain the iterator key when generating
  36. * the commands.
  37. *
  38. * @param AwsClientInterface $client Client used to execute commands.
  39. * @param array|\Iterator $commands Iterable that yields commands.
  40. * @param array $config Associative array of options.
  41. */
  42. public function __construct(
  43. AwsClientInterface $client,
  44. $commands,
  45. array $config = []
  46. ) {
  47. if (!isset($config['concurrency'])) {
  48. $config['concurrency'] = 25;
  49. }
  50. $before = $this->getBefore($config);
  51. $mapFn = function ($commands) use ($client, $before, $config) {
  52. foreach ($commands as $key => $command) {
  53. if (!($command instanceof CommandInterface)) {
  54. throw new \InvalidArgumentException('Each value yielded by '
  55. . 'the iterator must be an Aws\CommandInterface.');
  56. }
  57. if ($before) {
  58. $before($command, $key);
  59. }
  60. if (!empty($config['preserve_iterator_keys'])) {
  61. yield $key => $client->executeAsync($command);
  62. } else {
  63. yield $client->executeAsync($command);
  64. }
  65. }
  66. };
  67. $this->each = new EachPromise($mapFn($commands), $config);
  68. }
  69. /**
  70. * @return PromiseInterface
  71. */
  72. public function promise(): PromiseInterface
  73. {
  74. return $this->each->promise();
  75. }
  76. /**
  77. * Executes a pool synchronously and aggregates the results of the pool
  78. * into an indexed array in the same order as the passed in array.
  79. *
  80. * @param AwsClientInterface $client Client used to execute commands.
  81. * @param mixed $commands Iterable that yields commands.
  82. * @param array $config Configuration options.
  83. *
  84. * @return array
  85. * @see \Aws\CommandPool::__construct for available configuration options.
  86. */
  87. public static function batch(
  88. AwsClientInterface $client,
  89. $commands,
  90. array $config = []
  91. ) {
  92. $results = [];
  93. self::cmpCallback($config, 'fulfilled', $results);
  94. self::cmpCallback($config, 'rejected', $results);
  95. return (new self($client, $commands, $config))
  96. ->promise()
  97. ->then(static function () use (&$results) {
  98. ksort($results);
  99. return $results;
  100. })
  101. ->wait();
  102. }
  103. /**
  104. * @return callable
  105. */
  106. private function getBefore(array $config)
  107. {
  108. if (!isset($config['before'])) {
  109. return null;
  110. }
  111. if (is_callable($config['before'])) {
  112. return $config['before'];
  113. }
  114. throw new \InvalidArgumentException('before must be callable');
  115. }
  116. /**
  117. * Adds an onFulfilled or onRejected callback that aggregates results into
  118. * an array. If a callback is already present, it is replaced with the
  119. * composed function.
  120. *
  121. * @param array $config
  122. * @param $name
  123. * @param array $results
  124. */
  125. private static function cmpCallback(array &$config, $name, array &$results)
  126. {
  127. if (!isset($config[$name])) {
  128. $config[$name] = function ($v, $k) use (&$results) {
  129. $results[$k] = $v;
  130. };
  131. } else {
  132. $currentFn = $config[$name];
  133. $config[$name] = function ($v, $k) use (&$results, $currentFn) {
  134. $currentFn($v, $k);
  135. $results[$k] = $v;
  136. };
  137. }
  138. }
  139. }