Bladeren bron

Fixed:redis query cost too much time

Zhu Jinhui 7 jaren geleden
bovenliggende
commit
89b11ff407
2 gewijzigde bestanden met toevoegingen van 1538 en 1040 verwijderingen
  1. 1225 877
      Qii/Cache/Redis/Client.php
  2. 313 163
      Qii/Cache/Redis/Cluster.php

+ 1225 - 877
Qii/Cache/Redis/Client.php

@@ -21,7 +21,7 @@ namespace Qii\Cache\Redis;
  * @package Credis_Client
  */
 
-if (!defined('CRLF')) define('CRLF', sprintf('%s%s', chr(13), chr(10)));
+if( ! defined('CRLF')) define('CRLF', sprintf('%s%s', chr(13), chr(10)));
 
 /**
  * Credis-specific errors, wraps native Redis errors
@@ -29,16 +29,16 @@ if (!defined('CRLF')) define('CRLF', sprintf('%s%s', chr(13), chr(10)));
 class CredisException extends \Exception
 {
 
-	const CODE_TIMED_OUT = 1;
-	const CODE_DISCONNECTED = 2;
+    const CODE_TIMED_OUT = 1;
+    const CODE_DISCONNECTED = 2;
 
-	public function __construct($message, $code = 0, $exception = NULL)
-	{
-		if ($exception && get_class($exception) == 'RedisException' && $message == 'read error on connection') {
-			$code = CredisException::CODE_DISCONNECTED;
-		}
-		parent::__construct($message, $code, $exception);
-	}
+    public function __construct($message, $code = 0, $exception = NULL)
+    {
+        if ($exception && get_class($exception) == 'RedisException' && $message == 'read error on connection') {
+            $code = CredisException::CODE_DISCONNECTED;
+        }
+        parent::__construct($message, $code, $exception);
+    }
 
 }
 
@@ -51,8 +51,10 @@ class CredisException extends \Exception
  * @method array         exec()
  * @method string        flushAll()
  * @method string        flushDb()
- * @method array         info()
+ * @method array         info(string $section)
  * @method bool|array    config(string $setGet, string $key, string $value = null)
+ * @method array         role()
+ * @method array         time()
  *
  * Keys:
  * @method int           del(string $key)
@@ -110,7 +112,7 @@ class CredisException extends \Exception
  * @method bool|int      hLen(string $key)
  * @method bool          hDel(string $key, string $field)
  * @method array         hKeys(string $key, string $field)
- * @method array         hVals(string $key, string $field)
+ * @method array         hVals(string $key)
  * @method array         hGetAll(string $key)
  * @method bool          hExists(string $key, string $field)
  * @method int           hIncrBy(string $key, string $field, int $value)
@@ -137,11 +139,15 @@ class CredisException extends \Exception
  * @method int           rPushX(string $key, mixed $value)
  *
  * Sorted Sets:
+ * @method int           zCard(string $key)
+ * @method array         zRangeByScore(string $key, mixed $start, mixed $stop, array $args = null)
+ * @method array         zRevRangeByScore(string $key, mixed $start, mixed $stop, array $args = null)
+ * @method int           zRemRangeByScore(string $key, mixed $start, mixed $stop)
+ * @method array         zRange(string $key, mixed $start, mixed $stop, array $args = null)
+ * @method array         zRevRange(string $key, mixed $start, mixed $stop, array $args = null)
  * TODO
  *
  * Pub/Sub
- * @method array         pUnsubscribe(mixed $pattern, string $patternN = NULL))
- * @method array         unsubscribe(mixed $channel, string $channelN = NULL))
  * @method int           publish(string $channel, string $message)
  * @method int|array     pubsub(string $subCommand, $arg = NULL)
  *
@@ -150,875 +156,1217 @@ class CredisException extends \Exception
  * @method string|int|array|bool eval(string $script, array $keys = NULL, array $args = NULL)
  * @method string|int|array|bool evalSha(string $script, array $keys = NULL, array $args = NULL)
  */
