BatchDelete.php 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. <?php
  2. namespace Aws\S3;
  3. use Aws\AwsClientInterface;
  4. use Aws\S3\Exception\DeleteMultipleObjectsException;
  5. use GuzzleHttp\Promise;
  6. use GuzzleHttp\Promise\PromisorInterface;
  7. use GuzzleHttp\Promise\PromiseInterface;
  8. /**
  9. * Efficiently deletes many objects from a single Amazon S3 bucket using an
  10. * iterator that yields keys. Deletes are made using the DeleteObjects API
  11. * operation.
  12. *
  13. * $s3 = new Aws\S3\Client([
  14. * 'region' => 'us-west-2',
  15. * 'version' => 'latest'
  16. * ]);
  17. *
  18. * $listObjectsParams = ['Bucket' => 'foo', 'Prefix' => 'starts/with/'];
  19. * $delete = Aws\S3\BatchDelete::fromListObjects($s3, $listObjectsParams);
  20. * // Asynchronously delete
  21. * $promise = $delete->promise();
  22. * // Force synchronous completion
  23. * $delete->delete();
  24. *
  25. * When using one of the batch delete creational static methods, you can supply
  26. * an associative array of options:
  27. *
  28. * - before: Function invoked before executing a command. The function is
  29. * passed the command that is about to be executed. This can be useful
  30. * for logging, adding custom request headers, etc.
  31. * - batch_size: The size of each delete batch. Defaults to 1000.
  32. *
  33. * @link http://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
  34. */
  35. class BatchDelete implements PromisorInterface
  36. {
  37. private $bucket;
  38. /** @var AwsClientInterface */
  39. private $client;
  40. /** @var callable */
  41. private $before;
  42. /** @var PromiseInterface */
  43. private $cachedPromise;
  44. /** @var callable */
  45. private $promiseCreator;
  46. private $batchSize = 1000;
  47. private $queue = [];
  48. /**
  49. * Creates a BatchDelete object from all of the paginated results of a
  50. * ListObjects operation. Each result that is returned by the ListObjects
  51. * operation will be deleted.
  52. *
  53. * @param AwsClientInterface $client AWS Client to use.
  54. * @param array $listObjectsParams ListObjects API parameters
  55. * @param array $options BatchDelete options.
  56. *
  57. * @return BatchDelete
  58. */
  59. public static function fromListObjects(
  60. AwsClientInterface $client,
  61. array $listObjectsParams,
  62. array $options = []
  63. ) {
  64. $iter = $client->getPaginator('ListObjects', $listObjectsParams);
  65. $bucket = $listObjectsParams['Bucket'];
  66. $fn = function (BatchDelete $that) use ($iter) {
  67. return $iter->each(function ($result) use ($that) {
  68. $promises = [];
  69. if (is_array($result['Contents'])) {
  70. foreach ($result['Contents'] as $object) {
  71. if ($promise = $that->enqueue($object)) {
  72. $promises[] = $promise;
  73. }
  74. }
  75. }
  76. return $promises ? Promise\Utils::all($promises) : null;
  77. });
  78. };
  79. return new self($client, $bucket, $fn, $options);
  80. }
  81. /**
  82. * Creates a BatchDelete object from an iterator that yields results.
  83. *
  84. * @param AwsClientInterface $client AWS Client to use to execute commands
  85. * @param string $bucket Bucket where the objects are stored
  86. * @param \Iterator $iter Iterator that yields assoc arrays
  87. * @param array $options BatchDelete options
  88. *
  89. * @return BatchDelete
  90. */
  91. public static function fromIterator(
  92. AwsClientInterface $client,
  93. $bucket,
  94. \Iterator $iter,
  95. array $options = []
  96. ) {
  97. $fn = function (BatchDelete $that) use ($iter) {
  98. return Promise\Coroutine::of(function () use ($that, $iter) {
  99. foreach ($iter as $obj) {
  100. if ($promise = $that->enqueue($obj)) {
  101. yield $promise;
  102. }
  103. }
  104. });
  105. };
  106. return new self($client, $bucket, $fn, $options);
  107. }
  108. /**
  109. * @return PromiseInterface
  110. */
  111. public function promise(): PromiseInterface
  112. {
  113. if (!$this->cachedPromise) {
  114. $this->cachedPromise = $this->createPromise();
  115. }
  116. return $this->cachedPromise;
  117. }
  118. /**
  119. * Synchronously deletes all of the objects.
  120. *
  121. * @throws DeleteMultipleObjectsException on error.
  122. */
  123. public function delete()
  124. {
  125. $this->promise()->wait();
  126. }
  127. /**
  128. * @param AwsClientInterface $client Client used to transfer the requests
  129. * @param string $bucket Bucket to delete from.
  130. * @param callable $promiseFn Creates a promise.
  131. * @param array $options Hash of options used with the batch
  132. *
  133. * @throws \InvalidArgumentException if the provided batch_size is <= 0
  134. */
  135. private function __construct(
  136. AwsClientInterface $client,
  137. $bucket,
  138. callable $promiseFn,
  139. array $options = []
  140. ) {
  141. $this->client = $client;
  142. $this->bucket = $bucket;
  143. $this->promiseCreator = $promiseFn;
  144. if (isset($options['before'])) {
  145. if (!is_callable($options['before'])) {
  146. throw new \InvalidArgumentException('before must be callable');
  147. }
  148. $this->before = $options['before'];
  149. }
  150. if (isset($options['batch_size'])) {
  151. if ($options['batch_size'] <= 0) {
  152. throw new \InvalidArgumentException('batch_size is not > 0');
  153. }
  154. $this->batchSize = min($options['batch_size'], 1000);
  155. }
  156. }
  157. private function enqueue(array $obj)
  158. {
  159. $this->queue[] = $obj;
  160. return count($this->queue) >= $this->batchSize
  161. ? $this->flushQueue()
  162. : null;
  163. }
  164. private function flushQueue()
  165. {
  166. static $validKeys = ['Key' => true, 'VersionId' => true];
  167. if (count($this->queue) === 0) {
  168. return null;
  169. }
  170. $batch = [];
  171. while ($obj = array_shift($this->queue)) {
  172. $batch[] = array_intersect_key($obj, $validKeys);
  173. }
  174. $command = $this->client->getCommand('DeleteObjects', [
  175. 'Bucket' => $this->bucket,
  176. 'Delete' => ['Objects' => $batch]
  177. ]);
  178. if ($this->before) {
  179. call_user_func($this->before, $command);
  180. }
  181. return $this->client->executeAsync($command)
  182. ->then(function ($result) {
  183. if (!empty($result['Errors'])) {
  184. throw new DeleteMultipleObjectsException(
  185. $result['Deleted'] ?: [],
  186. $result['Errors']
  187. );
  188. }
  189. return $result;
  190. });
  191. }
  192. /**
  193. * Returns a promise that will clean up any references when it completes.
  194. *
  195. * @return PromiseInterface
  196. */
  197. private function createPromise()
  198. {
  199. // Create the promise
  200. $promise = call_user_func($this->promiseCreator, $this);
  201. $this->promiseCreator = null;
  202. // Cleans up the promise state and references.
  203. $cleanup = function () {
  204. $this->before = $this->client = $this->queue = null;
  205. };
  206. // When done, ensure cleanup and that any remaining are processed.
  207. return $promise->then(
  208. function () use ($cleanup) {
  209. return Promise\Create::promiseFor($this->flushQueue())
  210. ->then($cleanup);
  211. },
  212. function ($reason) use ($cleanup) {
  213. $cleanup();
  214. return Promise\Create::rejectionFor($reason);
  215. }
  216. );
  217. }
  218. }