AbstractUploader.php 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. <?php
  2. namespace Aws\Multipart;
  3. use Aws\AwsClientInterface as Client;
  4. use Aws\Exception\AwsException;
  5. use GuzzleHttp\Psr7;
  6. use InvalidArgumentException as IAE;
  7. use Psr\Http\Message\StreamInterface as Stream;
  8. abstract class AbstractUploader extends AbstractUploadManager
  9. {
  10. /** @var Stream Source of the data to be uploaded. */
  11. protected $source;
  12. /**
  13. * @param Client $client
  14. * @param mixed $source
  15. * @param array $config
  16. */
  17. public function __construct(Client $client, $source, array $config = [])
  18. {
  19. $this->source = $this->determineSource($source);
  20. parent::__construct($client, $config);
  21. }
  22. /**
  23. * Create a stream for a part that starts at the current position and
  24. * has a length of the upload part size (or less with the final part).
  25. *
  26. * @param Stream $stream
  27. *
  28. * @return Psr7\LimitStream
  29. */
  30. protected function limitPartStream(Stream $stream)
  31. {
  32. // Limit what is read from the stream to the part size.
  33. return new Psr7\LimitStream(
  34. $stream,
  35. $this->state->getPartSize(),
  36. $this->source->tell()
  37. );
  38. }
  39. protected function getUploadCommands(callable $resultHandler)
  40. {
  41. // Determine if the source can be seeked.
  42. $seekable = $this->source->isSeekable()
  43. && $this->source->getMetadata('wrapper_type') === 'plainfile';
  44. for ($partNumber = 1; $this->isEof($seekable); $partNumber++) {
  45. // If we haven't already uploaded this part, yield a new part.
  46. if (!$this->state->hasPartBeenUploaded($partNumber)) {
  47. $partStartPos = $this->source->tell();
  48. if (!($data = $this->createPart($seekable, $partNumber))) {
  49. break;
  50. }
  51. $command = $this->client->getCommand(
  52. $this->info['command']['upload'],
  53. $data + $this->state->getId()
  54. );
  55. $command->getHandlerList()->appendSign($resultHandler, 'mup');
  56. $numberOfParts = $this->getNumberOfParts($this->state->getPartSize());
  57. if (isset($numberOfParts) && $partNumber > $numberOfParts) {
  58. throw new $this->config['exception_class'](
  59. $this->state,
  60. new AwsException(
  61. "Maximum part number for this job exceeded, file has likely been corrupted." .
  62. " Please restart this upload.",
  63. $command
  64. )
  65. );
  66. }
  67. yield $command;
  68. if ($this->source->tell() > $partStartPos) {
  69. continue;
  70. }
  71. }
  72. // Advance the source's offset if not already advanced.
  73. if ($seekable) {
  74. $this->source->seek(min(
  75. $this->source->tell() + $this->state->getPartSize(),
  76. $this->source->getSize()
  77. ));
  78. } else {
  79. $this->source->read($this->state->getPartSize());
  80. }
  81. }
  82. }
  83. /**
  84. * Generates the parameters for an upload part by analyzing a range of the
  85. * source starting from the current offset up to the part size.
  86. *
  87. * @param bool $seekable
  88. * @param int $number
  89. *
  90. * @return array|null
  91. */
  92. abstract protected function createPart($seekable, $number);
  93. /**
  94. * Checks if the source is at EOF.
  95. *
  96. * @param bool $seekable
  97. *
  98. * @return bool
  99. */
  100. private function isEof($seekable)
  101. {
  102. return $seekable
  103. ? $this->source->tell() < $this->source->getSize()
  104. : !$this->source->eof();
  105. }
  106. /**
  107. * Turns the provided source into a stream and stores it.
  108. *
  109. * If a string is provided, it is assumed to be a filename, otherwise, it
  110. * passes the value directly to `Psr7\Utils::streamFor()`.
  111. *
  112. * @param mixed $source
  113. *
  114. * @return Stream
  115. */
  116. private function determineSource($source)
  117. {
  118. // Use the contents of a file as the data source.
  119. if (is_string($source)) {
  120. $source = Psr7\Utils::tryFopen($source, 'r');
  121. }
  122. // Create a source stream.
  123. $stream = Psr7\Utils::streamFor($source);
  124. if (!$stream->isReadable()) {
  125. throw new IAE('Source stream must be readable.');
  126. }
  127. return $stream;
  128. }
  129. protected function getNumberOfParts($partSize)
  130. {
  131. if ($sourceSize = $this->source->getSize()) {
  132. return ceil($sourceSize/$partSize);
  133. }
  134. return null;
  135. }
  136. }