Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encapsulate format logic within Statement and helper classes #162

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
8570d2d
Get rid of :do_execute, since it now has an equivalent signature to :…
leboshi Sep 6, 2024
10a902c
Automagically infer `FORMAT` instead of explicitly passing it as an a…
leboshi Sep 6, 2024
8722d05
Extract all format handling logic into FormatManager object.
leboshi Sep 6, 2024
b04ca64
Use a transient settings block to avoid copypasta of underlying abstr…
leboshi Sep 6, 2024
236575b
Encapsulate format and SQL within a Statement object.
leboshi Sep 6, 2024
be7495b
Shim old usage of :format with execute, with a deprecation warning.
leboshi Sep 6, 2024
fd6f72c
Allow users to pass `nil` to `with_response_format` in order to send …
leboshi Sep 6, 2024
6581809
Move FormatManager and ResponseProcessor within the Statement namespa…
leboshi Sep 6, 2024
4ccddc0
Put do_execute back with a deprecation warning.
leboshi Sep 6, 2024
90082f3
Allow adapter to receive :execute with other statements, too.
leboshi Sep 6, 2024
7fb1a7f
Integrate updates into rebase.
leboshi Nov 22, 2024
061a3e3
Fix logic to make sure settings are correctly reset after using `with…
leboshi Nov 23, 2024
a176fa2
Don't deprecate passing :format kwarg to :execute
leboshi Nov 23, 2024
dade0fa
Get rid of unnecessary stub.
leboshi Nov 23, 2024
a65dffb
Pass SQL to ResponseProcessor for logging.
leboshi Nov 23, 2024
dc87568
Specify no format in :exec_insert and :exec_insert_all instead of inf…
leboshi Dec 6, 2024
79e1f24
Use `each_line` instead of `split("\n")`
leboshi Dec 6, 2024
40f13dc
Add default values to :exec_insert to conform to base ActiveRecord de…
leboshi Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 93 additions & 112 deletions lib/active_record/connection_adapters/clickhouse/schema_statements.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,55 @@ module ActiveRecord
module ConnectionAdapters
module Clickhouse
module SchemaStatements
DEFAULT_RESPONSE_FORMAT = 'JSONCompactEachRowWithNamesAndTypes'.freeze

DB_EXCEPTION_REGEXP = /\ACode:\s+\d+\.\s+DB::Exception:/.freeze

def execute(sql, name = nil, settings: {})
do_execute(sql, name, settings: settings)
def with_settings(**settings)
@block_settings ||= {}
prev_settings = @block_settings
@block_settings = @block_settings.merge(settings)
yield
ensure
@block_settings = prev_settings
end

# Request a specific format for the duration of the provided block.
# Pass `nil` to explicitly send the SQL statement without a `FORMAT` clause.
# @param [String] format
#
# @example Specify CSVWithNamesAndTypes format
# with_response_format('CSVWithNamesAndTypes') do
# Table.connection.execute('SELECT * FROM table')
# end
# # sends and executes "SELECT * FROM table FORMAT CSVWithNamesAndTypes"
#
# @example Specify no format
# with_response_format(nil) do
# Table.connection.execute('SELECT * FROM table')
# end
# # sends and executes "SELECT * FROM table"
def with_response_format(format)
prev_format = @response_format
@response_format = format
yield
ensure
@response_format = prev_format
end

def execute(sql, name = nil, format: @response_format, settings: {})
with_response_format(format) do
log(sql, [adapter_name, name].compact.join(' ')) do
raw_execute(sql, settings: settings)
end
end
end

def exec_insert(sql, name, _binds, _pk = nil, _sequence_name = nil, returning: nil)
new_sql = sql.dup.sub(/ (DEFAULT )?VALUES/, " VALUES")
do_execute(new_sql, name, format: nil)
new_sql = sql.sub(/ (DEFAULT )?VALUES/, " VALUES")
execute(new_sql, name)
true
end

def internal_exec_query(sql, name = nil, binds = [], prepare: false, async: false, allow_retry: false)
result = do_execute(sql, name)
result = execute(sql, name)
columns = result['meta'].map { |m| m['name'] }
types = {}
result['meta'].each_with_index do |m, i|
Expand All @@ -37,24 +70,25 @@ def internal_exec_query(sql, name = nil, binds = [], prepare: false, async: fals
end

def exec_insert_all(sql, name)
do_execute(sql, name, format: nil)
execute(sql, name)
true
end

# @link https://clickhouse.com/docs/en/sql-reference/statements/alter/update
def exec_update(_sql, _name = nil, _binds = [])
do_execute(_sql, _name, format: nil)
def exec_update(sql, name = nil, _binds = [])
execute(sql, name)
0
end

# @link https://clickhouse.com/docs/en/sql-reference/statements/delete
def exec_delete(_sql, _name = nil, _binds = [])
log(_sql, "#{adapter_name} #{_name}") do
res = request(_sql)
def exec_delete(sql, name = nil, _binds = [])
log(sql, "#{adapter_name} #{name}") do
statement = Statement.new(sql, format: @response_format)
res = request(statement)
begin
data = JSON.parse(res.header['x-clickhouse-summary'])
data['result_rows'].to_i
rescue JSONError
rescue JSON::ParserError
0
end
end
Expand Down Expand Up @@ -85,7 +119,9 @@ def functions
end

def show_create_function(function)
do_execute("SELECT create_query FROM system.functions WHERE origin = 'SQLUserDefined' AND name = '#{function}'", format: nil)
result = do_system_execute("SELECT create_query FROM system.functions WHERE origin = 'SQLUserDefined' AND name = '#{function}'")
return if result.nil?
result['data'].flatten.first
end

def table_options(table)
Expand All @@ -110,18 +146,18 @@ def data_sources
tables
end

def do_system_execute(sql, name = nil)
log_with_debug(sql, "#{adapter_name} #{name}") do
res = request(sql, DEFAULT_RESPONSE_FORMAT)
process_response(res, DEFAULT_RESPONSE_FORMAT, sql)
def do_system_execute(sql, name = nil, except_params: [])
log_with_debug(sql, [adapter_name, name].compact.join(' ')) do
raw_execute(sql, except_params: except_params)
end
end

def do_execute(sql, name = nil, format: DEFAULT_RESPONSE_FORMAT, settings: {})
log(sql, "#{adapter_name} #{name}") do
res = request(sql, format, settings)
process_response(res, format, sql)
end
ActiveRecord.deprecator.warn(<<~MSG.squish)
`do_execute` is deprecated and will be removed in an upcoming release.
Please use `execute` instead.
MSG
execute(sql, name, format: format, settings: settings)
end

if ::ActiveRecord::version >= Gem::Version.new('7.2')
Expand Down Expand Up @@ -154,7 +190,7 @@ def assume_migrated_upto_version(version, migrations_paths = nil)
if (duplicate = inserting.detect { |v| inserting.count(v) > 1 })
raise "Duplicate migration #{duplicate}. Please renumber your migrations to resolve the conflict."
end
do_execute(insert_versions_sql(inserting), nil, format: nil, settings: {max_partitions_per_insert_block: [100, inserting.size].max})
execute(insert_versions_sql(inserting), nil, settings: {max_partitions_per_insert_block: [100, inserting.size].max})
end
end

Expand All @@ -168,54 +204,19 @@ def with_yaml_fallback(value) # :nodoc:
end
end

private

# Make HTTP request to ClickHouse server
# @param [String] sql
# @param [String, nil] format
# @param [Hash] settings
# @return [Net::HTTPResponse]
def request(sql, format = nil, settings = {})
formatted_sql = apply_format(sql, format)
request_params = @connection_config || {}
@connection.post("/?#{request_params.merge(settings).to_param}", formatted_sql, {
'User-Agent' => "Clickhouse ActiveRecord #{ClickhouseActiverecord::VERSION}",
'Content-Type' => 'application/x-www-form-urlencoded',
})
end
protected

def apply_format(sql, format)
format ? "#{sql} FORMAT #{format}" : sql
end
def table_structure(table_name)
result = do_system_execute("DESCRIBE TABLE `#{table_name}`", table_name)
data = result['data']

