123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- <?php
- namespace Aws;
- use GuzzleHttp\Promise\PromiseInterface;
- use GuzzleHttp\Promise\PromisorInterface;
- use GuzzleHttp\Promise\EachPromise;
- /**
- * Sends and iterator of commands concurrently using a capped pool size.
- *
- * The pool will read command objects from an iterator until it is cancelled or
- * until the iterator is consumed.
- */
- class CommandPool implements PromisorInterface
- {
- /** @var EachPromise */
- private $each;
- /**
- * The CommandPool constructor accepts a hash of configuration options:
- *
- * - concurrency: (callable|int) Maximum number of commands to execute
- * concurrently. Provide a function to resize the pool dynamically. The
- * function will be provided the current number of pending requests and
- * is expected to return an integer representing the new pool size limit.
- * - before: (callable) function to invoke before sending each command. The
- * before function accepts the command and the key of the iterator of the
- * command. You can mutate the command as needed in the before function
- * before sending the command.
- * - fulfilled: (callable) Function to invoke when a promise is fulfilled.
- * The function is provided the result object, id of the iterator that the
- * result came from, and the aggregate promise that can be resolved/rejected
- * if you need to short-circuit the pool.
- * - rejected: (callable) Function to invoke when a promise is rejected.
- * The function is provided an AwsException object, id of the iterator that
- * the exception came from, and the aggregate promise that can be
- * resolved/rejected if you need to short-circuit the pool.
- * - preserve_iterator_keys: (bool) Retain the iterator key when generating
- * the commands.
- *
- * @param AwsClientInterface $client Client used to execute commands.
- * @param array|\Iterator $commands Iterable that yields commands.
- * @param array $config Associative array of options.
- */
- public function __construct(
- AwsClientInterface $client,
- $commands,
- array $config = []
- ) {
- if (!isset($config['concurrency'])) {
- $config['concurrency'] = 25;
- }
- $before = $this->getBefore($config);
- $mapFn = function ($commands) use ($client, $before, $config) {
- foreach ($commands as $key => $command) {
- if (!($command instanceof CommandInterface)) {
- throw new \InvalidArgumentException('Each value yielded by '
- . 'the iterator must be an Aws\CommandInterface.');
- }
- if ($before) {
- $before($command, $key);
- }
- if (!empty($config['preserve_iterator_keys'])) {
- yield $key => $client->executeAsync($command);
- } else {
- yield $client->executeAsync($command);
- }
- }
- };
- $this->each = new EachPromise($mapFn($commands), $config);
- }
- /**
- * @return PromiseInterface
- */
- public function promise(): PromiseInterface
- {
- return $this->each->promise();
- }
- /**
- * Executes a pool synchronously and aggregates the results of the pool
- * into an indexed array in the same order as the passed in array.
- *
- * @param AwsClientInterface $client Client used to execute commands.
- * @param mixed $commands Iterable that yields commands.
- * @param array $config Configuration options.
- *
- * @return array
- * @see \Aws\CommandPool::__construct for available configuration options.
- */
- public static function batch(
- AwsClientInterface $client,
- $commands,
- array $config = []
- ) {
- $results = [];
- self::cmpCallback($config, 'fulfilled', $results);
- self::cmpCallback($config, 'rejected', $results);
- return (new self($client, $commands, $config))
- ->promise()
- ->then(static function () use (&$results) {
- ksort($results);
- return $results;
- })
- ->wait();
- }
- /**
- * @return callable
- */
- private function getBefore(array $config)
- {
- if (!isset($config['before'])) {
- return null;
- }
- if (is_callable($config['before'])) {
- return $config['before'];
- }
- throw new \InvalidArgumentException('before must be callable');
- }
- /**
- * Adds an onFulfilled or onRejected callback that aggregates results into
- * an array. If a callback is already present, it is replaced with the
- * composed function.
- *
- * @param array $config
- * @param $name
- * @param array $results
- */
- private static function cmpCallback(array &$config, $name, array &$results)
- {
- if (!isset($config[$name])) {
- $config[$name] = function ($v, $k) use (&$results) {
- $results[$k] = $v;
- };
- } else {
- $currentFn = $config[$name];
- $config[$name] = function ($v, $k) use (&$results, $currentFn) {
- $currentFn($v, $k);
- $results[$k] = $v;
- };
- }
- }
- }
|