123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- <?php
- namespace Aws\Multipart;
- use Aws\AwsClientInterface as Client;
- use Aws\Exception\AwsException;
- use GuzzleHttp\Psr7;
- use InvalidArgumentException as IAE;
- use Psr\Http\Message\StreamInterface as Stream;
- abstract class AbstractUploader extends AbstractUploadManager
- {
-
- protected $source;
-
- public function __construct(Client $client, $source, array $config = [])
- {
- $this->source = $this->determineSource($source);
- parent::__construct($client, $config);
- }
-
- protected function limitPartStream(Stream $stream)
- {
-
- return new Psr7\LimitStream(
- $stream,
- $this->state->getPartSize(),
- $this->source->tell()
- );
- }
- protected function getUploadCommands(callable $resultHandler)
- {
-
- $seekable = $this->source->isSeekable()
- && $this->source->getMetadata('wrapper_type') === 'plainfile';
- for ($partNumber = 1; $this->isEof($seekable); $partNumber++) {
-
- if (!$this->state->hasPartBeenUploaded($partNumber)) {
- $partStartPos = $this->source->tell();
- if (!($data = $this->createPart($seekable, $partNumber))) {
- break;
- }
- $command = $this->client->getCommand(
- $this->info['command']['upload'],
- $data + $this->state->getId()
- );
- $command->getHandlerList()->appendSign($resultHandler, 'mup');
- $numberOfParts = $this->getNumberOfParts($this->state->getPartSize());
- if (isset($numberOfParts) && $partNumber > $numberOfParts) {
- throw new $this->config['exception_class'](
- $this->state,
- new AwsException(
- "Maximum part number for this job exceeded, file has likely been corrupted." .
- " Please restart this upload.",
- $command
- )
- );
- }
- yield $command;
- if ($this->source->tell() > $partStartPos) {
- continue;
- }
- }
-
- if ($seekable) {
- $this->source->seek(min(
- $this->source->tell() + $this->state->getPartSize(),
- $this->source->getSize()
- ));
- } else {
- $this->source->read($this->state->getPartSize());
- }
- }
- }
-
- abstract protected function createPart($seekable, $number);
-
- private function isEof($seekable)
- {
- return $seekable
- ? $this->source->tell() < $this->source->getSize()
- : !$this->source->eof();
- }
-
- private function determineSource($source)
- {
-
- if (is_string($source)) {
- $source = Psr7\Utils::tryFopen($source, 'r');
- }
-
- $stream = Psr7\Utils::streamFor($source);
- if (!$stream->isReadable()) {
- throw new IAE('Source stream must be readable.');
- }
- return $stream;
- }
- protected function getNumberOfParts($partSize)
- {
- if ($sourceSize = $this->source->getSize()) {
- return ceil($sourceSize/$partSize);
- }
- return null;
- }
- }
|