1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372 |
- <?php
- namespace Qii\Cache\Redis;
- /**
- * Credis_Client (a fork of Redisent)
- *
- * Most commands are compatible with phpredis library:
- * - use "pipeline()" to start a pipeline of commands instead of multi(Redis::PIPELINE)
- * - any arrays passed as arguments will be flattened automatically
- * - setOption and getOption are not supported in standalone mode
- * - order of arguments follows redis-cli instead of phpredis where they differ (lrem)
- *
- * - Uses phpredis library if extension is installed for better performance.
- * - Establishes connection lazily.
- * - Supports tcp and unix sockets.
- * - Reconnects automatically unless a watch or transaction is in progress.
- * - Can set automatic retry connection attempts for iffy Redis connections.
- *
- * @author Colin Mollenhour <colin@mollenhour.com>
- * @copyright 2011 Colin Mollenhour <colin@mollenhour.com>
- * @license http://www.opensource.org/licenses/mit-license.php The MIT License
- * @package Credis_Client
- */
- if( ! defined('CRLF')) define('CRLF', sprintf('%s%s', chr(13), chr(10)));
- /**
- * Credis-specific errors, wraps native Redis errors
- */
- class CredisException extends \Exception
- {
- const CODE_TIMED_OUT = 1;
- const CODE_DISCONNECTED = 2;
- public function __construct($message, $code = 0, $exception = NULL)
- {
- if ($exception && get_class($exception) == 'RedisException' && $message == 'read error on connection') {
- $code = CredisException::CODE_DISCONNECTED;
- }
- parent::__construct($message, $code, $exception);
- }
- }
- /**
- * Credis_Client, a lightweight Redis PHP standalone client and phpredis wrapper
- *
- * Server/Connection:
- * @method Credis_Client pipeline()
- * @method Credis_Client multi()
- * @method array exec()
- * @method string flushAll()
- * @method string flushDb()
- * @method array info(string $section)
- * @method bool|array config(string $setGet, string $key, string $value = null)
- * @method array role()
- * @method array time()
- *
- * Keys:
- * @method int del(string $key)
- * @method int exists(string $key)
- * @method int expire(string $key, int $seconds)
- * @method int expireAt(string $key, int $timestamp)
- * @method array keys(string $key)
- * @method int persist(string $key)
- * @method bool rename(string $key, string $newKey)
- * @method bool renameNx(string $key, string $newKey)
- * @method array sort(string $key, string $arg1, string $valueN = null)
- * @method int ttl(string $key)
- * @method string type(string $key)
- *
- * Scalars:
- * @method int append(string $key, string $value)
- * @method int decr(string $key)
- * @method int decrBy(string $key, int $decrement)
- * @method bool|string get(string $key)
- * @method int getBit(string $key, int $offset)
- * @method string getRange(string $key, int $start, int $end)
- * @method string getSet(string $key, string $value)
- * @method int incr(string $key)
- * @method int incrBy(string $key, int $decrement)
- * @method array mGet(array $keys)
- * @method bool mSet(array $keysValues)
- * @method int mSetNx(array $keysValues)
- * @method bool set(string $key, string $value)
- * @method int setBit(string $key, int $offset, int $value)
- * @method bool setEx(string $key, int $seconds, string $value)
- * @method int setNx(string $key, string $value)
- * @method int setRange(string $key, int $offset, int $value)
- * @method int strLen(string $key)
- *
- * Sets:
- * @method int sAdd(string $key, mixed $value, string $valueN = null)
- * @method int sRem(string $key, mixed $value, string $valueN = null)
- * @method array sMembers(string $key)
- * @method array sUnion(mixed $keyOrArray, string $valueN = null)
- * @method array sInter(mixed $keyOrArray, string $valueN = null)
- * @method array sDiff(mixed $keyOrArray, string $valueN = null)
- * @method string sPop(string $key)
- * @method int sCard(string $key)
- * @method int sIsMember(string $key, string $member)
- * @method int sMove(string $source, string $dest, string $member)
- * @method string|array sRandMember(string $key, int $count = null)
- * @method int sUnionStore(string $dest, string $key1, string $key2 = null)
- * @method int sInterStore(string $dest, string $key1, string $key2 = null)
- * @method int sDiffStore(string $dest, string $key1, string $key2 = null)
- *
- * Hashes:
- * @method bool|int hSet(string $key, string $field, string $value)
- * @method bool hSetNx(string $key, string $field, string $value)
- * @method bool|string hGet(string $key, string $field)
- * @method bool|int hLen(string $key)
- * @method bool hDel(string $key, string $field)
- * @method array hKeys(string $key, string $field)
- * @method array hVals(string $key)
- * @method array hGetAll(string $key)
- * @method bool hExists(string $key, string $field)
- * @method int hIncrBy(string $key, string $field, int $value)
- * @method bool hMSet(string $key, array $keysValues)
- * @method array hMGet(string $key, array $fields)
- *
- * Lists:
- * @method array|null blPop(string $keyN, int $timeout)
- * @method array|null brPop(string $keyN, int $timeout)
- * @method array|null brPoplPush(string $source, string $destination, int $timeout)
- * @method string|null lIndex(string $key, int $index)
- * @method int lInsert(string $key, string $beforeAfter, string $pivot, string $value)
- * @method int lLen(string $key)
- * @method string|null lPop(string $key)
- * @method int lPush(string $key, mixed $value, mixed $valueN = null)
- * @method int lPushX(string $key, mixed $value)
- * @method array lRange(string $key, int $start, int $stop)
- * @method int lRem(string $key, int $count, mixed $value)
- * @method bool lSet(string $key, int $index, mixed $value)
- * @method bool lTrim(string $key, int $start, int $stop)
- * @method string|null rPop(string $key)
- * @method string|null rPoplPush(string $source, string $destination)
- * @method int rPush(string $key, mixed $value, mixed $valueN = null)
- * @method int rPushX(string $key, mixed $value)
- *
- * Sorted Sets:
- * @method int zCard(string $key)
- * @method array zRangeByScore(string $key, mixed $start, mixed $stop, array $args = null)
- * @method array zRevRangeByScore(string $key, mixed $start, mixed $stop, array $args = null)
- * @method int zRemRangeByScore(string $key, mixed $start, mixed $stop)
- * @method array zRange(string $key, mixed $start, mixed $stop, array $args = null)
- * @method array zRevRange(string $key, mixed $start, mixed $stop, array $args = null)
- * TODO
- *
- * Pub/Sub
- * @method int publish(string $channel, string $message)
- * @method int|array pubsub(string $subCommand, $arg = NULL)
- *
- * Scripting:
- * @method string|int script(string $command, string $arg1 = null)
- * @method string|int|array|bool eval(string $script, array $keys = NULL, array $args = NULL)
- * @method string|int|array|bool evalSha(string $script, array $keys = NULL, array $args = NULL)
- */
- class Client {
- const TYPE_STRING = 'string';
- const TYPE_LIST = 'list';
- const TYPE_SET = 'set';
- const TYPE_ZSET = 'zset';
- const TYPE_HASH = 'hash';
- const TYPE_NONE = 'none';
- const FREAD_BLOCK_SIZE = 8192;
- /**
- * Socket connection to the Redis server or Redis library instance
- * @var resource|Redis
- */
- protected $redis;
- protected $redisMulti;
- /**
- * Host of the Redis server
- * @var string
- */
- protected $host;
-
- /**
- * Port on which the Redis server is running
- * @var integer
- */
- protected $port;
- /**
- * Timeout for connecting to Redis server
- * @var float
- */
- protected $timeout;
- /**
- * Timeout for reading response from Redis server
- * @var float
- */
- protected $readTimeout;
- /**
- * Unique identifier for persistent connections
- * @var string
- */
- protected $persistent;
- /**
- * @var bool
- */
- protected $closeOnDestruct = TRUE;
- /**
- * @var bool
- */
- protected $connected = FALSE;
- /**
- * @var bool
- */
- protected $standalone;
- /**
- * @var int
- */
- protected $maxConnectRetries = 0;
- /**
- * @var int
- */
- protected $connectFailures = 0;
- /**
- * @var bool
- */
- protected $usePipeline = FALSE;
- /**
- * @var array
- */
- protected $commandNames;
- /**
- * @var string
- */
- protected $commands;
- /**
- * @var bool
- */
- protected $isMulti = FALSE;
- /**
- * @var bool
- */
- protected $isWatching = FALSE;
- /**
- * @var string
- */
- protected $authPassword;
- /**
- * @var int
- */
- protected $selectedDb = 0;
- /**
- * Aliases for backwards compatibility with phpredis
- * @var array
- */
- protected $wrapperMethods = array('delete' => 'del', 'getkeys' => 'keys', 'sremove' => 'srem');
- /**
- * @var array
- */
- protected $renamedCommands;
- /**
- * @var int
- */
- protected $requests = 0;
-
- /**
- * @var bool
- */
- protected $subscribed = false;
-
- /**
- * Creates a Redisent connection to the Redis server on host {@link $host} and port {@link $port}.
- * $host may also be a path to a unix socket or a string in the form of tcp://[hostname]:[port] or unix://[path]
- *
- * @param string $host The hostname of the Redis server
- * @param integer $port The port number of the Redis server
- * @param float $timeout Timeout period in seconds
- * @param string $persistent Flag to establish persistent connection
- * @param int $db The selected datbase of the Redis server
- * @param string $password The authentication password of the Redis server
- */
- public function __construct($host = '127.0.0.1', $port = 6379, $timeout = null, $persistent = '', $db = 0, $password = null)
- {
- $this->host = (string) $host;
- $this->port = (int) $port;
- $this->timeout = $timeout;
- $this->persistent = (string) $persistent;
- $this->standalone = ! extension_loaded('redis');
- $this->authPassword = $password;
- $this->selectedDb = (int)$db;
- $this->convertHost();
- }
- public function __destruct()
- {
- if ($this->closeOnDestruct) {
- $this->close();
- }
- }
-
- /**
- * @return bool
- */
- public function isSubscribed()
- {
- return $this->subscribed;
- }
-
- /**
- * Return the host of the Redis instance
- * @return string
- */
- public function getHost()
- {
- return $this->host;
- }
- /**
- * Return the port of the Redis instance
- * @return int
- */
- public function getPort()
- {
- return $this->port;
- }
- /**
- * Return the selected database
- * @return int
- */
- public function getSelectedDb()
- {
- return $this->selectedDb;
- }
- /**
- * @return string
- */
- public function getPersistence()
- {
- return $this->persistent;
- }
- /**
- * @throws CredisException
- * @return Credis_Client
- */
- public function forceStandalone()
- {
- if ($this->standalone) {
- return $this;
- }
- if($this->connected) {
- throw new CredisException('Cannot force Credis_Client to use standalone PHP driver after a connection has already been established.');
- }
- $this->standalone = TRUE;
- return $this;
- }
- /**
- * @param int $retries
- * @return Credis_Client
- */
- public function setMaxConnectRetries($retries)
- {
- $this->maxConnectRetries = $retries;
- return $this;
- }
- /**
- * @param bool $flag
- * @return Credis_Client
- */
- public function setCloseOnDestruct($flag)
- {
- $this->closeOnDestruct = $flag;
- return $this;
- }
- protected function convertHost()
- {
- if (preg_match('#^(tcp|unix)://(.*)$#', $this->host, $matches)) {
- if($matches[1] == 'tcp') {
- if ( ! preg_match('#^([^:]+)(:([0-9]+))?(/(.+))?$#', $matches[2], $matches)) {
- throw new CredisException('Invalid host format; expected tcp://host[:port][/persistence_identifier]');
- }
- $this->host = $matches[1];
- $this->port = (int) (isset($matches[3]) ? $matches[3] : 6379);
- $this->persistent = isset($matches[5]) ? $matches[5] : '';
- } else {
- $this->host = $matches[2];
- $this->port = NULL;
- if (substr($this->host,0,1) != '/') {
- throw new CredisException('Invalid unix socket format; expected unix:///path/to/redis.sock');
- }
- }
- }
- if ($this->port !== NULL && substr($this->host,0,1) == '/') {
- $this->port = NULL;
- }
- }
- /**
- * @throws CredisException
- * @return Credis_Client
- */
- public function connect()
- {
- if ($this->connected) {
- return $this;
- }
- if ($this->standalone) {
- $flags = STREAM_CLIENT_CONNECT;
- $remote_socket = $this->port === NULL
- ? 'unix://'.$this->host
- : 'tcp://'.$this->host.':'.$this->port;
- if ($this->persistent && $this->port !== NULL) {
- // Persistent connections to UNIX sockets are not supported
- $remote_socket .= '/'.$this->persistent;
- $flags = $flags | STREAM_CLIENT_PERSISTENT;
- }
- $result = $this->redis = @stream_socket_client($remote_socket, $errno, $errstr, $this->timeout !== null ? $this->timeout : 2.5, $flags);
- }
- else {
- if ( ! $this->redis) {
- $this->redis = new \Redis;
- }
- try
- {
- $socketTimeout = $this->timeout ? $this->timeout : 0.0;
- $result = $this->persistent
- ? $this->redis->pconnect($this->host, $this->port, $socketTimeout, $this->persistent)
- : $this->redis->connect($this->host, $this->port, $socketTimeout);
- }
- catch(Exception $e)
- {
- // Some applications will capture the php error that phpredis can sometimes generate and throw it as an Exception
- $result = false;
- $errno = 1;
- $errstr = $e->getMessage();
- }
- }
- // Use recursion for connection retries
- if ( ! $result) {
- $this->connectFailures++;
- if ($this->connectFailures <= $this->maxConnectRetries) {
- return $this->connect();
- }
- $failures = $this->connectFailures;
- $this->connectFailures = 0;
- throw new CredisException("Connection to Redis {$this->host}:{$this->port} failed after $failures failures." . (isset($errno) && isset($errstr) ? "Last Error : ({$errno}) {$errstr}" : ""));
- }
- $this->connectFailures = 0;
- $this->connected = TRUE;
- // Set read timeout
- if ($this->readTimeout) {
- $this->setReadTimeout($this->readTimeout);
- }
- if($this->authPassword) {
- $this->auth($this->authPassword);
- }
- if($this->selectedDb !== 0) {
- $this->select($this->selectedDb);
- }
- return $this;
- }
- /**
- * @return bool
- */
- public function isConnected()
- {
- return $this->connected;
- }
- /**
- * Set the read timeout for the connection. Use 0 to disable timeouts entirely (or use a very long timeout
- * if not supported).
- *
- * @param int $timeout 0 (or -1) for no timeout, otherwise number of seconds
- * @throws CredisException
- * @return Credis_Client
- */
- public function setReadTimeout($timeout)
- {
- if ($timeout < -1) {
- throw new CredisException('Timeout values less than -1 are not accepted.');
- }
- $this->readTimeout = $timeout;
- if ($this->connected) {
- if ($this->standalone) {
- $timeout = $timeout <= 0 ? 315360000 : $timeout; // Ten-year timeout
- stream_set_blocking($this->redis, TRUE);
- stream_set_timeout($this->redis, (int) floor($timeout), ($timeout - floor($timeout)) * 1000000);
- } else if (defined('Redis::OPT_READ_TIMEOUT')) {
- // supported in phpredis 2.2.3
- // a timeout value of -1 means reads will not timeout
- $timeout = $timeout == 0 ? -1 : $timeout;
- $this->redis->setOption(Redis::OPT_READ_TIMEOUT, $timeout);
- }
- }
- return $this;
- }
- /**
- * @return bool
- */
- public function close()
- {
- $result = TRUE;
- if ($this->connected && ! $this->persistent) {
- try {
- $result = $this->standalone ? fclose($this->redis) : $this->redis->close();
- $this->connected = FALSE;
- } catch (Exception $e) {
- ; // Ignore exceptions on close
- }
- }
- return $result;
- }
- /**
- * Enabled command renaming and provide mapping method. Supported methods are:
- *
- * 1. renameCommand('foo') // Salted md5 hash for all commands -> md5('foo'.$command)
- * 2. renameCommand(function($command){ return 'my'.$command; }); // Callable
- * 3. renameCommand('get', 'foo') // Single command -> alias
- * 4. renameCommand(['get' => 'foo', 'set' => 'bar']) // Full map of [command -> alias]
- *
- * @param string|callable|array $command
- * @param string|null $alias
- * @return $this
- */
- public function renameCommand($command, $alias = NULL)
- {
- if ( ! $this->standalone) {
- $this->forceStandalone();
- }
- if ($alias === NULL) {
- $this->renamedCommands = $command;
- } else {
- if ( ! $this->renamedCommands) {
- $this->renamedCommands = array();
- }
- $this->renamedCommands[$command] = $alias;
- }
- return $this;
- }
- /**
- * @param $command
- */
- public function getRenamedCommand($command)
- {
- static $map;
- // Command renaming not enabled
- if ($this->renamedCommands === NULL) {
- return $command;
- }
- // Initialize command map
- if ($map === NULL) {
- if (is_array($this->renamedCommands)) {
- $map = $this->renamedCommands;
- } else {
- $map = array();
- }
- }
- // Generate and return cached result
- if ( ! isset($map[$command])) {
- // String means all commands are hashed with salted md5
- if (is_string($this->renamedCommands)) {
- $map[$command] = md5($this->renamedCommands.$command);
- }
- // Would already be set in $map if it was intended to be renamed
- else if (is_array($this->renamedCommands)) {
- return $command;
- }
- // User-supplied function
- else if (is_callable($this->renamedCommands)) {
- $map[$command] = call_user_func($this->renamedCommands, $command);
- }
- }
- return $map[$command];
- }
- /**
- * @param string $password
- * @return bool
- */
- public function auth($password)
- {
- $response = $this->__call('auth', array($password));
- $this->authPassword = $password;
- return $response;
- }
- /**
- * @param int $index
- * @return bool
- */
- public function select($index)
- {
- $response = $this->__call('select', array($index));
- $this->selectedDb = (int) $index;
- return $response;
- }
-
- /**
- * @param string|array $pattern
- * @return array
- */
- public function pUnsubscribe()
- {
- list($command, $channel, $subscribedChannels) = $this->__call('punsubscribe', func_get_args());
- $this->subscribed = $subscribedChannels > 0;
- return array($command, $channel, $subscribedChannels);
- }
- /**
- * @param int $Iterator
- * @param string $pattern
- * @param int $count
- * @return bool | Array
- */
- public function scan(&$Iterator, $pattern = null, $count = null)
- {
- return $this->__call('scan', array(&$Iterator, $pattern, $count));
- }
- /**
- * @param int $Iterator
- * @param string $field
- * @param string $pattern
- * @param int $count
- * @return bool | Array
- */
- public function hscan(&$Iterator, $field, $pattern = null, $count = null)
- {
- return $this->__call('hscan', array($field, &$Iterator, $pattern, $count));
- }
-
- /**
- * @param int $Iterator
- * @param string $field
- * @param string $pattern
- * @param int $Iterator
- * @return bool | Array
- */
- public function sscan(&$Iterator, $field, $pattern = null, $count = null)
- {
- return $this->__call('sscan', array($field, &$Iterator, $pattern, $count));
- }
-
- /**
- * @param int $Iterator
- * @param string $field
- * @param string $pattern
- * @param int $Iterator
- * @return bool | Array
- */
- public function zscan(&$Iterator, $field, $pattern = null, $count = null)
- {
- return $this->__call('zscan', array($field, &$Iterator, $pattern, $count));
- }
- /**
- * @param string|array $patterns
- * @param $callback
- * @return $this|array|bool|Credis_Client|mixed|null|string
- * @throws CredisException
- */
- public function pSubscribe($patterns, $callback)
- {
- if ( ! $this->standalone) {
- return $this->__call('pSubscribe', array((array)$patterns, $callback));
- }
- // Standalone mode: use infinite loop to subscribe until timeout
- $patternCount = is_array($patterns) ? count($patterns) : 1;
- while ($patternCount--) {
- if (isset($status)) {
- list($command, $pattern, $status) = $this->read_reply();
- } else {
- list($command, $pattern, $status) = $this->__call('psubscribe', array($patterns));
- }
- $this->subscribed = $status > 0;
- if ( ! $status) {
- throw new CredisException('Invalid pSubscribe response.');
- }
- }
- try {
- while ($this->subscribed) {
- list($type, $pattern, $channel, $message) = $this->read_reply();
- if ($type != 'pmessage') {
- throw new CredisException('Received non-pmessage reply.');
- }
- $callback($this, $pattern, $channel, $message);
- }
- } catch (CredisException $e) {
- if ($e->getCode() == CredisException::CODE_TIMED_OUT) {
- try {
- list($command, $pattern, $status) = $this->pUnsubscribe($patterns);
- while ($status !== 0) {
- list($command, $pattern, $status) = $this->read_reply();
- }
- } catch (CredisException $e2) {
- throw $e2;
- }
- }
- throw $e;
- }
- }
- /**
- * @param string|array $pattern
- * @return array
- */
- public function unsubscribe()
- {
- list($command, $channel, $subscribedChannels) = $this->__call('unsubscribe', func_get_args());
- $this->subscribed = $subscribedChannels > 0;
- return array($command, $channel, $subscribedChannels);
- }
- /**
- * @param string|array $channels
- * @param $callback
- * @throws CredisException
- * @return $this|array|bool|Credis_Client|mixed|null|string
- */
- public function subscribe($channels, $callback)
- {
- if ( ! $this->standalone) {
- return $this->__call('subscribe', array((array)$channels, $callback));
- }
- // Standalone mode: use infinite loop to subscribe until timeout
- $channelCount = is_array($channels) ? count($channels) : 1;
- while ($channelCount--) {
- if (isset($status)) {
- list($command, $channel, $status) = $this->read_reply();
- } else {
- list($command, $channel, $status) = $this->__call('subscribe', array($channels));
- }
- $this->subscribed = $status > 0;
- if ( ! $status) {
- throw new CredisException('Invalid subscribe response.');
- }
- }
- try {
- while ($this->subscribed) {
- list($type, $channel, $message) = $this->read_reply();
- if ($type != 'message') {
- throw new CredisException('Received non-message reply.');
- }
- $callback($this, $channel, $message);
- }
- } catch (CredisException $e) {
- if ($e->getCode() == CredisException::CODE_TIMED_OUT) {
- try {
- list($command, $channel, $status) = $this->unsubscribe($channels);
- while ($status !== 0) {
- list($command, $channel, $status) = $this->read_reply();
- }
- } catch (CredisException $e2) {
- throw $e2;
- }
- }
- throw $e;
- }
- }
- public function __call($name, $args)
- {
- // Lazy connection
- $this->connect();
- $name = strtolower($name);
- // Send request via native PHP
- if($this->standalone)
- {
- switch ($name) {
- case 'eval':
- case 'evalsha':
- $script = array_shift($args);
- $keys = (array) array_shift($args);
- $eArgs = (array) array_shift($args);
- $args = array($script, count($keys), $keys, $eArgs);
- break;
- case 'zunionstore':
- $dest = array_shift($args);
- $keys = (array) array_shift($args);
- $weights = array_shift($args);
- $aggregate = array_shift($args);
- $args = array($dest, count($keys), $keys);
- if ($weights) {
- $args[] = (array) $weights;
- }
- if ($aggregate) {
- $args[] = $aggregate;
- }
- break;
- case 'set':
- // The php redis module has different behaviour with ttl
- // https://github.com/phpredis/phpredis#set
- if (count($args) === 3 && is_int($args[2])) {
- $args = array($args[0], $args[1], array('EX', $args[2]));
- } elseif (count($args) === 3 && is_array($args[2])) {
- $tmp_args = $args;
- $args = array($tmp_args[0], $tmp_args[1]);
- foreach ($tmp_args[2] as $k=>$v) {
- if (is_string($k)) {
- $args[] = array($k,$v);
- } elseif (is_int($k)) {
- $args[] = $v;
- }
- }
- unset($tmp_args);
- }
- break;
- case 'scan':
- $ref =& $args[0];
- if (empty($ref))
- {
- $ref = 0;
- }
- $eArgs = array($ref);
- if (!empty($args[1]))
- {
- $eArgs[] = 'MATCH';
- $eArgs[] = $args[1];
- }
- if (!empty($args[2]))
- {
- $eArgs[] = 'COUNT';
- $eArgs[] = $args[2];
- }
- $args = $eArgs;
- break;
- case 'sscan':
- case 'zscan':
- case 'hscan':
- $ref =& $args[1];
- if (empty($ref))
- {
- $ref = 0;
- }
- $eArgs = array($args[0],$ref);
- if (!empty($args[2]))
- {
- $eArgs[] = 'MATCH';
- $eArgs[] = $args[2];
- }
- if (!empty($args[3]))
- {
- $eArgs[] = 'COUNT';
- $eArgs[] = $args[3];
- }
- $args = $eArgs;
- break;
- case 'zrangebyscore':
- case 'zrevrangebyscore':
- case 'zrange':
- case 'zrevrange':
- if (isset($args[3]) && is_array($args[3])) {
- // map options
- $cArgs = array();
- if (!empty($args[3]['withscores'])) {
- $cArgs[] = 'withscores';
- }
- if (($name == 'zrangebyscore' || $name == 'zrevrangebyscore') && array_key_exists('limit', $args[3])) {
- $cArgs[] = array('limit' => $args[3]['limit']);
- }
- $args[3] = $cArgs;
- }
- break;
- case 'mget':
- if (isset($args[0]) && is_array($args[0]))
- {
- $args = array_values($args[0]);
- }
- break;
- }
- // Flatten arguments
- $args = self::_flattenArguments($args);
- // In pipeline mode
- if($this->usePipeline)
- {
- if($name == 'pipeline') {
- throw new CredisException('A pipeline is already in use and only one pipeline is supported.');
- }
- else if($name == 'exec') {
- if($this->isMulti) {
- $this->commandNames[] = $name;
- $this->commands .= self::_prepare_command(array($this->getRenamedCommand($name)));
- }
- // Write request
- if($this->commands) {
- $this->write_command($this->commands);
- }
- $this->commands = NULL;
- // Read response
- $response = array();
- foreach($this->commandNames as $command) {
- $response[] = $this->read_reply($command);
- }
- $this->commandNames = NULL;
- if($this->isMulti) {
- $response = array_pop($response);
- }
- $this->usePipeline = $this->isMulti = FALSE;
- return $response;
- }
- else {
- if($name == 'multi') {
- $this->isMulti = TRUE;
- }
- array_unshift($args, $this->getRenamedCommand($name));
- $this->commandNames[] = $name;
- $this->commands .= self::_prepare_command($args);
- return $this;
- }
- }
- // Start pipeline mode
- if($name == 'pipeline')
- {
- $this->usePipeline = TRUE;
- $this->commandNames = array();
- $this->commands = '';
- return $this;
- }
- // If unwatching, allow reconnect with no error thrown
- if($name == 'unwatch') {
- $this->isWatching = FALSE;
- }
- // Non-pipeline mode
- array_unshift($args, $this->getRenamedCommand($name));
- $command = self::_prepare_command($args);
- $this->write_command($command);
- $response = $this->read_reply($name);
- switch($name)
- {
- case 'scan':
- case 'sscan':
- $ref = array_shift($response);
- $response = empty($response[0]) ? array() : $response[0];
- break;
- case 'hscan':
- case 'zscan':
- $ref = array_shift($response);
- $response = empty($response[0]) ? array() : $response[0];
- if (!empty($response) && is_array($response))
- {
- $count = count($response);
- $out = array();
- for($i = 0;$i < $count;$i+=2){
- $out[$response[$i]] = $response[$i+1];
- }
- $response = $out;
- }
- break;
- case 'zrangebyscore':
- case 'zrevrangebyscore':
- if (in_array('withscores', $args, true)) {
- // Map array of values into key=>score list like phpRedis does
- $item = null;
- $out = array();
- foreach ($response as $value) {
- if ($item == null) {
- $item = $value;
- } else {
- // 2nd value is the score
- $out[$item] = (float) $value;
- $item = null;
- }
- }
- $response = $out;
- }
- break;
- }
- // Watch mode disables reconnect so error is thrown
- if($name == 'watch') {
- $this->isWatching = TRUE;
- }
- // Transaction mode
- else if($this->isMulti && ($name == 'exec' || $name == 'discard')) {
- $this->isMulti = FALSE;
- }
- // Started transaction
- else if($this->isMulti || $name == 'multi') {
- $this->isMulti = TRUE;
- $response = $this;
- }
- }
- // Send request via phpredis client
- else
- {
- // Tweak arguments
- switch($name) {
- case 'get': // optimize common cases
- case 'set':
- case 'hget':
- case 'hset':
- case 'setex':
- case 'mset':
- case 'msetnx':
- case 'hmset':
- case 'hmget':
- case 'del':
- case 'zrangebyscore':
- case 'zrevrangebyscore':
- case 'zrange':
- case 'zrevrange':
- break;
- case 'zunionstore':
- $cArgs = array();
- $cArgs[] = array_shift($args); // destination
- $cArgs[] = array_shift($args); // keys
- if(isset($args[0]) and isset($args[0]['weights'])) {
- $cArgs[] = (array) $args[0]['weights'];
- } else {
- $cArgs[] = null;
- }
- if(isset($args[0]) and isset($args[0]['aggregate'])) {
- $cArgs[] = strtoupper($args[0]['aggregate']);
- }
- $args = $cArgs;
- break;
- case 'mget':
- if(isset($args[0]) && ! is_array($args[0])) {
- $args = array($args);
- }
- break;
- case 'lrem':
- $args = array($args[0], $args[2], $args[1]);
- break;
- case 'eval':
- case 'evalsha':
- if (isset($args[1]) && is_array($args[1])) {
- $cKeys = $args[1];
- } elseif (isset($args[1]) && is_string($args[1])) {
- $cKeys = array($args[1]);
- } else {
- $cKeys = array();
- }
- if (isset($args[2]) && is_array($args[2])) {
- $cArgs = $args[2];
- } elseif (isset($args[2]) && is_string($args[2])) {
- $cArgs = array($args[2]);
- } else {
- $cArgs = array();
- }
- $args = array($args[0], array_merge($cKeys, $cArgs), count($cKeys));
- break;
- case 'subscribe':
- case 'psubscribe':
- break;
- case 'scan':
- case 'sscan':
- case 'hscan':
- case 'zscan':
- // allow phpredis to see the caller's reference
- //$param_ref =& $args[0];
- break;
- default:
- // Flatten arguments
- $args = self::_flattenArguments($args);
- }
- try {
- // Proxy pipeline mode to the phpredis library
- if($name == 'pipeline' || $name == 'multi') {
- if($this->isMulti) {
- return $this;
- } else {
- $this->isMulti = TRUE;
- $this->redisMulti = call_user_func_array(array($this->redis, $name), $args);
- return $this;
- }
- }
- else if($name == 'exec' || $name == 'discard') {
- $this->isMulti = FALSE;
- $response = $this->redisMulti->$name();
- $this->redisMulti = NULL;
- #echo "> $name : ".substr(print_r($response, TRUE),0,100)."\n";
- return $response;
- }
- // Use aliases to be compatible with phpredis wrapper
- if(isset($this->wrapperMethods[$name])) {
- $name = $this->wrapperMethods[$name];
- }
- // Multi and pipeline return self for chaining
- if($this->isMulti) {
- call_user_func_array(array($this->redisMulti, $name), $args);
- return $this;
- }
- // Send request, retry one time when using persistent connections on the first request only
- $this->requests++;
- try {
- $response = call_user_func_array(array($this->redis, $name), $args);
- } catch (RedisException $e) {
- if ($this->persistent && $this->requests == 1 && $e->getMessage() == 'read error on connection') {
- $this->connected = FALSE;
- $this->connect();
- $response = call_user_func_array(array($this->redis, $name), $args);
- } else {
- throw $e;
- }
- }
- }
- // Wrap exceptions
- catch(RedisException $e) {
- $code = 0;
- if ( ! ($result = $this->redis->IsConnected())) {
- $this->connected = FALSE;
- $code = CredisException::CODE_DISCONNECTED;
- }
- throw new CredisException($e->getMessage(), $code, $e);
- }
- #echo "> $name : ".substr(print_r($response, TRUE),0,100)."\n";
- // change return values where it is too difficult to minim in standalone mode
- switch($name)
- {
- case 'hmget':
- $response = array_values($response);
- break;
- case 'type':
- $typeMap = array(
- self::TYPE_NONE,
- self::TYPE_STRING,
- self::TYPE_SET,
- self::TYPE_LIST,
- self::TYPE_ZSET,
- self::TYPE_HASH,
- );
- $response = $typeMap[$response];
- break;
- // Handle scripting errors
- case 'eval':
- case 'evalsha':
- case 'script':
- $error = $this->redis->getLastError();
- $this->redis->clearLastError();
- if ($error && substr($error,0,8) == 'NOSCRIPT') {
- $response = NULL;
- } else if ($error) {
- throw new CredisException($error);
- }
- break;
- default:
- $error = $this->redis->getLastError();
- $this->redis->clearLastError();
- if ($error) {
- throw new CredisException($error);
- }
- break;
- }
- }
- return $response;
- }
- protected function write_command($command)
- {
- // Reconnect on lost connection (Redis server "timeout" exceeded since last command)
- if(feof($this->redis)) {
- $this->close();
- // If a watch or transaction was in progress and connection was lost, throw error rather than reconnect
- // since transaction/watch state will be lost.
- if(($this->isMulti && ! $this->usePipeline) || $this->isWatching) {
- $this->isMulti = $this->isWatching = FALSE;
- throw new CredisException('Lost connection to Redis server during watch or transaction.');
- }
- $this->connected = FALSE;
- $this->connect();
- if($this->authPassword) {
- $this->auth($this->authPassword);
- }
- if($this->selectedDb != 0) {
- $this->select($this->selectedDb);
- }
- }
- $commandLen = strlen($command);
- $lastFailed = FALSE;
- for ($written = 0; $written < $commandLen; $written += $fwrite) {
- $fwrite = fwrite($this->redis, substr($command, $written));
- if ($fwrite === FALSE || ($fwrite == 0 && $lastFailed)) {
- $this->connected = FALSE;
- throw new CredisException('Failed to write entire command to stream');
- }
- $lastFailed = $fwrite == 0;
- }
- }
- protected function read_reply($name = '')
- {
- $reply = fgets($this->redis);
- if($reply === FALSE) {
- $info = stream_get_meta_data($this->redis);
- if ($info['timed_out']) {
- throw new CredisException('Read operation timed out.', CredisException::CODE_TIMED_OUT);
- } else {
- $this->connected = FALSE;
- throw new CredisException('Lost connection to Redis server.', CredisException::CODE_DISCONNECTED);
- }
- }
- $reply = rtrim($reply, CRLF);
- #echo "> $name: $reply\n";
- $replyType = substr($reply, 0, 1);
- switch ($replyType) {
- /* Error reply */
- case '-':
- if($this->isMulti || $this->usePipeline) {
- $response = FALSE;
- } else if ($name == 'evalsha' && substr($reply,0,9) == '-NOSCRIPT') {
- $response = NULL;
- } else {
- throw new CredisException(substr($reply,0,4) == '-ERR' ? substr($reply, 5) : substr($reply,1));
- }
- break;
- /* Inline reply */
- case '+':
- $response = substr($reply, 1);
- if($response == 'OK' || $response == 'QUEUED') {
- return TRUE;
- }
- break;
- /* Bulk reply */
- case '$':
- if ($reply == '$-1') return FALSE;
- $size = (int) substr($reply, 1);
- $response = stream_get_contents($this->redis, $size + 2);
- if( ! $response) {
- $this->connected = FALSE;
- throw new CredisException('Error reading reply.');
- }
- $response = substr($response, 0, $size);
- break;
- /* Multi-bulk reply */
- case '*':
- $count = substr($reply, 1);
- if ($count == '-1') return FALSE;
- $response = array();
- for ($i = 0; $i < $count; $i++) {
- $response[] = $this->read_reply();
- }
- break;
- /* Integer reply */
- case ':':
- $response = intval(substr($reply, 1));
- break;
- default:
- throw new CredisException('Invalid response: '.print_r($reply, TRUE));
- break;
- }
- // Smooth over differences between phpredis and standalone response
- switch($name)
- {
- case '': // Minor optimization for multi-bulk replies
- break;
- case 'config':
- case 'hgetall':
- $keys = $values = array();
- while($response) {
- $keys[] = array_shift($response);
- $values[] = array_shift($response);
- }
- $response = count($keys) ? array_combine($keys, $values) : array();
- break;
- case 'info':
- $lines = explode(CRLF, trim($response,CRLF));
- $response = array();
- foreach($lines as $line) {
- if ( ! $line || substr($line, 0, 1) == '#') {
- continue;
- }
- list($key, $value) = explode(':', $line, 2);
- $response[$key] = $value;
- }
- break;
- case 'ttl':
- if($response === -1) {
- $response = FALSE;
- }
- break;
- }
- return $response;
- }
- /**
- * Build the Redis unified protocol command
- *
- * @param array $args
- * @return string
- */
- private static function _prepare_command($args)
- {
- return sprintf('*%d%s%s%s', count($args), CRLF, implode(array_map(array('self', '_map'), $args), CRLF), CRLF);
- }
- private static function _map($arg)
- {
- return sprintf('$%d%s%s', strlen($arg), CRLF, $arg);
- }
- /**
- * Flatten arguments
- *
- * If an argument is an array, the key is inserted as argument followed by the array values
- * array('zrangebyscore', '-inf', 123, array('limit' => array('0', '1')))
- * becomes
- * array('zrangebyscore', '-inf', 123, 'limit', '0', '1')
- *
- * @param array $in
- * @return array
- */
- private static function _flattenArguments(array $arguments, &$out = array())
- {
- foreach ($arguments as $key => $arg) {
- if (!is_int($key)) {
- $out[] = $key;
- }
-
- if (is_array($arg)) {
- self::_flattenArguments($arg, $out);
- } else {
- $out[] = $arg;
- }
- }
- return $out;
- }
- }
|