DecodingEventStreamIterator.php 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. <?php
  2. namespace Aws\Api\Parser;
  3. use \Iterator;
  4. use Aws\Api\DateTimeResult;
  5. use GuzzleHttp\Psr7;
  6. use Psr\Http\Message\StreamInterface;
  7. use Aws\Api\Parser\Exception\ParserException;
  8. /**
  9. * @internal Implements a decoder for a binary encoded event stream that will
  10. * decode, validate, and provide individual events from the stream.
  11. */
  12. class DecodingEventStreamIterator implements Iterator
  13. {
  14. const HEADERS = 'headers';
  15. const PAYLOAD = 'payload';
  16. const LENGTH_TOTAL = 'total_length';
  17. const LENGTH_HEADERS = 'headers_length';
  18. const CRC_PRELUDE = 'prelude_crc';
  19. const BYTES_PRELUDE = 12;
  20. const BYTES_TRAILING = 4;
  21. private static $preludeFormat = [
  22. self::LENGTH_TOTAL => 'decodeUint32',
  23. self::LENGTH_HEADERS => 'decodeUint32',
  24. self::CRC_PRELUDE => 'decodeUint32',
  25. ];
  26. private static $lengthFormatMap = [
  27. 1 => 'decodeUint8',
  28. 2 => 'decodeUint16',
  29. 4 => 'decodeUint32',
  30. 8 => 'decodeUint64',
  31. ];
  32. private static $headerTypeMap = [
  33. 0 => 'decodeBooleanTrue',
  34. 1 => 'decodeBooleanFalse',
  35. 2 => 'decodeInt8',
  36. 3 => 'decodeInt16',
  37. 4 => 'decodeInt32',
  38. 5 => 'decodeInt64',
  39. 6 => 'decodeBytes',
  40. 7 => 'decodeString',
  41. 8 => 'decodeTimestamp',
  42. 9 => 'decodeUuid',
  43. ];
  44. /** @var StreamInterface Stream of eventstream shape to parse. */
  45. protected $stream;
  46. /** @var array Currently parsed event. */
  47. protected $currentEvent;
  48. /** @var int Current in-order event key. */
  49. protected $key;
  50. /** @var resource|\HashContext CRC32 hash context for event validation */
  51. protected $hashContext;
  52. /** @var int $currentPosition */
  53. protected $currentPosition;
  54. /**
  55. * DecodingEventStreamIterator constructor.
  56. *
  57. * @param StreamInterface $stream
  58. */
  59. public function __construct(StreamInterface $stream)
  60. {
  61. $this->stream = $stream;
  62. $this->rewind();
  63. }
  64. protected function parseHeaders($headerBytes)
  65. {
  66. $headers = [];
  67. $bytesRead = 0;
  68. while ($bytesRead < $headerBytes) {
  69. list($key, $numBytes) = $this->decodeString(1);
  70. $bytesRead += $numBytes;
  71. list($type, $numBytes) = $this->decodeUint8();
  72. $bytesRead += $numBytes;
  73. $f = self::$headerTypeMap[$type];
  74. list($value, $numBytes) = $this->{$f}();
  75. $bytesRead += $numBytes;
  76. if (isset($headers[$key])) {
  77. throw new ParserException('Duplicate key in event headers.');
  78. }
  79. $headers[$key] = $value;
  80. }
  81. return [$headers, $bytesRead];
  82. }
  83. protected function parsePrelude()
  84. {
  85. $prelude = [];
  86. $bytesRead = 0;
  87. $calculatedCrc = null;
  88. foreach (self::$preludeFormat as $key => $decodeFunction) {
  89. if ($key === self::CRC_PRELUDE) {
  90. $hashCopy = hash_copy($this->hashContext);
  91. $calculatedCrc = hash_final($this->hashContext, true);
  92. $this->hashContext = $hashCopy;
  93. }
  94. list($value, $numBytes) = $this->{$decodeFunction}();
  95. $bytesRead += $numBytes;
  96. $prelude[$key] = $value;
  97. }
  98. if (unpack('N', $calculatedCrc)[1] !== $prelude[self::CRC_PRELUDE]) {
  99. throw new ParserException('Prelude checksum mismatch.');
  100. }
  101. return [$prelude, $bytesRead];
  102. }
  103. /**
  104. * This method decodes an event from the stream.
  105. *
  106. * @return array
  107. */
  108. protected function parseEvent()
  109. {
  110. $event = [];
  111. if ($this->stream->tell() < $this->stream->getSize()) {
  112. $this->hashContext = hash_init('crc32b');
  113. $bytesLeft = $this->stream->getSize() - $this->stream->tell();
  114. list($prelude, $numBytes) = $this->parsePrelude();
  115. if ($prelude[self::LENGTH_TOTAL] > $bytesLeft) {
  116. throw new ParserException('Message length too long.');
  117. }
  118. $bytesLeft -= $numBytes;
  119. if ($prelude[self::LENGTH_HEADERS] > $bytesLeft) {
  120. throw new ParserException('Headers length too long.');
  121. }
  122. list(
  123. $event[self::HEADERS],
  124. $numBytes
  125. ) = $this->parseHeaders($prelude[self::LENGTH_HEADERS]);
  126. $event[self::PAYLOAD] = Psr7\Utils::streamFor(
  127. $this->readAndHashBytes(
  128. $prelude[self::LENGTH_TOTAL] - self::BYTES_PRELUDE
  129. - $numBytes - self::BYTES_TRAILING
  130. )
  131. );
  132. $calculatedCrc = hash_final($this->hashContext, true);
  133. $messageCrc = $this->stream->read(4);
  134. if ($calculatedCrc !== $messageCrc) {
  135. throw new ParserException('Message checksum mismatch.');
  136. }
  137. }
  138. return $event;
  139. }
  140. // Iterator Functionality
  141. /**
  142. * @return array
  143. */
  144. #[\ReturnTypeWillChange]
  145. public function current()
  146. {
  147. return $this->currentEvent;
  148. }
  149. /**
  150. * @return int
  151. */
  152. #[\ReturnTypeWillChange]
  153. public function key()
  154. {
  155. return $this->key;
  156. }
  157. #[\ReturnTypeWillChange]
  158. public function next()
  159. {
  160. $this->currentPosition = $this->stream->tell();
  161. if ($this->valid()) {
  162. $this->key++;
  163. $this->currentEvent = $this->parseEvent();
  164. }
  165. }
  166. #[\ReturnTypeWillChange]
  167. public function rewind()
  168. {
  169. $this->stream->rewind();
  170. $this->key = 0;
  171. $this->currentPosition = 0;
  172. $this->currentEvent = $this->parseEvent();
  173. }
  174. /**
  175. * @return bool
  176. */
  177. #[\ReturnTypeWillChange]
  178. public function valid()
  179. {
  180. return $this->currentPosition < $this->stream->getSize();
  181. }
  182. // Decoding Utilities
  183. protected function readAndHashBytes($num)
  184. {
  185. $bytes = $this->stream->read($num);
  186. hash_update($this->hashContext, $bytes);
  187. return $bytes;
  188. }
  189. private function decodeBooleanTrue()
  190. {
  191. return [true, 0];
  192. }
  193. private function decodeBooleanFalse()
  194. {
  195. return [false, 0];
  196. }
  197. private function uintToInt($val, $size)
  198. {
  199. $signedCap = pow(2, $size - 1);
  200. if ($val > $signedCap) {
  201. $val -= (2 * $signedCap);
  202. }
  203. return $val;
  204. }
  205. private function decodeInt8()
  206. {
  207. $val = (int)unpack('C', $this->readAndHashBytes(1))[1];
  208. return [$this->uintToInt($val, 8), 1];
  209. }
  210. private function decodeUint8()
  211. {
  212. return [unpack('C', $this->readAndHashBytes(1))[1], 1];
  213. }
  214. private function decodeInt16()
  215. {
  216. $val = (int)unpack('n', $this->readAndHashBytes(2))[1];
  217. return [$this->uintToInt($val, 16), 2];
  218. }
  219. private function decodeUint16()
  220. {
  221. return [unpack('n', $this->readAndHashBytes(2))[1], 2];
  222. }
  223. private function decodeInt32()
  224. {
  225. $val = (int)unpack('N', $this->readAndHashBytes(4))[1];
  226. return [$this->uintToInt($val, 32), 4];
  227. }
  228. private function decodeUint32()
  229. {
  230. return [unpack('N', $this->readAndHashBytes(4))[1], 4];
  231. }
  232. private function decodeInt64()
  233. {
  234. $val = $this->unpackInt64($this->readAndHashBytes(8))[1];
  235. return [$this->uintToInt($val, 64), 8];
  236. }
  237. private function decodeUint64()
  238. {
  239. return [$this->unpackInt64($this->readAndHashBytes(8))[1], 8];
  240. }
  241. private function unpackInt64($bytes)
  242. {
  243. if (version_compare(PHP_VERSION, '5.6.3', '<')) {
  244. $d = unpack('N2', $bytes);
  245. return [1 => $d[1] << 32 | $d[2]];
  246. }
  247. return unpack('J', $bytes);
  248. }
  249. private function decodeBytes($lengthBytes=2)
  250. {
  251. if (!isset(self::$lengthFormatMap[$lengthBytes])) {
  252. throw new ParserException('Undefined variable length format.');
  253. }
  254. $f = self::$lengthFormatMap[$lengthBytes];
  255. list($len, $bytes) = $this->{$f}();
  256. return [$this->readAndHashBytes($len), $len + $bytes];
  257. }
  258. private function decodeString($lengthBytes=2)
  259. {
  260. if (!isset(self::$lengthFormatMap[$lengthBytes])) {
  261. throw new ParserException('Undefined variable length format.');
  262. }
  263. $f = self::$lengthFormatMap[$lengthBytes];
  264. list($len, $bytes) = $this->{$f}();
  265. return [$this->readAndHashBytes($len), $len + $bytes];
  266. }
  267. private function decodeTimestamp()
  268. {
  269. list($val, $bytes) = $this->decodeInt64();
  270. return [
  271. DateTimeResult::createFromFormat('U.u', $val / 1000),
  272. $bytes
  273. ];
  274. }
  275. private function decodeUuid()
  276. {
  277. $val = unpack('H32', $this->readAndHashBytes(16))[1];
  278. return [
  279. substr($val, 0, 8) . '-'
  280. . substr($val, 8, 4) . '-'
  281. . substr($val, 12, 4) . '-'
  282. . substr($val, 16, 4) . '-'
  283. . substr($val, 20, 12),
  284. 16
  285. ];
  286. }
  287. }