From e15eceef8ed8df93d11900e26c635ab12fa71f3f Mon Sep 17 00:00:00 2001 From: Jochen Topf Date: Thu, 12 Dec 2024 11:14:52 +0100 Subject: [PATCH] Use protocol level prepare instead of PREPARE sql commands Introduce a new pg_conn_t::prepare() function which is now used in several places instead of pg_conn_t::exec() with a SQL PREPARE command. This does not yet replace all places where PREPARE is used, the rest will come in a later commit. This is to make osm2pgsql work with some connection poolers that have problems with prepared statements. For some background see https://github.com/osm2pgsql-dev/osm2pgsql/discussions/2118 and https://www.crunchydata.com/blog/prepared-statements-in-transaction-mode-for-pgbouncer --- src/expire-output.cpp | 20 +++++++++++--------- src/flex-table.cpp | 14 ++++++-------- src/middle-pgsql.cpp | 9 +++++---- src/pgsql.cpp | 14 ++++++++++++++ src/pgsql.hpp | 20 ++++++++++++++++++++ src/properties.cpp | 6 +++--- src/table.cpp | 5 ++--- 7 files changed, 61 insertions(+), 27 deletions(-) diff --git a/src/expire-output.cpp b/src/expire-output.cpp index 5ec48d9f2..e9b9dbde4 100644 --- a/src/expire-output.cpp +++ b/src/expire-output.cpp @@ -65,17 +65,19 @@ std::size_t expire_output_t::output_tiles_to_table( if (result.num_fields() == 3) { // old format with fields: zoom, x, y - db_connection.exec("PREPARE insert_tiles(int4, int4, int4) AS" - " INSERT INTO {} (zoom, x, y) VALUES ($1, $2, $3)" - " ON CONFLICT DO NOTHING", - qn); + db_connection.prepare("insert_tiles", + "INSERT INTO {} (zoom, x, y)" + " VALUES ($1::int4, $2::int4, $3::int4)" + " ON CONFLICT DO NOTHING", + qn); } else { // new format with fields: zoom, x, y, first, last - db_connection.exec("PREPARE insert_tiles(int4, int4, int4) AS" - " INSERT INTO {} (zoom, x, y) VALUES ($1, $2, $3)" - " ON CONFLICT (zoom, x, y)" - " DO UPDATE SET last = CURRENT_TIMESTAMP(0)", - qn); + db_connection.prepare("insert_tiles", + "INSERT INTO {} (zoom, x, y)" + " VALUES ($1::int4, $2::int4, $3::int4)" + " ON CONFLICT (zoom, x, y)" + " DO UPDATE SET last = CURRENT_TIMESTAMP(0)", + qn); } auto const count = for_each_tile( diff --git a/src/flex-table.cpp b/src/flex-table.cpp index 80a2195b6..23a94b2a9 100644 --- a/src/flex-table.cpp +++ b/src/flex-table.cpp @@ -167,15 +167,12 @@ std::string flex_table_t::build_sql_prepare_get_wkb() const if (has_multicolumn_id_index()) { return fmt::format( - R"(PREPARE get_wkb_{}(char(1), bigint) AS)" - R"( SELECT {} FROM {} WHERE "{}" = $1 AND "{}" = $2)", - m_table_num, columns, full_name(), m_columns[0].name(), - m_columns[1].name()); + R"(SELECT {} FROM {} WHERE "{}" = $1::char(1) AND "{}" = $2::bigint)", + columns, full_name(), m_columns[0].name(), m_columns[1].name()); } - return fmt::format(R"(PREPARE get_wkb_{}(bigint) AS)" - R"( SELECT {} FROM {} WHERE "{}" = $1)", - m_table_num, columns, full_name(), id_column_names()); + return fmt::format(R"(SELECT {} FROM {} WHERE "{}" = $1::bigint)", columns, + full_name(), id_column_names()); } std::string @@ -246,7 +243,8 @@ bool flex_table_t::has_columns_with_expire() const noexcept void flex_table_t::prepare(pg_conn_t const &db_connection) const { if (has_id_column() && has_columns_with_expire()) { - db_connection.exec(build_sql_prepare_get_wkb()); + auto const stmt = fmt::format("get_wkb_{}", m_table_num); + db_connection.prepare(stmt, build_sql_prepare_get_wkb()); } } diff --git a/src/middle-pgsql.cpp b/src/middle-pgsql.cpp index 0f075dde9..24c88dcf1 100644 --- a/src/middle-pgsql.cpp +++ b/src/middle-pgsql.cpp @@ -1105,10 +1105,11 @@ void middle_pgsql_t::update_users_table() log_info("Writing {} entries to table '{}'...", m_users.size(), m_users_table.name()); - m_db_connection.exec("PREPARE insert_user(int8, text) AS" - " INSERT INTO {}.\"{}\" (id, name) VALUES ($1, $2)" - " ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id", - m_users_table.schema(), m_users_table.name()); + m_db_connection.prepare( + "insert_user", + "INSERT INTO {}.\"{}\" (id, name) VALUES ($1::int8, $2::text)" + " ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id", + m_users_table.schema(), m_users_table.name()); for (auto const &[id, name] : m_users) { m_db_connection.exec_prepared("insert_user", id, name); diff --git a/src/pgsql.cpp b/src/pgsql.cpp index 04c5c033d..048dc73ff 100644 --- a/src/pgsql.cpp +++ b/src/pgsql.cpp @@ -194,6 +194,20 @@ void pg_conn_t::copy_end(std::string_view context) const } } +void pg_conn_t::prepare_internal(std::string_view stmt, + std::string_view sql) const +{ + if (get_logger().log_sql()) { + log_sql("(C{}) PREPARE {} AS {}", m_connection_id, stmt, sql); + } + + pg_result_t const res{ + PQprepare(m_conn.get(), stmt.data(), sql.data(), 0, nullptr)}; + if (res.status() != PGRES_COMMAND_OK) { + throw fmt_error("Prepare failed for '{}': {}.", sql, error_msg()); + } +} + pg_result_t pg_conn_t::exec_prepared_internal(char const *stmt, int num_params, char const *const *param_values, int *param_lengths, diff --git a/src/pgsql.hpp b/src/pgsql.hpp index 9712e7a4d..8279e42a9 100644 --- a/src/pgsql.hpp +++ b/src/pgsql.hpp @@ -179,6 +179,24 @@ class pg_conn_t return exec(fmt::format(sql, std::forward(params)...)); } + /** + * Prepare SQL query. + * + * \param stmt Name of the prepared query. + * \param sql SQL query. + * \param params Any number of arguments for the fmt lib. + * \throws std::runtime_exception If the command failed (didn't return + * status code PGRES_COMMAND_OK). + */ + template + void prepare(std::string_view stmt, fmt::format_string sql, + TArgs... params) const + { + std::string const query = + fmt::format(sql, std::forward(params)...); + prepare_internal(stmt, query); + } + /** * Run the named prepared SQL statement and return the results in text * format. @@ -228,6 +246,8 @@ class pg_conn_t void close(); private: + void prepare_internal(std::string_view stmt, std::string_view sql) const; + pg_result_t exec_prepared_internal(char const *stmt, int num_params, char const *const *param_values, int *param_lengths, int *param_formats, diff --git a/src/properties.cpp b/src/properties.cpp index a2e454d94..602817ab2 100644 --- a/src/properties.cpp +++ b/src/properties.cpp @@ -121,9 +121,9 @@ void properties_t::store() pg_conn_t const db_connection{m_connection_params, "prop.store"}; - db_connection.exec( - "PREPARE set_property(text, text) AS" - " INSERT INTO {} (property, value) VALUES ($1, $2)" + db_connection.prepare( + "set_property", + "INSERT INTO {} (property, value) VALUES ($1::text, $2::text)" " ON CONFLICT (property) DO UPDATE SET value = EXCLUDED.value", table); diff --git a/src/table.cpp b/src/table.cpp index 9d707497f..9c41b3eb2 100644 --- a/src/table.cpp +++ b/src/table.cpp @@ -143,9 +143,8 @@ void table_t::prepare() { //let postgres cache this query as it will presumably happen a lot auto const qual_name = qualified_name(m_target->schema(), m_target->name()); - m_db_connection->exec("PREPARE get_wkb(int8) AS" - " SELECT way FROM {} WHERE osm_id = $1", - qual_name); + m_db_connection->prepare( + "get_wkb", "SELECT way FROM {} WHERE osm_id = $1::int8", qual_name); } void table_t::generate_copy_column_list()