-class Client
-{
-
-	const TYPE_STRING = 'string';
-	const TYPE_LIST = 'list';
-	const TYPE_SET = 'set';
-	const TYPE_ZSET = 'zset';
-	const TYPE_HASH = 'hash';
-	const TYPE_NONE = 'none';
-	const FREAD_BLOCK_SIZE = 8192;
-
-	/**
-	 * Socket connection to the Redis server or Redis library instance
-	 * @var resource|Redis
-	 */
-	protected $redis;
-	protected $redisMulti;
-
-	/**
-	 * Host of the Redis server
-	 * @var string
-	 */
-	protected $host;
-
-	/**
-	 * Port on which the Redis server is running
-	 * @var integer
-	 */
-	protected $port;
-
-	/**
-	 * Timeout for connecting to Redis server
-	 * @var float
-	 */
-	protected $timeout;
-
-	/**
-	 * Timeout for reading response from Redis server
-	 * @var float
-	 */
-	protected $readTimeout;
-
-	/**
-	 * Unique identifier for persistent connections
-	 * @var string
-	 */
-	protected $persistent;
-
-	/**
-	 * @var bool
-	 */
-	protected $closeOnDestruct = TRUE;
-
-	/**
-	 * @var bool
-	 */
-	protected $connected = FALSE;
-
-	/**
-	 * @var bool
-	 */
-	protected $standalone;
-
-	/**
-	 * @var int
-	 */
-	protected $maxConnectRetries = 0;
-
-	/**
-	 * @var int
-	 */
-	protected $connectFailures = 0;
-
-	/**
-	 * @var bool
-	 */
-	protected $usePipeline = FALSE;
-
-	/**
-	 * @var array
-	 */
-	protected $commandNames;
-
-	/**
-	 * @var string
-	 */
-	protected $commands;
-
-	/**
-	 * @var bool
-	 */
-	protected $isMulti = FALSE;
-
-	/**
-	 * @var bool
-	 */
-	protected $isWatching = FALSE;
-
-	/**
-	 * @var string
-	 */
-	protected $authPassword;
-
-	/**
-	 * @var int
-	 */
-	protected $selectedDb = 0;
-
-	/**
-	 * Aliases for backwards compatibility with phpredis
-	 * @var array
-	 */
-	protected $wrapperMethods = array('delete' => 'del', 'getkeys' => 'keys', 'sremove' => 'srem');
-
-	/**
-	 * @var array
-	 */
-	protected $renamedCommands;
-
-	/**
-	 * Creates a Redisent connection to the Redis server on host {@link $host} and port {@link $port}.
-	 * $host may also be a path to a unix socket or a string in the form of tcp://[hostname]:[port] or unix://[path]
-	 *
-	 * @param string $host The hostname of the Redis server
-	 * @param integer $port The port number of the Redis server
-	 * @param float $timeout Timeout period in seconds
-	 * @param string $persistent Flag to establish persistent connection
-	 */
-	public function __construct($host = '127.0.0.1', $port = 6379, $timeout = null, $persistent = '')
-	{
-		$this->host = (string)$host;
-		$this->port = (int)$port;
-		$this->timeout = $timeout;
-		$this->persistent = (string)$persistent;
-		$this->standalone = !extension_loaded('redis');
-	}
-
-	public function __destruct()
-	{
-		if ($this->closeOnDestruct) {
-			$this->close();
-		}
-	}
-
-	/**
-	 * @throws CredisException
-	 * @return Credis_Client
-	 */
-	public function forceStandalone()
-	{
-		if ($this->connected) {
-			throw new CredisException('Cannot force Credis_Client to use standalone PHP driver after a connection has already been established.');
-		}
-		$this->standalone = TRUE;
-		return $this;
-	}
-
-	/**
-	 * @param int $retries
-	 * @return Credis_Client
-	 */
-	public function setMaxConnectRetries($retries)
-	{
-		$this->maxConnectRetries = $retries;
-		return $this;
-	}
-
-	/**
-	 * @param bool $flag
-	 * @return Credis_Client
-	 */
-	public function setCloseOnDestruct($flag)
-	{
-		$this->closeOnDestruct = $flag;
-		return $this;
-	}
-
-	/**
-	 * @throws CredisException
-	 * @return Credis_Client
-	 */
-	public function connect()
-	{
-		if ($this->connected) {
-			return $this;
-		}
-		if (preg_match('#^(tcp|unix)://(.*)$#', $this->host, $matches)) {
-			if ($matches[1] == 'tcp') {
-				if (!preg_match('#^(.*)(?::(\d+))?(?:/(.*))?$#', $matches[2], $matches)) {
-					throw new CredisException('Invalid host format; expected tcp://host[:port][/persistent]');
-				}
-				$this->host = $matches[1];
-				$this->port = (int)(isset($matches[2]) ? $matches[2] : 6379);
-				$this->persistent = isset($matches[3]) ? $matches[3] : '';
-			} else {
-				$this->host = $matches[2];
-				$this->port = NULL;
-				if (substr($this->host, 0, 1) != '/') {
-					throw new CredisException('Invalid unix socket format; expected unix:///path/to/redis.sock');
-				}
-			}
-		}
-		if ($this->port !== NULL && substr($this->host, 0, 1) == '/') {
-			$this->port = NULL;
-		}
-		if ($this->standalone) {
-			$flags = STREAM_CLIENT_CONNECT;
-			$remote_socket = $this->port === NULL
-				? 'unix://' . $this->host
-				: 'tcp://' . $this->host . ':' . $this->port;
-			if ($this->persistent) {
-				if ($this->port === NULL) { // Unix socket
-					throw new CredisException('Persistent connections to UNIX sockets are not supported in standalone mode.');
-				}
-				$remote_socket .= '/' . $this->persistent;
-				$flags = $flags | STREAM_CLIENT_PERSISTENT;
-			}
-			$result = $this->redis = @stream_socket_client($remote_socket, $errno, $errstr, $this->timeout !== null ? $this->timeout : 2.5, $flags);
-		} else {
-			if (!$this->redis) {
-				$this->redis = new \Redis;
-			}
-			$result = $this->persistent
-				? $this->redis->pconnect($this->host, $this->port, $this->timeout, $this->persistent)
-				: $this->redis->connect($this->host, $this->port, $this->timeout);
-		}
-
-		// Use recursion for connection retries
-		if (!$result) {
-			$this->connectFailures++;
-			if ($this->connectFailures <= $this->maxConnectRetries) {
-				return $this->connect();
-			}
-			$failures = $this->connectFailures;
-			$this->connectFailures = 0;
-			throw new CredisException("Connection to Redis failed after $failures failures." . (isset($errno) && isset($errstr) ? "Last Error : ({$errno}) {$errstr}" : ""));
-		}
-
-		$this->connectFailures = 0;
-		$this->connected = TRUE;
-
-		// Set read timeout
-		if ($this->readTimeout) {
-			$this->setReadTimeout($this->readTimeout);
-		}
-
-		return $this;
-	}
-
-	/**
-	 * Set the read timeout for the connection. Use 0 to disable timeouts entirely (or use a very long timeout
-	 * if not supported).
-	 *
-	 * @param int $timeout 0 (or -1) for no timeout, otherwise number of seconds
-	 * @throws CredisException
-	 * @return Credis_Client
-	 */
-	public function setReadTimeout($timeout)
-	{
-		if ($timeout < -1) {
-			throw new CredisException('Timeout values less than -1 are not accepted.');
-		}
-		$this->readTimeout = $timeout;
-		if ($this->connected) {
-			if ($this->standalone) {
-				$timeout = $timeout <= 0 ? 315360000 : $timeout; // Ten-year timeout
-				stream_set_blocking($this->redis, TRUE);
-				stream_set_timeout($this->redis, (int)floor($timeout), ($timeout - floor($timeout)) * 1000000);
-			} else if (defined('Redis::OPT_READ_TIMEOUT')) {
-				// supported in phpredis 2.2.3
-				// a timeout value of -1 means reads will not timeout
-				$timeout = $timeout == 0 ? -1 : $timeout;
-				$this->redis->setOption(Redis::OPT_READ_TIMEOUT, $timeout);
-			}
-		}
-		return $this;
-	}
-
-	/**
-	 * @return bool
-	 */
-	public function close()
-	{
-		$result = TRUE;
-		if ($this->connected && !$this->persistent) {
-			try {
-				$result = $this->standalone ? fclose($this->redis) : $this->redis->close();
-				$this->connected = FALSE;
-			} catch (\Exception $e) {
-				; // Ignore exceptions on close
-			}
-		}
-		return $result;
-	}
-
-	/**
-	 * Enabled command renaming and provide mapping method. Supported methods are:
-	 *
-	 * 1. renameCommand('foo') // Salted md5 hash for all commands -> md5('foo'.$command)
-	 * 2. renameCommand(function($command){ return 'my'.$command; }); // Callable
-	 * 3. renameCommand('get', 'foo') // Single command -> alias
-	 * 4. renameCommand(['get' => 'foo', 'set' => 'bar']) // Full map of [command -> alias]
-	 *
-	 * @param string|callable|array $command
-	 * @param string|null $alias
-	 * @return $this
-	 */
-	public function renameCommand($command, $alias = NULL)
-	{
-		if (!$this->standalone) {
-			$this->forceStandalone();
-		}
-		if ($alias === NULL) {
-			$this->renamedCommands = $command;
-		} else {
-			if (!$this->renamedCommands) {
-				$this->renamedCommands = array();
-			}
-			$this->renamedCommands[$command] = $alias;
-		}
-		return $this;
-	}
-
-	/**
-	 * @param $command
+class Client {
+
+    const TYPE_STRING      = 'string';
+    const TYPE_LIST        = 'list';
+    const TYPE_SET         = 'set';
+    const TYPE_ZSET        = 'zset';
+    const TYPE_HASH        = 'hash';
+    const TYPE_NONE        = 'none';
+    const FREAD_BLOCK_SIZE = 8192;
+
+    /**
+     * Socket connection to the Redis server or Redis library instance
+     * @var resource|Redis
+     */
+    protected $redis;
+    protected $redisMulti;
+
+    /**
+     * Host of the Redis server
+     * @var string
+     */
+    protected $host;
+    
+    /**
+     * Port on which the Redis server is running
+     * @var integer
+     */
+    protected $port;
+
+    /**
+     * Timeout for connecting to Redis server
+     * @var float
+     */
+    protected $timeout;
+
+    /**
+     * Timeout for reading response from Redis server
+     * @var float
+     */
+    protected $readTimeout;
+
+    /**
+     * Unique identifier for persistent connections
+     * @var string
+     */
+    protected $persistent;
+
+    /**
+     * @var bool
+     */
+    protected $closeOnDestruct = TRUE;
+
+    /**
+     * @var bool
+     */
+    protected $connected = FALSE;
+
+    /**
+     * @var bool
+     */
+    protected $standalone;
+
+    /**
+     * @var int
+     */
+    protected $maxConnectRetries = 0;
+
+    /**
+     * @var int
+     */
+    protected $connectFailures = 0;
+
+    /**
+     * @var bool
+     */
+    protected $usePipeline = FALSE;
+
+    /**
+     * @var array
+     */
+    protected $commandNames;
+
+    /**
+     * @var string
+     */
+    protected $commands;
+
+    /**
+     * @var bool
+     */
+    protected $isMulti = FALSE;
+
+    /**
+     * @var bool
+     */
+    protected $isWatching = FALSE;
+
+    /**
+     * @var string
+     */
+    protected $authPassword;
+
+    /**
+     * @var int
+     */
+    protected $selectedDb = 0;
+
+    /**
+     * Aliases for backwards compatibility with phpredis
+     * @var array
+     */
+    protected $wrapperMethods = array('delete' => 'del', 'getkeys' => 'keys', 'sremove' => 'srem');
+
+    /**
+     * @var array
+     */
+    protected $renamedCommands;
+
+    /**
+     * @var int
+     */
+    protected $requests = 0;
+    
+    /**
+     * @var bool
+     */
+    protected $subscribed = false;
+    
+
+    /**
+     * Creates a Redisent connection to the Redis server on host {@link $host} and port {@link $port}.
+     * $host may also be a path to a unix socket or a string in the form of tcp://[hostname]:[port] or unix://[path]
+     *
+     * @param string $host The hostname of the Redis server
+     * @param integer $port The port number of the Redis server
+     * @param float $timeout  Timeout period in seconds
+     * @param string $persistent  Flag to establish persistent connection
+     * @param int $db The selected datbase of the Redis server
+     * @param string $password The authentication password of the Redis server
+     */
+    public function __construct($host = '127.0.0.1', $port = 6379, $timeout = null, $persistent = '', $db = 0, $password = null)
+    {
+        $this->host = (string) $host;
+        $this->port = (int) $port;
+        $this->timeout = $timeout;
+        $this->persistent = (string) $persistent;
+        $this->standalone = ! extension_loaded('redis');
+        $this->authPassword = $password;
+        $this->selectedDb = (int)$db;
+        $this->convertHost();
+    }
+
+    public function __destruct()
+    {
+        if ($this->closeOnDestruct) {
+            $this->close();
+        }
+    }
+    
+    /**
+     * @return bool
+     */
+    public function isSubscribed()
+    {
+    	return $this->subscribed;
+    }
+    
+    /**
+     * Return the host of the Redis instance
+     * @return string
+     */
+    public function getHost()
+    {
+        return $this->host;
+    }
+    /**
+     * Return the port of the Redis instance
+     * @return int
+     */
+    public function getPort()
+    {
+        return $this->port;
+    }
+
+    /**
+     * Return the selected database
+     * @return int
+     */
+    public function getSelectedDb()
+    {
+        return $this->selectedDb;
+    }
+    /**
+     * @return string
+     */
+    public function getPersistence()
+    {
+        return $this->persistent;
+    }
+    /**
+     * @throws CredisException
+     * @return Credis_Client
+     */
+    public function forceStandalone()
+    {
+        if ($this->standalone) {
+            return $this;
+        }
+        if($this->connected) {
+            throw new CredisException('Cannot force Credis_Client to use standalone PHP driver after a connection has already been established.');
+        }
+        $this->standalone = TRUE;
+        return $this;
+    }
+
+    /**
+     * @param int $retries
+     * @return Credis_Client
+     */
+    public function setMaxConnectRetries($retries)
+    {
+        $this->maxConnectRetries = $retries;
+        return $this;
+    }
+
+    /**
+     * @param bool $flag
+     * @return Credis_Client
+     */
+    public function setCloseOnDestruct($flag)
+    {
+        $this->closeOnDestruct = $flag;
+        return $this;
+    }
+    protected function convertHost()
+    {
+        if (preg_match('#^(tcp|unix)://(.*)$#', $this->host, $matches)) {
+            if($matches[1] == 'tcp') {
+                if ( ! preg_match('#^([^:]+)(:([0-9]+))?(/(.+))?$#', $matches[2], $matches)) {
+                    throw new CredisException('Invalid host format; expected tcp://host[:port][/persistence_identifier]');
+                }
+                $this->host = $matches[1];
+                $this->port = (int) (isset($matches[3]) ? $matches[3] : 6379);
+                $this->persistent = isset($matches[5]) ? $matches[5] : '';
+            } else {
+                $this->host = $matches[2];
+                $this->port = NULL;
+                if (substr($this->host,0,1) != '/') {
+                    throw new CredisException('Invalid unix socket format; expected unix:///path/to/redis.sock');
+                }
+            }
+        }
+        if ($this->port !== NULL && substr($this->host,0,1) == '/') {
+            $this->port = NULL;
+        }
+    }
+    /**
+     * @throws CredisException
+     * @return Credis_Client
+     */
+    public function connect()
+    {
+        if ($this->connected) {
+            return $this;
+        }
+        if ($this->standalone) {
+            $flags = STREAM_CLIENT_CONNECT;
+            $remote_socket = $this->port === NULL
+                ? 'unix://'.$this->host
+                : 'tcp://'.$this->host.':'.$this->port;
+            if ($this->persistent && $this->port !== NULL) {
+                // Persistent connections to UNIX sockets are not supported
+                $remote_socket .= '/'.$this->persistent;
+                $flags = $flags | STREAM_CLIENT_PERSISTENT;
+            }
+            $result = $this->redis = @stream_socket_client($remote_socket, $errno, $errstr, $this->timeout !== null ? $this->timeout : 2.5, $flags);
+        }
+        else {
+            if ( ! $this->redis) {
+                $this->redis = new \Redis;
+            }
+            try
+            {
+                $socketTimeout = $this->timeout ? $this->timeout : 0.0;
+                $result = $this->persistent
+                    ? $this->redis->pconnect($this->host, $this->port, $socketTimeout, $this->persistent)
+                    : $this->redis->connect($this->host, $this->port, $socketTimeout);
+            }
+            catch(Exception $e)
+            {
+                // Some applications will capture the php error that phpredis can sometimes generate and throw it as an Exception
+                $result = false;
+                $errno = 1;
+                $errstr = $e->getMessage();
+            }
+        }
+
+        // Use recursion for connection retries
+        if ( ! $result) {
+            $this->connectFailures++;
+            if ($this->connectFailures <= $this->maxConnectRetries) {
+                return $this->connect();
+            }
+            $failures = $this->connectFailures;
+            $this->connectFailures = 0;
+            throw new CredisException("Connection to Redis {$this->host}:{$this->port} failed after $failures failures." . (isset($errno) && isset($errstr) ? "Last Error : ({$errno}) {$errstr}" : ""));
+        }
+
+        $this->connectFailures = 0;
+        $this->connected = TRUE;
+
+        // Set read timeout
+        if ($this->readTimeout) {
+            $this->setReadTimeout($this->readTimeout);
+        }
+
+        if($this->authPassword) {
+            $this->auth($this->authPassword);
+        }
+        if($this->selectedDb !== 0) {
+            $this->select($this->selectedDb);
+        }
+        return $this;
+    }
+    /**
+     * @return bool
+     */
+    public function isConnected()
+    {
+        return $this->connected;
+    }
+    /**
+     * Set the read timeout for the connection. Use 0 to disable timeouts entirely (or use a very long timeout
+     * if not supported).
+     *
+     * @param int $timeout 0 (or -1) for no timeout, otherwise number of seconds
+     * @throws CredisException
+     * @return Credis_Client
+     */
+    public function setReadTimeout($timeout)
+    {
+        if ($timeout < -1) {
+            throw new CredisException('Timeout values less than -1 are not accepted.');
+        }
+        $this->readTimeout = $timeout;
+        if ($this->connected) {
+            if ($this->standalone) {
+                $timeout = $timeout <= 0 ? 315360000 : $timeout; // Ten-year timeout
+                stream_set_blocking($this->redis, TRUE);
+                stream_set_timeout($this->redis, (int) floor($timeout), ($timeout - floor($timeout)) * 1000000);
+            } else if (defined('Redis::OPT_READ_TIMEOUT')) {
+                // supported in phpredis 2.2.3
+                // a timeout value of -1 means reads will not timeout
+                $timeout = $timeout == 0 ? -1 : $timeout;
+                $this->redis->setOption(Redis::OPT_READ_TIMEOUT, $timeout);
+            }
+        }
+        return $this;
+    }
+
+    /**
+     * @return bool
+     */
+    public function close()
+    {
+        $result = TRUE;
+        if ($this->connected && ! $this->persistent) {
+            try {
+                $result = $this->standalone ? fclose($this->redis) : $this->redis->close();
+                $this->connected = FALSE;
+            } catch (Exception $e) {
+                ; // Ignore exceptions on close
+            }
+        }
+        return $result;
+    }
+
+    /**
+     * Enabled command renaming and provide mapping method. Supported methods are:
+     *
+     * 1. renameCommand('foo') // Salted md5 hash for all commands -> md5('foo'.$command)
+     * 2. renameCommand(function($command){ return 'my'.$command; }); // Callable
+     * 3. renameCommand('get', 'foo') // Single command -> alias
+     * 4. renameCommand(['get' => 'foo', 'set' => 'bar']) // Full map of [command -> alias]
+     *
+     * @param string|callable|array $command
+     * @param string|null $alias
+     * @return $this
+     */
+    public function renameCommand($command, $alias = NULL)
+    {
+        if ( ! $this->standalone) {
+            $this->forceStandalone();
+        }
+        if ($alias === NULL) {
+            $this->renamedCommands = $command;
+        } else {
+            if ( ! $this->renamedCommands) {
+                $this->renamedCommands = array();
+            }
+            $this->renamedCommands[$command] = $alias;
+        }
+        return $this;
+    }
+
+    /**
+     * @param $command
+     */
+    public function getRenamedCommand($command)
+    {
+        static $map;
+
+        // Command renaming not enabled
+        if ($this->renamedCommands === NULL) {
+            return $command;
+        }
+
+        // Initialize command map
+        if ($map === NULL) {
+            if (is_array($this->renamedCommands)) {
+                $map = $this->renamedCommands;
+            } else {
+                $map = array();
+            }
+        }
+
+        // Generate and return cached result
+        if ( ! isset($map[$command])) {
+            // String means all commands are hashed with salted md5
+            if (is_string($this->renamedCommands)) {
+                $map[$command] = md5($this->renamedCommands.$command);
+            }
+            // Would already be set in $map if it was intended to be renamed
+            else if (is_array($this->renamedCommands)) {
+                return $command;
+            }
+            // User-supplied function
+            else if (is_callable($this->renamedCommands)) {
+                $map[$command] = call_user_func($this->renamedCommands, $command);
+            }
+        }
+        return $map[$command];
+    }
+
+    /**
+     * @param string $password
+     * @return bool
+     */
+    public function auth($password)
+    {
+        $response = $this->__call('auth', array($password));
+        $this->authPassword = $password;
+        return $response;
+    }
+
+    /**
+     * @param int $index
+     * @return bool
+     */
+    public function select($index)
+    {
+        $response = $this->__call('select', array($index));
+        $this->selectedDb = (int) $index;
+        return $response;
+    }
+    
+    /**
+     * @param string|array $pattern
+     * @return array
+     */
+    public function pUnsubscribe()
+    {
+    	list($command, $channel, $subscribedChannels) = $this->__call('punsubscribe', func_get_args());
+    	$this->subscribed = $subscribedChannels > 0;
+    	return array($command, $channel, $subscribedChannels);
+    }
+
+    /**
+     * @param int $Iterator
+     * @param string $pattern
+     * @param int $count
+     * @return bool | Array
+     */    
+    public function scan(&$Iterator, $pattern = null, $count = null)
+    {
+        return $this->__call('scan', array(&$Iterator, $pattern, $count));
+    }
+
+    /**
+	 * @param int $Iterator
+	 * @param string $field
+	 * @param string $pattern
+	 * @param int $count
+	 * @return bool | Array
 	 */
-	public function getRenamedCommand($command)
+	public function hscan(&$Iterator, $field, $pattern = null, $count = null)
 	{
-		static $map;
-
-		// Command renaming not enabled
-		if ($this->renamedCommands === NULL) {
-			return $command;
-		}
-
-		// Initialize command map
-		if ($map === NULL) {
-			if (is_array($this->renamedCommands)) {
-				$map = $this->renamedCommands;
-			} else {
-				$map = array();
-			}
-		}
-
-		// Generate and return cached result
-		if (!isset($map[$command])) {
-			// String means all commands are hashed with salted md5
-			if (is_string($this->renamedCommands)) {
-				$map[$command] = md5($this->renamedCommands . $command);
-			} // Would already be set in $map if it was intended to be renamed
-			else if (is_array($this->renamedCommands)) {
-				return $command;
-			} // User-supplied function
-			else if (is_callable($this->renamedCommands)) {
-				$map[$command] = call_user_func($this->renamedCommands, $command);
-			}
-		}
-		return $map[$command];
+		return $this->__call('hscan', array($field, &$Iterator, $pattern, $count));
 	}
-
-	/**
-	 * @param string $password
-	 * @return bool
-	 */
-	public function auth($password)
-	{
-		$this->authPassword = $password;
-		$response = $this->__call('auth', array($this->authPassword));
-		return $response;
-	}
-
-	/**
-	 * @param int $index
-	 * @return bool
-	 */
-	public function select($index)
-	{
-		$this->selectedDb = (int)$index;
-		$response = $this->__call('select', array($this->selectedDb));
-		return $response;
-	}
-
-	/**
-	 * @param string|array $patterns
-	 * @param $callback
-	 * @return $this|array|bool|Credis_Client|mixed|null|string
-	 * @throws CredisException
-	 */
-	public function pSubscribe($patterns, $callback)
-	{
-		if (!$this->standalone) {
-			return $this->__call('pSubscribe', array((array)$patterns, $callback));
-		}
-
-		// Standalone mode: use infinite loop to subscribe until timeout
-		$patternCount = is_array($patterns) ? count($patterns) : 1;
-		while ($patternCount--) {
-			if (isset($status)) {
-				list($command, $pattern, $status) = $this->read_reply();
-			} else {
-				list($command, $pattern, $status) = $this->__call('psubscribe', array($patterns));
-			}
-			if (!$status) {
-				throw new CredisException('Invalid pSubscribe response.');
-			}
-		}
-		try {
-			while (1) {
-				list($type, $pattern, $channel, $message) = $this->read_reply();
-				if ($type != 'pmessage') {
-					throw new CredisException('Received non-pmessage reply.');
-				}
-				$callback($this, $pattern, $channel, $message);
-			}
-		} catch (CredisException $e) {
-			if ($e->getCode() == CredisException::CODE_TIMED_OUT) {
-				try {
-					list($command, $pattern, $status) = $this->pUnsubscribe($patterns);
-					while ($status !== 0) {
-						list($command, $pattern, $status) = $this->read_reply();
+    
+    /**
+     * @param int $Iterator
+     * @param string $field
+     * @param string $pattern
+     * @param int $Iterator
+     * @return bool | Array
+     */    
+    public function sscan(&$Iterator, $field, $pattern = null, $count = null)
+    {
+        return $this->__call('sscan', array($field, &$Iterator, $pattern, $count));
+    }
+    
+    /**
+     * @param int $Iterator
+     * @param string $field
+     * @param string $pattern
+     * @param int $Iterator
+     * @return bool | Array
+     */    
+    public function zscan(&$Iterator, $field, $pattern = null, $count = null)
+    {
+        return $this->__call('zscan', array($field, &$Iterator, $pattern, $count));
+    }
+
+    /**
+     * @param string|array $patterns
+     * @param $callback
+     * @return $this|array|bool|Credis_Client|mixed|null|string
+     * @throws CredisException
+     */
+    public function pSubscribe($patterns, $callback)
+    {
+        if ( ! $this->standalone) {
+            return $this->__call('pSubscribe', array((array)$patterns, $callback));
+        }
+
+        // Standalone mode: use infinite loop to subscribe until timeout
+        $patternCount = is_array($patterns) ? count($patterns) : 1;
+        while ($patternCount--) {
+            if (isset($status)) {
+                list($command, $pattern, $status) = $this->read_reply();
+            } else {
+                list($command, $pattern, $status) = $this->__call('psubscribe', array($patterns));
+            }
+            $this->subscribed = $status > 0;
+            if ( ! $status) {
+                throw new CredisException('Invalid pSubscribe response.');
+            }
+        }
+        try {
+            while ($this->subscribed) {
+                list($type, $pattern, $channel, $message) = $this->read_reply();
+                if ($type != 'pmessage') {
+                    throw new CredisException('Received non-pmessage reply.');
+                }
+                $callback($this, $pattern, $channel, $message);
+            }
+        } catch (CredisException $e) {
+            if ($e->getCode() == CredisException::CODE_TIMED_OUT) {
+                try {
+                    list($command, $pattern, $status) = $this->pUnsubscribe($patterns);
+                    while ($status !== 0) {
+                        list($command, $pattern, $status) = $this->read_reply();
+                    }
+                } catch (CredisException $e2) {
+                    throw $e2;
+                }
+            }
+            throw $e;
+        }
+    }
+
+    /**
+     * @param string|array $pattern
+     * @return array
+     */
+    public function unsubscribe()
+    {
+    	list($command, $channel, $subscribedChannels) = $this->__call('unsubscribe', func_get_args());
+    	$this->subscribed = $subscribedChannels > 0;
+    	return array($command, $channel, $subscribedChannels);
+    }
+
+    /**
+     * @param string|array $channels
+     * @param $callback
+     * @throws CredisException
+     * @return $this|array|bool|Credis_Client|mixed|null|string
+     */
+    public function subscribe($channels, $callback)
+    {
+        if ( ! $this->standalone) {
+            return $this->__call('subscribe', array((array)$channels, $callback));
+        }
+
+        // Standalone mode: use infinite loop to subscribe until timeout
+        $channelCount = is_array($channels) ? count($channels) : 1;
+        while ($channelCount--) {
+            if (isset($status)) {
+                list($command, $channel, $status) = $this->read_reply();
+            } else {
+                list($command, $channel, $status) = $this->__call('subscribe', array($channels));
+            }
+            $this->subscribed = $status > 0;
+            if ( ! $status) {
+                throw new CredisException('Invalid subscribe response.');
+            }
+        }
+        try {
+            while ($this->subscribed) {
+                list($type, $channel, $message) = $this->read_reply();
+                if ($type != 'message') {
+                    throw new CredisException('Received non-message reply.');
+                }
+                $callback($this, $channel, $message);
+            }
+        } catch (CredisException $e) {
+            if ($e->getCode() == CredisException::CODE_TIMED_OUT) {
+                try {
+                    list($command, $channel, $status) = $this->unsubscribe($channels);
+                    while ($status !== 0) {
+                        list($command, $channel, $status) = $this->read_reply();
+                    }
+                } catch (CredisException $e2) {
+                    throw $e2;
+                }
+            }
+            throw $e;
+        }
+    }
+
+    public function __call($name, $args)
+    {
+        // Lazy connection
+        $this->connect();
+
+        $name = strtolower($name);
+
+        // Send request via native PHP
+        if($this->standalone)
+        {
+            switch ($name) {
+                case 'eval':
+                case 'evalsha':
+                    $script = array_shift($args);
+                    $keys = (array) array_shift($args);
+                    $eArgs = (array) array_shift($args);
+                    $args = array($script, count($keys), $keys, $eArgs);
+                    break;
+                case 'zunionstore':
+                    $dest = array_shift($args);
+                    $keys = (array) array_shift($args);
+                    $weights = array_shift($args);
+                    $aggregate = array_shift($args);
+                    $args = array($dest, count($keys), $keys);
+                    if ($weights) {
+                        $args[] = (array) $weights;
+                    }
+                    if ($aggregate) {
+                        $args[] = $aggregate;
+                    }
+                    break;
+                case 'set':
+                    // The php redis module has different behaviour with ttl
+                    // https://github.com/phpredis/phpredis#set
+                    if (count($args) === 3 && is_int($args[2])) { 
+                        $args = array($args[0], $args[1], array('EX', $args[2]));
+                    } elseif (count($args) === 3 && is_array($args[2])) {
+                        $tmp_args = $args;
+                        $args = array($tmp_args[0], $tmp_args[1]);
+                        foreach ($tmp_args[2] as $k=>$v) {
+                            if (is_string($k)) {
+                                $args[] = array($k,$v);
+                            } elseif (is_int($k)) {
+                                $args[] = $v;
+                            }
+                        }
+                        unset($tmp_args);
+                    }
+                    break;
+                case 'scan':
+                    $ref =& $args[0];
+                    if (empty($ref))
+                    {
+                        $ref = 0;
+                    }
+                    $eArgs = array($ref);
+                    if (!empty($args[1]))
+                    {
+                        $eArgs[] = 'MATCH';
+                        $eArgs[] = $args[1];
+                    }
+                    if (!empty($args[2]))
+                    {
+                        $eArgs[] = 'COUNT';
+                        $eArgs[] = $args[2];
+                    }
+                    $args = $eArgs;
+                    break;
+                case 'sscan':
+                case 'zscan':
+                case 'hscan':
+					$ref =& $args[1];
+					if (empty($ref))
+					{
+						$ref = 0;
 					}
-				} catch (CredisException $e2) {
-					throw $e2;
-				}
-			}
-			throw $e;
-		}
-	}
-
-	/**
-	 * @param string|array $channels
-	 * @param $callback
-	 * @throws CredisException
-	 * @return $this|array|bool|Credis_Client|mixed|null|string
-	 */
-	public function subscribe($channels, $callback)
-	{
-		if (!$this->standalone) {
-			return $this->__call('subscribe', array((array)$channels, $callback));
-		}
-
-		// Standalone mode: use infinite loop to subscribe until timeout
-		$channelCount = is_array($channels) ? count($channels) : 1;
-		while ($channelCount--) {
-			if (isset($status)) {
-				list($command, $channel, $status) = $this->read_reply();
-			} else {
-				list($command, $channel, $status) = $this->__call('subscribe', array($channels));
-			}
-			if (!$status) {
-				throw new CredisException('Invalid subscribe response.');
-			}
-		}
-		try {
-			while (1) {
-				list($type, $channel, $message) = $this->read_reply();
-				if ($type != 'message') {
-					throw new CredisException('Received non-message reply.');
-				}
-				$callback($this, $channel, $message);
-			}
-		} catch (CredisException $e) {
-			if ($e->getCode() == CredisException::CODE_TIMED_OUT) {
-				try {
-					list($command, $channel, $status) = $this->unsubscribe($channels);
-					while ($status !== 0) {
-						list($command, $channel, $status) = $this->read_reply();
+					$eArgs = array($args[0],$ref);
+					if (!empty($args[2]))
+					{
+						$eArgs[] = 'MATCH';
+						$eArgs[] = $args[2];
 					}
-				} catch (CredisException $e2) {
-					throw $e2;
-				}
-			}
-			throw $e;
-		}
-	}
-
-	public function __call($name, $args)
-	{
-		// Lazy connection
-		$this->connect();
-
-		$name = strtolower($name);
-
-		// Send request via native PHP
-		if ($this->standalone) {
-			switch ($name) {
-				case 'eval':
-				case 'evalsha':
-					$script = array_shift($args);
-					$keys = (array)array_shift($args);
-					$eArgs = (array)array_shift($args);
-					$args = array($script, count($keys), $keys, $eArgs);
-					break;
-			}
-			// Flatten arguments
-			$argsFlat = NULL;
-			foreach ($args as $index => $arg) {
-				if (is_array($arg)) {
-					if ($argsFlat === NULL) {
-						$argsFlat = array_slice($args, 0, $index);
-					}
-					if ($name == 'mset' || $name == 'msetnx' || $name == 'hmset') {
-						foreach ($arg as $key => $value) {
-							$argsFlat[] = $key;
-							$argsFlat[] = $value;
-						}
-					} else {
-						$argsFlat = array_merge($argsFlat, $arg);
-					}
-				} else if ($argsFlat !== NULL) {
-					$argsFlat[] = $arg;
-				}
-			}
-			if ($argsFlat !== NULL) {
-				$args = $argsFlat;
-				$argsFlat = NULL;
-			}
-
-			// In pipeline mode
-			if ($this->usePipeline) {
-				if ($name == 'pipeline') {
-					throw new CredisException('A pipeline is already in use and only one pipeline is supported.');
-				} else if ($name == 'exec') {
-					if ($this->isMulti) {
-						$this->commandNames[] = $name;
-						$this->commands .= self::_prepare_command(array($this->getRenamedCommand($name)));
-					}
-
-					// Write request
-					if ($this->commands) {
-						$this->write_command($this->commands);
-					}
-					$this->commands = NULL;
-
-					// Read response
-					$response = array();
-					foreach ($this->commandNames as $command) {
-						$response[] = $this->read_reply($command);
+					if (!empty($args[3]))
+					{
+						$eArgs[] = 'COUNT';
+						$eArgs[] = $args[3];
 					}
-					$this->commandNames = NULL;
-
-					if ($this->isMulti) {
-						$response = array_pop($response);
-					}
-					$this->usePipeline = $this->isMulti = FALSE;
-					return $response;
-				} else {
-					if ($name == 'multi') {
-						$this->isMulti = TRUE;
-					}
-					array_unshift($args, $this->getRenamedCommand($name));
-					$this->commandNames[] = $name;
-					$this->commands .= self::_prepare_command($args);
-					return $this;
-				}
-			}
-
-			// Start pipeline mode
-			if ($name == 'pipeline') {
-				$this->usePipeline = TRUE;
-				$this->commandNames = array();
-				$this->commands = '';
-				return $this;
-			}
-
-			// If unwatching, allow reconnect with no error thrown
-			if ($name == 'unwatch') {
-				$this->isWatching = FALSE;
-			}
-
-			// Non-pipeline mode
-			array_unshift($args, $this->getRenamedCommand($name));
-			$command = self::_prepare_command($args);
-			$this->write_command($command);
-			$response = $this->read_reply($name);
-
-			// Watch mode disables reconnect so error is thrown
-			if ($name == 'watch') {
-				$this->isWatching = TRUE;
-			} // Transaction mode
-			else if ($this->isMulti && ($name == 'exec' || $name == 'discard')) {
-				$this->isMulti = FALSE;
-			} // Started transaction
-			else if ($this->isMulti || $name == 'multi') {
-				$this->isMulti = TRUE;
-				$response = $this;
-			}
-		} // Send request via phpredis client
-		else {
-			// Tweak arguments
-			switch ($name) {
-				case 'get':   // optimize common cases
-				case 'set':
-				case 'hget':
-				case 'hset':
-				case 'setex':
-				case 'mset':
-				case 'msetnx':
-				case 'hmset':
-				case 'hmget':
-				case 'del':
-					break;
-				case 'mget':
-					if (isset($args[0]) && !is_array($args[0])) {
-						$args = array($args);
-					}
-					break;
-				case 'lrem':
-					$args = array($args[0], $args[2], $args[1]);
-					break;
-				case 'eval':
-				case 'evalsha':
-					if (isset($args[1]) && is_array($args[1])) {
-						$cKeys = $args[1];
-					} elseif (isset($args[1]) && is_string($args[1])) {
-						$cKeys = array($args[1]);
-					} else {
-						$cKeys = array();
-					}
-					if (isset($args[2]) && is_array($args[2])) {
-						$cArgs = $args[2];
-					} elseif (isset($args[2]) && is_string($args[2])) {
-						$cArgs = array($args[2]);
-					} else {
-						$cArgs = array();
-					}
-					$args = array($args[0], array_merge($cKeys, $cArgs), count($cKeys));
-					break;
-				case 'subscribe':
-				case 'psubscribe':
+					$args = $eArgs;
 					break;
-				default:
-					// Flatten arguments
-					$argsFlat = NULL;
-					foreach ($args as $index => $arg) {
-						if (is_array($arg)) {
-							if ($argsFlat === NULL) {
-								$argsFlat = array_slice($args, 0, $index);
-							}
-							$argsFlat = array_merge($argsFlat, $arg);
-						} else if ($argsFlat !== NULL) {
-							$argsFlat[] = $arg;
-						}
-					}
-					if ($argsFlat !== NULL) {
-						$args = $argsFlat;
-						$argsFlat = NULL;
-					}
-			}
-
-			try {
-				// Proxy pipeline mode to the phpredis library
-				if ($name == 'pipeline' || $name == 'multi') {
-					if ($this->isMulti) {
-						return $this;
-					} else {
-						$this->isMulti = TRUE;
-						$this->redisMulti = call_user_func_array(array($this->redis, $name), $args);
-					}
-				} else if ($name == 'exec' || $name == 'discard') {
-					$this->isMulti = FALSE;
-					$response = $this->redisMulti->$name();
-					$this->redisMulti = NULL;
-					#echo "> $name : ".substr(print_r($response, TRUE),0,100)."\n";
-					return $response;
-				}
-
-				// Use aliases to be compatible with phpredis wrapper
-				if (isset($this->wrapperMethods[$name])) {
-					$name = $this->wrapperMethods[$name];
-				}
-
-				// Multi and pipeline return self for chaining
-				if ($this->isMulti) {
-					call_user_func_array(array($this->redisMulti, $name), $args);
-					return $this;
-				}
-
-				$response = call_user_func_array(array($this->redis, $name), $args);
-			} // Wrap exceptions
-			catch (RedisException $e) {
-				$code = 0;
-				if (!($result = $this->redis->IsConnected())) {
-					$this->connected = FALSE;
-					$code = CredisException::CODE_DISCONNECTED;
-				}
-				throw new CredisException($e->getMessage(), $code, $e);
-			}
-
-			#echo "> $name : ".substr(print_r($response, TRUE),0,100)."\n";
-
-			// change return values where it is too difficult to minim in standalone mode
-			switch ($name) {
-				case 'hmget':
-					$response = array_values($response);
-					break;
-
-				case 'type':
-					$typeMap = array(
-						self::TYPE_NONE,
-						self::TYPE_STRING,
-						self::TYPE_SET,
-						self::TYPE_LIST,
-						self::TYPE_ZSET,
-						self::TYPE_HASH,
-					);
-					$response = $typeMap[$response];
-					break;
-
-				// Handle scripting errors
-				case 'eval':
-				case 'evalsha':
-				case 'script':
-					$error = $this->redis->getLastError();
-					$this->redis->clearLastError();
-					if ($error && substr($error, 0, 8) == 'NOSCRIPT') {
-						$response = NULL;
-					} else if ($error) {
-						throw new CredisException($error);
-					}
+                case 'zrangebyscore':
+                case 'zrevrangebyscore':
+                case 'zrange':
+                case 'zrevrange':
+                    if (isset($args[3]) && is_array($args[3])) {
+                        // map options
+                        $cArgs = array();
+                        if (!empty($args[3]['withscores'])) {
+                            $cArgs[] = 'withscores';
+                        }
+                        if (($name == 'zrangebyscore' || $name == 'zrevrangebyscore') && array_key_exists('limit', $args[3])) {
+                            $cArgs[] = array('limit' => $args[3]['limit']);
+                        }
+                        $args[3] = $cArgs;
+                    }
+                    break;
+                case 'mget':
+                    if (isset($args[0]) && is_array($args[0])) 
+                    {
+                        $args = array_values($args[0]);
+                    }
+                    break;
+            }
+            // Flatten arguments
+            $args = self::_flattenArguments($args);
+
+            // In pipeline mode
+            if($this->usePipeline)
+            {
+                if($name == 'pipeline') {
+                    throw new CredisException('A pipeline is already in use and only one pipeline is supported.');
+                }
+                else if($name == 'exec') {
+                    if($this->isMulti) {
+                        $this->commandNames[] = $name;
+                        $this->commands .= self::_prepare_command(array($this->getRenamedCommand($name)));
+                    }
+
+                    // Write request
+                    if($this->commands) {
+                        $this->write_command($this->commands);
+                    }
+                    $this->commands = NULL;
+
+                    // Read response
+                    $response = array();
+                    foreach($this->commandNames as $command) {
+                        $response[] = $this->read_reply($command);
+                    }
+                    $this->commandNames = NULL;
+
+                    if($this->isMulti) {
+                        $response = array_pop($response);
+                    }
+                    $this->usePipeline = $this->isMulti = FALSE;
+                    return $response;
+                }
+                else {
+                    if($name == 'multi') {
+                        $this->isMulti = TRUE;
+                    }
+                    array_unshift($args, $this->getRenamedCommand($name));
+                    $this->commandNames[] = $name;
+                    $this->commands .= self::_prepare_command($args);
+                    return $this;
+                }
+            }
+
+            // Start pipeline mode
+            if($name == 'pipeline')
+            {
+                $this->usePipeline = TRUE;
+                $this->commandNames = array();
+                $this->commands = '';
+                return $this;
+            }
+
+            // If unwatching, allow reconnect with no error thrown
+            if($name == 'unwatch') {
+                $this->isWatching = FALSE;
+            }
+
+            // Non-pipeline mode
+            array_unshift($args, $this->getRenamedCommand($name));
+            $command = self::_prepare_command($args);
+            $this->write_command($command);
+            $response = $this->read_reply($name);
+
+            switch($name)
+            {
+                case 'scan':
+                case 'sscan':
+                    $ref = array_shift($response);
+                    $response = empty($response[0]) ? array() : $response[0];
+                    break;
+                case 'hscan':
+                case 'zscan':
+                    $ref = array_shift($response);
+                    $response = empty($response[0]) ? array() : $response[0];
+                    if (!empty($response) && is_array($response))
+                    {
+                        $count  = count($response);
+                        $out    = array();
+                        for($i  = 0;$i < $count;$i+=2){
+                            $out[$response[$i]] = $response[$i+1];
+                        }
+                        $response = $out;
+                    }
 					break;
-			}
-		}
-
-		return $response;
-	}
-
-	protected function write_command($command)
-	{
-		// Reconnect on lost connection (Redis server "timeout" exceeded since last command)
-		if (feof($this->redis)) {
-			$this->close();
-			// If a watch or transaction was in progress and connection was lost, throw error rather than reconnect
-			// since transaction/watch state will be lost.
-			if (($this->isMulti && !$this->usePipeline) || $this->isWatching) {
-				$this->isMulti = $this->isWatching = FALSE;
-				throw new CredisException('Lost connection to Redis server during watch or transaction.');
-			}
-			$this->connected = FALSE;
-			$this->connect();
-			if ($this->authPassword) {
-				$this->auth($this->authPassword);
-			}
-			if ($this->selectedDb != 0) {
-				$this->select($this->selectedDb);
-			}
-		}
-
-		$commandLen = strlen($command);
-		for ($written = 0; $written < $commandLen; $written += $fwrite) {
-			$fwrite = fwrite($this->redis, substr($command, $written));
-			if ($fwrite === FALSE || $fwrite == 0) {
-				$this->connected = FALSE;
-				throw new CredisException('Failed to write entire command to stream');
-			}
-		}
-	}
-
-	protected function read_reply($name = '')
-	{
-		$reply = fgets($this->redis);
-		if ($reply === FALSE) {
-			$info = stream_get_meta_data($this->redis);
-			if ($info['timed_out']) {
-				throw new CredisException('Read operation timed out.', CredisException::CODE_TIMED_OUT);
-			} else {
-				$this->connected = FALSE;
-				throw new CredisException('Lost connection to Redis server.', CredisException::CODE_DISCONNECTED);
-			}
-		}
-		$reply = rtrim($reply, CRLF);
-		#echo "> $name: $reply\n";
-		$replyType = substr($reply, 0, 1);
-		switch ($replyType) {
-			/* Error reply */
-			case '-':
-				if ($this->isMulti || $this->usePipeline) {
-					$response = FALSE;
-				} else if ($name == 'evalsha' && substr($reply, 0, 9) == '-NOSCRIPT') {
-					$response = NULL;
-				} else {
-					throw new CredisException(substr($reply, 0, 4) == '-ERR' ? substr($reply, 5) : substr($reply, 1));
-				}
-				break;
-			/* Inline reply */
-			case '+':
-				$response = substr($reply, 1);
-				if ($response == 'OK' || $response == 'QUEUED') {
-					return TRUE;
-				}
-				break;
-			/* Bulk reply */
-			case '$':
-				if ($reply == '$-1') return FALSE;
-				$size = (int)substr($reply, 1);
-				$response = stream_get_contents($this->redis, $size + 2);
-				if (!$response) {
-					$this->connected = FALSE;
-					throw new CredisException('Error reading reply.');
-				}
-				$response = substr($response, 0, $size);
-				break;
-			/* Multi-bulk reply */
-			case '*':
-				$count = substr($reply, 1);
-				if ($count == '-1') return FALSE;
-
-				$response = array();
-				for ($i = 0; $i < $count; $i++) {
-					$response[] = $this->read_reply();
-				}
-				break;
-			/* Integer reply */
-			case ':':
-				$response = intval(substr($reply, 1));
-				break;
-			default:
-				throw new CredisException('Invalid response: ' . print_r($reply, TRUE));
-				break;
-		}
-
-		// Smooth over differences between phpredis and standalone response
-		switch ($name) {
-			case '': // Minor optimization for multi-bulk replies
-				break;
-			case 'config':
-			case 'hgetall':
-				$keys = $values = array();
-				while ($response) {
-					$keys[] = array_shift($response);
-					$values[] = array_shift($response);
-				}
-				$response = count($keys) ? array_combine($keys, $values) : array();
-				break;
-			case 'info':
-				$lines = explode(CRLF, trim($response, CRLF));
-				$response = array();
-				foreach ($lines as $line) {
-					if (!$line || substr($line, 0, 1) == '#') {
-						continue;
-					}
-					list($key, $value) = explode(':', $line, 2);
-					$response[$key] = $value;
-				}
-				break;
-			case 'ttl':
-				if ($response === -1) {
-					$response = FALSE;
-				}
-				break;
-		}
-
-		return $response;
-	}
-
-	/**
-	 * Build the Redis unified protocol command
-	 *
-	 * @param array $args
-	 * @return string
-	 */
-	private static function _prepare_command($args)
-	{
-		return sprintf('*%d%s%s%s', count($args), CRLF, implode(array_map(array('self', '_map'), $args), CRLF), CRLF);
-	}
-
-	private static function _map($arg)
-	{
-		return sprintf('$%d%s%s', strlen($arg), CRLF, $arg);
-	}
-
+                case 'zrangebyscore':
+                case 'zrevrangebyscore':
+                    if (in_array('withscores', $args, true)) {
+                        // Map array of values into key=>score list like phpRedis does
+                        $item = null;
+                        $out = array();
+                        foreach ($response as $value) {
+                            if ($item == null) {
+                                $item = $value;
+                            } else {
+                                // 2nd value is the score
+                                $out[$item] = (float) $value;
+                                $item = null;
+                            }
+                        }
+                        $response = $out;
+                    }
+                    break;
+            }
+
+            // Watch mode disables reconnect so error is thrown
+            if($name == 'watch') {
+                $this->isWatching = TRUE;
+            }
+            // Transaction mode
+            else if($this->isMulti && ($name == 'exec' || $name == 'discard')) {
+                $this->isMulti = FALSE;
+            }
+            // Started transaction
+            else if($this->isMulti || $name == 'multi') {
+                $this->isMulti = TRUE;
+                $response = $this;
+            }
+        }
+
+        // Send request via phpredis client
+        else
+        {
+            // Tweak arguments
+            switch($name) {
+                case 'get':   // optimize common cases
+                case 'set':
+                case 'hget':
+                case 'hset':
+                case 'setex':
+                case 'mset':
+                case 'msetnx':
+                case 'hmset':
+                case 'hmget':
+                case 'del':
+                case 'zrangebyscore':
+                case 'zrevrangebyscore':
+                case 'zrange':
+                case 'zrevrange':
+                   break;
+                case 'zunionstore':
+                    $cArgs = array();
+                    $cArgs[] = array_shift($args); // destination
+                    $cArgs[] = array_shift($args); // keys
+                    if(isset($args[0]) and isset($args[0]['weights'])) {
+                        $cArgs[] = (array) $args[0]['weights'];
+                    } else {
+                        $cArgs[] = null;
+                    }
+                    if(isset($args[0]) and isset($args[0]['aggregate'])) {
+                        $cArgs[] = strtoupper($args[0]['aggregate']);
+                    }
+                    $args = $cArgs;
+                    break;
+                case 'mget':
+                    if(isset($args[0]) && ! is_array($args[0])) {
+                        $args = array($args);
+                    }
+                    break;
+                case 'lrem':
+                    $args = array($args[0], $args[2], $args[1]);
+                    break;
+                case 'eval':
+                case 'evalsha':
+                    if (isset($args[1]) && is_array($args[1])) {
+                        $cKeys = $args[1];
+                    } elseif (isset($args[1]) && is_string($args[1])) {
+                        $cKeys = array($args[1]);
+                    } else {
+                        $cKeys = array();
+                    }
+                    if (isset($args[2]) && is_array($args[2])) {
+                        $cArgs = $args[2];
+                    } elseif (isset($args[2]) && is_string($args[2])) {
+                        $cArgs = array($args[2]);
+                    } else {
+                        $cArgs = array();
+                    }
+                    $args = array($args[0], array_merge($cKeys, $cArgs), count($cKeys));
+                    break;
+                case 'subscribe':
+                case 'psubscribe':
+                    break;
+                case 'scan':
+                case 'sscan':
+                case 'hscan':
+                case 'zscan':
+                    // allow phpredis to see the caller's reference
+                    //$param_ref =& $args[0];
+                    break;
+                default:
+                    // Flatten arguments
+                    $args = self::_flattenArguments($args);
+            }
+
+            try {
+                // Proxy pipeline mode to the phpredis library
+                if($name == 'pipeline' || $name == 'multi') {
+                    if($this->isMulti) {
+                        return $this;
+                    } else {
+                        $this->isMulti = TRUE;
+                        $this->redisMulti = call_user_func_array(array($this->redis, $name), $args);
+                        return $this;
+                    }
+                }
+                else if($name == 'exec' || $name == 'discard') {
+                    $this->isMulti = FALSE;
+                    $response = $this->redisMulti->$name();
+                    $this->redisMulti = NULL;
+                    #echo "> $name : ".substr(print_r($response, TRUE),0,100)."\n";
+                    return $response;
+                }
+
+                // Use aliases to be compatible with phpredis wrapper
+                if(isset($this->wrapperMethods[$name])) {
+                    $name = $this->wrapperMethods[$name];
+                }
+
+                // Multi and pipeline return self for chaining
+                if($this->isMulti) {
+                    call_user_func_array(array($this->redisMulti, $name), $args);
+                    return $this;
+                }
+
+                // Send request, retry one time when using persistent connections on the first request only
+                $this->requests++;
+                try {
+                    $response = call_user_func_array(array($this->redis, $name), $args);
+                } catch (RedisException $e) {
+                    if ($this->persistent && $this->requests == 1 && $e->getMessage() == 'read error on connection') {
+                        $this->connected = FALSE;
+                        $this->connect();
+                        $response = call_user_func_array(array($this->redis, $name), $args);
+                    } else {
+                        throw $e;
+                    }
+                }
+            }
+            // Wrap exceptions
+            catch(RedisException $e) {
+                $code = 0;
+                if ( ! ($result = $this->redis->IsConnected())) {
+                    $this->connected = FALSE;
+                    $code = CredisException::CODE_DISCONNECTED;
+                }
+                throw new CredisException($e->getMessage(), $code, $e);
+            }
+
+            #echo "> $name : ".substr(print_r($response, TRUE),0,100)."\n";
+
+            // change return values where it is too difficult to minim in standalone mode
+            switch($name)
+            {
+                case 'hmget':
+                    $response = array_values($response);
+                    break;
+
+                case 'type':
+                    $typeMap = array(
+                      self::TYPE_NONE,
+                      self::TYPE_STRING,
+                      self::TYPE_SET,
+                      self::TYPE_LIST,
+                      self::TYPE_ZSET,
+                      self::TYPE_HASH,
+                    );
+                    $response = $typeMap[$response];
+                    break;
+
+                // Handle scripting errors
+                case 'eval':
+                case 'evalsha':
+                case 'script':
+                    $error = $this->redis->getLastError();
+                    $this->redis->clearLastError();
+                    if ($error && substr($error,0,8) == 'NOSCRIPT') {
+                        $response = NULL;
+                    } else if ($error) {
+                        throw new CredisException($error);
+                    }
+                    break;
+                default:
+                    $error = $this->redis->getLastError();
+                    $this->redis->clearLastError();
+                    if ($error) {
+                        throw new CredisException($error);
+                    }
+                    break;
+            }
+        }
+
+        return $response;
+    }
+
+    protected function write_command($command)
+    {
+        // Reconnect on lost connection (Redis server "timeout" exceeded since last command)
+        if(feof($this->redis)) {
+            $this->close();
+            // If a watch or transaction was in progress and connection was lost, throw error rather than reconnect
+            // since transaction/watch state will be lost.
+            if(($this->isMulti && ! $this->usePipeline) || $this->isWatching) {
+                $this->isMulti = $this->isWatching = FALSE;
+                throw new CredisException('Lost connection to Redis server during watch or transaction.');
+            }
+            $this->connected = FALSE;
+            $this->connect();
+            if($this->authPassword) {
+                $this->auth($this->authPassword);
+            }
+            if($this->selectedDb != 0) {
+                $this->select($this->selectedDb);
+            }
+        }
+
+        $commandLen = strlen($command);
+        $lastFailed = FALSE;
+        for ($written = 0; $written < $commandLen; $written += $fwrite) {
+            $fwrite = fwrite($this->redis, substr($command, $written));
+            if ($fwrite === FALSE || ($fwrite == 0 && $lastFailed)) {
+                $this->connected = FALSE;
+                throw new CredisException('Failed to write entire command to stream');
+            }
+            $lastFailed = $fwrite == 0;
+        }
+    }
+
+    protected function read_reply($name = '')
+    {
+        $reply = fgets($this->redis);
+        if($reply === FALSE) {
+            $info = stream_get_meta_data($this->redis);
+            if ($info['timed_out']) {
+                throw new CredisException('Read operation timed out.', CredisException::CODE_TIMED_OUT);
+            } else {
+                $this->connected = FALSE;
+                throw new CredisException('Lost connection to Redis server.', CredisException::CODE_DISCONNECTED);
+            }
+        }
+        $reply = rtrim($reply, CRLF);
+        #echo "> $name: $reply\n";
+        $replyType = substr($reply, 0, 1);
+        switch ($replyType) {
+            /* Error reply */
+            case '-':
+                if($this->isMulti || $this->usePipeline) {
+                    $response = FALSE;
+                } else if ($name == 'evalsha' && substr($reply,0,9) == '-NOSCRIPT') {
+                    $response = NULL;
+                } else {
+                    throw new CredisException(substr($reply,0,4) == '-ERR' ? substr($reply, 5) : substr($reply,1));
+                }
+                break;
+            /* Inline reply */
+            case '+':
+                $response = substr($reply, 1);
+                if($response == 'OK' || $response == 'QUEUED') {
+                  return TRUE;
+                }
+                break;
+            /* Bulk reply */
+            case '$':
+                if ($reply == '$-1') return FALSE;
+                $size = (int) substr($reply, 1);
+                $response = stream_get_contents($this->redis, $size + 2);
+                if( ! $response) {
+                    $this->connected = FALSE;
+                    throw new CredisException('Error reading reply.');
+                }
+                $response = substr($response, 0, $size);
+                break;
+            /* Multi-bulk reply */
+            case '*':
+                $count = substr($reply, 1);
+                if ($count == '-1') return FALSE;
+
+                $response = array();
+                for ($i = 0; $i < $count; $i++) {
+                        $response[] = $this->read_reply();
+                }
+                break;
+            /* Integer reply */
+            case ':':
+                $response = intval(substr($reply, 1));
+                break;
+            default:
+                throw new CredisException('Invalid response: '.print_r($reply, TRUE));
+                break;
+        }
+
+        // Smooth over differences between phpredis and standalone response
+        switch($name)
+        {
+            case '': // Minor optimization for multi-bulk replies
+                break;
+            case 'config':
+            case 'hgetall':
+                $keys = $values = array();
+                while($response) {
+                    $keys[] = array_shift($response);
+                    $values[] = array_shift($response);
+                }
+                $response = count($keys) ? array_combine($keys, $values) : array();
+                break;
+            case 'info':
+                $lines = explode(CRLF, trim($response,CRLF));
+                $response = array();
+                foreach($lines as $line) {
+                    if ( ! $line || substr($line, 0, 1) == '#') {
+                      continue;
+                    }
+                    list($key, $value) = explode(':', $line, 2);
+                    $response[$key] = $value;
+                }
+                break;
+            case 'ttl':
+                if($response === -1) {
+                    $response = FALSE;
+                }
+                break;
+        }
+
+        return $response;
+    }
+
+    /**
+     * Build the Redis unified protocol command
+     *
+     * @param array $args
+     * @return string
+     */
+    private static function _prepare_command($args)
+    {
+        return sprintf('*%d%s%s%s', count($args), CRLF, implode(array_map(array('self', '_map'), $args), CRLF), CRLF);
+    }
+
+    private static function _map($arg)
+    {
+        return sprintf('$%d%s%s', strlen($arg), CRLF, $arg);
+    }
+
+    /**
+     * Flatten arguments
+     *
+     * If an argument is an array, the key is inserted as argument followed by the array values
+     *  array('zrangebyscore', '-inf', 123, array('limit' => array('0', '1')))
+     * becomes
+     *  array('zrangebyscore', '-inf', 123, 'limit', '0', '1')
+     *
+     * @param array $in
+     * @return array
+     */
+    private static function _flattenArguments(array $arguments, &$out = array())
+    {
+        foreach ($arguments as $key => $arg) {
+            if (!is_int($key)) {
+                $out[] = $key;
+            }
+            
+            if (is_array($arg)) {
+                self::_flattenArguments($arg, $out);
+            } else {
+                $out[] = $arg;
+            }
+        }
+
+        return $out;
+    }
 }

+ 313 - 163
Qii/Cache/Redis/Cluster.php

@@ -9,182 +9,332 @@ namespace Qii\Cache\Redis;
  * @package Credis
  */
 
-#require_once 'Credis/Client.php';
-
 /**
- * A generalized Credis_Client interface for a cluster of Redis servers
+ * A generalized \Qii\Cache\Redis\Client interface for a cluster of Redis servers
+ *
+ * @deprecated
  */
 class Cluster
 {
+  /**
+   * Collection of \Qii\Cache\Redis\Client objects attached to Redis servers
+   * @var \Qii\Cache\Redis\Client[]
+   */
+  protected $clients;
+  /**
+   * If a server is set as master, all write commands go to that one
+   * @var \Qii\Cache\Redis\Client
+   */
+  protected $masterClient;
+  /**
+   * Aliases of \Qii\Cache\Redis\Client objects attached to Redis servers, used to route commands to specific servers
+   * @see Credis_Cluster::to
+   * @var array
+   */
+  protected $aliases;
+  
+  /**
+   * Hash ring of Redis server nodes
+   * @var array
+   */
+  protected $ring;
+  
+  /**
+   * Individual nodes of pointers to Redis servers on the hash ring
+   * @var array
+   */
+  protected $nodes;
+  
+  /**
+   * The commands that are not subject to hashing
+   * @var array
+   * @access protected
+   */
+  protected $dont_hash;
 
-	/**
-	 * Collection of Credis_Client objects attached to Redis servers
-	 * @var Credis_Client[]
-	 */
-	protected $clients;
-
-	/**
-	 * Aliases of Credis_Client objects attached to Redis servers, used to route commands to specific servers
-	 * @see Credis_Cluster::to
-	 * @var array
-	 */
-	protected $aliases;
-
-	/**
-	 * Hash ring of Redis server nodes
-	 * @var array
-	 */
-	protected $ring;
-
-	/**
-	 * Individual nodes of pointers to Redis servers on the hash ring
-	 * @var array
-	 */
-	protected $nodes;
-
-	/**
-	 * The commands that are not subject to hashing
-	 * @var array
-	 * @access protected
-	 */
-	protected $dont_hash;
+  /**
+   * Currently working cluster-wide database number.
+   * @var int
+   */
+  protected $selectedDb = 0;
 
-	/**
-	 * Creates an interface to a cluster of Redis servers
-	 * Each server should be in the format:
-	 *  array(
-	 *   'host' => hostname,
-	 *   'port' => port,
-	 *   'timeout' => timeout,
-	 *   'alias' => alias
-	 * )
-	 *
-	 * @param array $servers The Redis servers in the cluster.
-	 * @param int $replicas
-	 */
-	public function __construct($servers, $replicas = 128)
-	{
-		$this->clients = array();
-		$this->aliases = array();
-		$this->ring = array();
-		$clientNum = 0;
-		foreach ($servers as $server) {
-			$client = new \Qii\Cache\Redis\Client($server['host'], $server['port'], isset($server['timeout']) ? $server['timeout'] : 2.5, isset($server['persistent']) ? $server['persistent'] : '');
-			$this->clients[] = $client;
-			if (isset($server['alias'])) {
-				$this->aliases[$server['alias']] = $client;
-			}
-			for ($replica = 0; $replica <= $replicas; $replica++) {
-				$this->ring[crc32($server['host'] . ':' . $server['port'] . '-' . $replica)] = $clientNum;
-			}
-			$clientNum++;
-		}
-		ksort($this->ring, SORT_NUMERIC);
-		$this->nodes = array_keys($this->ring);
-		$this->dont_hash = array_flip(array(
-			'RANDOMKEY', 'DBSIZE', 'PIPELINE', 'EXEC',
-			'SELECT', 'MOVE', 'FLUSHDB', 'FLUSHALL',
-			'SAVE', 'BGSAVE', 'LASTSAVE', 'SHUTDOWN',
-			'INFO', 'MONITOR', 'SLAVEOF'
-		));
-	}
+  /**
+   * Creates an interface to a cluster of Redis servers
+   * Each server should be in the format:
+   *  array(
+   *   'host' => hostname,
+   *   'port' => port,
+   *   'db' => db,
+   *   'password' => password,
+   *   'timeout' => timeout,
+   *   'alias' => alias,
+   *   'persistent' => persistence_identifier,
+   *   'master' => master
+   *   'write_only'=> true/false
+   * )
+   *
+   * @param array $servers The Redis servers in the cluster.
+   * @param int $replicas
+   * @param bool $standAlone
+   * @throws CredisException
+   */
+  public function __construct($servers, $replicas = 128, $standAlone = false)
+  {
+    $this->clients = array();
+    $this->masterClient = null;
+    $this->aliases = array();
+    $this->ring = array();
+    $this->replicas = (int)$replicas;
+    $client = null;
+    foreach ($servers as $server)
+    {
+      if(is_array($server)){
+          $client = new \Qii\Cache\Redis\Client(
+            $server['host'],
+            $server['port'],
+            isset($server['timeout']) ? $server['timeout'] : 2.5,
+            isset($server['persistent']) ? $server['persistent'] : '',
+            isset($server['db']) ? $server['db'] : 0,
+            isset($server['password']) ? $server['password'] : null
+          );
+          if (isset($server['alias'])) {
+            $this->aliases[$server['alias']] = $client;
+          }
+          if(isset($server['master']) && $server['master'] === true){
+            $this->masterClient = $client;
+            if(isset($server['write_only']) && $server['write_only'] === true){
+                continue;
+            }
+          }
+      } elseif($server instanceof \Qii\Cache\Redis\Client){
+        $client = $server;
+      } else {
+          throw new CredisException('Server should either be an array or an instance of \Qii\Cache\Redis\Client');
+      }
+      if($standAlone) {
+          $client->forceStandalone();
+      }
+      $this->clients[] = $client;
+      for ($replica = 0; $replica <= $this->replicas; $replica++) {
+          $md5num = hexdec(substr(md5($client->getHost().':'.$client->getPort().'-'.$replica),0,7));
+          $this->ring[$md5num] = count($this->clients)-1;
+      }
+    }
+    ksort($this->ring, SORT_NUMERIC);
+    $this->nodes = array_keys($this->ring);
+    $this->dont_hash = array_flip(array(
+      'RANDOMKEY', 'DBSIZE', 'PIPELINE', 'EXEC',
+      'SELECT',    'MOVE',    'FLUSHDB',  'FLUSHALL',
+      'SAVE',      'BGSAVE',  'LASTSAVE', 'SHUTDOWN',
+      'INFO',      'MONITOR', 'SLAVEOF'
+    ));
+    if($this->masterClient !== null && count($this->clients()) == 0){
+        $this->clients[] = $this->masterClient;
+        for ($replica = 0; $replica <= $this->replicas; $replica++) {
+            $md5num = hexdec(substr(md5($this->masterClient->getHost().':'.$this->masterClient->getHost().'-'.$replica),0,7));
+            $this->ring[$md5num] = count($this->clients)-1;
+        }
+        $this->nodes = array_keys($this->ring);
+    }
+  }
 
-	/**
-	 * Get a client by index or alias.
-	 *
-	 * @param string|int $alias
-	 * @throws CredisException
-	 * @return Credis_Client
-	 */
-	public function client($alias)
-	{
-		if (is_int($alias) && isset($this->clients[$alias])) {
-			return $this->clients[$alias];
-		} else if (isset($this->aliases[$alias])) {
-			return $this->aliases[$alias];
-		}
-		throw new CredisException("Client $alias does not exist.");
-	}
+  /**
+   * @param \Qii\Cache\Redis\Client $masterClient
+   * @param bool $writeOnly
+   * @return Credis_Cluster
+   */
+  public function setMasterClient(\Qii\Cache\Redis\Client $masterClient, $writeOnly=false)
+  {
+    if(!$masterClient instanceof \Qii\Cache\Redis\Client){
+        throw new CredisException('Master client should be an instance of \Qii\Cache\Redis\Client');
+    }
+    $this->masterClient = $masterClient;
+    if (!isset($this->aliases['master'])) {
+        $this->aliases['master'] = $masterClient;
+    }
+    if(!$writeOnly){
+        $this->clients[] = $this->masterClient;
+        for ($replica = 0; $replica <= $this->replicas; $replica++) {
+            $md5num = hexdec(substr(md5($this->masterClient->getHost().':'.$this->masterClient->getHost().'-'.$replica),0,7));
+            $this->ring[$md5num] = count($this->clients)-1;
+        }
+        $this->nodes = array_keys($this->ring);
+    }
+    return $this;
+  }
+  /**
+   * Get a client by index or alias.
+   *
+   * @param string|int $alias
+   * @throws CredisException
+   * @return \Qii\Cache\Redis\Client
+   */
+  public function client($alias)
+  {
+    if (is_int($alias) && isset($this->clients[$alias])) {
+      return $this->clients[$alias];
+    }
+    else if (isset($this->aliases[$alias])) {
+      return $this->aliases[$alias];
+    }
+    throw new CredisException("Client $alias does not exist.");
+  }
 
-	/**
-	 * Get an array of all clients
-	 *
-	 * @return array|Credis_Client[]
-	 */
-	public function clients()
-	{
-		return $this->clients;
-	}
+  /**
+   * Get an array of all clients
+   *
+   * @return array|\Qii\Cache\Redis\Client[]
+   */
+  public function clients()
+  {
+    return $this->clients;
+  }
 
-	/**
-	 * Execute a command on all clients
-	 *
-	 * @return array
-	 */
-	public function all()
-	{
-		$args = func_get_args();
-		$name = array_shift($args);
-		$results = array();
-		foreach ($this->clients as $client) {
-			$results[] = $client->__call($name, $args);
-		}
-		return $results;
-	}
+  /**
+   * Execute a command on all clients
+   *
+   * @return array
+   */
+  public function all()
+  {
+    $args = func_get_args();
+    $name = array_shift($args);
+    $results = array();
+    foreach($this->clients as $client) {
+      $results[] = call_user_func_array([$client, $name], $args);
+    }
+    return $results;
+  }
 
-	/**
-	 * Get the client that the key would hash to.
-	 *
-	 * @param string $key
-	 * @return \Credis_Client
-	 */
-	public function byHash($key)
-	{
-		return $this->clients[$this->hash($key)];
-	}
+  /**
+   * Get the client that the key would hash to.
+   *
+   * @param string $key
+   * @return \\Qii\Cache\Redis\Client
+   */
+  public function byHash($key)
+  {
+    return $this->clients[$this->hash($key)];
+  }
 
-	/**
-	 * Execute a Redis command on the cluster with automatic consistent hashing
-	 *
-	 * @param string $name
-	 * @param array $args
-	 * @return mixed
-	 */
-	public function __call($name, $args)
-	{
-		if (isset($this->dont_hash[strtoupper($name)])) {
-			$client = $this->clients[0];
-		} else {
-			$client = $this->byHash($args[0]);
-		}
+  /**
+   * @param int $index
+   * @return void
+   */
+  public function select($index)
+  {
+      $this->selectedDb = (int) $index;
+  }
 
-		return $client->__call($name, $args);
-	}
+  /**
+   * Execute a Redis command on the cluster with automatic consistent hashing and read/write splitting
+   *
+   * @param string $name
+   * @param array $args
+   * @return mixed
+   */
+  public function __call($name, $args)
+  {
+    if($this->masterClient !== null && !$this->isReadOnlyCommand($name)){
+        $client = $this->masterClient;
+    }elseif (count($this->clients()) == 1 || isset($this->dont_hash[strtoupper($name)]) || !isset($args[0])) {
+      $client = $this->clients[0];
+    }
+    else {
+      $client = $this->byHash($args[0]);
+    }
+    // Ensure that current client is working on the same database as expected.
+    if ($client->getSelectedDb() != $this->selectedDb) {
+      $client->select($this->selectedDb);
+    }
+    return call_user_func_array([$client, $name], $args);
+  }
 
-	/**
-	 * Get client index for a key by searching ring with binary search
-	 *
-	 * @param string $key The key to hash
-	 * @return int The index of the client object associated with the hash of the key
-	 */
-	public function hash($key)
-	{
-		$needle = crc32($key);
-		$server = $min = 0;
-		$max = count($this->nodes) - 1;
-		while ($max >= $min) {
-			$position = (int)(($min + $max) / 2);
-			$server = $this->nodes[$position];
-			if ($needle < $server) {
-				$max = $position - 1;
-			} else if ($needle > $server) {
-				$min = $position + 1;
-			} else {
-				break;
-			}
-		}
-		return $this->ring[$server];
-	}
+  /**
+   * Get client index for a key by searching ring with binary search
+   *
+   * @param string $key The key to hash
+   * @return int The index of the client object associated with the hash of the key
+   */
+  public function hash($key)
+  {
+    $needle = hexdec(substr(md5($key),0,7));
+    $server = $min = 0;
+    $max = count($this->nodes) - 1;
+    while ($max >= $min) {
+      $position = (int) (($min + $max) / 2);
+      $server = $this->nodes[$position];
+      if ($needle < $server) {
+        $max = $position - 1;
+      }
+      else if ($needle > $server) {
+        $min = $position + 1;
+      }
+      else {
+        break;
+      }
+    }
+    return $this->ring[$server];
+  }
 
+  public function isReadOnlyCommand($command)
+  {
+      $readOnlyCommands = array(
+          'DBSIZE',
+          'INFO',
+          'MONITOR',
+          'EXISTS',
+          'TYPE',
+          'KEYS',
+          'SCAN',
+          'RANDOMKEY',
+          'TTL',
+          'GET',
+          'MGET',
+          'SUBSTR',
+          'STRLEN',
+          'GETRANGE',
+          'GETBIT',
+          'LLEN',
+          'LRANGE',
+          'LINDEX',
+          'SCARD',
+          'SISMEMBER',
+          'SINTER',
+          'SUNION',
+          'SDIFF',
+          'SMEMBERS',
+          'SSCAN',
+          'SRANDMEMBER',
+          'ZRANGE',
+          'ZREVRANGE',
+          'ZRANGEBYSCORE',
+          'ZREVRANGEBYSCORE',
+          'ZCARD',
+          'ZSCORE',
+          'ZCOUNT',
+          'ZRANK',
+          'ZREVRANK',
+          'ZSCAN',
+          'HGET',
+          'HMGET',
+          'HEXISTS',
+          'HLEN',
+          'HKEYS',
+          'HVALS',
+          'HGETALL',
+          'HSCAN',
+          'PING',
+          'AUTH',
+          'SELECT',
+          'ECHO',
+          'QUIT',
+          'OBJECT',
+          'BITCOUNT',
+          'TIME',
+          'SORT'
+      );
+      return in_array(strtoupper($command),$readOnlyCommands);
+  }
 }