'12345' (flag=0) * 12.345 => '12.345' (flag=0) * true => '1' (flag=0) * false => '' (flag=0) * null => 'N;' (serialized, flag=1) * '12345' => '12345' (flag=0) * 'abc' => 'abc' (flag=0) * array() => 'a:0:{}' (serialized, flag=1) * array(1, 2, 3) => 'a:3:{i:0;i:1;i:1;i:2;i:2;i:3;}' (serialized, flag=1) * array('b'=>1, 'a'=>2, 'c'=>3) => 'a:3:{s:1:"a";i:1;s:1:"b";i:2;s:1:"c";i:3;}' (serialized, flag=1) */ namespace PHPMemcache; class MCException extends \Exception {} class MCBadParamError extends MCException {} class MCConnectionException extends MCException {} class MCCommunicationException extends MCException {} class MCUnknownCommandError extends MCException {} class MCClientError extends MCException {} class MCServerError extends MCException {} class Client { protected $conn; //params protected $host; protected $port; protected $persistent; protected $io_timeout; protected $use_json; function __construct(array $params) { $this->host = $params['host']; $this->port = (int) $params['port']; $this->persistent = isset($params['persistent'])? (bool) $params['persistent']: false; $this->conn_timeout = isset($params['timeout'])? (int) $params['timeout']: 1; $this->io_timeout = isset($params['io_timeout'])? (int) $params['io_timeout']: 1; $this->use_json = isset($params['json_mode'])? (bool) $params['json_mode']: false; if ($this->use_json && !function_exists('json_encode')) dl('json.so'); } /** * Connect to server * * @throws MCConnectionException */ function connect() { if ($this->conn) return; $uri = sprintf('tcp://%s:%d/', $this->host, $this->port); $connectFlags = STREAM_CLIENT_CONNECT; if ($this->persistent) $connectFlags |= STREAM_CLIENT_PERSISTENT; $conn = stream_socket_client($uri, $errno, $errstr, $this->conn_timeout, $connectFlags); if (!$conn) throw new MCConnectionException(trim($errstr), $errno); $io_timeout = $this->io_timeout; $timeoutSeconds = floor($io_timeout); $timeoutUSeconds = ($io_timeout - $timeoutSeconds) * 1000000; stream_set_timeout($conn, $timeoutSeconds, $timeoutUSeconds); $this->conn = $conn; } /** * Closes connection */ function close() { if ($this->conn === null) return; stream_socket_shutdown($this->conn, STREAM_SHUT_RDWR); @fclose($this->conn); $this->conn = null; } /** * Encode value to flag & data block * * @throws MCBadParamError * @param $value * @return array */ function encode($value) { if (is_string($value)) return array(0, $value); if (is_bool($value) || is_numeric($value)) return array(0, (string) $value); if (is_array($value) || $value === null) return array(1, $this->use_json? json_encode($value, JSON_NO_UNICODE): serialize($value)); throw new MCBadParamError('Values of type '.gettype($value).' are not supported'); } /** * Decode data block * * @throws MCCommunicationException * @param int $flags * @param string $value * @return mixed */ function decode($flags, $value) { if ($flags == 0) return $value; if ($flags == 1) return $this->use_json? json_decode($value, true): unserialize($value); throw new MCCommunicationException("Unknown flags: $flags"); } /** * Get entry * * @param string $key * @return mixed|bool|null */ function get($key) { $cmd = array('get', $key); try { #connect $this->connect(); #try get if (!$this->send($cmd)) { $this->close(); throw new MCCommunicationException('Error sending cmd'); } $r = $this->receive(); if ($r == 'END') //no such key return false; $r = explode(' ', $r); if ($r[0] != 'VALUE') { $this->close(); throw new MCCommunicationException("Failed getting key $key"); } #receive value list(, $read_key, $flags, $bytes) = $r; $data_block = $this->receiveDataBlock($bytes); #get end line $r = $this->receive(); if ($r != 'END') { $this->close(); throw new MCCommunicationException('No END command received'); } if ($read_key != $key) throw new MCCommunicationException("Read wrong key: $read_key instead of $key"); $data = $this->decode($flags, $data_block); return $data; } catch (MCException $e) { $this->close(); trigger_error($e->getMessage()." (cmd:get, key:$key, srv:{$this->host}:{$this->port})", E_USER_WARNING); return null; } } /** * Set entry * * @param string $key * @param mixed $value * @param int $exptime * @return bool|null */ function set($key, $value, $exptime=0) { if (!preg_match('/[a-z0-9_\-]+/i', $key)) throw new MCBadParamError("Bad key: $key"); try { list($flags, $data) = $this->encode($value); $cmd = array('set', (string) $key, $flags, (int) $exptime, mb_orig_strlen($data)); #connect $this->connect(); #try upd if (!$this->send($cmd, $data)) { $this->close(); throw new MCCommunicationException('Error sending cmd'); } $r = $this->receive(); if ($r !== 'STORED') { $this->close(); throw new MCCommunicationException("Failed setting key $key: ".mb_orig_substr($r, 0, 100)); } return true; } catch (MCException $e) { $this->close(); trigger_error($e->getMessage()." (cmd:get, key:$key, srv:{$this->host}:{$this->port})", E_USER_WARNING); return null; } } /** * Send cmd * * @param array $cmd * @param string $data_block * @return bool */ protected function send(array $cmd, $data_block='') { $data = implode(' ', $cmd)."\r\n"; if ($data_block !== '') $data .= $data_block."\r\n"; //error_log("=> $data\n", 3, '/tmp/mc.log'); //echo "=> $data\n"; while (($len = mb_orig_strlen($data)) > 0) { $wrtn = fwrite($this->conn, $data); if ($len === $wrtn) return true; if ($wrtn === false || $wrtn === 0) throw new CommunicationException('Socket timed out or got an error'); $data = mb_orig_substr($data, $wrtn); } } /** * Receive text line * * @throws \Exception|MCClientError|MCCommunicationException|MCServerError|MCUnknownCommandError * @return array|string */ protected function receive() { $str = fgets($this->conn); //stream_get_line is sooooooooo sloooooow //error_log("<= $str\n", 3, '/tmp/mc.log'); //echo "<= $str\n"; if ($str === false) throw new MCCommunicationException('Socket timed out or got an error'); if (mb_orig_substr($str, -2) !== "\r\n") throw new MCCommunicationException('Received non-string response: '.bin2hex(mb_orig_substr($str, 0, 100))); $str = mb_orig_substr($str, 0, -2); if (in_array($str, array('STORED', 'NOT_STORED', 'EXISTS', 'NOT_FOUND', 'END'))) return $str; if ($str == 'ERROR') throw new MCUnknownCommandError('Server reported command as unknown'); if (mb_orig_substr($str, 0, 12) == 'CLIENT_ERROR') throw new MCClientError(mb_orig_substr($str, 13)); if (mb_orig_substr($str, 0, 12) == 'SERVER_ERROR') throw new MCServerError(mb_orig_substr($str, 13)); if (mb_orig_substr($str, 0, 5) == 'VALUE') return $str; throw new MCUnknownCommandError('Unknown command received'); } /** * Receive binary data block * * @throws MCCommunicationException * @param int $block_len * @return string */ protected function receiveDataBlock($block_len) { #read data block + 2 bytes $buffer = ''; $bytes_to_read = $block_len + 2; do { $tmp = fread($this->conn, $bytes_to_read); //stream_socket_recvfrom just don't work if ($tmp === false) //failure throw new MCCommunicationException('Socket timed out or got an error'); $buffer .= $tmp; $bytes_to_read -= mb_orig_strlen($tmp); } while ($bytes_to_read > 0); //also limited by io_timeout if ($bytes_to_read < 0) throw new MCCommunicationException('Read more than needed'); if (mb_orig_substr($buffer, -2) !== "\r\n") throw new MCCommunicationException('Malformed data block'); //error_log("