123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- <?php
- namespace Aws\S3;
- use Aws\AwsClientInterface;
- use Aws\S3\Exception\DeleteMultipleObjectsException;
- use GuzzleHttp\Promise;
- use GuzzleHttp\Promise\PromisorInterface;
- use GuzzleHttp\Promise\PromiseInterface;
- class BatchDelete implements PromisorInterface
- {
- private $bucket;
-
- private $client;
-
- private $before;
-
- private $cachedPromise;
-
- private $promiseCreator;
- private $batchSize = 1000;
- private $queue = [];
-
- public static function fromListObjects(
- AwsClientInterface $client,
- array $listObjectsParams,
- array $options = []
- ) {
- $iter = $client->getPaginator('ListObjects', $listObjectsParams);
- $bucket = $listObjectsParams['Bucket'];
- $fn = function (BatchDelete $that) use ($iter) {
- return $iter->each(function ($result) use ($that) {
- $promises = [];
- if (is_array($result['Contents'])) {
- foreach ($result['Contents'] as $object) {
- if ($promise = $that->enqueue($object)) {
- $promises[] = $promise;
- }
- }
- }
- return $promises ? Promise\Utils::all($promises) : null;
- });
- };
- return new self($client, $bucket, $fn, $options);
- }
-
- public static function fromIterator(
- AwsClientInterface $client,
- $bucket,
- \Iterator $iter,
- array $options = []
- ) {
- $fn = function (BatchDelete $that) use ($iter) {
- return Promise\Coroutine::of(function () use ($that, $iter) {
- foreach ($iter as $obj) {
- if ($promise = $that->enqueue($obj)) {
- yield $promise;
- }
- }
- });
- };
- return new self($client, $bucket, $fn, $options);
- }
-
- public function promise(): PromiseInterface
- {
- if (!$this->cachedPromise) {
- $this->cachedPromise = $this->createPromise();
- }
- return $this->cachedPromise;
- }
-
- public function delete()
- {
- $this->promise()->wait();
- }
-
- private function __construct(
- AwsClientInterface $client,
- $bucket,
- callable $promiseFn,
- array $options = []
- ) {
- $this->client = $client;
- $this->bucket = $bucket;
- $this->promiseCreator = $promiseFn;
- if (isset($options['before'])) {
- if (!is_callable($options['before'])) {
- throw new \InvalidArgumentException('before must be callable');
- }
- $this->before = $options['before'];
- }
- if (isset($options['batch_size'])) {
- if ($options['batch_size'] <= 0) {
- throw new \InvalidArgumentException('batch_size is not > 0');
- }
- $this->batchSize = min($options['batch_size'], 1000);
- }
- }
- private function enqueue(array $obj)
- {
- $this->queue[] = $obj;
- return count($this->queue) >= $this->batchSize
- ? $this->flushQueue()
- : null;
- }
- private function flushQueue()
- {
- static $validKeys = ['Key' => true, 'VersionId' => true];
- if (count($this->queue) === 0) {
- return null;
- }
- $batch = [];
- while ($obj = array_shift($this->queue)) {
- $batch[] = array_intersect_key($obj, $validKeys);
- }
- $command = $this->client->getCommand('DeleteObjects', [
- 'Bucket' => $this->bucket,
- 'Delete' => ['Objects' => $batch]
- ]);
- if ($this->before) {
- call_user_func($this->before, $command);
- }
- return $this->client->executeAsync($command)
- ->then(function ($result) {
- if (!empty($result['Errors'])) {
- throw new DeleteMultipleObjectsException(
- $result['Deleted'] ?: [],
- $result['Errors']
- );
- }
- return $result;
- });
- }
-
- private function createPromise()
- {
-
- $promise = call_user_func($this->promiseCreator, $this);
- $this->promiseCreator = null;
-
- $cleanup = function () {
- $this->before = $this->client = $this->queue = null;
- };
-
- return $promise->then(
- function () use ($cleanup) {
- return Promise\Create::promiseFor($this->flushQueue())
- ->then($cleanup);
- },
- function ($reason) use ($cleanup) {
- $cleanup();
- return Promise\Create::rejectionFor($reason);
- }
- );
- }
- }
|