stream = $stream; if ($this->stream->isSeekable()) { throw new \InvalidArgumentException('The stream provided must be not seekable.'); } $this->tempBuffer = []; } /** * @inheritDoc * * @return array */ protected function parseEvent(): array { $event = []; $this->hashContext = hash_init('crc32b'); $prelude = $this->parsePrelude()[0]; list( $event[self::HEADERS], $numBytes ) = $this->parseHeaders($prelude[self::LENGTH_HEADERS]); $event[self::PAYLOAD] = Psr7\Utils::streamFor( $this->readAndHashBytes( $prelude[self::LENGTH_TOTAL] - self::BYTES_PRELUDE - $numBytes - self::BYTES_TRAILING ) ); $calculatedCrc = hash_final($this->hashContext, true); $messageCrc = $this->stream->read(4); if ($calculatedCrc !== $messageCrc) { throw new ParserException('Message checksum mismatch.'); } return $event; } protected function readAndHashBytes($num): string { $bytes = ''; while (!empty($this->tempBuffer) && $num > 0) { $byte = array_shift($this->tempBuffer); $bytes .= $byte; $num = $num - 1; } $bytes = $bytes . $this->stream->read($num); hash_update($this->hashContext, $bytes); return $bytes; } // Iterator Functionality #[\ReturnTypeWillChange] public function rewind() { $this->currentEvent = $this->parseEvent(); } public function next() { $this->tempBuffer[] = $this->stream->read(1); if ($this->valid()) { $this->key++; $this->currentEvent = $this->parseEvent(); } } /** * @return bool */ #[\ReturnTypeWillChange] public function valid() { return !$this->stream->eof(); } }