. ** ** dbi.class - UltraFlexible Database Interface 3000 */ class it_dbi_postgres extends it_dbi { static $_global_key = 'it_dbi_postgres'; # Override base class to get our own singleton function _where($params) { if ($params['LIMIT'] && ($m = it::match('^\s*(\d+)\s*,\s*(\d+)\s*$', $params['LIMIT']))) { unset($params['LIMIT']); $params[] = " OFFSET $m[0] LIMIT $m[1]"; } return parent::_where($params); } function replace($tags = []) { foreach (array_keys($this->_fields) as $k) { $escaped = $this->escape_name($k); $strings[] = $escaped . '= EXCLUDED.' . $escaped; } $this->insert(array_merge($tags, [' ON CONFLICT (' . $this->escape_name($this->_p['keyfield']) . ') DO UPDATE SET ' . implode(', ', $strings)])); } function _tables($p = array()) { for ($qr = $this->query('SELECT table_name FROM information_schema.tables ' . $this->_where(['table_catalog' => $this->_p['db'], 'table_schema' => 'public'], $p), $p); $row = $this->_fetch_assoc($qr);) $result[] = $row['table_name']; return (array)$result; } static function _convertbool($v) { return $v == 't' ? 1 : 0; } function _get_field_defs() { list($table_name, $table_schema) = array_reverse(explode('.', $this->_p['table'], 2)); $where = $this->_where(['t.table_name' => $table_name, 't.table_catalog' => $this->_p['db']] + ($table_schema ? ['t.table_schema' => $table_schema] : [])); # Recreate Key column of mysql show columns $res = $this->query('SELECT column_name,constraint_type,ordinal_position FROM information_schema.table_constraints AS t JOIN information_schema.key_column_usage USING (constraint_name, constraint_schema, constraint_catalog) ' . $where); while ($res && ($row = $this->_fetch_assoc($res))) { if ($row['constraint_type'] == 'PRIMARY KEY') $keys[$row['column_name']] = 'PRI'; else if ($row['ordinal_position'] == 1) $keys[$row['column_name']] = $row['constraint_type'] == 'UNIQUE' ? 'UNI' : 'MUL'; } $res = $this->query('SELECT * FROM information_schema.columns AS t ' . $where); while ($res && ($field = $this->_fetch_assoc($res))) { $result[$field['column_name']] = ['Field' => $field['column_name'], 'Type' => $field['data_type'], 'Extra' => it::match('^nextval\(', $field['column_default']) ? 'auto_increment' : '', 'Key' => $keys[$field['column_name']]]; if ($field['data_type'] == 'boolean') $result[$field['column_name']] += ['_convertfunc' => 'it_dbi_postgres::_convertbool', '_escapefunc' => 'it_dbi::escape_bool']; } return $result; } function _escape_string($str) { return pg_escape_literal($this->_link, $str); } function _escape_name($str) { return pg_escape_identifier($this->_link, $str); } function _connect_db($p) { $result = @pg_connect("host=$p[server] user=$p[user] dbname=$p[db] password=$p[pw]", PGSQL_CONNECT_FORCE_NEW); if ($result) { # set charset used for this connection if ($p['charset']) if (!pg_set_client_encoding($result, $p['charset'])) $this->_fatal("_connect(): can't set charset \"{$p['charset']}\""); } return [$result, $result ? '' : 'Could not connect']; } function __query($query, $p) { if ($p['timeout'] || $this->_p['interruptible_queries']) { $starttime = microtime(true); pg_send_query($this->_link, $query); while (pg_connection_busy($this->_link)) { if ($p['timeout'] && (microtime(true) - $starttime) > $p['timeout']) return false; $read = $error = [pg_socket($this->_link)]; $write = []; stream_select($read, $write, $error, 1, 0); } while ($newresult = @pg_get_result($this->_link)) $result = $newresult; if ($result && pg_result_error($result)) $result = false; return $result; } else { return @pg_query($this->_link, $query); } } function _query($query, $p) { if ($this->_p['keyfield'] && it::match('^INSERT ', $query)) { $isinsert = true; $query .= ' RETURNING ' . $this->_escape_name($this->_p['keyfield']); } if (!($result = $this->__query($query, $p)) && $p['safety']) { if (($p['safety'] < 2) && it::match('duplicate key value', pg_last_error($this->_link))) # Duplicate entry return null; /* TODO if ($errno == 2006) # mysql server has gone away: retry { it::log('sqllog', "it_dbi(): reconnecting mysqli_connect {$p['server']}, {$p['db']}"); $this->_connect(array('reconnect' => true)); $result = $this->__query($query, $p); } */ } if ($result) { $this->_affectedrows = pg_affected_rows($result); $this->_insertid = $isinsert ? $this->_fetch_assoc($result)[$this->_p['keyfield']] : 0; /* TODO probably PGSQl_NOTICE_ALL if (($warning = $this->_link->get_warnings())) { do { if (!it::match(trim($this->_p['ignored_warnings'] . "|1364|1261|1051|1062", "|"), $warning->errno)) $messages[] = $warning->message . " [error $warning->errno]"; } while ($warning->next() && ++$checked < 20); if ($messages) it::error(['title' => "Mysql warning: " . $messages[0], 'body' => "$query\n\n" . implode("\n", $messages) . "\n"]); } */ } return $result; } function _json_extract($col, $field) { return "$col->>" . $this->escape_string($field); } function _json_object($tags) { foreach ((array)$tags as $f => $v) $strings[] = $this->escape_string($f) . ', ' . $v; return "JSONB_BUILD_OBJECT(" . implode(', ', $strings) . ")"; } function _json_set($source, $tags) { return "$source || " . $this->_json_object($tags); } function _json_remove($source, $fields) { foreach ((array)$fields as $f) $strings[] = $this->escape_string($f); return "($source - " . implode(" - ", $strings) . ")"; } function _fetch_assoc($res) { return pg_fetch_assoc($res); } function _num_rows($res) { return pg_num_rows($res); } function _seek($res, $offset) { pg_result_seek($res, $offset); } function _error($text) { $text = get_class($this).'::'.$text; if ($this->_link && ($errstr = pg_last_error($this->_link))) $text = "\"$errstr\" in $text"; return $text; } }