上面我们大概知道了redis的事情模式,为了更好的认知它,我就开始思考如何自己去连接做事端呢?我想到利用Yii2时,用到redis我是没有安装官方供应的redis扩展,但是它仍旧可以与redis做事端通信,于是乎便去追踪了Yii2-redis组件的源代码,看完往后,深感作者的强大。
准备熟习redis协议:http://redis.cn/topics/protocolRedis在TCP端口6379上监听到来的连接,客户端连接到来时,Redis做事器为此创建一个TCP连接。在客户端与做事器端之间传输的每个Redis命令或者数据都以\r\n结尾新的统一协议已在Redis 1.2中引入,但是在Redis 2.0中,这就成为了与Redis做事器通讯的标准办法。代码实现
<?php/ Class SocketException /class SocketException extends \Exception{ / @var array / public $errorInfo = []; / @return string / public function getName() { return 39;Redis Socket Exception'; } / SocketException constructor. @param string $message @param array $errorInfo @param int $code @param Throwable|null $previous / public function __construct($message = "", $errorInfo = [], $code = 0, Throwable $previous = null) { $this->errorInfo = $errorInfo; parent::__construct($message, $code, $previous); } / @return string / public function __toString() { return parent::__toString() . PHP_EOL . 'Additional Information:' . PHP_EOL . print_r($this->errorInfo, true);; // TODO: Change the autogenerated stub }}/ Class RedisConnection /class RedisConnection{ / @var string / public $host = 'localhost'; / @var int / public $port = 6379; / @var null / public $password = null; / @var int / public $database = 0; / @var / public $connectionTimeout; / @var null / public $dataTimeout = null; / STREAM_CLIENT_ASYNC_CONNECT指示应打开每个后续连接,而不必等待上一个连接的完成 @var int / public $socketClientFlags = STREAM_CLIENT_CONNECT; / @var array / protected $pools = []; / @var int / protected $maxPoolSize = 10; / RedisConnection constructor. @param string $host @param int $port @param null $password / public function __construct($host = 'localhost', $port = 6379, $password = null) { $this->host = $host; $this->port = $port; $this->password = $password; } / @return string / public function getConnectionString() { return 'tcp://' . $this->host . ':' . $this->port; } / @param int $database / public function connect($database = 0) { $this->database = $database; $countSize = count($this->pools); if ($countSize > $this->maxPoolSize) { return; } if ($this->getSocket() !== false) { return; } $connId = $this->getConnectionString(); $connection = $connId . ', database=' . $this->database; try { $socket = stream_socket_client( $connId, $errorNumber, $errorDescription, $this->connectionTimeout ?? ini_get('default_socket_timeout'), $this->socketClientFlags ); if ($socket) { $this->pools[$connId] = $socket; if ($this->dataTimeout !== null) { $timeout = (int)$this->dataTimeout; $microTimeout = (int)(($this->dataTimeout - $timeout) 1000000); stream_set_timeout($socket, $timeout, $microTimeout); } if ($this->password !== null) { $this->exec('AUTH', [$this->password]); } if ($this->database !== null) { $this->exec('SELECT', [$this->database]); } } else { $message = "无法打开redis数据库连接 ($connection): $errorNumber - $errorDescription"; throw new Exception($message, $errorDescription, $errorNumber); } } catch (Exception $e) { exit($e->getMessage()); } } / 用单行回答,回答的第一个字节将是“+” 缺点,回答的第一个字节将是“-” 整型数字,回答的第一个字节将是“:” 批量回答,回答的第一个字节将是“$” 多个批量回答,回答的第一个字节将是“” 命令LRNGE须要返回多个值 LRANGE mylist 0 3 无值:0 有值: 3 $1 c $1 b $1 a sadd mylist a b c d :4 @link http://redis.cn/topics/protocol @param $cmd @param array $params / public function exec($cmd, $params = []) { //状态:-ERR Client sent AUTH, but no password is set //状态:+OK //get: $15\r\nlemon1024026382\r\n //del: :1 $params = array_merge(explode(' ', $cmd), $params); $command = ''; $paramsCount = 0; foreach ($params as $param) { if ($param === null) { continue; } $command .= '$' . mb_strlen($param, '8bit') . "\r\n" . $param . "\r\n"; $paramsCount++; } $command = '' . $paramsCount . "\r\n" . $command;// echo 'Executing Redis Command:', $cmd, PHP_EOL;// echo 'Yuan Shi Redis Cmd:', $command, PHP_EOL; return $this->send($command, $params); } / @param $cmd @param $params @return null @throws SocketException / private function send($cmd, $params) { $socket = $this->getSocket(); $written = fwrite($socket, $cmd); if ($written === false) { throw new SocketException("无法写入到socket.\nRedis命令是: " . $cmd); } return $this->parseData($params, $cmd); } / 用单行回答,回答的第一个字节将是“+” 缺点,回答的第一个字节将是“-” 整型数字,回答的第一个字节将是“:” 批量回答,回答的第一个字节将是“$” 多个批量回答,回答的第一个字节将是“” @link http://redis.cn/topics/protocol \r\n作为分割符号,利用fgets按行读取 如果是状态返回的命令或者缺点返回或者返回整数这些相对大略处理 批量返回时,如果是号,第一步先读取返回的计数器,比如keys 会返回有多少个key值,循环计数器 ,读取第二行便是$数字(批量回答),例如$9表示当前key的value值长度 代码里有 $length = intval($line)+2; +2实在是表示\r\n这个分隔符 @param array $params @throws SocketException @return null|string|mixed / private function parseData($params) { $socket = $this->getSocket(); $prettyCmd = implode(' ', $params); if (($line = fgets($socket)) === false) {// 从文件指针中读取一行,这里最得当,由于协议分割标识是\r\n throw new SocketException("无法从socket读取.\nRedis命令是: " . $prettyCmd); } echo '做事端相应数据:', $line, PHP_EOL; $type = $line[0]; $line = mb_substr($line, 1, -2, '8bit'); if ('+' === $type) { // eg:SET ping select quit auth... if (in_array($line, ['Ok', 'PONG'])) { return true; } else { return $line; } } if ('-' === $type) { throw new SocketException("Redis 缺点: " . $line . "\nRedis命令是: " . $prettyCmd); } // eg: hset zadd del if (':' === $type) { return $line; } / 例如: 输入存在的key:get jie 返回:$9\r\nyangjiecheng\r\n 输入:keys 返回: 9\r\n$9\r\nchat.user\r\n$3\r\njie\r\n$4\r\ntest\r\n 输入不存在的key: get 111 返回:$-1 对应redis返回:nil / if ('$' === $type) { if (-1 == $line) { return null; } $length = intval($line) + 2; //+2 表示后面的\r\n $data = ''; while ($length > 0) { $str = fread($socket, $length); if ($str === false) { throw new SocketException("无法从socket读取.\nRedis命令是: " . $prettyCmd); } $data .= $str; $length -= mb_strlen($str, '8bit');//此函数可以指定字符串编码并打算长度,8bit便是按照字节打算长度 } return !empty($data) ? mb_substr($data, 0, -2, '8bit') : ''; } if ('' === $type) { $count = intval($line); $data = []; for ($i = 0; $i < $count; $i++) { $data[] = $this->parseData($params); } return $data; } throw new SocketException('从Redis读取到未能解析的数据: ' . $line . "\nRedis命令是: " . $prettyCmd); } / @return bool|mixed / public function getSocket() { $id = $this->getConnectionString(); return isset($this->pools[$id]) ? $this->pools[$id] : false; } / / public function close() { $connectionString = $this->getConnectionString(); foreach ($this->pools as $socket) { $connection = $connectionString . ', database=' . $this->database; echo 'Closing DB connection: ' . $connection . PHP_EOL; try { $this->exec('QUIT'); } catch (SocketException $e) { // ignore errors when quitting a closed connection } fclose($socket); } $this->pools = []; } / / public function __destruct() { // TODO: Implement __destruct() method. $this->close(); }}
测试代码
/ @param $file @param $content /function writeLog($file, $content){ $str = '@@@@@@@@@@ Time Is ' . date('Y-m-d H:i:s') . ' @@@@@@@@@' . PHP_EOL; $str .= $content . PHP_EOL; $str .= '@@@@@@@@@@ End Block Log @@@@@@@@' . PHP_EOL; file_put_contents($file, $str, FILE_APPEND);}set_exception_handler(/ @param Exception $exception / function (\Exception $exception) { echoMsg($exception->getMessage());});/ @param $msg @param string $type /function echoMsg($msg, $type = 'error'){ if ($type === 'error') { $msg = "\e[" . "1;37m" . $msg . "\e[0m"; } else { $msg = "\e[" . "0;31m" . $msg . "\e[0m"; } echo "\033[" . "41m", "Exception: ", $msg, "\033[0m", PHP_EOL;}set_error_handler(/ @param $errno @param $errstr @param $errfile @param $errline / function ($errno, $errstr, $errfile, $errline){ echoMsg("custom error:[$errno] $errstr"); echoMsg(" Error on line $errline in $errfile");});//register_shutdown_function$redis = new RedisConnection();$redis->connect();echo $redis->getConnectionString(), PHP_EOL;//$redis->exec('SET', ['RedisClient', 'OK']);//$redis->exec('DEL', ['RedisClient']);//$redis->exec('KEYS', ['']);var_dump($redis->exec('GET', ['access_token']));//var_dump($redis->exec('LRANGE', ['qlist', 0, 3]));