SqsClient.php 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. <?php
  2. namespace Aws\Sqs;
  3. use Aws\AwsClient;
  4. use Aws\CommandInterface;
  5. use Aws\Sqs\Exception\SqsException;
  6. use GuzzleHttp\Psr7\Uri;
  7. use GuzzleHttp\Psr7\UriResolver;
  8. use Psr\Http\Message\RequestInterface;
  9. /**
  10. * Client used to interact Amazon Simple Queue Service (Amazon SQS)
  11. *
  12. * @method \Aws\Result addPermission(array $args = [])
  13. * @method \GuzzleHttp\Promise\Promise addPermissionAsync(array $args = [])
  14. * @method \Aws\Result cancelMessageMoveTask(array $args = [])
  15. * @method \GuzzleHttp\Promise\Promise cancelMessageMoveTaskAsync(array $args = [])
  16. * @method \Aws\Result changeMessageVisibility(array $args = [])
  17. * @method \GuzzleHttp\Promise\Promise changeMessageVisibilityAsync(array $args = [])
  18. * @method \Aws\Result changeMessageVisibilityBatch(array $args = [])
  19. * @method \GuzzleHttp\Promise\Promise changeMessageVisibilityBatchAsync(array $args = [])
  20. * @method \Aws\Result createQueue(array $args = [])
  21. * @method \GuzzleHttp\Promise\Promise createQueueAsync(array $args = [])
  22. * @method \Aws\Result deleteMessage(array $args = [])
  23. * @method \GuzzleHttp\Promise\Promise deleteMessageAsync(array $args = [])
  24. * @method \Aws\Result deleteMessageBatch(array $args = [])
  25. * @method \GuzzleHttp\Promise\Promise deleteMessageBatchAsync(array $args = [])
  26. * @method \Aws\Result deleteQueue(array $args = [])
  27. * @method \GuzzleHttp\Promise\Promise deleteQueueAsync(array $args = [])
  28. * @method \Aws\Result getQueueAttributes(array $args = [])
  29. * @method \GuzzleHttp\Promise\Promise getQueueAttributesAsync(array $args = [])
  30. * @method \Aws\Result getQueueUrl(array $args = [])
  31. * @method \GuzzleHttp\Promise\Promise getQueueUrlAsync(array $args = [])
  32. * @method \Aws\Result listDeadLetterSourceQueues(array $args = [])
  33. * @method \GuzzleHttp\Promise\Promise listDeadLetterSourceQueuesAsync(array $args = [])
  34. * @method \Aws\Result listMessageMoveTasks(array $args = [])
  35. * @method \GuzzleHttp\Promise\Promise listMessageMoveTasksAsync(array $args = [])
  36. * @method \Aws\Result listQueueTags(array $args = [])
  37. * @method \GuzzleHttp\Promise\Promise listQueueTagsAsync(array $args = [])
  38. * @method \Aws\Result listQueues(array $args = [])
  39. * @method \GuzzleHttp\Promise\Promise listQueuesAsync(array $args = [])
  40. * @method \Aws\Result purgeQueue(array $args = [])
  41. * @method \GuzzleHttp\Promise\Promise purgeQueueAsync(array $args = [])
  42. * @method \Aws\Result receiveMessage(array $args = [])
  43. * @method \GuzzleHttp\Promise\Promise receiveMessageAsync(array $args = [])
  44. * @method \Aws\Result removePermission(array $args = [])
  45. * @method \GuzzleHttp\Promise\Promise removePermissionAsync(array $args = [])
  46. * @method \Aws\Result sendMessage(array $args = [])
  47. * @method \GuzzleHttp\Promise\Promise sendMessageAsync(array $args = [])
  48. * @method \Aws\Result sendMessageBatch(array $args = [])
  49. * @method \GuzzleHttp\Promise\Promise sendMessageBatchAsync(array $args = [])
  50. * @method \Aws\Result setQueueAttributes(array $args = [])
  51. * @method \GuzzleHttp\Promise\Promise setQueueAttributesAsync(array $args = [])
  52. * @method \Aws\Result startMessageMoveTask(array $args = [])
  53. * @method \GuzzleHttp\Promise\Promise startMessageMoveTaskAsync(array $args = [])
  54. * @method \Aws\Result tagQueue(array $args = [])
  55. * @method \GuzzleHttp\Promise\Promise tagQueueAsync(array $args = [])
  56. * @method \Aws\Result untagQueue(array $args = [])
  57. * @method \GuzzleHttp\Promise\Promise untagQueueAsync(array $args = [])
  58. */
  59. class SqsClient extends AwsClient
  60. {
  61. public function __construct(array $config)
  62. {
  63. parent::__construct($config);
  64. $list = $this->getHandlerList();
  65. $list->appendSign($this->validateMd5(), 'sqs.md5');
  66. }
  67. /**
  68. * Converts a queue URL into a queue ARN.
  69. *
  70. * @param string $queueUrl The queue URL to perform the action on.
  71. * Retrieved when the queue is first created.
  72. *
  73. * @return string An ARN representation of the queue URL.
  74. */
  75. public function getQueueArn($queueUrl)
  76. {
  77. $queueArn = strtr($queueUrl, [
  78. 'http://' => 'arn:aws:',
  79. 'https://' => 'arn:aws:',
  80. '.amazonaws.com' => '',
  81. '/' => ':',
  82. '.' => ':',
  83. ]);
  84. // Cope with SQS' .fifo / :fifo arn inconsistency
  85. if (substr($queueArn, -5) === ':fifo') {
  86. $queueArn = substr_replace($queueArn, '.fifo', -5);
  87. }
  88. return $queueArn;
  89. }
  90. /**
  91. * Calculates the expected md5 hash of message attributes according to the encoding
  92. * scheme detailed in SQS documentation.
  93. *
  94. * @param array $message Message containing attributes for validation.
  95. * Retrieved when using MessageAttributeNames on
  96. * ReceiveMessage.
  97. *
  98. * @return string|null The md5 hash of the message attributes according to
  99. * the encoding scheme. Returns null when there are no
  100. * attributes.
  101. * @link http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-attributes.html#message-attributes-items-validation
  102. */
  103. private static function calculateMessageAttributesMd5($message)
  104. {
  105. if (empty($message['MessageAttributes'])
  106. || !is_array($message['MessageAttributes'])
  107. ) {
  108. return null;
  109. }
  110. ksort($message['MessageAttributes']);
  111. $attributeValues = "";
  112. foreach ($message['MessageAttributes'] as $name => $details) {
  113. $attributeValues .= self::getEncodedStringPiece($name);
  114. $attributeValues .= self::getEncodedStringPiece($details['DataType']);
  115. if (substr($details['DataType'], 0, 6) === 'Binary') {
  116. $attributeValues .= pack('c', 0x02);
  117. $attributeValues .= self::getEncodedBinaryPiece(
  118. $details['BinaryValue']
  119. );
  120. } else {
  121. $attributeValues .= pack('c', 0x01);
  122. $attributeValues .= self::getEncodedStringPiece(
  123. $details['StringValue']
  124. );
  125. }
  126. }
  127. return md5($attributeValues);
  128. }
  129. private static function calculateBodyMd5($message)
  130. {
  131. return md5($message['Body']);
  132. }
  133. private static function getEncodedStringPiece($piece)
  134. {
  135. $utf8Piece = iconv(
  136. mb_detect_encoding($piece, mb_detect_order(), true),
  137. "UTF-8",
  138. $piece
  139. );
  140. return self::getFourBytePieceLength($utf8Piece) . $utf8Piece;
  141. }
  142. private static function getEncodedBinaryPiece($piece)
  143. {
  144. return self::getFourBytePieceLength($piece) . $piece;
  145. }
  146. private static function getFourBytePieceLength($piece)
  147. {
  148. return pack('N', (int)strlen($piece));
  149. }
  150. /**
  151. * Validates ReceiveMessage body and message attribute MD5s.
  152. *
  153. * @return callable
  154. */
  155. private function validateMd5()
  156. {
  157. return static function (callable $handler) {
  158. return function (
  159. CommandInterface $c,
  160. RequestInterface $r = null
  161. ) use ($handler) {
  162. if ($c->getName() !== 'ReceiveMessage') {
  163. return $handler($c, $r);
  164. }
  165. return $handler($c, $r)
  166. ->then(
  167. function ($result) use ($c, $r) {
  168. foreach ((array) $result['Messages'] as $msg) {
  169. $bodyMd5 = self::calculateBodyMd5($msg);
  170. if (isset($msg['MD5OfBody'])
  171. && $bodyMd5 !== $msg['MD5OfBody']
  172. ) {
  173. throw new SqsException(
  174. sprintf(
  175. 'MD5 mismatch. Expected %s, found %s',
  176. $msg['MD5OfBody'],
  177. $bodyMd5
  178. ),
  179. $c,
  180. [
  181. 'code' => 'ClientChecksumMismatch',
  182. 'request' => $r
  183. ]
  184. );
  185. }
  186. if (isset($msg['MD5OfMessageAttributes'])) {
  187. $messageAttributesMd5 = self::calculateMessageAttributesMd5($msg);
  188. if ($messageAttributesMd5 !== $msg['MD5OfMessageAttributes']) {
  189. throw new SqsException(
  190. sprintf(
  191. 'Attribute MD5 mismatch. Expected %s, found %s',
  192. $msg['MD5OfMessageAttributes'],
  193. $messageAttributesMd5
  194. ? $messageAttributesMd5
  195. : 'No Attributes'
  196. ),
  197. $c,
  198. [
  199. 'code' => 'ClientChecksumMismatch',
  200. 'request' => $r
  201. ]
  202. );
  203. }
  204. } else if (!empty($msg['MessageAttributes'])) {
  205. throw new SqsException(
  206. sprintf(
  207. 'No Attribute MD5 found. Expected %s',
  208. self::calculateMessageAttributesMd5($msg)
  209. ),
  210. $c,
  211. [
  212. 'code' => 'ClientChecksumMismatch',
  213. 'request' => $r
  214. ]
  215. );
  216. }
  217. }
  218. return $result;
  219. }
  220. );
  221. };
  222. };
  223. }
  224. }