LockingSessionConnection.php 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. <?php
  2. namespace Aws\DynamoDb;
  3. use Aws\DynamoDb\Exception\DynamoDbException;
  4. /**
  5. * The locking connection adds locking logic to the read operation.
  6. */
  7. class LockingSessionConnection extends StandardSessionConnection
  8. {
  9. public function __construct(DynamoDbClient $client, array $config = [])
  10. {
  11. parent::__construct($client, $config);
  12. }
  13. /**
  14. * {@inheritdoc}
  15. * Retries the request until the lock can be acquired
  16. */
  17. public function read($id)
  18. {
  19. // Create the params for the UpdateItem operation so that a lock can be
  20. // set and item returned (via ReturnValues) in a one, atomic operation.
  21. $params = [
  22. 'TableName' => $this->getTableName(),
  23. 'Key' => $this->formatKey($id),
  24. 'Expected' => ['lock' => ['Exists' => false]],
  25. 'AttributeUpdates' => ['lock' => ['Value' => ['N' => '1']]],
  26. 'ReturnValues' => 'ALL_NEW',
  27. ];
  28. // Acquire the lock and fetch the item data.
  29. $timeout = time() + $this->getMaxLockWaitTime();
  30. while (true) {
  31. try {
  32. $item = [];
  33. $result = $this->client->updateItem($params);
  34. if (isset($result['Attributes'])) {
  35. foreach ($result['Attributes'] as $key => $value) {
  36. $item[$key] = current($value);
  37. }
  38. }
  39. return $item;
  40. } catch (DynamoDbException $e) {
  41. if ($e->getAwsErrorCode() === 'ConditionalCheckFailedException'
  42. && time() < $timeout
  43. ) {
  44. usleep(rand(
  45. $this->getMinLockRetryMicrotime(),
  46. $this->getMaxLockRetryMicrotime()
  47. ));
  48. } else {
  49. break;
  50. }
  51. }
  52. }
  53. }
  54. }