def process_response(res, format, sql = nil)
case res.code.to_i
when 200
body = res.body
return data unless data.empty?

if body.include?("DB::Exception") && body.match?(DB_EXCEPTION_REGEXP)
raise ActiveRecord::ActiveRecordError, "Response code: #{res.code}:\n#{res.body}#{sql ? "\nQuery: #{sql}" : ''}"
else
format_body_response(res.body, format)
end
else
case res.body
when /DB::Exception:.*\(UNKNOWN_DATABASE\)/
raise ActiveRecord::NoDatabaseError
when /DB::Exception:.*\(DATABASE_ALREADY_EXISTS\)/
raise ActiveRecord::DatabaseAlreadyExists
else
raise ActiveRecord::ActiveRecordError, "Response code: #{res.code}:\n#{res.body}"
end
end
rescue JSON::ParserError
res.body
raise ActiveRecord::StatementInvalid, "Could not find table '#{table_name}'"
end
alias column_definitions table_structure

def log_with_debug(sql, name = nil)
return yield unless @debug
log(sql, "#{name} (system)") { yield }
end
private

def schema_creation
Clickhouse::SchemaCreation.new(self)
Expand All @@ -234,20 +235,6 @@ def new_column_from_field(table_name, field, _definitions)
Clickhouse::Column.new(field[0], default_value, type_metadata, field[1].include?('Nullable'), default_function, codec: field[5].presence)
end

