| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436 | <?phpnamespace Aws\S3;use Aws;use Aws\CommandInterface;use Aws\Exception\AwsException;use GuzzleHttp\Promise;use GuzzleHttp\Promise\PromiseInterface;use GuzzleHttp\Promise\PromisorInterface;use Iterator;/** * Transfers files from the local filesystem to S3 or from S3 to the local * filesystem. * * This class does not support copying from the local filesystem to somewhere * else on the local filesystem or from one S3 bucket to another. */class Transfer implements PromisorInterface{    private $client;    private $promise;    private $source;    private $sourceMetadata;    private $destination;    private $concurrency;    private $mupThreshold;    private $before;    private $s3Args = [];    private $addContentMD5;    /**     * When providing the $source argument, you may provide a string referencing     * the path to a directory on disk to upload, an s3 scheme URI that contains     * the bucket and key (e.g., "s3://bucket/key"), or an \Iterator object     * that yields strings containing filenames that are the path to a file on     * disk or an s3 scheme URI. The bucket portion of the s3 URI may be an S3     * access point ARN. The "/key" portion of an s3 URI is optional.     *     * When providing an iterator for the $source argument, you must also     * provide a 'base_dir' key value pair in the $options argument.     *     * The $dest argument can be the path to a directory on disk or an s3     * scheme URI (e.g., "s3://bucket/key").     *     * The options array can contain the following key value pairs:     *     * - base_dir: (string) Base directory of the source, if $source is an     *   iterator. If the $source option is not an array, then this option is     *   ignored.     * - before: (callable) A callback to invoke before each transfer. The     *   callback accepts a single argument: Aws\CommandInterface $command.     *   The provided command will be either a GetObject, PutObject,     *   InitiateMultipartUpload, or UploadPart command.     * - mup_threshold: (int) Size in bytes in which a multipart upload should     *   be used instead of PutObject. Defaults to 20971520 (20 MB).     * - concurrency: (int, default=5) Number of files to upload concurrently.     *   The ideal concurrency value will vary based on the number of files     *   being uploaded and the average size of each file. Generally speaking,     *   smaller files benefit from a higher concurrency while larger files     *   will not.     * - debug: (bool) Set to true to print out debug information for     *   transfers. Set to an fopen() resource to write to a specific stream     *   rather than writing to STDOUT.     *     * @param S3ClientInterface $client  Client used for transfers.     * @param string|Iterator   $source  Where the files are transferred from.     * @param string            $dest    Where the files are transferred to.     * @param array             $options Hash of options.     */    public function __construct(        S3ClientInterface $client,        $source,        $dest,        array $options = []    ) {        $this->client = $client;        // Prepare the destination.        $this->destination = $this->prepareTarget($dest);        if ($this->destination['scheme'] === 's3') {            $this->s3Args = $this->getS3Args($this->destination['path']);        }        // Prepare the source.        if (is_string($source)) {            $this->sourceMetadata = $this->prepareTarget($source);            $this->source = $source;        } elseif ($source instanceof Iterator) {            if (empty($options['base_dir'])) {                throw new \InvalidArgumentException('You must provide the source'                    . ' argument as a string or provide the "base_dir" option.');            }            $this->sourceMetadata = $this->prepareTarget($options['base_dir']);            $this->source = $source;        } else {            throw new \InvalidArgumentException('source must be the path to a '                . 'directory or an iterator that yields file names.');        }        // Validate schemes.        if ($this->sourceMetadata['scheme'] === $this->destination['scheme']) {            throw new \InvalidArgumentException("You cannot copy from"                . " {$this->sourceMetadata['scheme']} to"                . " {$this->destination['scheme']}."            );        }        // Handle multipart-related options.        $this->concurrency = isset($options['concurrency'])            ? $options['concurrency']            : MultipartUploader::DEFAULT_CONCURRENCY;        $this->mupThreshold = isset($options['mup_threshold'])            ? $options['mup_threshold']            : 16777216;        if ($this->mupThreshold < MultipartUploader::PART_MIN_SIZE) {            throw new \InvalidArgumentException('mup_threshold must be >= 5MB');        }        // Handle "before" callback option.        if (isset($options['before'])) {            $this->before = $options['before'];            if (!is_callable($this->before)) {                throw new \InvalidArgumentException('before must be a callable.');            }        }        // Handle "debug" option.        if (isset($options['debug'])) {            if ($options['debug'] === true) {                $options['debug'] = fopen('php://output', 'w');            }            if (is_resource($options['debug'])) {                $this->addDebugToBefore($options['debug']);            }        }        // Handle "add_content_md5" option.        $this->addContentMD5 = isset($options['add_content_md5'])            && $options['add_content_md5'] === true;    }    /**     * Transfers the files.     *     * @return PromiseInterface     */    public function promise(): PromiseInterface    {        // If the promise has been created, just return it.        if (!$this->promise) {            // Create an upload/download promise for the transfer.            $this->promise = $this->sourceMetadata['scheme'] === 'file'                ? $this->createUploadPromise()                : $this->createDownloadPromise();        }        return $this->promise;    }    /**     * Transfers the files synchronously.     */    public function transfer()    {        $this->promise()->wait();    }    private function prepareTarget($targetPath)    {        $target = [            'path'   => $this->normalizePath($targetPath),            'scheme' => $this->determineScheme($targetPath),        ];        if ($target['scheme'] !== 's3' && $target['scheme'] !== 'file') {            throw new \InvalidArgumentException('Scheme must be "s3" or "file".');        }        return $target;    }    /**     * Creates an array that contains Bucket and Key by parsing the filename.     *     * @param string $path Path to parse.     *     * @return array     */    private function getS3Args($path)    {        $parts = explode('/', str_replace('s3://', '', $path), 2);        $args = ['Bucket' => $parts[0]];        if (isset($parts[1])) {            $args['Key'] = $parts[1];        }        return $args;    }    /**     * Parses the scheme from a filename.     *     * @param string $path Path to parse.     *     * @return string     */    private function determineScheme($path)    {        return !strpos($path, '://') ? 'file' : explode('://', $path)[0];    }    /**     * Normalize a path so that it has UNIX-style directory separators and no trailing /     *     * @param string $path     *     * @return string     */    private function normalizePath($path)    {        return rtrim(str_replace('\\', '/', $path), '/');    }    private function resolvesOutsideTargetDirectory($sink, $objectKey)    {        $resolved = [];        $sections = explode('/', $sink);        $targetSectionsLength = count(explode('/', $objectKey));        $targetSections = array_slice($sections, -($targetSectionsLength + 1));        $targetDirectory = $targetSections[0];        foreach ($targetSections as $section) {            if ($section === '.' || $section === '') {                continue;            }            if ($section === '..') {                array_pop($resolved);                if (empty($resolved) || $resolved[0] !== $targetDirectory) {                    return true;                }            } else {                $resolved []= $section;            }        }        return false;    }    private function createDownloadPromise()    {        $parts = $this->getS3Args($this->sourceMetadata['path']);        $prefix = "s3://{$parts['Bucket']}/"            . (isset($parts['Key']) ? $parts['Key'] . '/' : '');        $commands = [];        foreach ($this->getDownloadsIterator() as $object) {            // Prepare the sink.            $objectKey = preg_replace('/^' . preg_quote($prefix, '/') . '/', '', $object);            $sink = $this->destination['path'] . '/' . $objectKey;            $command = $this->client->getCommand(                'GetObject',                $this->getS3Args($object) + ['@http'  => ['sink'  => $sink]]            );            if ($this->resolvesOutsideTargetDirectory($sink, $objectKey)) {                throw new AwsException(                    'Cannot download key ' . $objectKey                    . ', its relative path resolves outside the'                    . ' parent directory', $command);            }            // Create the directory if needed.            $dir = dirname($sink);            if (!is_dir($dir) && !mkdir($dir, 0777, true)) {                throw new \RuntimeException("Could not create dir: {$dir}");            }            // Create the command.            $commands []= $command;        }        // Create a GetObject command pool and return the promise.        return (new Aws\CommandPool($this->client, $commands, [            'concurrency' => $this->concurrency,            'before'      => $this->before,            'rejected'    => function ($reason, $idx, Promise\PromiseInterface $p) {                $p->reject($reason);            }        ]))->promise();    }    private function createUploadPromise()    {        // Map each file into a promise that performs the actual transfer.        $files = \Aws\map($this->getUploadsIterator(), function ($file) {            return (filesize($file) >= $this->mupThreshold)                ? $this->uploadMultipart($file)                : $this->upload($file);        });        // Create an EachPromise, that will concurrently handle the upload        // operations' yielded promises from the iterator.        return Promise\Each::ofLimitAll($files, $this->concurrency);    }    /** @return Iterator */    private function getUploadsIterator()    {        if (is_string($this->source)) {            return Aws\filter(                Aws\recursive_dir_iterator($this->sourceMetadata['path']),                function ($file) { return !is_dir($file); }            );        }        return $this->source;    }    /** @return Iterator */    private function getDownloadsIterator()    {        if (is_string($this->source)) {            $listArgs = $this->getS3Args($this->sourceMetadata['path']);            if (isset($listArgs['Key'])) {                $listArgs['Prefix'] = $listArgs['Key'] . '/';                unset($listArgs['Key']);            }            $files = $this->client                ->getPaginator('ListObjects', $listArgs)                ->search('Contents[].Key');            $files = Aws\map($files, function ($key) use ($listArgs) {                return "s3://{$listArgs['Bucket']}/$key";            });            return Aws\filter($files, function ($key) {                return substr($key, -1, 1) !== '/';            });        }        return $this->source;    }    private function upload($filename)    {        $args = $this->s3Args;        $args['SourceFile'] = $filename;        $args['Key'] = $this->createS3Key($filename);        $args['AddContentMD5'] = $this->addContentMD5;        $command = $this->client->getCommand('PutObject', $args);        $this->before and call_user_func($this->before, $command);        return $this->client->executeAsync($command);    }    private function uploadMultipart($filename)    {        $args = $this->s3Args;        $args['Key'] = $this->createS3Key($filename);        $filename = $filename instanceof \SplFileInfo ? $filename->getPathname() : $filename;        return (new MultipartUploader($this->client, $filename, [            'bucket'          => $args['Bucket'],            'key'             => $args['Key'],            'before_initiate' => $this->before,            'before_upload'   => $this->before,            'before_complete' => $this->before,            'concurrency'     => $this->concurrency,            'add_content_md5' => $this->addContentMD5        ]))->promise();    }    private function createS3Key($filename)    {        $filename = $this->normalizePath($filename);        $relative_file_path = ltrim(            preg_replace('#^' . preg_quote($this->sourceMetadata['path']) . '#', '', $filename),            '/\\'        );        if (isset($this->s3Args['Key'])) {            return rtrim($this->s3Args['Key'], '/').'/'.$relative_file_path;        }        return $relative_file_path;    }    private function addDebugToBefore($debug)    {        $before = $this->before;        $sourcePath = $this->sourceMetadata['path'];        $s3Args = $this->s3Args;        $this->before = static function (            CommandInterface $command        ) use ($before, $debug, $sourcePath, $s3Args) {            // Call the composed before function.            $before and $before($command);            // Determine the source and dest values based on operation.            switch ($operation = $command->getName()) {                case 'GetObject':                    $source = "s3://{$command['Bucket']}/{$command['Key']}";                    $dest = $command['@http']['sink'];                    break;                case 'PutObject':                    $source = $command['SourceFile'];                    $dest = "s3://{$command['Bucket']}/{$command['Key']}";                    break;                case 'UploadPart':                    $part = $command['PartNumber'];                case 'CreateMultipartUpload':                case 'CompleteMultipartUpload':                    $sourceKey = $command['Key'];                    if (isset($s3Args['Key']) && strpos($sourceKey, $s3Args['Key']) === 0) {                        $sourceKey = substr($sourceKey, strlen($s3Args['Key']) + 1);                    }                    $source = "{$sourcePath}/{$sourceKey}";                    $dest = "s3://{$command['Bucket']}/{$command['Key']}";                    break;                default:                    throw new \UnexpectedValueException(                        "Transfer encountered an unexpected operation: {$operation}."                    );            }            // Print the debugging message.            $context = sprintf('%s -> %s (%s)', $source, $dest, $operation);            if (isset($part)) {                $context .= " : Part={$part}";            }            fwrite($debug, "Transferring {$context}\n");        };    }}
 |