From 8570d2d761e445cba1ff4b18c72bcec4be7278fa Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Fri, 6 Sep 2024 02:41:51 -0400 Subject: [PATCH 01/18] Get rid of :do_execute, since it now has an equivalent signature to :execute. --- .../clickhouse/schema_statements.rb | 26 ++++++++----------- .../connection_adapters/clickhouse_adapter.rb | 14 +++++----- lib/clickhouse-activerecord/tasks.rb | 4 +-- .../1_create_some_function.rb | 2 +- spec/single/model_spec.rb | 8 +++--- 5 files changed, 25 insertions(+), 29 deletions(-) diff --git a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb index 9dfe973..a639e1f 100644 --- a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb +++ b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb @@ -10,18 +10,21 @@ module SchemaStatements DB_EXCEPTION_REGEXP = /\ACode:\s+\d+\.\s+DB::Exception:/.freeze - def execute(sql, name = nil, settings: {}) - do_execute(sql, name, settings: settings) + def execute(sql, name = nil, format: DEFAULT_RESPONSE_FORMAT, settings: {}) + log(sql, "#{adapter_name} #{name}") do + res = request(sql, format, settings) + process_response(res, format, sql) + end end def exec_insert(sql, name, _binds, _pk = nil, _sequence_name = nil, returning: nil) new_sql = sql.dup.sub(/ (DEFAULT )?VALUES/, " VALUES") - do_execute(new_sql, name, format: nil) + execute(new_sql, name, format: nil) true end def internal_exec_query(sql, name = nil, binds = [], prepare: false, async: false, allow_retry: false) - result = do_execute(sql, name) + result = execute(sql, name) columns = result['meta'].map { |m| m['name'] } types = {} result['meta'].each_with_index do |m, i| @@ -37,13 +40,13 @@ def internal_exec_query(sql, name = nil, binds = [], prepare: false, async: fals end def exec_insert_all(sql, name) - do_execute(sql, name, format: nil) + execute(sql, name, format: nil) true end # @link https://clickhouse.com/docs/en/sql-reference/statements/alter/update def exec_update(_sql, _name = nil, _binds = []) - do_execute(_sql, _name, format: nil) + execute(_sql, _name, format: nil) 0 end @@ -85,7 +88,7 @@ def functions end def show_create_function(function) - do_execute("SELECT create_query FROM system.functions WHERE origin = 'SQLUserDefined' AND name = '#{function}'", format: nil) + execute("SELECT create_query FROM system.functions WHERE origin = 'SQLUserDefined' AND name = '#{function}'", format: nil) end def table_options(table) @@ -117,13 +120,6 @@ def do_system_execute(sql, name = nil) end end - def do_execute(sql, name = nil, format: DEFAULT_RESPONSE_FORMAT, settings: {}) - log(sql, "#{adapter_name} #{name}") do - res = request(sql, format, settings) - process_response(res, format, sql) - end - end - if ::ActiveRecord::version >= Gem::Version.new('7.2') def schema_migration pool.schema_migration @@ -154,7 +150,7 @@ def assume_migrated_upto_version(version, migrations_paths = nil) if (duplicate = inserting.detect { |v| inserting.count(v) > 1 }) raise "Duplicate migration #{duplicate}. Please renumber your migrations to resolve the conflict." end - do_execute(insert_versions_sql(inserting), nil, format: nil, settings: {max_partitions_per_insert_block: [100, inserting.size].max}) + execute(insert_versions_sql(inserting), nil, format: nil, settings: {max_partitions_per_insert_block: [100, inserting.size].max}) end end diff --git a/lib/active_record/connection_adapters/clickhouse_adapter.rb b/lib/active_record/connection_adapters/clickhouse_adapter.rb index fb73993..9c32b37 100644 --- a/lib/active_record/connection_adapters/clickhouse_adapter.rb +++ b/lib/active_record/connection_adapters/clickhouse_adapter.rb @@ -324,7 +324,7 @@ def create_view(table_name, request_settings: {}, **options) drop_table(table_name, options.merge(if_exists: true)) end - do_execute(schema_creation.accept(td), format: nil, settings: request_settings) + execute(schema_creation.accept(td), format: nil) end def create_table(table_name, request_settings: {}, **options, &block) @@ -341,7 +341,7 @@ def create_table(table_name, request_settings: {}, **options, &block) drop_table(table_name, options.merge(if_exists: true)) end - do_execute(schema_creation.accept(td), format: nil, settings: request_settings) + execute(schema_creation.accept(td), format: nil) if options[:with_distributed] distributed_table_name = options.delete(:with_distributed) @@ -356,7 +356,7 @@ def create_table(table_name, request_settings: {}, **options, &block) def create_function(name, body, **options) fd = "CREATE#{' OR REPLACE' if options[:force]} FUNCTION #{apply_cluster(quote_table_name(name))} AS #{body}" - do_execute(fd, format: nil) + execute(fd, format: nil) end # Drops a ClickHouse database. @@ -375,7 +375,7 @@ def drop_functions end def rename_table(table_name, new_name) - do_execute apply_cluster "RENAME TABLE #{quote_table_name(table_name)} TO #{quote_table_name(new_name)}" + execute apply_cluster "RENAME TABLE #{quote_table_name(table_name)} TO #{quote_table_name(new_name)}" end def drop_table(table_name, options = {}) # :nodoc: @@ -385,7 +385,7 @@ def drop_table(table_name, options = {}) # :nodoc: query = apply_cluster(query) query = "#{query} SYNC" if options[:sync] - do_execute(query) + execute(query) if options[:with_distributed] distributed_table_name = options.delete(:with_distributed) @@ -400,7 +400,7 @@ def drop_function(name, options = {}) query = apply_cluster(query) query = "#{query} SYNC" if options[:sync] - do_execute(query, format: nil) + execute(query, format: nil) end def add_column(table_name, column_name, type, **options) @@ -418,7 +418,7 @@ def remove_column(table_name, column_name, type = nil, **options) end def change_column(table_name, column_name, type, **options) - result = do_execute("ALTER TABLE #{quote_table_name(table_name)} #{change_column_for_alter(table_name, column_name, type, **options)}", nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1}) + result = execute("ALTER TABLE #{quote_table_name(table_name)} #{change_column_for_alter(table_name, column_name, type, **options)}", nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1}) raise "Error parse json response: #{result}" if result.presence && !result.is_a?(Hash) end diff --git a/lib/clickhouse-activerecord/tasks.rb b/lib/clickhouse-activerecord/tasks.rb index 7440a17..55c5794 100644 --- a/lib/clickhouse-activerecord/tasks.rb +++ b/lib/clickhouse-activerecord/tasks.rb @@ -66,9 +66,9 @@ def structure_load(*args) if sql.gsub(/[a-z]/i, '').blank? next elsif sql =~ /^INSERT INTO/ - connection.do_execute(sql, nil, format: nil) + connection.execute(sql, nil, format: nil) elsif sql =~ /^CREATE .*?FUNCTION/ - connection.do_execute(sql, nil, format: nil) + connection.execute(sql, nil, format: nil) else connection.execute(sql) end diff --git a/spec/fixtures/migrations/plain_function_creation/1_create_some_function.rb b/spec/fixtures/migrations/plain_function_creation/1_create_some_function.rb index ede6a3e..26ea757 100644 --- a/spec/fixtures/migrations/plain_function_creation/1_create_some_function.rb +++ b/spec/fixtures/migrations/plain_function_creation/1_create_some_function.rb @@ -10,6 +10,6 @@ def up sql = <<~SQL CREATE FUNCTION addFun AS (x,y) -> x + y SQL - do_execute(sql, format: nil) + execute(sql, format: nil) end end diff --git a/spec/single/model_spec.rb b/spec/single/model_spec.rb index d32b332..bc6543d 100644 --- a/spec/single/model_spec.rb +++ b/spec/single/model_spec.rb @@ -36,16 +36,16 @@ class ModelPk < ActiveRecord::Base expect(Model.first.event_name).to eq('DB::Exception') end - describe '#do_execute' do + describe '#execute' do it 'returns formatted result' do - result = Model.connection.do_execute('SELECT 1 AS t') + result = Model.connection.execute('SELECT 1 AS t') expect(result['data']).to eq([[1]]) expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) end context 'with JSONCompact format' do it 'returns formatted result' do - result = Model.connection.do_execute('SELECT 1 AS t', format: 'JSONCompact') + result = Model.connection.execute('SELECT 1 AS t', format: 'JSONCompact') expect(result['data']).to eq([[1]]) expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) end @@ -53,7 +53,7 @@ class ModelPk < ActiveRecord::Base context 'with JSONCompactEachRowWithNamesAndTypes format' do it 'returns formatted result' do - result = Model.connection.do_execute('SELECT 1 AS t', format: 'JSONCompactEachRowWithNamesAndTypes') + result = Model.connection.execute('SELECT 1 AS t', format: 'JSONCompactEachRowWithNamesAndTypes') expect(result['data']).to eq([[1]]) expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) end From 10a902c34aa163e2e8531a7bedc547b9ddefb5fa Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Fri, 6 Sep 2024 02:56:00 -0400 Subject: [PATCH 02/18] Automagically infer `FORMAT` instead of explicitly passing it as an argument to `execute`. --- .../clickhouse/schema_statements.rb | 66 +++++++++---------- .../connection_adapters/clickhouse_adapter.rb | 9 +-- .../1_create_some_function.rb | 2 +- spec/single/model_spec.rb | 16 ----- 4 files changed, 39 insertions(+), 54 deletions(-) diff --git a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb index a639e1f..d8d610b 100644 --- a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb +++ b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb @@ -6,20 +6,18 @@ module ActiveRecord module ConnectionAdapters module Clickhouse module SchemaStatements - DEFAULT_RESPONSE_FORMAT = 'JSONCompactEachRowWithNamesAndTypes'.freeze - DB_EXCEPTION_REGEXP = /\ACode:\s+\d+\.\s+DB::Exception:/.freeze - def execute(sql, name = nil, format: DEFAULT_RESPONSE_FORMAT, settings: {}) + def execute(sql, name = nil, settings: {}) log(sql, "#{adapter_name} #{name}") do - res = request(sql, format, settings) - process_response(res, format, sql) + res = request(sql, settings) + process_response(res, sql) end end def exec_insert(sql, name, _binds, _pk = nil, _sequence_name = nil, returning: nil) - new_sql = sql.dup.sub(/ (DEFAULT )?VALUES/, " VALUES") - execute(new_sql, name, format: nil) + new_sql = sql.sub(/ (DEFAULT )?VALUES/, " VALUES") + execute(new_sql, name) true end @@ -40,20 +38,20 @@ def internal_exec_query(sql, name = nil, binds = [], prepare: false, async: fals end def exec_insert_all(sql, name) - execute(sql, name, format: nil) + execute(sql, name) true end # @link https://clickhouse.com/docs/en/sql-reference/statements/alter/update - def exec_update(_sql, _name = nil, _binds = []) - execute(_sql, _name, format: nil) + def exec_update(sql, name = nil, _binds = []) + execute(sql, name) 0 end # @link https://clickhouse.com/docs/en/sql-reference/statements/delete - def exec_delete(_sql, _name = nil, _binds = []) - log(_sql, "#{adapter_name} #{_name}") do - res = request(_sql) + def exec_delete(sql, name = nil, _binds = []) + log(sql, "#{adapter_name} #{name}") do + res = request(sql) begin data = JSON.parse(res.header['x-clickhouse-summary']) data['result_rows'].to_i @@ -88,7 +86,7 @@ def functions end def show_create_function(function) - execute("SELECT create_query FROM system.functions WHERE origin = 'SQLUserDefined' AND name = '#{function}'", format: nil) + execute("SELECT create_query FROM system.functions WHERE origin = 'SQLUserDefined' AND name = '#{function}'") end def table_options(table) @@ -115,8 +113,8 @@ def data_sources def do_system_execute(sql, name = nil) log_with_debug(sql, "#{adapter_name} #{name}") do - res = request(sql, DEFAULT_RESPONSE_FORMAT) - process_response(res, DEFAULT_RESPONSE_FORMAT, sql) + res = request(sql) + process_response(res, sql) end end @@ -150,7 +148,7 @@ def assume_migrated_upto_version(version, migrations_paths = nil) if (duplicate = inserting.detect { |v| inserting.count(v) > 1 }) raise "Duplicate migration #{duplicate}. Please renumber your migrations to resolve the conflict." end - execute(insert_versions_sql(inserting), nil, format: nil, settings: {max_partitions_per_insert_block: [100, inserting.size].max}) + execute(insert_versions_sql(inserting), nil, settings: {max_partitions_per_insert_block: [100, inserting.size].max}) end end @@ -168,11 +166,10 @@ def with_yaml_fallback(value) # :nodoc: # Make HTTP request to ClickHouse server # @param [String] sql - # @param [String, nil] format # @param [Hash] settings # @return [Net::HTTPResponse] - def request(sql, format = nil, settings = {}) - formatted_sql = apply_format(sql, format) + def request(sql, settings = {}) + formatted_sql = apply_format(sql) request_params = @connection_config || {} @connection.post("/?#{request_params.merge(settings).to_param}", formatted_sql, { 'User-Agent' => "Clickhouse ActiveRecord #{ClickhouseActiverecord::VERSION}", @@ -180,11 +177,21 @@ def request(sql, format = nil, settings = {}) }) end - def apply_format(sql, format) - format ? "#{sql} FORMAT #{format}" : sql + def apply_format(sql) + return sql unless formattable?(sql) + + "#{sql} FORMAT #{ClickhouseAdapter::DEFAULT_RESPONSE_FORMAT}" + end + + def formattable?(sql) + !for_insert?(sql) end - def process_response(res, format, sql = nil) + def for_insert?(sql) + /^insert into/i.match?(sql) + end + + def process_response(res, sql = nil) case res.code.to_i when 200 body = res.body @@ -192,7 +199,7 @@ def process_response(res, format, sql = nil) if body.include?("DB::Exception") && body.match?(DB_EXCEPTION_REGEXP) raise ActiveRecord::ActiveRecordError, "Response code: #{res.code}:\n#{res.body}#{sql ? "\nQuery: #{sql}" : ''}" else - format_body_response(res.body, format) + format_body_response(res.body) end else case res.body @@ -263,17 +270,10 @@ def has_default_function?(default) # :nodoc: (%r{\w+\(.*\)} === default) end - def format_body_response(body, format) + def format_body_response(body) return body if body.blank? - case format - when 'JSONCompact' - format_from_json_compact(body) - when 'JSONCompactEachRowWithNamesAndTypes' - format_from_json_compact_each_row_with_names_and_types(body) - else - body - end + format_from_json_compact_each_row_with_names_and_types(body) end def format_from_json_compact(body) diff --git a/lib/active_record/connection_adapters/clickhouse_adapter.rb b/lib/active_record/connection_adapters/clickhouse_adapter.rb index 9c32b37..497fdb1 100644 --- a/lib/active_record/connection_adapters/clickhouse_adapter.rb +++ b/lib/active_record/connection_adapters/clickhouse_adapter.rb @@ -82,6 +82,7 @@ class ClickhouseAdapter < AbstractAdapter include Clickhouse::Quoting ADAPTER_NAME = 'Clickhouse'.freeze + DEFAULT_RESPONSE_FORMAT = 'JSONCompactEachRowWithNamesAndTypes'.freeze NATIVE_DATABASE_TYPES = { string: { name: 'String' }, integer: { name: 'UInt32' }, @@ -324,7 +325,7 @@ def create_view(table_name, request_settings: {}, **options) drop_table(table_name, options.merge(if_exists: true)) end - execute(schema_creation.accept(td), format: nil) + execute(schema_creation.accept(td)) end def create_table(table_name, request_settings: {}, **options, &block) @@ -341,7 +342,7 @@ def create_table(table_name, request_settings: {}, **options, &block) drop_table(table_name, options.merge(if_exists: true)) end - execute(schema_creation.accept(td), format: nil) + execute(schema_creation.accept(td)) if options[:with_distributed] distributed_table_name = options.delete(:with_distributed) @@ -356,7 +357,7 @@ def create_table(table_name, request_settings: {}, **options, &block) def create_function(name, body, **options) fd = "CREATE#{' OR REPLACE' if options[:force]} FUNCTION #{apply_cluster(quote_table_name(name))} AS #{body}" - execute(fd, format: nil) + execute(fd) end # Drops a ClickHouse database. @@ -400,7 +401,7 @@ def drop_function(name, options = {}) query = apply_cluster(query) query = "#{query} SYNC" if options[:sync] - execute(query, format: nil) + execute(query) end def add_column(table_name, column_name, type, **options) diff --git a/spec/fixtures/migrations/plain_function_creation/1_create_some_function.rb b/spec/fixtures/migrations/plain_function_creation/1_create_some_function.rb index 26ea757..bd5c268 100644 --- a/spec/fixtures/migrations/plain_function_creation/1_create_some_function.rb +++ b/spec/fixtures/migrations/plain_function_creation/1_create_some_function.rb @@ -10,6 +10,6 @@ def up sql = <<~SQL CREATE FUNCTION addFun AS (x,y) -> x + y SQL - execute(sql, format: nil) + execute(sql) end end diff --git a/spec/single/model_spec.rb b/spec/single/model_spec.rb index bc6543d..219f6a4 100644 --- a/spec/single/model_spec.rb +++ b/spec/single/model_spec.rb @@ -42,22 +42,6 @@ class ModelPk < ActiveRecord::Base expect(result['data']).to eq([[1]]) expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) end - - context 'with JSONCompact format' do - it 'returns formatted result' do - result = Model.connection.execute('SELECT 1 AS t', format: 'JSONCompact') - expect(result['data']).to eq([[1]]) - expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) - end - end - - context 'with JSONCompactEachRowWithNamesAndTypes format' do - it 'returns formatted result' do - result = Model.connection.execute('SELECT 1 AS t', format: 'JSONCompactEachRowWithNamesAndTypes') - expect(result['data']).to eq([[1]]) - expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) - end - end end describe '#create' do From 8722d05e4c99b7178dfdf4c5ed8c4ab01beca344 Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Fri, 6 Sep 2024 03:12:37 -0400 Subject: [PATCH 03/18] Extract all format handling logic into FormatManager object. --- .../clickhouse/format_manager.rb | 46 +++++++++++++++++++ .../clickhouse/schema_statements.rb | 12 +---- .../connection_adapters/clickhouse_adapter.rb | 1 + 3 files changed, 48 insertions(+), 11 deletions(-) create mode 100644 lib/active_record/connection_adapters/clickhouse/format_manager.rb diff --git a/lib/active_record/connection_adapters/clickhouse/format_manager.rb b/lib/active_record/connection_adapters/clickhouse/format_manager.rb new file mode 100644 index 0000000..0d76b4b --- /dev/null +++ b/lib/active_record/connection_adapters/clickhouse/format_manager.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module ActiveRecord + module ConnectionAdapters + module Clickhouse + class FormatManager + def initialize(sql) + @sql = sql + end + + def apply + return @sql if skip_format? + + "#{@sql} FORMAT #{ClickhouseAdapter::DEFAULT_RESPONSE_FORMAT}" + end + + def skip_format? + for_insert? || system_command? || schema_command? || format_specified? || delete? + end + + private + + def for_insert? + /^insert into/i.match?(@sql) + end + + def system_command? + /^system|^optimize/i.match?(@sql) + end + + def schema_command? + /^create|^alter|^drop|^rename/i.match?(@sql) + end + + def format_specified? + /format [a-z]+\z/i.match?(@sql) + end + + def delete? + /^delete from/i.match?(@sql) + end + + end + end + end +end diff --git a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb index d8d610b..be8601a 100644 --- a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb +++ b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb @@ -178,17 +178,7 @@ def request(sql, settings = {}) end def apply_format(sql) - return sql unless formattable?(sql) - - "#{sql} FORMAT #{ClickhouseAdapter::DEFAULT_RESPONSE_FORMAT}" - end - - def formattable?(sql) - !for_insert?(sql) - end - - def for_insert?(sql) - /^insert into/i.match?(sql) + FormatManager.new(sql).apply end def process_response(res, sql = nil) diff --git a/lib/active_record/connection_adapters/clickhouse_adapter.rb b/lib/active_record/connection_adapters/clickhouse_adapter.rb index 497fdb1..0d69236 100644 --- a/lib/active_record/connection_adapters/clickhouse_adapter.rb +++ b/lib/active_record/connection_adapters/clickhouse_adapter.rb @@ -13,6 +13,7 @@ require 'active_record/connection_adapters/clickhouse/oid/map' require 'active_record/connection_adapters/clickhouse/oid/uuid' require 'active_record/connection_adapters/clickhouse/column' +require 'active_record/connection_adapters/clickhouse/format_manager' require 'active_record/connection_adapters/clickhouse/quoting' require 'active_record/connection_adapters/clickhouse/schema_creation' require 'active_record/connection_adapters/clickhouse/schema_statements' From b04ca64d50156fb5167a05d859552634ba1e6205 Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Fri, 6 Sep 2024 03:24:43 -0400 Subject: [PATCH 04/18] Use a transient settings block to avoid copypasta of underlying abstract ActiveRecord code. --- .../clickhouse/schema_statements.rb | 17 +++++++++++++++++ .../connection_adapters/clickhouse_adapter.rb | 10 ++-------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb index be8601a..ce597e5 100644 --- a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb +++ b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb @@ -8,6 +8,15 @@ module Clickhouse module SchemaStatements DB_EXCEPTION_REGEXP = /\ACode:\s+\d+\.\s+DB::Exception:/.freeze + def with_settings(**settings) + @block_settings ||= {} + prev_settings = @block_settings + @block_settings.merge! settings + yield + ensure + @block_settings = prev_settings + end + def execute(sql, name = nil, settings: {}) log(sql, "#{adapter_name} #{name}") do res = request(sql, settings) @@ -290,6 +299,14 @@ def format_from_json_compact_each_row_with_names_and_types(body) def parse_json_payload(payload) JSON.parse(payload, decimal_class: BigDecimal) end + + def settings_params(settings = {}) + request_params = @config || {} + block_settings = @block_settings || {} + request_params.merge(block_settings) + .merge(settings) + .to_param + end end end end diff --git a/lib/active_record/connection_adapters/clickhouse_adapter.rb b/lib/active_record/connection_adapters/clickhouse_adapter.rb index 0d69236..50c42dc 100644 --- a/lib/active_record/connection_adapters/clickhouse_adapter.rb +++ b/lib/active_record/connection_adapters/clickhouse_adapter.rb @@ -406,17 +406,11 @@ def drop_function(name, options = {}) end def add_column(table_name, column_name, type, **options) - return if options[:if_not_exists] == true && column_exists?(table_name, column_name, type) - - at = create_alter_table table_name - at.add_column(column_name, type, **options) - execute(schema_creation.accept(at), nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1}) + with_settings(wait_end_of_query: 1, send_progress_in_http_headers: 1) { super } end def remove_column(table_name, column_name, type = nil, **options) - return if options[:if_exists] == true && !column_exists?(table_name, column_name) - - execute("ALTER TABLE #{quote_table_name(table_name)} #{remove_column_for_alter(table_name, column_name, type, **options)}", nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1}) + with_settings(wait_end_of_query: 1, send_progress_in_http_headers: 1) { super } end def change_column(table_name, column_name, type, **options) From 236575b287f9e304058c68cb670c2cc9ca2ea183 Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Fri, 6 Sep 2024 04:00:46 -0400 Subject: [PATCH 05/18] Encapsulate format and SQL within a Statement object. --- .../clickhouse/format_manager.rb | 6 +- .../clickhouse/response_processor.rb | 94 ++++++++++++ .../clickhouse/schema_statements.rb | 136 ++++++------------ .../clickhouse/statement.rb | 30 ++++ .../connection_adapters/clickhouse_adapter.rb | 15 +- spec/single/model_spec.rb | 30 ++++ 6 files changed, 208 insertions(+), 103 deletions(-) create mode 100644 lib/active_record/connection_adapters/clickhouse/response_processor.rb create mode 100644 lib/active_record/connection_adapters/clickhouse/statement.rb diff --git a/lib/active_record/connection_adapters/clickhouse/format_manager.rb b/lib/active_record/connection_adapters/clickhouse/format_manager.rb index 0d76b4b..a875a8a 100644 --- a/lib/active_record/connection_adapters/clickhouse/format_manager.rb +++ b/lib/active_record/connection_adapters/clickhouse/format_manager.rb @@ -4,14 +4,16 @@ module ActiveRecord module ConnectionAdapters module Clickhouse class FormatManager - def initialize(sql) + + def initialize(sql, format:) @sql = sql + @format = format end def apply return @sql if skip_format? - "#{@sql} FORMAT #{ClickhouseAdapter::DEFAULT_RESPONSE_FORMAT}" + "#{@sql} FORMAT #{@format}" end def skip_format? diff --git a/lib/active_record/connection_adapters/clickhouse/response_processor.rb b/lib/active_record/connection_adapters/clickhouse/response_processor.rb new file mode 100644 index 0000000..fb65899 --- /dev/null +++ b/lib/active_record/connection_adapters/clickhouse/response_processor.rb @@ -0,0 +1,94 @@ +# frozen_string_literal: true + +module ActiveRecord + module ConnectionAdapters + module Clickhouse + class ResponseProcessor + + def initialize(raw_response, format) + @raw_response = raw_response + @format = format + end + + def process + if success? + process_successful_response + else + raise_database_error! + end + rescue JSON::ParserError + @raw_response.body + end + + private + + def success? + @raw_response.code.to_i == 200 + end + + def process_successful_response + raise_generic! if @raw_response.body.to_s.include?('DB::Exception') + + format_body_response + end + + def raise_generic! + raise ActiveRecord::ActiveRecordError, "Response code: #{@raw_response.code}:\n#{@raw_response.body}" + end + + def format_body_response + body = @raw_response.body + return body if body.blank? + + case @format + when 'JSONCompact' + format_from_json_compact(body) + when 'JSONCompactEachRowWithNamesAndTypes' + format_from_json_compact_each_row_with_names_and_types(body) + else + body + end + rescue JSON::ParserError + @raw_response.body + end + + def format_from_json_compact(body) + parse_json_payload(body) + end + + def format_from_json_compact_each_row_with_names_and_types(body) + rows = body.split("\n").map { |row| parse_json_payload(row) } + names, types, *data = rows + + meta = names.zip(types).map do |name, type| + { + 'name' => name, + 'type' => type + } + end + + { + 'meta' => meta, + 'data' => data + } + end + + def parse_json_payload(payload) + JSON.parse(payload, decimal_class: BigDecimal) + end + + def raise_database_error! + case @raw_response.body + when /DB::Exception:.*\(UNKNOWN_DATABASE\)/ + raise ActiveRecord::NoDatabaseError + when /DB::Exception:.*\(DATABASE_ALREADY_EXISTS\)/ + raise ActiveRecord::DatabaseAlreadyExists + else + raise_generic! + end + end + + end + end + end +end diff --git a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb index ce597e5..750ef5d 100644 --- a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb +++ b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb @@ -17,10 +17,17 @@ def with_settings(**settings) @block_settings = prev_settings end + def with_response_format(format) + prev_format = @response_format + @response_format = format + yield + ensure + @response_format = prev_format + end + def execute(sql, name = nil, settings: {}) - log(sql, "#{adapter_name} #{name}") do - res = request(sql, settings) - process_response(res, sql) + log(sql, [adapter_name, name].compact.join(' ')) do + raw_execute(sql, settings: settings) end end @@ -60,11 +67,12 @@ def exec_update(sql, name = nil, _binds = []) # @link https://clickhouse.com/docs/en/sql-reference/statements/delete def exec_delete(sql, name = nil, _binds = []) log(sql, "#{adapter_name} #{name}") do - res = request(sql) + statement = Statement.new(sql) + res = request(statement) begin data = JSON.parse(res.header['x-clickhouse-summary']) data['result_rows'].to_i - rescue JSONError + rescue JSON::ParserError 0 end end @@ -120,10 +128,9 @@ def data_sources tables end - def do_system_execute(sql, name = nil) - log_with_debug(sql, "#{adapter_name} #{name}") do - res = request(sql) - process_response(res, sql) + def do_system_execute(sql, name = nil, except_params: []) + log_with_debug(sql, [adapter_name, name].compact.join(' ')) do + raw_execute(sql, except_params: except_params) end end @@ -171,53 +178,19 @@ def with_yaml_fallback(value) # :nodoc: end end - private - - # Make HTTP request to ClickHouse server - # @param [String] sql - # @param [Hash] settings - # @return [Net::HTTPResponse] - def request(sql, settings = {}) - formatted_sql = apply_format(sql) - request_params = @connection_config || {} - @connection.post("/?#{request_params.merge(settings).to_param}", formatted_sql, { - 'User-Agent' => "Clickhouse ActiveRecord #{ClickhouseActiverecord::VERSION}", - 'Content-Type' => 'application/x-www-form-urlencoded', - }) - end + protected - def apply_format(sql) - FormatManager.new(sql).apply - end + def table_structure(table_name) + result = do_system_execute("DESCRIBE TABLE `#{table_name}`", table_name) + data = result['data'] - def process_response(res, sql = nil) - case res.code.to_i - when 200 - body = res.body + return data unless data.empty? - if body.include?("DB::Exception") && body.match?(DB_EXCEPTION_REGEXP) - raise ActiveRecord::ActiveRecordError, "Response code: #{res.code}:\n#{res.body}#{sql ? "\nQuery: #{sql}" : ''}" - else - format_body_response(res.body) - end - else - case res.body - when /DB::Exception:.*\(UNKNOWN_DATABASE\)/ - raise ActiveRecord::NoDatabaseError - when /DB::Exception:.*\(DATABASE_ALREADY_EXISTS\)/ - raise ActiveRecord::DatabaseAlreadyExists - else - raise ActiveRecord::ActiveRecordError, "Response code: #{res.code}:\n#{res.body}" - end - end - rescue JSON::ParserError - res.body + raise ActiveRecord::StatementInvalid, "Could not find table '#{table_name}'" end + alias column_definitions table_structure - def log_with_debug(sql, name = nil) - return yield unless @debug - log(sql, "#{name} (system)") { yield } - end + private def schema_creation Clickhouse::SchemaCreation.new(self) @@ -236,20 +209,6 @@ def new_column_from_field(table_name, field, _definitions) Clickhouse::Column.new(field[0], default_value, type_metadata, field[1].include?('Nullable'), default_function, codec: field[5].presence) end - protected - - def table_structure(table_name) - result = do_system_execute("DESCRIBE TABLE `#{table_name}`", table_name) - data = result['data'] - - return data unless data.empty? - - raise ActiveRecord::StatementInvalid, "Could not find table '#{table_name}'" - end - alias column_definitions table_structure - - private - # Extracts the value from a PostgreSQL column default definition. def extract_value_from_default(default_expression, default_type) return nil if default_type != 'DEFAULT' || default_expression.blank? @@ -269,42 +228,35 @@ def has_default_function?(default) # :nodoc: (%r{\w+\(.*\)} === default) end - def format_body_response(body) - return body if body.blank? - - format_from_json_compact_each_row_with_names_and_types(body) - end - - def format_from_json_compact(body) - parse_json_payload(body) + def raw_execute(sql, settings: {}, except_params: []) + statement = Statement.new(sql) + statement.response = request(statement, settings: settings, except_params: except_params) + statement.processed_response end - def format_from_json_compact_each_row_with_names_and_types(body) - rows = body.split("\n").map { |row| parse_json_payload(row) } - names, types, *data = rows - - meta = names.zip(types).map do |name, type| - { - 'name' => name, - 'type' => type - } - end - - { - 'meta' => meta, - 'data' => data - } + # Make HTTP request to ClickHouse server + # @param [ActiveRecord::ConnectionAdapters::Clickhouse::Statement] statement + # @param [Hash] settings + # @param [Array] except_params + # @return [Net::HTTPResponse] + def request(statement, settings: {}, except_params: []) + @connection.post("/?#{settings_params(settings, except: except_params)}", + statement.formatted_sql, + 'Content-Type' => 'application/x-www-form-urlencoded', + 'User-Agent' => ClickhouseAdapter::USER_AGENT) end - def parse_json_payload(payload) - JSON.parse(payload, decimal_class: BigDecimal) + def log_with_debug(sql, name = nil) + return yield unless @debug + log(sql, "#{name} (system)") { yield } end - def settings_params(settings = {}) - request_params = @config || {} + def settings_params(settings = {}, except: []) + request_params = @connection_config || {} block_settings = @block_settings || {} request_params.merge(block_settings) .merge(settings) + .except(*except) .to_param end end diff --git a/lib/active_record/connection_adapters/clickhouse/statement.rb b/lib/active_record/connection_adapters/clickhouse/statement.rb new file mode 100644 index 0000000..d5d8a18 --- /dev/null +++ b/lib/active_record/connection_adapters/clickhouse/statement.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +require 'active_record/connection_adapters/clickhouse/response_processor' +require 'active_record/connection_adapters/clickhouse/format_manager' + +module ActiveRecord + module ConnectionAdapters + module Clickhouse + class Statement + + attr_reader :format + attr_writer :response + + def initialize(sql, format: nil) + @sql = sql + @format = format || ClickhouseAdapter::DEFAULT_RESPONSE_FORMAT + end + + def formatted_sql + @formatted_sql ||= FormatManager.new(@sql, format: @format).apply + end + + def processed_response + ResponseProcessor.new(@response, @format).process + end + + end + end + end +end diff --git a/lib/active_record/connection_adapters/clickhouse_adapter.rb b/lib/active_record/connection_adapters/clickhouse_adapter.rb index 50c42dc..4a12a88 100644 --- a/lib/active_record/connection_adapters/clickhouse_adapter.rb +++ b/lib/active_record/connection_adapters/clickhouse_adapter.rb @@ -17,6 +17,7 @@ require 'active_record/connection_adapters/clickhouse/quoting' require 'active_record/connection_adapters/clickhouse/schema_creation' require 'active_record/connection_adapters/clickhouse/schema_statements' +require 'active_record/connection_adapters/clickhouse/statement' require 'active_record/connection_adapters/clickhouse/table_definition' require 'net/http' require 'openssl' @@ -84,6 +85,7 @@ class ClickhouseAdapter < AbstractAdapter ADAPTER_NAME = 'Clickhouse'.freeze DEFAULT_RESPONSE_FORMAT = 'JSONCompactEachRowWithNamesAndTypes'.freeze + USER_AGENT = "ClickHouse ActiveRecord #{ClickhouseActiverecord::VERSION}" NATIVE_DATABASE_TYPES = { string: { name: 'String' }, integer: { name: 'UInt32' }, @@ -139,6 +141,7 @@ def initialize(config_or_deprecated_connection, deprecated_logger = nil, depreca @connection_config = { user: @config[:username], password: @config[:password], database: @config[:database] }.compact @debug = @config[:debug] || false + @response_format = @config[:format] || DEFAULT_RESPONSE_FORMAT @prepared_statements = false @@ -147,7 +150,7 @@ def initialize(config_or_deprecated_connection, deprecated_logger = nil, depreca # Return ClickHouse server version def server_version - @server_version ||= do_system_execute('SELECT version()')['data'][0][0] + @server_version ||= select_value('SELECT version()') end # Savepoints are not supported, noop @@ -310,10 +313,7 @@ def show_create_table(table) # Create a new ClickHouse database. def create_database(name) sql = apply_cluster "CREATE DATABASE #{quote_table_name(name)}" - log_with_debug(sql, adapter_name) do - res = @connection.post("/?#{@connection_config.except(:database).to_param}", sql) - process_response(res, DEFAULT_RESPONSE_FORMAT) - end + do_system_execute sql, adapter_name, except_params: [:database] end def create_view(table_name, request_settings: {}, **options) @@ -364,10 +364,7 @@ def create_function(name, body, **options) # Drops a ClickHouse database. def drop_database(name) #:nodoc: sql = apply_cluster "DROP DATABASE IF EXISTS #{quote_table_name(name)}" - log_with_debug(sql, adapter_name) do - res = @connection.post("/?#{@connection_config.except(:database).to_param}", sql) - process_response(res, DEFAULT_RESPONSE_FORMAT) - end + do_system_execute sql, adapter_name, except_params: [:database] end def drop_functions diff --git a/spec/single/model_spec.rb b/spec/single/model_spec.rb index 219f6a4..a64f533 100644 --- a/spec/single/model_spec.rb +++ b/spec/single/model_spec.rb @@ -44,6 +44,36 @@ class ModelPk < ActiveRecord::Base end end + describe '#with_response_format' do + it 'returns formatted result' do + result = Model.connection.execute('SELECT 1 AS t') + expect(result['data']).to eq([[1]]) + expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) + end + + context 'with JSONCompact format' do + it 'returns formatted result' do + result = + Model.connection.with_response_format('JSONCompact') do + Model.connection.execute('SELECT 1 AS t') + end + expect(result['data']).to eq([[1]]) + expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) + end + end + + context 'with JSONCompactEachRowWithNamesAndTypes format' do + it 'returns formatted result' do + result = + Model.connection.with_response_format('JSONCompactEachRowWithNamesAndTypes') do + Model.connection.execute('SELECT 1 AS t') + end + expect(result['data']).to eq([[1]]) + expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) + end + end + end + describe '#create' do it 'creates a new record' do expect { From be7495bd3edbda385205a9e09a3ee67472f9b1db Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Fri, 6 Sep 2024 04:21:39 -0400 Subject: [PATCH 06/18] Shim old usage of :format with execute, with a deprecation warning. --- .../clickhouse/schema_statements.rb | 26 ++++++++++++++++--- spec/single/model_spec.rb | 14 ++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb index 750ef5d..34f5a96 100644 --- a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb +++ b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb @@ -10,13 +10,20 @@ module SchemaStatements def with_settings(**settings) @block_settings ||= {} - prev_settings = @block_settings + prev_settings = @block_settings @block_settings.merge! settings yield ensure @block_settings = prev_settings end + # Request a specific format for the duration of the provided block + # @param [String] format + # + # @example Specify CSVWithNamesAndTypes format + # with_response_format('CSVWithNamesAndTypes') do + # Table.connection.execute('SELECT * FROM table') + # end def with_response_format(format) prev_format = @response_format @response_format = format @@ -25,9 +32,20 @@ def with_response_format(format) @response_format = prev_format end - def execute(sql, name = nil, settings: {}) - log(sql, [adapter_name, name].compact.join(' ')) do - raw_execute(sql, settings: settings) + def execute(sql, name = nil, format: nil, settings: {}) + if format + ActiveRecord.deprecator.warn(<<~MSG.squish) + Passing `format` to `execute` is deprecated and will be removed in an upcoming release. + Please wrap `execute` in `with_response_format` instead. + MSG + end + + format ||= @response_format + format ||= ClickhouseAdapter::DEFAULT_RESPONSE_FORMAT + with_response_format(format) do + log(sql, [adapter_name, name].compact.join(' ')) do + raw_execute(sql, settings: settings) + end end end diff --git a/spec/single/model_spec.rb b/spec/single/model_spec.rb index a64f533..2a4c9d4 100644 --- a/spec/single/model_spec.rb +++ b/spec/single/model_spec.rb @@ -42,6 +42,20 @@ class ModelPk < ActiveRecord::Base expect(result['data']).to eq([[1]]) expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) end + + context 'when a different format is passed as a keyword' do + it 'prints a deprecation warning' do + expect(ActiveRecord.deprecator).to receive(:warn).with(/Passing `format` to `execute` is deprecated/) + Model.connection.execute('SELECT 1 AS t', format: 'JSONCompact') + end + + it 'still works' do + allow(ActiveRecord.deprecator).to receive(:warn) + result = Model.connection.execute('SELECT 1 AS t', format: 'JSONCompact') + expect(result['data']).to eq([[1]]) + expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) + end + end end describe '#with_response_format' do From fd6f72ca0fc072a0b3b9d2884a3e711e0a7a192a Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Fri, 6 Sep 2024 04:52:37 -0400 Subject: [PATCH 07/18] Allow users to pass `nil` to `with_response_format` in order to send a request without appending a `FORMAT` clause. --- .../clickhouse/format_manager.rb | 2 +- .../clickhouse/schema_statements.rb | 22 +++++++++++++------ .../clickhouse/statement.rb | 4 ++-- spec/single/model_spec.rb | 10 +++++++++ 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/lib/active_record/connection_adapters/clickhouse/format_manager.rb b/lib/active_record/connection_adapters/clickhouse/format_manager.rb index a875a8a..498eebd 100644 --- a/lib/active_record/connection_adapters/clickhouse/format_manager.rb +++ b/lib/active_record/connection_adapters/clickhouse/format_manager.rb @@ -11,7 +11,7 @@ def initialize(sql, format:) end def apply - return @sql if skip_format? + return @sql if skip_format? || @format.blank? "#{@sql} FORMAT #{@format}" end diff --git a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb index 34f5a96..beec383 100644 --- a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb +++ b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb @@ -17,13 +17,21 @@ def with_settings(**settings) @block_settings = prev_settings end - # Request a specific format for the duration of the provided block + # Request a specific format for the duration of the provided block. + # Pass `nil` to explicitly send the SQL statement without a `FORMAT` clause. # @param [String] format # # @example Specify CSVWithNamesAndTypes format # with_response_format('CSVWithNamesAndTypes') do # Table.connection.execute('SELECT * FROM table') # end + # # sends and executes "SELECT * FROM table FORMAT CSVWithNamesAndTypes" + # + # @example Specify no format + # with_response_format(nil) do + # Table.connection.execute('SELECT * FROM table') + # end + # # sends and executes "SELECT * FROM table" def with_response_format(format) prev_format = @response_format @response_format = format @@ -32,16 +40,16 @@ def with_response_format(format) @response_format = prev_format end - def execute(sql, name = nil, format: nil, settings: {}) - if format + def execute(sql, name = nil, format: :deprecated, settings: {}) + if format == :deprecated + format = @response_format + else ActiveRecord.deprecator.warn(<<~MSG.squish) Passing `format` to `execute` is deprecated and will be removed in an upcoming release. Please wrap `execute` in `with_response_format` instead. MSG end - format ||= @response_format - format ||= ClickhouseAdapter::DEFAULT_RESPONSE_FORMAT with_response_format(format) do log(sql, [adapter_name, name].compact.join(' ')) do raw_execute(sql, settings: settings) @@ -85,7 +93,7 @@ def exec_update(sql, name = nil, _binds = []) # @link https://clickhouse.com/docs/en/sql-reference/statements/delete def exec_delete(sql, name = nil, _binds = []) log(sql, "#{adapter_name} #{name}") do - statement = Statement.new(sql) + statement = Statement.new(sql, format: @response_format) res = request(statement) begin data = JSON.parse(res.header['x-clickhouse-summary']) @@ -247,7 +255,7 @@ def has_default_function?(default) # :nodoc: end def raw_execute(sql, settings: {}, except_params: []) - statement = Statement.new(sql) + statement = Statement.new(sql, format: @response_format) statement.response = request(statement, settings: settings, except_params: except_params) statement.processed_response end diff --git a/lib/active_record/connection_adapters/clickhouse/statement.rb b/lib/active_record/connection_adapters/clickhouse/statement.rb index d5d8a18..0c95bcc 100644 --- a/lib/active_record/connection_adapters/clickhouse/statement.rb +++ b/lib/active_record/connection_adapters/clickhouse/statement.rb @@ -11,9 +11,9 @@ class Statement attr_reader :format attr_writer :response - def initialize(sql, format: nil) + def initialize(sql, format:) @sql = sql - @format = format || ClickhouseAdapter::DEFAULT_RESPONSE_FORMAT + @format = format end def formatted_sql diff --git a/spec/single/model_spec.rb b/spec/single/model_spec.rb index 2a4c9d4..b7b9954 100644 --- a/spec/single/model_spec.rb +++ b/spec/single/model_spec.rb @@ -86,6 +86,16 @@ class ModelPk < ActiveRecord::Base expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) end end + + context 'with nil format' do + it 'omits the FORMAT clause' do + result = + Model.connection.with_response_format(nil) do + Model.connection.execute('SELECT 1 AS t') + end + expect(result.chomp).to eq('1') + end + end end describe '#create' do From 658180963307e4bb3086a8051b7d161160a412c4 Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Fri, 6 Sep 2024 05:19:16 -0400 Subject: [PATCH 08/18] Move FormatManager and ResponseProcessor within the Statement namespace to indicate that they're internal/private. --- .../clickhouse/format_manager.rb | 48 ---------- .../clickhouse/response_processor.rb | 94 ------------------ .../clickhouse/statement.rb | 4 +- .../clickhouse/statement/format_manager.rb | 50 ++++++++++ .../statement/response_processor.rb | 96 +++++++++++++++++++ 5 files changed, 148 insertions(+), 144 deletions(-) delete mode 100644 lib/active_record/connection_adapters/clickhouse/format_manager.rb delete mode 100644 lib/active_record/connection_adapters/clickhouse/response_processor.rb create mode 100644 lib/active_record/connection_adapters/clickhouse/statement/format_manager.rb create mode 100644 lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb diff --git a/lib/active_record/connection_adapters/clickhouse/format_manager.rb b/lib/active_record/connection_adapters/clickhouse/format_manager.rb deleted file mode 100644 index 498eebd..0000000 --- a/lib/active_record/connection_adapters/clickhouse/format_manager.rb +++ /dev/null @@ -1,48 +0,0 @@ -# frozen_string_literal: true - -module ActiveRecord - module ConnectionAdapters - module Clickhouse - class FormatManager - - def initialize(sql, format:) - @sql = sql - @format = format - end - - def apply - return @sql if skip_format? || @format.blank? - - "#{@sql} FORMAT #{@format}" - end - - def skip_format? - for_insert? || system_command? || schema_command? || format_specified? || delete? - end - - private - - def for_insert? - /^insert into/i.match?(@sql) - end - - def system_command? - /^system|^optimize/i.match?(@sql) - end - - def schema_command? - /^create|^alter|^drop|^rename/i.match?(@sql) - end - - def format_specified? - /format [a-z]+\z/i.match?(@sql) - end - - def delete? - /^delete from/i.match?(@sql) - end - - end - end - end -end diff --git a/lib/active_record/connection_adapters/clickhouse/response_processor.rb b/lib/active_record/connection_adapters/clickhouse/response_processor.rb deleted file mode 100644 index fb65899..0000000 --- a/lib/active_record/connection_adapters/clickhouse/response_processor.rb +++ /dev/null @@ -1,94 +0,0 @@ -# frozen_string_literal: true - -module ActiveRecord - module ConnectionAdapters - module Clickhouse - class ResponseProcessor - - def initialize(raw_response, format) - @raw_response = raw_response - @format = format - end - - def process - if success? - process_successful_response - else - raise_database_error! - end - rescue JSON::ParserError - @raw_response.body - end - - private - - def success? - @raw_response.code.to_i == 200 - end - - def process_successful_response - raise_generic! if @raw_response.body.to_s.include?('DB::Exception') - - format_body_response - end - - def raise_generic! - raise ActiveRecord::ActiveRecordError, "Response code: #{@raw_response.code}:\n#{@raw_response.body}" - end - - def format_body_response - body = @raw_response.body - return body if body.blank? - - case @format - when 'JSONCompact' - format_from_json_compact(body) - when 'JSONCompactEachRowWithNamesAndTypes' - format_from_json_compact_each_row_with_names_and_types(body) - else - body - end - rescue JSON::ParserError - @raw_response.body - end - - def format_from_json_compact(body) - parse_json_payload(body) - end - - def format_from_json_compact_each_row_with_names_and_types(body) - rows = body.split("\n").map { |row| parse_json_payload(row) } - names, types, *data = rows - - meta = names.zip(types).map do |name, type| - { - 'name' => name, - 'type' => type - } - end - - { - 'meta' => meta, - 'data' => data - } - end - - def parse_json_payload(payload) - JSON.parse(payload, decimal_class: BigDecimal) - end - - def raise_database_error! - case @raw_response.body - when /DB::Exception:.*\(UNKNOWN_DATABASE\)/ - raise ActiveRecord::NoDatabaseError - when /DB::Exception:.*\(DATABASE_ALREADY_EXISTS\)/ - raise ActiveRecord::DatabaseAlreadyExists - else - raise_generic! - end - end - - end - end - end -end diff --git a/lib/active_record/connection_adapters/clickhouse/statement.rb b/lib/active_record/connection_adapters/clickhouse/statement.rb index 0c95bcc..7892d1d 100644 --- a/lib/active_record/connection_adapters/clickhouse/statement.rb +++ b/lib/active_record/connection_adapters/clickhouse/statement.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true -require 'active_record/connection_adapters/clickhouse/response_processor' -require 'active_record/connection_adapters/clickhouse/format_manager' +require 'active_record/connection_adapters/clickhouse/statement/format_manager' +require 'active_record/connection_adapters/clickhouse/statement/response_processor' module ActiveRecord module ConnectionAdapters diff --git a/lib/active_record/connection_adapters/clickhouse/statement/format_manager.rb b/lib/active_record/connection_adapters/clickhouse/statement/format_manager.rb new file mode 100644 index 0000000..0e8f1f9 --- /dev/null +++ b/lib/active_record/connection_adapters/clickhouse/statement/format_manager.rb @@ -0,0 +1,50 @@ +# frozen_string_literal: true + +module ActiveRecord + module ConnectionAdapters + module Clickhouse + class Statement + class FormatManager + + def initialize(sql, format:) + @sql = sql + @format = format + end + + def apply + return @sql if skip_format? || @format.blank? + + "#{@sql} FORMAT #{@format}" + end + + def skip_format? + for_insert? || system_command? || schema_command? || format_specified? || delete? + end + + private + + def for_insert? + /^insert into/i.match?(@sql) + end + + def system_command? + /^system|^optimize/i.match?(@sql) + end + + def schema_command? + /^create|^alter|^drop|^rename/i.match?(@sql) + end + + def format_specified? + /format [a-z]+\z/i.match?(@sql) + end + + def delete? + /^delete from/i.match?(@sql) + end + + end + end + end + end +end diff --git a/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb b/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb new file mode 100644 index 0000000..89ea4a0 --- /dev/null +++ b/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +module ActiveRecord + module ConnectionAdapters + module Clickhouse + class Statement + class ResponseProcessor + + def initialize(raw_response, format) + @raw_response = raw_response + @format = format + end + + def process + if success? + process_successful_response + else + raise_database_error! + end + rescue JSON::ParserError + @raw_response.body + end + + private + + def success? + @raw_response.code.to_i == 200 + end + + def process_successful_response + raise_generic! if @raw_response.body.to_s.include?('DB::Exception') + + format_body_response + end + + def raise_generic! + raise ActiveRecord::ActiveRecordError, "Response code: #{@raw_response.code}:\n#{@raw_response.body}" + end + + def format_body_response + body = @raw_response.body + return body if body.blank? + + case @format + when 'JSONCompact' + format_from_json_compact(body) + when 'JSONCompactEachRowWithNamesAndTypes' + format_from_json_compact_each_row_with_names_and_types(body) + else + body + end + rescue JSON::ParserError + @raw_response.body + end + + def format_from_json_compact(body) + parse_json_payload(body) + end + + def format_from_json_compact_each_row_with_names_and_types(body) + rows = body.split("\n").map { |row| parse_json_payload(row) } + names, types, *data = rows + + meta = names.zip(types).map do |name, type| + { + 'name' => name, + 'type' => type + } + end + + { + 'meta' => meta, + 'data' => data + } + end + + def parse_json_payload(payload) + JSON.parse(payload, decimal_class: BigDecimal) + end + + def raise_database_error! + case @raw_response.body + when /DB::Exception:.*\(UNKNOWN_DATABASE\)/ + raise ActiveRecord::NoDatabaseError + when /DB::Exception:.*\(DATABASE_ALREADY_EXISTS\)/ + raise ActiveRecord::DatabaseAlreadyExists + else + raise_generic! + end + end + + end + end + end + end +end From 4ccddc02dc54901db1eb89499b2a722606919e62 Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Fri, 6 Sep 2024 05:23:16 -0400 Subject: [PATCH 09/18] Put do_execute back with a deprecation warning. --- .../connection_adapters/clickhouse/schema_statements.rb | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb index beec383..f8d140c 100644 --- a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb +++ b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb @@ -160,6 +160,14 @@ def do_system_execute(sql, name = nil, except_params: []) end end + def do_execute(sql, name = nil, format: DEFAULT_RESPONSE_FORMAT, settings: {}) + ActiveRecord.deprecator.warn(<<~MSG.squish) + `do_execute` is deprecated and will be removed in an upcoming release. + Please use `execute` instead. + MSG + execute(sql, name, format: format, settings: settings) + end + if ::ActiveRecord::version >= Gem::Version.new('7.2') def schema_migration pool.schema_migration From 90082f39d524e5bcf454835f00cbab0f98824bd5 Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Fri, 6 Sep 2024 05:28:28 -0400 Subject: [PATCH 10/18] Allow adapter to receive :execute with other statements, too. --- spec/cluster/migration_spec.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spec/cluster/migration_spec.rb b/spec/cluster/migration_spec.rb index 09edf19..c1c72b6 100644 --- a/spec/cluster/migration_spec.rb +++ b/spec/cluster/migration_spec.rb @@ -129,6 +129,8 @@ it 'creates a table' do + allow_any_instance_of(ActiveRecord::ConnectionAdapters::ClickhouseAdapter).to receive(:execute).and_call_original + expect_any_instance_of(ActiveRecord::ConnectionAdapters::ClickhouseAdapter).to receive(:execute) .with('ALTER TABLE some ON CLUSTER ' + connection_config[:cluster_name] + ' DROP INDEX idx') .and_call_original From 7fb1a7f12d64ac6c038e19729b4de147f2d96c78 Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Thu, 21 Nov 2024 19:29:25 -0500 Subject: [PATCH 11/18] Integrate updates into rebase. * Restore support for table/database creation with request settings. * Move improved "DB::Exception" handling into ResponseProcessor. --- .../clickhouse/schema_statements.rb | 5 +++-- .../statement/response_processor.rb | 22 ++++++++++--------- .../connection_adapters/clickhouse_adapter.rb | 5 ++--- spec/cluster/migration_spec.rb | 2 +- .../1_create_some_function.rb | 2 +- spec/single/migration_spec.rb | 2 +- 6 files changed, 20 insertions(+), 18 deletions(-) diff --git a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb index f8d140c..229e856 100644 --- a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb +++ b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb @@ -6,7 +6,6 @@ module ActiveRecord module ConnectionAdapters module Clickhouse module SchemaStatements - DB_EXCEPTION_REGEXP = /\ACode:\s+\d+\.\s+DB::Exception:/.freeze def with_settings(**settings) @block_settings ||= {} @@ -129,7 +128,9 @@ def functions end def show_create_function(function) - execute("SELECT create_query FROM system.functions WHERE origin = 'SQLUserDefined' AND name = '#{function}'") + result = do_system_execute("SELECT create_query FROM system.functions WHERE origin = 'SQLUserDefined' AND name = '#{function}'") + return if result.nil? + result['data'].flatten.first end def table_options(table) diff --git a/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb b/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb index 89ea4a0..5288965 100644 --- a/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb +++ b/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb @@ -6,8 +6,11 @@ module Clickhouse class Statement class ResponseProcessor + DB_EXCEPTION_REGEXP = /\ACode:\s+\d+\.\s+DB::Exception:/.freeze + def initialize(raw_response, format) @raw_response = raw_response + @body = raw_response.body @format = format end @@ -18,7 +21,7 @@ def process raise_database_error! end rescue JSON::ParserError - @raw_response.body + @body end private @@ -28,29 +31,28 @@ def success? end def process_successful_response - raise_generic! if @raw_response.body.to_s.include?('DB::Exception') + raise_generic! if @body.include?('DB::Exception') && @body.match?(DB_EXCEPTION_REGEXP) format_body_response end def raise_generic! - raise ActiveRecord::ActiveRecordError, "Response code: #{@raw_response.code}:\n#{@raw_response.body}" + raise ActiveRecord::ActiveRecordError, "Response code: #{@raw_response.code}:\n#{@body}" end def format_body_response - body = @raw_response.body - return body if body.blank? + return @body if @body.blank? case @format when 'JSONCompact' - format_from_json_compact(body) + format_from_json_compact(@body) when 'JSONCompactEachRowWithNamesAndTypes' - format_from_json_compact_each_row_with_names_and_types(body) + format_from_json_compact_each_row_with_names_and_types(@body) else - body + @body end rescue JSON::ParserError - @raw_response.body + @body end def format_from_json_compact(body) @@ -79,7 +81,7 @@ def parse_json_payload(payload) end def raise_database_error! - case @raw_response.body + case @body when /DB::Exception:.*\(UNKNOWN_DATABASE\)/ raise ActiveRecord::NoDatabaseError when /DB::Exception:.*\(DATABASE_ALREADY_EXISTS\)/ diff --git a/lib/active_record/connection_adapters/clickhouse_adapter.rb b/lib/active_record/connection_adapters/clickhouse_adapter.rb index 4a12a88..144a188 100644 --- a/lib/active_record/connection_adapters/clickhouse_adapter.rb +++ b/lib/active_record/connection_adapters/clickhouse_adapter.rb @@ -13,7 +13,6 @@ require 'active_record/connection_adapters/clickhouse/oid/map' require 'active_record/connection_adapters/clickhouse/oid/uuid' require 'active_record/connection_adapters/clickhouse/column' -require 'active_record/connection_adapters/clickhouse/format_manager' require 'active_record/connection_adapters/clickhouse/quoting' require 'active_record/connection_adapters/clickhouse/schema_creation' require 'active_record/connection_adapters/clickhouse/schema_statements' @@ -326,7 +325,7 @@ def create_view(table_name, request_settings: {}, **options) drop_table(table_name, options.merge(if_exists: true)) end - execute(schema_creation.accept(td)) + execute(schema_creation.accept(td), settings: request_settings) end def create_table(table_name, request_settings: {}, **options, &block) @@ -343,7 +342,7 @@ def create_table(table_name, request_settings: {}, **options, &block) drop_table(table_name, options.merge(if_exists: true)) end - execute(schema_creation.accept(td)) + execute(schema_creation.accept(td), settings: request_settings) if options[:with_distributed] distributed_table_name = options.delete(:with_distributed) diff --git a/spec/cluster/migration_spec.rb b/spec/cluster/migration_spec.rb index c1c72b6..76cb2cf 100644 --- a/spec/cluster/migration_spec.rb +++ b/spec/cluster/migration_spec.rb @@ -70,7 +70,7 @@ let(:directory) { 'dsl_create_function' } it 'creates a function' do - ActiveRecord::Base.connection.do_execute('CREATE FUNCTION forced_fun AS (x, k, b) -> k*x + b', format: nil) + ActiveRecord::Base.connection.execute('CREATE FUNCTION forced_fun AS (x, k, b) -> k*x + b') subject diff --git a/spec/fixtures/migrations/plain_function_creation/1_create_some_function.rb b/spec/fixtures/migrations/plain_function_creation/1_create_some_function.rb index bd5c268..e500273 100644 --- a/spec/fixtures/migrations/plain_function_creation/1_create_some_function.rb +++ b/spec/fixtures/migrations/plain_function_creation/1_create_some_function.rb @@ -5,7 +5,7 @@ def up sql = <<~SQL CREATE FUNCTION multFun AS (x,y) -> x * y SQL - do_execute(sql, format: nil) + execute(sql) sql = <<~SQL CREATE FUNCTION addFun AS (x,y) -> x + y diff --git a/spec/single/migration_spec.rb b/spec/single/migration_spec.rb index 9b1c574..aca41e2 100644 --- a/spec/single/migration_spec.rb +++ b/spec/single/migration_spec.rb @@ -388,7 +388,7 @@ context 'dsl' do let(:directory) { 'dsl_create_function' } it 'creates a function' do - ActiveRecord::Base.connection.do_execute('CREATE FUNCTION forced_fun AS (x, k, b) -> k*x + b', format: nil) + ActiveRecord::Base.connection.execute('CREATE FUNCTION forced_fun AS (x, k, b) -> k*x + b') subject From 061a3e33cf340dbcbed731b083a42967e1280768 Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Fri, 22 Nov 2024 20:25:25 -0500 Subject: [PATCH 12/18] Fix logic to make sure settings are correctly reset after using `with_settings`. --- .../clickhouse/schema_statements.rb | 2 +- spec/single/model_spec.rb | 49 +++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb index 229e856..552fff7 100644 --- a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb +++ b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb @@ -10,7 +10,7 @@ module SchemaStatements def with_settings(**settings) @block_settings ||= {} prev_settings = @block_settings - @block_settings.merge! settings + @block_settings = @block_settings.merge(settings) yield ensure @block_settings = prev_settings diff --git a/spec/single/model_spec.rb b/spec/single/model_spec.rb index b7b9954..597cbf8 100644 --- a/spec/single/model_spec.rb +++ b/spec/single/model_spec.rb @@ -276,6 +276,55 @@ class ModelPk < ActiveRecord::Base end end + describe 'block-style settings' do + let!(:record) { Model.create!(event_name: 'some event', date: Date.current, datetime: Time.now) } + + let(:last_query_finder) do + <<~SQL.squish + SELECT query, Settings, event_time_microseconds + FROM system.query_log + WHERE query ILIKE 'SELECT sample.* FROM sample FORMAT %' + ORDER BY event_date DESC, event_time DESC, event_time_microseconds DESC + LIMIT 1 + SQL + end + + it 'sends the settings to the server' do + expect_any_instance_of(Net::HTTP).to receive(:post).and_wrap_original do |original_method, *args, **kwargs| + resource, sql, * = args + if sql.include?('SELECT sample.*') + query = resource.split('?').second + params = query.split('&').to_h { |pair| pair.split('=').map { |s| CGI.unescape(s) } } + expect(params['cast_keep_nullable']).to eq('1') + expect(params['log_comment']).to eq('Log Comment!') + end + original_method.call(*args, **kwargs) + end + + Model.connection.with_settings(cast_keep_nullable: 1, log_comment: 'Log Comment!') do + Model.all.load + end + end + + it 'resets settings to default outside the block' do + Model.connection.with_settings(cast_keep_nullable: 1, log_comment: 'Log Comment!') do + Model.all.load + end + + expect_any_instance_of(Net::HTTP).to receive(:post).and_wrap_original do |original_method, *args, **kwargs| + resource, sql, * = args + if sql.include?('SELECT sample.*') + query = resource.split('?').second + params = query.split('&').to_h { |pair| pair.split('=').map { |s| CGI.unescape(s) } } + expect(params).not_to include('cast_keep_nullable', 'log_comment') + end + original_method.call(*args, **kwargs) + end + + Model.all.load + end + end + describe '#using' do it 'works' do sql = Model.joins(:joins).using(:event_name, :date).to_sql From a176fa2fb42c5acce34b12397d8002187bc07798 Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Fri, 22 Nov 2024 20:29:05 -0500 Subject: [PATCH 13/18] Don't deprecate passing :format kwarg to :execute --- .../clickhouse/schema_statements.rb | 11 +---------- spec/single/model_spec.rb | 16 ++++------------ 2 files changed, 5 insertions(+), 22 deletions(-) diff --git a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb index 552fff7..35b74a0 100644 --- a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb +++ b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb @@ -39,16 +39,7 @@ def with_response_format(format) @response_format = prev_format end - def execute(sql, name = nil, format: :deprecated, settings: {}) - if format == :deprecated - format = @response_format - else - ActiveRecord.deprecator.warn(<<~MSG.squish) - Passing `format` to `execute` is deprecated and will be removed in an upcoming release. - Please wrap `execute` in `with_response_format` instead. - MSG - end - + def execute(sql, name = nil, format: @response_format, settings: {}) with_response_format(format) do log(sql, [adapter_name, name].compact.join(' ')) do raw_execute(sql, settings: settings) diff --git a/spec/single/model_spec.rb b/spec/single/model_spec.rb index 597cbf8..b7d9f5c 100644 --- a/spec/single/model_spec.rb +++ b/spec/single/model_spec.rb @@ -43,18 +43,10 @@ class ModelPk < ActiveRecord::Base expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) end - context 'when a different format is passed as a keyword' do - it 'prints a deprecation warning' do - expect(ActiveRecord.deprecator).to receive(:warn).with(/Passing `format` to `execute` is deprecated/) - Model.connection.execute('SELECT 1 AS t', format: 'JSONCompact') - end - - it 'still works' do - allow(ActiveRecord.deprecator).to receive(:warn) - result = Model.connection.execute('SELECT 1 AS t', format: 'JSONCompact') - expect(result['data']).to eq([[1]]) - expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) - end + it 'also works when a different format is passed as a keyword' do + result = Model.connection.execute('SELECT 1 AS t', format: 'JSONCompact') + expect(result['data']).to eq([[1]]) + expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) end end From dade0fa164b255cf16a71fa4452c5ed820f00544 Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Fri, 22 Nov 2024 20:32:32 -0500 Subject: [PATCH 14/18] Get rid of unnecessary stub. --- spec/cluster/migration_spec.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/spec/cluster/migration_spec.rb b/spec/cluster/migration_spec.rb index 76cb2cf..e1451d7 100644 --- a/spec/cluster/migration_spec.rb +++ b/spec/cluster/migration_spec.rb @@ -129,8 +129,6 @@ it 'creates a table' do - allow_any_instance_of(ActiveRecord::ConnectionAdapters::ClickhouseAdapter).to receive(:execute).and_call_original - expect_any_instance_of(ActiveRecord::ConnectionAdapters::ClickhouseAdapter).to receive(:execute) .with('ALTER TABLE some ON CLUSTER ' + connection_config[:cluster_name] + ' DROP INDEX idx') .and_call_original From a65dffb1abda21d7bb9841ebc1f64a0e842bd702 Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Fri, 22 Nov 2024 20:44:17 -0500 Subject: [PATCH 15/18] Pass SQL to ResponseProcessor for logging. --- .../connection_adapters/clickhouse/statement.rb | 2 +- .../clickhouse/statement/response_processor.rb | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/active_record/connection_adapters/clickhouse/statement.rb b/lib/active_record/connection_adapters/clickhouse/statement.rb index 7892d1d..4a58794 100644 --- a/lib/active_record/connection_adapters/clickhouse/statement.rb +++ b/lib/active_record/connection_adapters/clickhouse/statement.rb @@ -21,7 +21,7 @@ def formatted_sql end def processed_response - ResponseProcessor.new(@response, @format).process + ResponseProcessor.new(@response, @format, @sql).process end end diff --git a/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb b/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb index 5288965..fcf9bea 100644 --- a/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb +++ b/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb @@ -8,10 +8,11 @@ class ResponseProcessor DB_EXCEPTION_REGEXP = /\ACode:\s+\d+\.\s+DB::Exception:/.freeze - def initialize(raw_response, format) + def initialize(raw_response, format, sql) @raw_response = raw_response @body = raw_response.body @format = format + @sql = sql end def process @@ -31,13 +32,13 @@ def success? end def process_successful_response - raise_generic! if @body.include?('DB::Exception') && @body.match?(DB_EXCEPTION_REGEXP) + raise_generic!(@sql) if @body.include?('DB::Exception') && @body.match?(DB_EXCEPTION_REGEXP) format_body_response end - def raise_generic! - raise ActiveRecord::ActiveRecordError, "Response code: #{@raw_response.code}:\n#{@body}" + def raise_generic!(sql = nil) + raise ActiveRecord::ActiveRecordError, "Response code: #{@raw_response.code}:\n#{@body}#{"\nQuery: #{sql}" if sql}" end def format_body_response From dc8756845d6cec1d367610b30f2a6194fb089163 Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Fri, 6 Dec 2024 15:05:16 -0500 Subject: [PATCH 16/18] Specify no format in :exec_insert and :exec_insert_all instead of inferring within FormatManager --- .../clickhouse/schema_statements.rb | 6 +++--- .../clickhouse/statement/format_manager.rb | 14 +++++--------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb index 35b74a0..5140384 100644 --- a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb +++ b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb @@ -18,7 +18,7 @@ def with_settings(**settings) # Request a specific format for the duration of the provided block. # Pass `nil` to explicitly send the SQL statement without a `FORMAT` clause. - # @param [String] format + # @param [String, nil] format # # @example Specify CSVWithNamesAndTypes format # with_response_format('CSVWithNamesAndTypes') do @@ -49,7 +49,7 @@ def execute(sql, name = nil, format: @response_format, settings: {}) def exec_insert(sql, name, _binds, _pk = nil, _sequence_name = nil, returning: nil) new_sql = sql.sub(/ (DEFAULT )?VALUES/, " VALUES") - execute(new_sql, name) + with_response_format(nil) { execute(new_sql, name) } true end @@ -70,7 +70,7 @@ def internal_exec_query(sql, name = nil, binds = [], prepare: false, async: fals end def exec_insert_all(sql, name) - execute(sql, name) + with_response_format(nil) { execute(sql, name) } true end diff --git a/lib/active_record/connection_adapters/clickhouse/statement/format_manager.rb b/lib/active_record/connection_adapters/clickhouse/statement/format_manager.rb index 0e8f1f9..b17da79 100644 --- a/lib/active_record/connection_adapters/clickhouse/statement/format_manager.rb +++ b/lib/active_record/connection_adapters/clickhouse/statement/format_manager.rb @@ -7,7 +7,7 @@ class Statement class FormatManager def initialize(sql, format:) - @sql = sql + @sql = sql.strip @format = format end @@ -18,21 +18,17 @@ def apply end def skip_format? - for_insert? || system_command? || schema_command? || format_specified? || delete? + system_command? || schema_command? || format_specified? || delete? end private - def for_insert? - /^insert into/i.match?(@sql) - end - def system_command? - /^system|^optimize/i.match?(@sql) + /\Asystem|\Aoptimize/i.match?(@sql) end def schema_command? - /^create|^alter|^drop|^rename/i.match?(@sql) + /\Acreate|\Aalter|\Adrop|\Arename/i.match?(@sql) end def format_specified? @@ -40,7 +36,7 @@ def format_specified? end def delete? - /^delete from/i.match?(@sql) + /\Adelete from/i.match?(@sql) end end From 79e1f24b4866ff59082d29e5faf3518e6454c344 Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Fri, 6 Dec 2024 15:16:55 -0500 Subject: [PATCH 17/18] Use `each_line` instead of `split("\n")` Thanks, @nelson-vantage! --- .../clickhouse/statement/response_processor.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb b/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb index fcf9bea..11cb854 100644 --- a/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb +++ b/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb @@ -61,7 +61,7 @@ def format_from_json_compact(body) end def format_from_json_compact_each_row_with_names_and_types(body) - rows = body.split("\n").map { |row| parse_json_payload(row) } + rows = body.each_line.map { |row| parse_json_payload(row) } names, types, *data = rows meta = names.zip(types).map do |name, type| From 40f13dcbe42ef648dcad2041e7d927d425feef04 Mon Sep 17 00:00:00 2001 From: Ryan Kerr Date: Mon, 9 Dec 2024 18:39:33 -0500 Subject: [PATCH 18/18] Add default values to :exec_insert to conform to base ActiveRecord definition --- .../connection_adapters/clickhouse/schema_statements.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb index 5140384..ef87f7e 100644 --- a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb +++ b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb @@ -47,7 +47,7 @@ def execute(sql, name = nil, format: @response_format, settings: {}) end end - def exec_insert(sql, name, _binds, _pk = nil, _sequence_name = nil, returning: nil) + def exec_insert(sql, name = nil, _binds = [], _pk = nil, _sequence_name = nil, returning: nil) new_sql = sql.sub(/ (DEFAULT )?VALUES/, " VALUES") with_response_format(nil) { execute(new_sql, name) } true