protected

def table_structure(table_name)
result = do_system_execute("DESCRIBE TABLE `#{table_name}`", table_name)
data = result['data']

return data unless data.empty?

raise ActiveRecord::StatementInvalid, "Could not find table '#{table_name}'"
end
alias column_definitions table_structure

private

# Extracts the value from a PostgreSQL column default definition.
def extract_value_from_default(default_expression, default_type)
return nil if default_type != 'DEFAULT' || default_expression.blank?
Expand All @@ -267,42 +254,36 @@ def has_default_function?(default) # :nodoc:
(%r{\w+\(.*\)} === default)
end

def format_body_response(body, format)
return body if body.blank?

case format
when 'JSONCompact'
format_from_json_compact(body)
when 'JSONCompactEachRowWithNamesAndTypes'
format_from_json_compact_each_row_with_names_and_types(body)
else
body
end
def raw_execute(sql, settings: {}, except_params: [])
statement = Statement.new(sql, format: @response_format)
statement.response = request(statement, settings: settings, except_params: except_params)
statement.processed_response
end

def format_from_json_compact(body)
parse_json_payload(body)
# Make HTTP request to ClickHouse server
# @param [ActiveRecord::ConnectionAdapters::Clickhouse::Statement] statement
# @param [Hash] settings
# @param [Array] except_params
# @return [Net::HTTPResponse]
def request(statement, settings: {}, except_params: [])
@connection.post("/?#{settings_params(settings, except: except_params)}",
statement.formatted_sql,
'Content-Type' => 'application/x-www-form-urlencoded',
'User-Agent' => ClickhouseAdapter::USER_AGENT)
end

def format_from_json_compact_each_row_with_names_and_types(body)
rows = body.split("\n").map { |row| parse_json_payload(row) }
names, types, *data = rows

meta = names.zip(types).map do |name, type|
{
'name' => name,
'type' => type
}
end

{
'meta' => meta,
'data' => data
}
def log_with_debug(sql, name = nil)
return yield unless @debug
log(sql, "#{name} (system)") { yield }
end

def parse_json_payload(payload)
JSON.parse(payload, decimal_class: BigDecimal)
def settings_params(settings = {}, except: [])
request_params = @connection_config || {}
block_settings = @block_settings || {}
request_params.merge(block_settings)
.merge(settings)
.except(*except)
.to_param
end
end
end
Expand Down
30 changes: 30 additions & 0 deletions lib/active_record/connection_adapters/clickhouse/statement.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

require 'active_record/connection_adapters/clickhouse/statement/format_manager'
require 'active_record/connection_adapters/clickhouse/statement/response_processor'

module ActiveRecord
module ConnectionAdapters
module Clickhouse
class Statement

attr_reader :format
attr_writer :response

def initialize(sql, format:)
@sql = sql
@format = format
end

def formatted_sql
@formatted_sql ||= FormatManager.new(@sql, format: @format).apply
end

def processed_response
ResponseProcessor.new(@response, @format, @sql).process
end

end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# frozen_string_literal: true

module ActiveRecord
module ConnectionAdapters
module Clickhouse
class Statement
class FormatManager
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This incorrect logic for request:

SELECT * FROM table WHERE column LIKE 'insert into%'

We have a method exec_insert which clearly indicates that this is an insert and what response format should be used.


def initialize(sql, format:)
@sql = sql
@format = format
end

def apply
return @sql if skip_format? || @format.blank?

"#{@sql} FORMAT #{@format}"
end

def skip_format?
for_insert? || system_command? || schema_command? || format_specified? || delete?
end

private

def for_insert?
/^insert into/i.match?(@sql)
end

def system_command?
/^system|^optimize/i.match?(@sql)
end

def schema_command?
/^create|^alter|^drop|^rename/i.match?(@sql)
end

def format_specified?
/format [a-z]+\z/i.match?(@sql)
end

def delete?
/^delete from/i.match?(@sql)
end

end
end
end
end
end
Loading
Loading