Connection.php 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. <?php
  2. /**
  3. * A parallel HTTP client written in pure PHP
  4. *
  5. * @author hightman <hightman@twomice.net>
  6. * @link http://hightman.cn
  7. * @copyright Copyright (c) 2015 Twomice Studio.
  8. */
  9. namespace hightman\http;
  10. /**
  11. * Connection manager
  12. *
  13. * @author hightman
  14. * @since 1.0
  15. */
  16. class Connection
  17. {
  18. /**
  19. * The maximum number of concurrent connections for the same host and port pair.
  20. */
  21. const MAX_BURST = 3;
  22. /**
  23. * The connection socket flags
  24. */
  25. const FLAG_NEW = 0x01;
  26. const FLAG_NEW2 = 0x02;
  27. const FLAG_BUSY = 0x04;
  28. const FLAG_OPENED = 0x08;
  29. const FLAG_REUSED = 0x10;
  30. const FLAG_SELECT = 0x20;
  31. protected $outBuf, $outLen;
  32. protected $arg, $sock, $conn, $flag = 0;
  33. private static $_objs = [];
  34. private static $_refs = [];
  35. private static $_lastError;
  36. /**
  37. * Create connection, with built-in pool.
  38. * @param string $conn connection string, like `protocal://host:port`.
  39. * @param mixed $arg external argument, fetched by `[[getExArg()]]`
  40. * @return static the connection object, null if it reaches the upper limit of concurrent, or false on failure.
  41. */
  42. public static function connect($conn, $arg = null)
  43. {
  44. $obj = null;
  45. if (!isset(self::$_objs[$conn])) {
  46. self::$_objs[$conn] = [];
  47. }
  48. foreach (self::$_objs[$conn] as $tmp) {
  49. if (!($tmp->flag & self::FLAG_BUSY)) {
  50. Client::debug('reuse conn \'', $tmp->conn, '\': ', $tmp->sock);
  51. $obj = $tmp;
  52. break;
  53. }
  54. }
  55. if ($obj === null && count(self::$_objs[$conn]) < self::MAX_BURST) {
  56. $obj = new self($conn);
  57. self::$_objs[$conn][] = $obj;
  58. Client::debug('create conn \'', $conn, '\'');
  59. }
  60. if ($obj !== null) {
  61. if ($obj->flag & self::FLAG_OPENED) {
  62. $obj->flag |= self::FLAG_REUSED;
  63. } else {
  64. if (!$obj->openSock()) {
  65. return false;
  66. }
  67. }
  68. $obj->flag |= self::FLAG_BUSY;
  69. $obj->outBuf = null;
  70. $obj->outLen = 0;
  71. $obj->arg = $arg;
  72. }
  73. return $obj;
  74. }
  75. /**
  76. * Find connection object by socket, used after stream_select()
  77. * @param resource $sock
  78. * @return Connection the connection object or null if not found.
  79. */
  80. public static function findBySock($sock)
  81. {
  82. $sock = strval($sock);
  83. return isset(self::$_refs[$sock]) ? self::$_refs[$sock] : null;
  84. }
  85. /**
  86. * Get last error
  87. * @return string the last error message.
  88. */
  89. public static function getLastError()
  90. {
  91. return self::$_lastError;
  92. }
  93. /**
  94. * Close the connection
  95. * @param boolean $realClose whether to shutdown the connection, default is added to the pool for next request.
  96. */
  97. public function close($realClose = false)
  98. {
  99. $this->arg = null;
  100. $this->flag &= ~self::FLAG_BUSY;
  101. if ($realClose === true) {
  102. Client::debug('close conn \'', $this->conn, '\': ', $this->sock);
  103. $this->flag &= ~self::FLAG_OPENED;
  104. @fclose($this->sock);
  105. $this->delSockRef();
  106. $this->sock = false;
  107. } else {
  108. Client::debug('free conn \'', $this->conn, '\': ', $this->sock);
  109. }
  110. }
  111. /**
  112. * Append writing cache
  113. * @param $buf string data content.
  114. */
  115. public function addWriteData($buf)
  116. {
  117. if ($this->outBuf === null) {
  118. $this->outBuf = $buf;
  119. } else {
  120. $this->outBuf .= $buf;
  121. }
  122. }
  123. /**
  124. * @return boolean if there is data to be written.
  125. */
  126. public function hasDataToWrite()
  127. {
  128. return ($this->outBuf !== null && strlen($this->outBuf) > $this->outLen);
  129. }
  130. /**
  131. * Write data to socket
  132. * @param string $buf the string to be written, passing null to flush cache.
  133. * @return mixed the number of bytes were written, 0 if the buffer is full, or false on error.
  134. */
  135. public function write($buf = null)
  136. {
  137. if ($buf === null) {
  138. $len = 0;
  139. if ($this->hasDataToWrite()) {
  140. $buf = $this->outLen > 0 ? substr($this->outBuf, $this->outLen) : $this->outBuf;
  141. $len = $this->write($buf);
  142. if ($len !== false) {
  143. $this->outLen += $len;
  144. }
  145. }
  146. return $len;
  147. }
  148. $n = fwrite($this->sock, $buf);
  149. if ($n === 0 && $this->ioEmptyError()) {
  150. $n = false;
  151. }
  152. $this->ioFlagReset();
  153. return $n;
  154. }
  155. /**
  156. * Read one line (not contains \r\n at the end)
  157. * @return mixed line string, null when has not data, or false on error.
  158. */
  159. public function getLine()
  160. {
  161. $line = stream_get_line($this->sock, 2048, "\n");
  162. if ($line === '' || $line === false) {
  163. $line = $this->ioEmptyError() ? false : null;
  164. } else {
  165. $line = rtrim($line, "\r");
  166. }
  167. $this->ioFlagReset();
  168. return $line;
  169. }
  170. /**
  171. * Read data from socket
  172. * @param int $size the max number of bytes to be read.
  173. * @return mixed the read string, null when has not data, or false on error.
  174. */
  175. public function read($size = 8192)
  176. {
  177. $buf = fread($this->sock, $size);
  178. if ($buf === '' || $buf === false) {
  179. $buf = $this->ioEmptyError() ? false : null;
  180. }
  181. $this->ioFlagReset();
  182. return $buf;
  183. }
  184. /**
  185. * Get the connection socket
  186. * @return resource the socket
  187. */
  188. public function getSock()
  189. {
  190. $this->flag |= self::FLAG_SELECT;
  191. return $this->sock;
  192. }
  193. /**
  194. * @return mixed the external argument
  195. */
  196. public function getExArg()
  197. {
  198. return $this->arg;
  199. }
  200. /**
  201. * Destructor.
  202. */
  203. public function __destruct()
  204. {
  205. $this->close(true);
  206. }
  207. /**
  208. * @param boolean $repeat whether it is repeat connection
  209. * @return resource the connection socket
  210. */
  211. protected function openSock($repeat = false)
  212. {
  213. $this->delSockRef();
  214. $this->flag |= self::FLAG_NEW;
  215. if ($repeat === true) {
  216. $this->flag |= self::FLAG_NEW2;
  217. }
  218. // async-connect
  219. $this->sock = stream_socket_client($this->conn, $errno, $error, 1, STREAM_CLIENT_ASYNC_CONNECT);
  220. if ($this->sock === false) {
  221. Client::debug($repeat ? 're' : '', 'open \'', $this->conn, '\' failed: ', $error);
  222. self::$_lastError = $error;
  223. } else {
  224. Client::debug($repeat ? 're' : '', 'open \'', $this->conn, '\' success: ', $this->sock);
  225. stream_set_blocking($this->sock, false);
  226. $this->flag |= self::FLAG_OPENED;
  227. $this->addSockRef();
  228. }
  229. $this->outBuf = null;
  230. $this->outLen = 0;
  231. return $this->sock;
  232. }
  233. protected function ioEmptyError()
  234. {
  235. if ($this->flag & self::FLAG_SELECT) {
  236. if (!($this->flag & self::FLAG_REUSED) || !$this->openSock(true)) {
  237. self::$_lastError = ($this->flag & self::FLAG_NEW) ? 'Fail to connect' : 'Reset by peer';
  238. return true;
  239. }
  240. }
  241. return false;
  242. }
  243. protected function ioFlagReset()
  244. {
  245. $this->flag &= ~(self::FLAG_NEW | self::FLAG_REUSED | self::FLAG_SELECT);
  246. if ($this->flag & self::FLAG_NEW2) {
  247. $this->flag |= self::FLAG_NEW;
  248. $this->flag ^= self::FLAG_NEW2;
  249. }
  250. }
  251. protected function addSockRef()
  252. {
  253. if ($this->sock !== false) {
  254. $sock = strval($this->sock);
  255. self::$_refs[$sock] = $this;
  256. }
  257. }
  258. protected function delSockRef()
  259. {
  260. if ($this->sock !== false) {
  261. $sock = strval($this->sock);
  262. unset(self::$_refs[$sock]);
  263. }
  264. }
  265. protected function __construct($conn)
  266. {
  267. $this->conn = $conn;
  268. $this->sock = false;
  269. }
  270. }