Pool.php 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\Pool;
  12. use Hyperf\Contract\ConnectionInterface;
  13. use Hyperf\Contract\FrequencyInterface;
  14. use Hyperf\Contract\PoolInterface;
  15. use Hyperf\Contract\PoolOptionInterface;
  16. use Hyperf\Contract\StdoutLoggerInterface;
  17. use Psr\Container\ContainerInterface;
  18. use RuntimeException;
  19. use Throwable;
  20. abstract class Pool implements PoolInterface
  21. {
  22. /**
  23. * @var Channel
  24. */
  25. protected $channel;
  26. /**
  27. * @var ContainerInterface
  28. */
  29. protected $container;
  30. /**
  31. * @var PoolOptionInterface
  32. */
  33. protected $option;
  34. /**
  35. * @var int
  36. */
  37. protected $currentConnections = 0;
  38. /**
  39. * @var LowFrequencyInterface
  40. */
  41. protected $frequency;
  42. public function __construct(ContainerInterface $container, array $config = [])
  43. {
  44. $this->container = $container;
  45. $this->initOption($config);
  46. $this->channel = make(Channel::class, ['size' => $this->option->getMaxConnections()]);
  47. }
  48. public function get(): ConnectionInterface
  49. {
  50. $connection = $this->getConnection();
  51. try {
  52. if ($this->frequency instanceof FrequencyInterface) {
  53. $this->frequency->hit();
  54. }
  55. if ($this->frequency instanceof LowFrequencyInterface) {
  56. if ($this->frequency->isLowFrequency()) {
  57. $this->flush();
  58. }
  59. }
  60. } catch (\Throwable $exception) {
  61. if ($this->container->has(StdoutLoggerInterface::class) && $logger = $this->container->get(StdoutLoggerInterface::class)) {
  62. $logger->error((string) $exception);
  63. }
  64. }
  65. return $connection;
  66. }
  67. public function release(ConnectionInterface $connection): void
  68. {
  69. $this->channel->push($connection);
  70. }
  71. public function flush(): void
  72. {
  73. $num = $this->getConnectionsInChannel();
  74. if ($num > 0) {
  75. while ($this->currentConnections > $this->option->getMinConnections() && $conn = $this->channel->pop(0.001)) {
  76. try {
  77. $conn->close();
  78. } catch (\Throwable $exception) {
  79. if ($this->container->has(StdoutLoggerInterface::class) && $logger = $this->container->get(StdoutLoggerInterface::class)) {
  80. $logger->error((string) $exception);
  81. }
  82. } finally {
  83. --$this->currentConnections;
  84. --$num;
  85. }
  86. if ($num <= 0) {
  87. // Ignore connections queued during flushing.
  88. break;
  89. }
  90. }
  91. }
  92. }
  93. public function flushOne(bool $must = false): void
  94. {
  95. $num = $this->getConnectionsInChannel();
  96. if ($num > 0 && $conn = $this->channel->pop(0.001)) {
  97. if ($must || ! $conn->check()) {
  98. try {
  99. $conn->close();
  100. } catch (\Throwable $exception) {
  101. if ($this->container->has(StdoutLoggerInterface::class) && $logger = $this->container->get(StdoutLoggerInterface::class)) {
  102. $logger->error((string) $exception);
  103. }
  104. } finally {
  105. --$this->currentConnections;
  106. }
  107. } else {
  108. $this->release($conn);
  109. }
  110. }
  111. }
  112. public function getCurrentConnections(): int
  113. {
  114. return $this->currentConnections;
  115. }
  116. public function getOption(): PoolOptionInterface
  117. {
  118. return $this->option;
  119. }
  120. public function getConnectionsInChannel(): int
  121. {
  122. return $this->channel->length();
  123. }
  124. protected function initOption(array $options = []): void
  125. {
  126. $this->option = make(PoolOption::class, [
  127. 'minConnections' => $options['min_connections'] ?? 1,
  128. 'maxConnections' => $options['max_connections'] ?? 10,
  129. 'connectTimeout' => $options['connect_timeout'] ?? 10.0,
  130. 'waitTimeout' => $options['wait_timeout'] ?? 3.0,
  131. 'heartbeat' => $options['heartbeat'] ?? -1,
  132. 'maxIdleTime' => $options['max_idle_time'] ?? 60.0,
  133. ]);
  134. }
  135. abstract protected function createConnection(): ConnectionInterface;
  136. private function getConnection(): ConnectionInterface
  137. {
  138. $num = $this->getConnectionsInChannel();
  139. try {
  140. if ($num === 0 && $this->currentConnections < $this->option->getMaxConnections()) {
  141. ++$this->currentConnections;
  142. return $this->createConnection();
  143. }
  144. } catch (Throwable $throwable) {
  145. --$this->currentConnections;
  146. throw $throwable;
  147. }
  148. $connection = $this->channel->pop($this->option->getWaitTimeout());
  149. if (! $connection instanceof ConnectionInterface) {
  150. throw new RuntimeException('Connection pool exhausted. Cannot establish new connection before wait_timeout.');
  151. }
  152. return $connection;
  153. }
  154. }