array('db_name' => 'bda', 'table_name' => 'sess', 'index_name' => 'PRIMARY', 'fields' => array('data')), protected $r_indexes; protected $w_indexes; function __construct(array $params) { $this->host = $params['host']; $this->r_port = $params['r_port']; $this->w_port = $params['w_port']; $this->persistent = $params['persistent']; $this->conn_timeout = $params['conn_timeout']; $this->r_conn_timeout = $params['r_conn_timeout']; $this->w_conn_timeout = $params['w_conn_timeout']; $this->r_indexes = $params['r_indexes']; $this->w_indexes = $params['w_indexes']; } /** * Connect to R or W HS socket * * @param const $mode * @throws ConnectionException */ function connect($mode) { $conn = $mode == self::R_MODE? $this->r_conn: $this->w_conn; if ($conn) return true; $uri = sprintf('tcp://%s:%d/', $this->host, $mode == self::R_MODE? $this->r_port: $this->w_port); $connectFlags = STREAM_CLIENT_CONNECT; if ($this->persistent) $connectFlags |= STREAM_CLIENT_PERSISTENT; $conn = @stream_socket_client($uri, $errno, $errstr, $this->conn_timeout, $connectFlags); if (!$conn) throw new ConnectionException(trim($errstr), $errno); $io_timeout = $mode == self::R_MODE? $this->r_conn_timeout: $this->w_conn_timeout; $timeoutSeconds = floor($io_timeout); $timeoutUSeconds = ($io_timeout - $timeoutSeconds) * 1000000; stream_set_timeout($conn, $timeoutSeconds, $timeoutUSeconds); if ($mode == self::R_MODE) $this->r_conn = $conn; else $this->w_conn = $conn; } /** * Closes connection */ function close($mode = false) { if ($mode == self::R_MODE || $mode === false) { stream_socket_shutdown($this->r_conn, STREAM_SHUT_RDWR); @fclose($this->r_conn); $this->r_conn = null; } if ($mode == self::W_MODE || $mode === false) { stream_socket_shutdown($this->w_conn, STREAM_SHUT_RDWR); @fclose($this->w_conn); $this->w_conn = null; } } /** * Encode string for HS protocol * * @param $str */ static function encode($data) { /* * - Characters in the range [0x10 - 0xff] are not encoded. * - A character in the range [0x00 - 0x0f] is prefixed by 0x01 and * shifted by 0x40. For example, 0x03 is encoded as 0x01 0x43. * - NULL is expressed as a single NUL(0x00). */ if ($data === null) return 0x00; return preg_replace('/[\x00-\x0f]/es', 'chr(0x01).chr(ord("$0")+0x40)', $data); } /** * Decode string for HS protocol * @param $str */ static function decode($data) { if ($data === 0x00) return null; return preg_replace('/[\x01]([\x40-\x4f])/es', 'chr(ord("$1")-0x40)', $data); } /** * Open index * * @param string $mode * @param int $index_num * @throws IndexNotFoundException, CommunicationException */ function open_index($mode, $index_num) { $index_params = $this->get_index_params($mode, $index_num); $cmd_open = array('P', $index_num, $index_params['db_name'], $index_params['table_name'], $index_params['index_name'], implode(',', $index_params['fields'])); //open index if (!$this->send($mode, $cmd_open)) { $this->close($mode); throw new CommunicationException('Error sending cmd', 1); } $r = $this->receive($mode); if ($r != array('0', '1')) { $this->close($mode); throw new CommunicationException("Failed opening index $index_num".(isset($r[2])? ", err: {$r[2]}": ''), 3); } } /** * Returns index params * * @throws IndexNotFoundException * @param string $mode * @param int $index_num * @return array */ protected function get_index_params($mode, $index_num) { $indexes = $mode == self::R_MODE ? $this->r_indexes : $this->w_indexes; if (empty($indexes[$index_num])) throw new IndexNotFoundException("Index not found: $index_num", 5); $index_params = $indexes[$index_num]; return $index_params; } /** * Get entry * * @param string|array $id * @param int $index_num * @throws CommunicationException */ function select($id, $index_num) { #connect r $this->connect(self::R_MODE); #non-persistent conn-s: surely open index if (!$this->persistent) $this->open_index(self::R_MODE, $index_num); $id = (array) $id; $cmd_get = array_merge(array($index_num, '=', sizeof($id)), $id); //get key $try = 0; while ($try <= 1) { //second try when opening index #try get if (!$this->send(self::R_MODE, $cmd_get)) { $this->close(self::R_MODE); throw new CommunicationException('Error sending cmd', 1); } $r = $this->receive(self::R_MODE); if ($r == array('2', '1', 'stmtnum')) { //index not opened #open index & +1 attempt $this->open_index(self::R_MODE, $index_num); $try++; continue; } if ($r[0] != 0 || sizeof($r) < 2) { $this->close(self::R_MODE); throw new CommunicationException('Failed getting values from DB'.(isset($r[2])? ", err: {$r[2]}": ''), 4); } return array_slice($r, 2, $r[1]); } return false; //impossible } /** * Insert entry * * @param string|array $id * @param string|array $values * @param int $index_num * @throws KeyAlreadyExistsException, CommunicationException */ function insert($id, $values, $index_num, $autoinc_mode=false) { #connect w $this->connect(self::W_MODE); #non-persistent conn-s: surely open index if (!$this->persistent) $this->open_index(self::W_MODE, $index_num); if ($autoinc_mode) $cmd_insrt = array_merge(array($index_num, '+', sizeof($values)), (array) $values); //insert without key else { $id = (array) $id; $cmd_insrt = array_merge(array($index_num, '+', sizeof($id)+sizeof($values)), $id, (array) $values); //insert key } $try = 0; while ($try <= 1) { //second try when opening index #try upd if (!$this->send(self::W_MODE, $cmd_insrt)) { $this->close(self::W_MODE); throw new CommunicationException('Error sending cmd', 1); } $r = $this->receive(self::W_MODE); if ($r == array('2', '1', 'stmtnum')) { //index not opened #open index & +1 attempt $this->open_index(self::W_MODE, $index_num); $try++; continue; } if (array_slice($r, 0, 2) == array('1', '1')) throw new KeyAlreadyExistsException('Failed inserting values: key exists'.(isset($r[2])? ", err: {$r[2]}": ''), 5); if (array_slice($r, 0, 2) != array('0', '1')) { // "0 1 {last_insert_id}" for autoincrement $this->close(self::W_MODE); throw new CommunicationException('Failed inserting values in DB ('.var_export($r ,1).')'.(isset($r[2])? ", err: {$r[2]}": ''), 4); } if (sizeof($r) == 3) return $r[2]; //last_insert_id return true; } return false; //impossible } /** * Update entry * * @param string|array $id * @param string|array $values * @param int $index_num * @return boolean|string true on success, false if no key found * @throws CommunicationException */ function update($id, $values, $index_num) { #connect w $this->connect(self::W_MODE); #non-persistent conn-s: surely open index if (!$this->persistent) $this->open_index(self::W_MODE, $index_num); $id = (array) $id; $cmd_upd = array_merge(array($index_num, '=', sizeof($id)), $id, array(1, 0, 'U'), (array) $values); //update key $try = 0; while ($try <= 1) { //second try when opening index #try upd if (!$this->send(self::W_MODE, $cmd_upd)) { $this->close(self::W_MODE); throw new CommunicationException('Error sending cmd', 1); } $r = $this->receive(self::W_MODE); if ($r == array('2', '1', 'stmtnum')) { //index not opened #open index & +1 attempt $this->open_index(self::W_MODE, $index_num); $try++; continue; } if ($r[0] != 0 || sizeof($r) < 3) { $this->close(self::W_MODE); throw new CommunicationException('Failed updating values in DB'.(isset($r[2])? ", err: {$r[2]}": ''), 4); } return (bool) $r[2]; } return false; //impossible } /** * Delete entry * * @param int|string $id * @param string|array $values * @param int $index_num * @return boolean|string true on success, false if no key found * @throws CommunicationException */ function delete($id, $index_num) { #connect w $this->connect(self::W_MODE); #non-persistent conn-s: surely open index if (!$this->persistent) $this->open_index(self::W_MODE, $index_num); $cmd_upd = array($index_num, '=', 1, $id, 1, 0, 'D'); //delete key $try = 0; while ($try <= 1) { //second try when opening index #try upd if (!$this->send(self::W_MODE, $cmd_upd)) { $this->close(self::W_MODE); throw new CommunicationException('Error sending cmd', 1); } $r = $this->receive(self::W_MODE); if ($r == array('2', '1', 'stmtnum')) { //index not opened #open index & +1 attempt $this->open_index(self::W_MODE, $index_num); $try++; continue; } if ($r[0] != 0 || sizeof($r) < 3) { $this->close(self::W_MODE); throw new CommunicationException('Failed deleting values from DB'.(isset($r[2])? ", err: {$r[2]}": ''), 4); } return (bool) $r[2]; } return false; //impossible } /** * Send cmd * * @param const $mode * @param array $cmd * @return bool */ protected function send($mode, array $cmd) { $conn = $mode == self::R_MODE? $this->r_conn: $this->w_conn; foreach ($cmd as &$i) $i = self::encode($i); $data = implode("\t", $cmd)."\n"; //error_log(($mode == self::R_MODE? 'R':'W')."=> $data\n", 3, '/tmp/hs.log'); try { while (($len = mb_orig_strlen($data)) > 0) { $wrtn = fwrite($conn, $data); if ($len === $wrtn) return true; if ($wrtn === false || $wrtn === 0) throw new CommunicationException('Socket timed out or got an error', 6); $data = mb_orig_substr($data, $wrtn); } } catch (\Exception $e) { $this->close($mode); throw $e; //rethrow } return true; } /** * Receive cmd * * @param const $mode * @throws CommunicationException */ protected function receive($mode) { $conn = $mode == self::R_MODE? $this->r_conn: $this->w_conn; try { $str = fgets($conn); //error_log(($mode == self::R_MODE? 'R':'W')."<= $str\n", 3, '/tmp/hs.log'); if ($str === false) throw new CommunicationException('Socket timed out or got an error', 6); if (mb_orig_substr($str, -1) !== "\n") throw new CommunicationException('Received malformed response: '.rawurlencode($str), 2); } catch (\Exception $e) { $this->close($mode); throw $e; //rethrow } return explode("\t", mb_orig_substr($str, 0, -1)); } } /** * Client for BDA session DBs * Lazy generates index config from template */ class SessionClient extends Client { /** * Returns index params * * @throws IndexNotFoundException * @param string $mode * @param int $index_num * @return array */ protected function get_index_params($mode, $index_num) { #under 10000 -> original scheme if ($index_num < 10000) return parent::get_index_params($mode, $index_num); $indexes = $mode == self::R_MODE ? $this->r_indexes : $this->w_indexes; if (empty($indexes['session_template'])) throw new IndexNotFoundException("Index not found: $index_num", 5); #10000—19999 -> R, 20000—29999 -> W $num = $index_num - ($mode == self::R_MODE ? 10000: 20000); if ($num > 9999) throw new IndexNotFoundException("Index not found: $index_num", 5); $db_num = floor($num / 100); $tbl_num = floor($num % 100); $index_params = $indexes['session_template']; $index_params['db_name'] = sprintf($index_params['db_name'], $db_num); $index_params['table_name'] = sprintf($index_params['table_name'], $tbl_num); return $index_params; } }