Transfer.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. <?php
  2. namespace Aws\S3;
  3. use Aws;
  4. use Aws\CommandInterface;
  5. use Aws\Exception\AwsException;
  6. use GuzzleHttp\Promise;
  7. use GuzzleHttp\Promise\PromiseInterface;
  8. use GuzzleHttp\Promise\PromisorInterface;
  9. use Iterator;
  10. /**
  11. * Transfers files from the local filesystem to S3 or from S3 to the local
  12. * filesystem.
  13. *
  14. * This class does not support copying from the local filesystem to somewhere
  15. * else on the local filesystem or from one S3 bucket to another.
  16. */
  17. class Transfer implements PromisorInterface
  18. {
  19. private $client;
  20. private $promise;
  21. private $source;
  22. private $sourceMetadata;
  23. private $destination;
  24. private $concurrency;
  25. private $mupThreshold;
  26. private $before;
  27. private $s3Args = [];
  28. private $addContentMD5;
  29. /**
  30. * When providing the $source argument, you may provide a string referencing
  31. * the path to a directory on disk to upload, an s3 scheme URI that contains
  32. * the bucket and key (e.g., "s3://bucket/key"), or an \Iterator object
  33. * that yields strings containing filenames that are the path to a file on
  34. * disk or an s3 scheme URI. The bucket portion of the s3 URI may be an S3
  35. * access point ARN. The "/key" portion of an s3 URI is optional.
  36. *
  37. * When providing an iterator for the $source argument, you must also
  38. * provide a 'base_dir' key value pair in the $options argument.
  39. *
  40. * The $dest argument can be the path to a directory on disk or an s3
  41. * scheme URI (e.g., "s3://bucket/key").
  42. *
  43. * The options array can contain the following key value pairs:
  44. *
  45. * - base_dir: (string) Base directory of the source, if $source is an
  46. * iterator. If the $source option is not an array, then this option is
  47. * ignored.
  48. * - before: (callable) A callback to invoke before each transfer. The
  49. * callback accepts a single argument: Aws\CommandInterface $command.
  50. * The provided command will be either a GetObject, PutObject,
  51. * InitiateMultipartUpload, or UploadPart command.
  52. * - mup_threshold: (int) Size in bytes in which a multipart upload should
  53. * be used instead of PutObject. Defaults to 20971520 (20 MB).
  54. * - concurrency: (int, default=5) Number of files to upload concurrently.
  55. * The ideal concurrency value will vary based on the number of files
  56. * being uploaded and the average size of each file. Generally speaking,
  57. * smaller files benefit from a higher concurrency while larger files
  58. * will not.
  59. * - debug: (bool) Set to true to print out debug information for
  60. * transfers. Set to an fopen() resource to write to a specific stream
  61. * rather than writing to STDOUT.
  62. *
  63. * @param S3ClientInterface $client Client used for transfers.
  64. * @param string|Iterator $source Where the files are transferred from.
  65. * @param string $dest Where the files are transferred to.
  66. * @param array $options Hash of options.
  67. */
  68. public function __construct(
  69. S3ClientInterface $client,
  70. $source,
  71. $dest,
  72. array $options = []
  73. ) {
  74. $this->client = $client;
  75. // Prepare the destination.
  76. $this->destination = $this->prepareTarget($dest);
  77. if ($this->destination['scheme'] === 's3') {
  78. $this->s3Args = $this->getS3Args($this->destination['path']);
  79. }
  80. // Prepare the source.
  81. if (is_string($source)) {
  82. $this->sourceMetadata = $this->prepareTarget($source);
  83. $this->source = $source;
  84. } elseif ($source instanceof Iterator) {
  85. if (empty($options['base_dir'])) {
  86. throw new \InvalidArgumentException('You must provide the source'
  87. . ' argument as a string or provide the "base_dir" option.');
  88. }
  89. $this->sourceMetadata = $this->prepareTarget($options['base_dir']);
  90. $this->source = $source;
  91. } else {
  92. throw new \InvalidArgumentException('source must be the path to a '
  93. . 'directory or an iterator that yields file names.');
  94. }
  95. // Validate schemes.
  96. if ($this->sourceMetadata['scheme'] === $this->destination['scheme']) {
  97. throw new \InvalidArgumentException("You cannot copy from"
  98. . " {$this->sourceMetadata['scheme']} to"
  99. . " {$this->destination['scheme']}."
  100. );
  101. }
  102. // Handle multipart-related options.
  103. $this->concurrency = isset($options['concurrency'])
  104. ? $options['concurrency']
  105. : MultipartUploader::DEFAULT_CONCURRENCY;
  106. $this->mupThreshold = isset($options['mup_threshold'])
  107. ? $options['mup_threshold']
  108. : 16777216;
  109. if ($this->mupThreshold < MultipartUploader::PART_MIN_SIZE) {
  110. throw new \InvalidArgumentException('mup_threshold must be >= 5MB');
  111. }
  112. // Handle "before" callback option.
  113. if (isset($options['before'])) {
  114. $this->before = $options['before'];
  115. if (!is_callable($this->before)) {
  116. throw new \InvalidArgumentException('before must be a callable.');
  117. }
  118. }
  119. // Handle "debug" option.
  120. if (isset($options['debug'])) {
  121. if ($options['debug'] === true) {
  122. $options['debug'] = fopen('php://output', 'w');
  123. }
  124. if (is_resource($options['debug'])) {
  125. $this->addDebugToBefore($options['debug']);
  126. }
  127. }
  128. // Handle "add_content_md5" option.
  129. $this->addContentMD5 = isset($options['add_content_md5'])
  130. && $options['add_content_md5'] === true;
  131. }
  132. /**
  133. * Transfers the files.
  134. *
  135. * @return PromiseInterface
  136. */
  137. public function promise(): PromiseInterface
  138. {
  139. // If the promise has been created, just return it.
  140. if (!$this->promise) {
  141. // Create an upload/download promise for the transfer.
  142. $this->promise = $this->sourceMetadata['scheme'] === 'file'
  143. ? $this->createUploadPromise()
  144. : $this->createDownloadPromise();
  145. }
  146. return $this->promise;
  147. }
  148. /**
  149. * Transfers the files synchronously.
  150. */
  151. public function transfer()
  152. {
  153. $this->promise()->wait();
  154. }
  155. private function prepareTarget($targetPath)
  156. {
  157. $target = [
  158. 'path' => $this->normalizePath($targetPath),
  159. 'scheme' => $this->determineScheme($targetPath),
  160. ];
  161. if ($target['scheme'] !== 's3' && $target['scheme'] !== 'file') {
  162. throw new \InvalidArgumentException('Scheme must be "s3" or "file".');
  163. }
  164. return $target;
  165. }
  166. /**
  167. * Creates an array that contains Bucket and Key by parsing the filename.
  168. *
  169. * @param string $path Path to parse.
  170. *
  171. * @return array
  172. */
  173. private function getS3Args($path)
  174. {
  175. $parts = explode('/', str_replace('s3://', '', $path), 2);
  176. $args = ['Bucket' => $parts[0]];
  177. if (isset($parts[1])) {
  178. $args['Key'] = $parts[1];
  179. }
  180. return $args;
  181. }
  182. /**
  183. * Parses the scheme from a filename.
  184. *
  185. * @param string $path Path to parse.
  186. *
  187. * @return string
  188. */
  189. private function determineScheme($path)
  190. {
  191. return !strpos($path, '://') ? 'file' : explode('://', $path)[0];
  192. }
  193. /**
  194. * Normalize a path so that it has UNIX-style directory separators and no trailing /
  195. *
  196. * @param string $path
  197. *
  198. * @return string
  199. */
  200. private function normalizePath($path)
  201. {
  202. return rtrim(str_replace('\\', '/', $path), '/');
  203. }
  204. private function resolvesOutsideTargetDirectory($sink, $objectKey)
  205. {
  206. $resolved = [];
  207. $sections = explode('/', $sink);
  208. $targetSectionsLength = count(explode('/', $objectKey));
  209. $targetSections = array_slice($sections, -($targetSectionsLength + 1));
  210. $targetDirectory = $targetSections[0];
  211. foreach ($targetSections as $section) {
  212. if ($section === '.' || $section === '') {
  213. continue;
  214. }
  215. if ($section === '..') {
  216. array_pop($resolved);
  217. if (empty($resolved) || $resolved[0] !== $targetDirectory) {
  218. return true;
  219. }
  220. } else {
  221. $resolved []= $section;
  222. }
  223. }
  224. return false;
  225. }
  226. private function createDownloadPromise()
  227. {
  228. $parts = $this->getS3Args($this->sourceMetadata['path']);
  229. $prefix = "s3://{$parts['Bucket']}/"
  230. . (isset($parts['Key']) ? $parts['Key'] . '/' : '');
  231. $commands = [];
  232. foreach ($this->getDownloadsIterator() as $object) {
  233. // Prepare the sink.
  234. $objectKey = preg_replace('/^' . preg_quote($prefix, '/') . '/', '', $object);
  235. $sink = $this->destination['path'] . '/' . $objectKey;
  236. $command = $this->client->getCommand(
  237. 'GetObject',
  238. $this->getS3Args($object) + ['@http' => ['sink' => $sink]]
  239. );
  240. if ($this->resolvesOutsideTargetDirectory($sink, $objectKey)) {
  241. throw new AwsException(
  242. 'Cannot download key ' . $objectKey
  243. . ', its relative path resolves outside the'
  244. . ' parent directory', $command);
  245. }
  246. // Create the directory if needed.
  247. $dir = dirname($sink);
  248. if (!is_dir($dir) && !mkdir($dir, 0777, true)) {
  249. throw new \RuntimeException("Could not create dir: {$dir}");
  250. }
  251. // Create the command.
  252. $commands []= $command;
  253. }
  254. // Create a GetObject command pool and return the promise.
  255. return (new Aws\CommandPool($this->client, $commands, [
  256. 'concurrency' => $this->concurrency,
  257. 'before' => $this->before,
  258. 'rejected' => function ($reason, $idx, Promise\PromiseInterface $p) {
  259. $p->reject($reason);
  260. }
  261. ]))->promise();
  262. }
  263. private function createUploadPromise()
  264. {
  265. // Map each file into a promise that performs the actual transfer.
  266. $files = \Aws\map($this->getUploadsIterator(), function ($file) {
  267. return (filesize($file) >= $this->mupThreshold)
  268. ? $this->uploadMultipart($file)
  269. : $this->upload($file);
  270. });
  271. // Create an EachPromise, that will concurrently handle the upload
  272. // operations' yielded promises from the iterator.
  273. return Promise\Each::ofLimitAll($files, $this->concurrency);
  274. }
  275. /** @return Iterator */
  276. private function getUploadsIterator()
  277. {
  278. if (is_string($this->source)) {
  279. return Aws\filter(
  280. Aws\recursive_dir_iterator($this->sourceMetadata['path']),
  281. function ($file) { return !is_dir($file); }
  282. );
  283. }
  284. return $this->source;
  285. }
  286. /** @return Iterator */
  287. private function getDownloadsIterator()
  288. {
  289. if (is_string($this->source)) {
  290. $listArgs = $this->getS3Args($this->sourceMetadata['path']);
  291. if (isset($listArgs['Key'])) {
  292. $listArgs['Prefix'] = $listArgs['Key'] . '/';
  293. unset($listArgs['Key']);
  294. }
  295. $files = $this->client
  296. ->getPaginator('ListObjects', $listArgs)
  297. ->search('Contents[].Key');
  298. $files = Aws\map($files, function ($key) use ($listArgs) {
  299. return "s3://{$listArgs['Bucket']}/$key";
  300. });
  301. return Aws\filter($files, function ($key) {
  302. return substr($key, -1, 1) !== '/';
  303. });
  304. }
  305. return $this->source;
  306. }
  307. private function upload($filename)
  308. {
  309. $args = $this->s3Args;
  310. $args['SourceFile'] = $filename;
  311. $args['Key'] = $this->createS3Key($filename);
  312. $args['AddContentMD5'] = $this->addContentMD5;
  313. $command = $this->client->getCommand('PutObject', $args);
  314. $this->before and call_user_func($this->before, $command);
  315. return $this->client->executeAsync($command);
  316. }
  317. private function uploadMultipart($filename)
  318. {
  319. $args = $this->s3Args;
  320. $args['Key'] = $this->createS3Key($filename);
  321. $filename = $filename instanceof \SplFileInfo ? $filename->getPathname() : $filename;
  322. return (new MultipartUploader($this->client, $filename, [
  323. 'bucket' => $args['Bucket'],
  324. 'key' => $args['Key'],
  325. 'before_initiate' => $this->before,
  326. 'before_upload' => $this->before,
  327. 'before_complete' => $this->before,
  328. 'concurrency' => $this->concurrency,
  329. 'add_content_md5' => $this->addContentMD5
  330. ]))->promise();
  331. }
  332. private function createS3Key($filename)
  333. {
  334. $filename = $this->normalizePath($filename);
  335. $relative_file_path = ltrim(
  336. preg_replace('#^' . preg_quote($this->sourceMetadata['path']) . '#', '', $filename),
  337. '/\\'
  338. );
  339. if (isset($this->s3Args['Key'])) {
  340. return rtrim($this->s3Args['Key'], '/').'/'.$relative_file_path;
  341. }
  342. return $relative_file_path;
  343. }
  344. private function addDebugToBefore($debug)
  345. {
  346. $before = $this->before;
  347. $sourcePath = $this->sourceMetadata['path'];
  348. $s3Args = $this->s3Args;
  349. $this->before = static function (
  350. CommandInterface $command
  351. ) use ($before, $debug, $sourcePath, $s3Args) {
  352. // Call the composed before function.
  353. $before and $before($command);
  354. // Determine the source and dest values based on operation.
  355. switch ($operation = $command->getName()) {
  356. case 'GetObject':
  357. $source = "s3://{$command['Bucket']}/{$command['Key']}";
  358. $dest = $command['@http']['sink'];
  359. break;
  360. case 'PutObject':
  361. $source = $command['SourceFile'];
  362. $dest = "s3://{$command['Bucket']}/{$command['Key']}";
  363. break;
  364. case 'UploadPart':
  365. $part = $command['PartNumber'];
  366. case 'CreateMultipartUpload':
  367. case 'CompleteMultipartUpload':
  368. $sourceKey = $command['Key'];
  369. if (isset($s3Args['Key']) && strpos($sourceKey, $s3Args['Key']) === 0) {
  370. $sourceKey = substr($sourceKey, strlen($s3Args['Key']) + 1);
  371. }
  372. $source = "{$sourcePath}/{$sourceKey}";
  373. $dest = "s3://{$command['Bucket']}/{$command['Key']}";
  374. break;
  375. default:
  376. throw new \UnexpectedValueException(
  377. "Transfer encountered an unexpected operation: {$operation}."
  378. );
  379. }
  380. // Print the debugging message.
  381. $context = sprintf('%s -> %s (%s)', $source, $dest, $operation);
  382. if (isset($part)) {
  383. $context .= " : Part={$part}";
  384. }
  385. fwrite($debug, "Transferring {$context}\n");
  386. };
  387. }
  388. }