| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 | <?phpnamespace Aws\Retry;/** * @internal */class RateLimiter{    // User-configurable constants    private $beta;    private $minCapacity;    private $minFillRate;    private $scaleConstant;    private $smooth;    // Optional callable time provider    private $timeProvider;    // Pre-set state variables    private $currentCapacity = 0;    private $enabled = false;    private $lastMaxRate = 0;    private $measuredTxRate = 0;    private $requestCount = 0;    // Other state variables    private $fillRate;    private $lastThrottleTime;    private $lastTimestamp;    private $lastTxRateBucket;    private $maxCapacity;    private $timeWindow;    public function __construct($options = [])    {        $this->beta = isset($options['beta'])            ? $options['beta']            : 0.7;        $this->minCapacity = isset($options['min_capacity'])            ? $options['min_capacity']            : 1;        $this->minFillRate = isset($options['min_fill_rate'])            ? $options['min_fill_rate']            : 0.5;        $this->scaleConstant = isset($options['scale_constant'])            ? $options['scale_constant']            : 0.4;        $this->smooth = isset($options['smooth'])            ? $options['smooth']            : 0.8;        $this->timeProvider = isset($options['time_provider'])            ? $options['time_provider']            : null;        $this->lastTxRateBucket = floor($this->time());        $this->lastThrottleTime = $this->time();    }    public function isEnabled()    {        return $this->enabled;    }    public function getSendToken()    {        $this->acquireToken(1);    }    public function updateSendingRate($isThrottled)    {        $this->updateMeasuredRate();        if ($isThrottled) {            if (!$this->isEnabled()) {                $rateToUse = $this->measuredTxRate;            } else {                $rateToUse = min($this->measuredTxRate, $this->fillRate);            }            $this->lastMaxRate = $rateToUse;            $this->calculateTimeWindow();            $this->lastThrottleTime = $this->time();            $calculatedRate = $this->cubicThrottle($rateToUse);            $this->enableTokenBucket();        } else {            $this->calculateTimeWindow();            $calculatedRate = $this->cubicSuccess($this->time());        }        $newRate = min($calculatedRate, 2 * $this->measuredTxRate);        $this->updateTokenBucketRate($newRate);        return $newRate;    }    private function acquireToken($amount)    {        if (!$this->enabled) {            return true;        }        $this->refillTokenBucket();        if ($amount > $this->currentCapacity) {            usleep((int) (1000000 * ($amount - $this->currentCapacity) / $this->fillRate));        }        $this->currentCapacity -= $amount;        return true;    }    private function calculateTimeWindow()    {        $this->timeWindow = pow(($this->lastMaxRate * (1 - $this->beta) / $this->scaleConstant), 0.333);    }    private function cubicSuccess($timestamp)    {        $dt = $timestamp - $this->lastThrottleTime;        return $this->scaleConstant * pow($dt - $this->timeWindow, 3) + $this->lastMaxRate;    }    private function cubicThrottle($rateToUse)    {        return $rateToUse * $this->beta;    }    private function enableTokenBucket()    {        $this->enabled = true;    }    private function refillTokenBucket()    {        $timestamp = $this->time();        if (!isset($this->lastTimestamp)) {            $this->lastTimestamp = $timestamp;            return;        }        $fillAmount = ($timestamp - $this->lastTimestamp) * $this->fillRate;        $this->currentCapacity = $this->currentCapacity + $fillAmount;        if (!is_null($this->maxCapacity)) {            $this->currentCapacity = min(                $this->maxCapacity,                $this->currentCapacity            );        }        $this->lastTimestamp = $timestamp;    }    private function time()    {        if (is_callable($this->timeProvider)) {            $provider = $this->timeProvider;            $time = $provider();            return $time;        }        return microtime(true);    }    private function updateMeasuredRate()    {        $timestamp = $this->time();        $timeBucket = floor(round($timestamp, 3) * 2) / 2;        $this->requestCount++;        if ($timeBucket > $this->lastTxRateBucket) {            $currentRate = $this->requestCount / ($timeBucket - $this->lastTxRateBucket);            $this->measuredTxRate = ($currentRate * $this->smooth)                + ($this->measuredTxRate * (1 - $this->smooth));            $this->requestCount = 0;            $this->lastTxRateBucket = $timeBucket;        }    }    private function updateTokenBucketRate($newRps)    {        $this->refillTokenBucket();        $this->fillRate = max($newRps, $this->minFillRate);        $this->maxCapacity = max($newRps, $this->minCapacity);        $this->currentCapacity = min($this->currentCapacity, $this->maxCapacity);    }}
 |