123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- <?php
- namespace Aws\Multipart;
- use Aws\AwsClientInterface as Client;
- use Aws\Exception\AwsException;
- use GuzzleHttp\Psr7;
- use InvalidArgumentException as IAE;
- use Psr\Http\Message\StreamInterface as Stream;
- abstract class AbstractUploader extends AbstractUploadManager
- {
- /** @var Stream Source of the data to be uploaded. */
- protected $source;
- /**
- * @param Client $client
- * @param mixed $source
- * @param array $config
- */
- public function __construct(Client $client, $source, array $config = [])
- {
- $this->source = $this->determineSource($source);
- parent::__construct($client, $config);
- }
- /**
- * Create a stream for a part that starts at the current position and
- * has a length of the upload part size (or less with the final part).
- *
- * @param Stream $stream
- *
- * @return Psr7\LimitStream
- */
- protected function limitPartStream(Stream $stream)
- {
- // Limit what is read from the stream to the part size.
- return new Psr7\LimitStream(
- $stream,
- $this->state->getPartSize(),
- $this->source->tell()
- );
- }
- protected function getUploadCommands(callable $resultHandler)
- {
- // Determine if the source can be seeked.
- $seekable = $this->source->isSeekable()
- && $this->source->getMetadata('wrapper_type') === 'plainfile';
- for ($partNumber = 1; $this->isEof($seekable); $partNumber++) {
- // If we haven't already uploaded this part, yield a new part.
- if (!$this->state->hasPartBeenUploaded($partNumber)) {
- $partStartPos = $this->source->tell();
- if (!($data = $this->createPart($seekable, $partNumber))) {
- break;
- }
- $command = $this->client->getCommand(
- $this->info['command']['upload'],
- $data + $this->state->getId()
- );
- $command->getHandlerList()->appendSign($resultHandler, 'mup');
- $numberOfParts = $this->getNumberOfParts($this->state->getPartSize());
- if (isset($numberOfParts) && $partNumber > $numberOfParts) {
- throw new $this->config['exception_class'](
- $this->state,
- new AwsException(
- "Maximum part number for this job exceeded, file has likely been corrupted." .
- " Please restart this upload.",
- $command
- )
- );
- }
- yield $command;
- if ($this->source->tell() > $partStartPos) {
- continue;
- }
- }
- // Advance the source's offset if not already advanced.
- if ($seekable) {
- $this->source->seek(min(
- $this->source->tell() + $this->state->getPartSize(),
- $this->source->getSize()
- ));
- } else {
- $this->source->read($this->state->getPartSize());
- }
- }
- }
- /**
- * Generates the parameters for an upload part by analyzing a range of the
- * source starting from the current offset up to the part size.
- *
- * @param bool $seekable
- * @param int $number
- *
- * @return array|null
- */
- abstract protected function createPart($seekable, $number);
- /**
- * Checks if the source is at EOF.
- *
- * @param bool $seekable
- *
- * @return bool
- */
- private function isEof($seekable)
- {
- return $seekable
- ? $this->source->tell() < $this->source->getSize()
- : !$this->source->eof();
- }
- /**
- * Turns the provided source into a stream and stores it.
- *
- * If a string is provided, it is assumed to be a filename, otherwise, it
- * passes the value directly to `Psr7\Utils::streamFor()`.
- *
- * @param mixed $source
- *
- * @return Stream
- */
- private function determineSource($source)
- {
- // Use the contents of a file as the data source.
- if (is_string($source)) {
- $source = Psr7\Utils::tryFopen($source, 'r');
- }
- // Create a source stream.
- $stream = Psr7\Utils::streamFor($source);
- if (!$stream->isReadable()) {
- throw new IAE('Source stream must be readable.');
- }
- return $stream;
- }
- protected function getNumberOfParts($partSize)
- {
- if ($sourceSize = $this->source->getSize()) {
- return ceil($sourceSize/$partSize);
- }
- return null;
- }
- }
|