EventParsingIterator.php 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. <?php
  2. namespace Aws\Api\Parser;
  3. use \Iterator;
  4. use Aws\Exception\EventStreamDataException;
  5. use Aws\Api\Parser\Exception\ParserException;
  6. use Aws\Api\StructureShape;
  7. use Psr\Http\Message\StreamInterface;
  8. /**
  9. * @internal Implements a decoder for a binary encoded event stream that will
  10. * decode, validate, and provide individual events from the stream.
  11. */
  12. class EventParsingIterator implements Iterator
  13. {
  14. /** @var StreamInterface */
  15. private $decodingIterator;
  16. /** @var StructureShape */
  17. private $shape;
  18. /** @var AbstractParser */
  19. private $parser;
  20. public function __construct(
  21. StreamInterface $stream,
  22. StructureShape $shape,
  23. AbstractParser $parser
  24. ) {
  25. $this->decodingIterator = $this->chooseDecodingIterator($stream);
  26. $this->shape = $shape;
  27. $this->parser = $parser;
  28. }
  29. /**
  30. * This method choose a decoding iterator implementation based on if the stream
  31. * is seekable or not.
  32. *
  33. * @param $stream
  34. *
  35. * @return Iterator
  36. */
  37. private function chooseDecodingIterator($stream)
  38. {
  39. if ($stream->isSeekable()) {
  40. return new DecodingEventStreamIterator($stream);
  41. } else {
  42. return new NonSeekableStreamDecodingEventStreamIterator($stream);
  43. }
  44. }
  45. #[\ReturnTypeWillChange]
  46. public function current()
  47. {
  48. return $this->parseEvent($this->decodingIterator->current());
  49. }
  50. #[\ReturnTypeWillChange]
  51. public function key()
  52. {
  53. return $this->decodingIterator->key();
  54. }
  55. #[\ReturnTypeWillChange]
  56. public function next()
  57. {
  58. $this->decodingIterator->next();
  59. }
  60. #[\ReturnTypeWillChange]
  61. public function rewind()
  62. {
  63. $this->decodingIterator->rewind();
  64. }
  65. #[\ReturnTypeWillChange]
  66. public function valid()
  67. {
  68. return $this->decodingIterator->valid();
  69. }
  70. private function parseEvent(array $event)
  71. {
  72. if (!empty($event['headers'][':message-type'])) {
  73. if ($event['headers'][':message-type'] === 'error') {
  74. return $this->parseError($event);
  75. }
  76. if ($event['headers'][':message-type'] !== 'event') {
  77. throw new ParserException('Failed to parse unknown message type.');
  78. }
  79. }
  80. $eventType = $event['headers'][':event-type'] ?? null;
  81. if (empty($eventType)) {
  82. throw new ParserException('Failed to parse without event type.');
  83. }
  84. $eventPayload = $event['payload'];
  85. if ($eventType === 'initial-response') {
  86. return $this->parseInitialResponseEvent($eventPayload);
  87. }
  88. $eventShape = $this->shape->getMember($eventType);
  89. return [
  90. $eventType => array_merge(
  91. $this->parseEventHeaders($event['headers'], $eventShape),
  92. $this->parseEventPayload($eventPayload, $eventShape)
  93. )
  94. ];
  95. }
  96. /**
  97. * @param $headers
  98. * @param $eventShape
  99. *
  100. * @return array
  101. */
  102. private function parseEventHeaders($headers, $eventShape): array
  103. {
  104. $parsedHeaders = [];
  105. foreach ($eventShape->getMembers() as $memberName => $memberProps) {
  106. if (isset($memberProps['eventheader'])) {
  107. $parsedHeaders[$memberName] = $headers[$memberName];
  108. }
  109. }
  110. return $parsedHeaders;
  111. }
  112. /**
  113. * @param $payload
  114. * @param $eventShape
  115. *
  116. * @return array
  117. */
  118. private function parseEventPayload($payload, $eventShape): array
  119. {
  120. $parsedPayload = [];
  121. foreach ($eventShape->getMembers() as $memberName => $memberProps) {
  122. $memberShape = $eventShape->getMember($memberName);
  123. if (isset($memberProps['eventpayload'])) {
  124. if ($memberShape->getType() === 'blob') {
  125. $parsedPayload[$memberName] = $payload;
  126. } else {
  127. $parsedPayload[$memberName] = $this->parser->parseMemberFromStream(
  128. $payload,
  129. $memberShape,
  130. null
  131. );
  132. }
  133. break;
  134. }
  135. }
  136. if (empty($parsedPayload) && !empty($payload->getContents())) {
  137. /**
  138. * If we did not find a member with an eventpayload trait, then we should deserialize the payload
  139. * using the event's shape.
  140. */
  141. $parsedPayload = $this->parser->parseMemberFromStream($payload, $eventShape, null);
  142. }
  143. return $parsedPayload;
  144. }
  145. private function parseError(array $event)
  146. {
  147. throw new EventStreamDataException(
  148. $event['headers'][':error-code'],
  149. $event['headers'][':error-message']
  150. );
  151. }
  152. private function parseInitialResponseEvent($payload): array
  153. {
  154. return ['initial-response' => json_decode($payload, true)];
  155. }
  156. }