NonSeekableStreamDecodingEventStreamIterator.php 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. <?php
  2. namespace Aws\Api\Parser;
  3. use GuzzleHttp\Psr7;
  4. use Psr\Http\Message\StreamInterface;
  5. use Aws\Api\Parser\Exception\ParserException;
  6. /**
  7. * @inheritDoc
  8. */
  9. class NonSeekableStreamDecodingEventStreamIterator extends DecodingEventStreamIterator
  10. {
  11. /** @var array $tempBuffer */
  12. private $tempBuffer;
  13. /**
  14. * NonSeekableStreamDecodingEventStreamIterator constructor.
  15. *
  16. * @param StreamInterface $stream
  17. */
  18. public function __construct(StreamInterface $stream)
  19. {
  20. $this->stream = $stream;
  21. if ($this->stream->isSeekable()) {
  22. throw new \InvalidArgumentException('The stream provided must be not seekable.');
  23. }
  24. $this->tempBuffer = [];
  25. }
  26. /**
  27. * @inheritDoc
  28. *
  29. * @return array
  30. */
  31. protected function parseEvent(): array
  32. {
  33. $event = [];
  34. $this->hashContext = hash_init('crc32b');
  35. $prelude = $this->parsePrelude()[0];
  36. list(
  37. $event[self::HEADERS],
  38. $numBytes
  39. ) = $this->parseHeaders($prelude[self::LENGTH_HEADERS]);
  40. $event[self::PAYLOAD] = Psr7\Utils::streamFor(
  41. $this->readAndHashBytes(
  42. $prelude[self::LENGTH_TOTAL] - self::BYTES_PRELUDE
  43. - $numBytes - self::BYTES_TRAILING
  44. )
  45. );
  46. $calculatedCrc = hash_final($this->hashContext, true);
  47. $messageCrc = $this->stream->read(4);
  48. if ($calculatedCrc !== $messageCrc) {
  49. throw new ParserException('Message checksum mismatch.');
  50. }
  51. return $event;
  52. }
  53. protected function readAndHashBytes($num): string
  54. {
  55. $bytes = '';
  56. while (!empty($this->tempBuffer) && $num > 0) {
  57. $byte = array_shift($this->tempBuffer);
  58. $bytes .= $byte;
  59. $num = $num - 1;
  60. }
  61. $bytes = $bytes . $this->stream->read($num);
  62. hash_update($this->hashContext, $bytes);
  63. return $bytes;
  64. }
  65. // Iterator Functionality
  66. #[\ReturnTypeWillChange]
  67. public function rewind()
  68. {
  69. $this->currentEvent = $this->parseEvent();
  70. }
  71. public function next()
  72. {
  73. $this->tempBuffer[] = $this->stream->read(1);
  74. if ($this->valid()) {
  75. $this->key++;
  76. $this->currentEvent = $this->parseEvent();
  77. }
  78. }
  79. /**
  80. * @return bool
  81. */
  82. #[\ReturnTypeWillChange]
  83. public function valid()
  84. {
  85. return !$this->stream->eof();
  86. }
  87. }