RateLimiter.php 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. <?php
  2. namespace Aws\Retry;
  3. /**
  4. * @internal
  5. */
  6. class RateLimiter
  7. {
  8. // User-configurable constants
  9. private $beta;
  10. private $minCapacity;
  11. private $minFillRate;
  12. private $scaleConstant;
  13. private $smooth;
  14. // Optional callable time provider
  15. private $timeProvider;
  16. // Pre-set state variables
  17. private $currentCapacity = 0;
  18. private $enabled = false;
  19. private $lastMaxRate = 0;
  20. private $measuredTxRate = 0;
  21. private $requestCount = 0;
  22. // Other state variables
  23. private $fillRate;
  24. private $lastThrottleTime;
  25. private $lastTimestamp;
  26. private $lastTxRateBucket;
  27. private $maxCapacity;
  28. private $timeWindow;
  29. public function __construct($options = [])
  30. {
  31. $this->beta = isset($options['beta'])
  32. ? $options['beta']
  33. : 0.7;
  34. $this->minCapacity = isset($options['min_capacity'])
  35. ? $options['min_capacity']
  36. : 1;
  37. $this->minFillRate = isset($options['min_fill_rate'])
  38. ? $options['min_fill_rate']
  39. : 0.5;
  40. $this->scaleConstant = isset($options['scale_constant'])
  41. ? $options['scale_constant']
  42. : 0.4;
  43. $this->smooth = isset($options['smooth'])
  44. ? $options['smooth']
  45. : 0.8;
  46. $this->timeProvider = isset($options['time_provider'])
  47. ? $options['time_provider']
  48. : null;
  49. $this->lastTxRateBucket = floor($this->time());
  50. $this->lastThrottleTime = $this->time();
  51. }
  52. public function isEnabled()
  53. {
  54. return $this->enabled;
  55. }
  56. public function getSendToken()
  57. {
  58. $this->acquireToken(1);
  59. }
  60. public function updateSendingRate($isThrottled)
  61. {
  62. $this->updateMeasuredRate();
  63. if ($isThrottled) {
  64. if (!$this->isEnabled()) {
  65. $rateToUse = $this->measuredTxRate;
  66. } else {
  67. $rateToUse = min($this->measuredTxRate, $this->fillRate);
  68. }
  69. $this->lastMaxRate = $rateToUse;
  70. $this->calculateTimeWindow();
  71. $this->lastThrottleTime = $this->time();
  72. $calculatedRate = $this->cubicThrottle($rateToUse);
  73. $this->enableTokenBucket();
  74. } else {
  75. $this->calculateTimeWindow();
  76. $calculatedRate = $this->cubicSuccess($this->time());
  77. }
  78. $newRate = min($calculatedRate, 2 * $this->measuredTxRate);
  79. $this->updateTokenBucketRate($newRate);
  80. return $newRate;
  81. }
  82. private function acquireToken($amount)
  83. {
  84. if (!$this->enabled) {
  85. return true;
  86. }
  87. $this->refillTokenBucket();
  88. if ($amount > $this->currentCapacity) {
  89. usleep((int) (1000000 * ($amount - $this->currentCapacity) / $this->fillRate));
  90. }
  91. $this->currentCapacity -= $amount;
  92. return true;
  93. }
  94. private function calculateTimeWindow()
  95. {
  96. $this->timeWindow = pow(($this->lastMaxRate * (1 - $this->beta) / $this->scaleConstant), 0.333);
  97. }
  98. private function cubicSuccess($timestamp)
  99. {
  100. $dt = $timestamp - $this->lastThrottleTime;
  101. return $this->scaleConstant * pow($dt - $this->timeWindow, 3) + $this->lastMaxRate;
  102. }
  103. private function cubicThrottle($rateToUse)
  104. {
  105. return $rateToUse * $this->beta;
  106. }
  107. private function enableTokenBucket()
  108. {
  109. $this->enabled = true;
  110. }
  111. private function refillTokenBucket()
  112. {
  113. $timestamp = $this->time();
  114. if (!isset($this->lastTimestamp)) {
  115. $this->lastTimestamp = $timestamp;
  116. return;
  117. }
  118. $fillAmount = ($timestamp - $this->lastTimestamp) * $this->fillRate;
  119. $this->currentCapacity = $this->currentCapacity + $fillAmount;
  120. if (!is_null($this->maxCapacity)) {
  121. $this->currentCapacity = min(
  122. $this->maxCapacity,
  123. $this->currentCapacity
  124. );
  125. }
  126. $this->lastTimestamp = $timestamp;
  127. }
  128. private function time()
  129. {
  130. if (is_callable($this->timeProvider)) {
  131. $provider = $this->timeProvider;
  132. $time = $provider();
  133. return $time;
  134. }
  135. return microtime(true);
  136. }
  137. private function updateMeasuredRate()
  138. {
  139. $timestamp = $this->time();
  140. $timeBucket = floor(round($timestamp, 3) * 2) / 2;
  141. $this->requestCount++;
  142. if ($timeBucket > $this->lastTxRateBucket) {
  143. $currentRate = $this->requestCount / ($timeBucket - $this->lastTxRateBucket);
  144. $this->measuredTxRate = ($currentRate * $this->smooth)
  145. + ($this->measuredTxRate * (1 - $this->smooth));
  146. $this->requestCount = 0;
  147. $this->lastTxRateBucket = $timeBucket;
  148. }
  149. }
  150. private function updateTokenBucketRate($newRps)
  151. {
  152. $this->refillTokenBucket();
  153. $this->fillRate = max($newRps, $this->minFillRate);
  154. $this->maxCapacity = max($newRps, $this->minCapacity);
  155. $this->currentCapacity = min($this->currentCapacity, $this->maxCapacity);
  156. }
  157. }