diff --git a/apisix/plugins/response-rewrite.lua b/apisix/plugins/response-rewrite.lua index ee9746fa37fc..adf630fe52c9 100644 --- a/apisix/plugins/response-rewrite.lua +++ b/apisix/plugins/response-rewrite.lua @@ -27,9 +27,7 @@ local pairs = pairs local ipairs = ipairs local type = type local pcall = pcall -local zlib = require("ffi-zlib") -local str_buffer = require("string.buffer") -local is_br_libs_loaded, brotli = pcall(require, "brotli") +local content_decode = require("apisix.utils.content-decode") local lrucache = core.lrucache.new({ @@ -203,83 +201,6 @@ local function check_set_headers(headers) end -local function inflate_gzip(data) - local inputs = str_buffer.new():set(data) - local outputs = str_buffer.new() - - local read_inputs = function(size) - local data = inputs:get(size) - if data == "" then - return nil - end - return data - end - - local write_outputs = function(data) - return outputs:put(data) - end - - local ok, err = zlib.inflateGzip(read_inputs, write_outputs) - if not ok then - return nil, err - end - - return outputs:get() -end - - -local function brotli_stream_decode(read_inputs, write_outputs) - -- read 64k data per times - local read_size = 64 * 1024 - local decompressor = brotli.decompressor:new() - - local chunk, ok, res - repeat - chunk = read_inputs(read_size) - if chunk then - ok, res = pcall(function() - return decompressor:decompress(chunk) - end) - else - ok, res = pcall(function() - return decompressor:finish() - end) - end - if not ok then - return false, res - end - write_outputs(res) - until not chunk - - return true, nil -end - - -local function brotli_decode(data) - local inputs = str_buffer.new():set(data) - local outputs = str_buffer.new() - - local read_inputs = function(size) - local data = inputs:get(size) - if data == "" then - return nil - end - return data - end - - local write_outputs = function(data) - return outputs:put(data) - end - - local ok, err = brotli_stream_decode(read_inputs, write_outputs) - if not ok then - return nil, err - end - - return outputs:get() -end - - function _M.check_schema(conf) local ok, err = core.schema.check(schema, conf) if not ok then @@ -341,23 +262,19 @@ function _M.body_filter(conf, ctx) end local err - if ctx.response_encoding == "gzip" then - body, err = inflate_gzip(body) - if err ~= nil then - core.log.error("filters may not work as expected, inflate gzip err: ", err) + if ctx.response_encoding ~= nil then + local decoder = content_decode.dispatch_decoder(ctx.response_encoding) + if not decoder then + core.log.error("filters may not work as expected ", + "due to unsupported compression encoding type: ", + ctx.response_encoding) return end - elseif ctx.response_encoding == "br" and is_br_libs_loaded then - body, err = brotli_decode(body) + body, err = decoder(body) if err ~= nil then - core.log.error("filters may not work as expected, brotli decode err: ", err) + core.log.error("filters may not work as expected: ", err) return end - elseif ctx.response_encoding ~= nil then - core.log.error("filters may not work as expected ", - "due to unsupported compression encoding type: ", - ctx.response_encoding) - return end for _, filter in ipairs(conf.filters) do diff --git a/apisix/utils/content-decode.lua b/apisix/utils/content-decode.lua new file mode 100644 index 000000000000..c22c965fd865 --- /dev/null +++ b/apisix/utils/content-decode.lua @@ -0,0 +1,112 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local pcall = pcall +local zlib = require("ffi-zlib") +local str_buffer = require("string.buffer") +local is_br_libs_loaded, brotli = pcall(require, "brotli") +local content_decode_funcs = {} +local _M = {} + + +local function inflate_gzip(data) + local inputs = str_buffer.new():set(data) + local outputs = str_buffer.new() + + local read_inputs = function(size) + local data = inputs:get(size) + if data == "" then + return nil + end + return data + end + + local write_outputs = function(data) + return outputs:put(data) + end + + local ok, err = zlib.inflateGzip(read_inputs, write_outputs) + if not ok then + return nil, "inflate gzip err: " .. err + end + + return outputs:get() +end +content_decode_funcs.gzip = inflate_gzip + + +local function brotli_stream_decode(read_inputs, write_outputs) + -- read 64k data per times + local read_size = 64 * 1024 + local decompressor = brotli.decompressor:new() + + local chunk, ok, res + repeat + chunk = read_inputs(read_size) + if chunk then + ok, res = pcall(function() + return decompressor:decompress(chunk) + end) + else + ok, res = pcall(function() + return decompressor:finish() + end) + end + if not ok then + return false, res + end + write_outputs(res) + until not chunk + + return true, nil +end + + +local function brotli_decode(data) + local inputs = str_buffer.new():set(data) + local outputs = str_buffer.new() + + local read_inputs = function(size) + local data = inputs:get(size) + if data == "" then + return nil + end + return data + end + + local write_outputs = function(data) + return outputs:put(data) + end + + local ok, err = brotli_stream_decode(read_inputs, write_outputs) + if not ok then + return nil, "brotli decode err: " .. err + end + + return outputs:get() +end + +if is_br_libs_loaded then + content_decode_funcs.br = brotli_decode +end + + +function _M.dispatch_decoder(response_encoding) + return content_decode_funcs[response_encoding] +end + + +return _M diff --git a/apisix/utils/log-util.lua b/apisix/utils/log-util.lua index f724c2c51099..a3ff834ee9f4 100644 --- a/apisix/utils/log-util.lua +++ b/apisix/utils/log-util.lua @@ -17,9 +17,11 @@ local core = require("apisix.core") local plugin = require("apisix.plugin") local expr = require("resty.expr.v1") -local ngx = ngx +local content_decode = require("apisix.utils.content-decode") +local ngx = ngx local pairs = pairs local ngx_now = ngx.now +local ngx_header = ngx.header local os_date = os.date local str_byte = string.byte local math_floor = math.floor @@ -47,6 +49,7 @@ local function gen_log_format(format) return log_format end + local function get_custom_format_log(ctx, format) local log_format = lru_log_format(format or "", nil, gen_log_format, format) local entry = core.table.new(0, core.table.nkeys(log_format)) @@ -311,7 +314,29 @@ function _M.collect_body(conf, ctx) if not final_body then return end - ctx.resp_body = final_body + + local response_encoding = ngx_header["Content-Encoding"] + if not response_encoding then + ctx.resp_body = final_body + return + end + + local decoder = content_decode.dispatch_decoder(response_encoding) + if not decoder then + core.log.warn("unsupported compression encoding type: ", + response_encoding) + ctx.resp_body = final_body + return + end + + local decoded_body, err = decoder(final_body) + if err ~= nil then + core.log.warn("try decode compressed data err: ", err) + ctx.resp_body = final_body + return + end + + ctx.resp_body = decoded_body end end end diff --git a/t/plugin/http-logger2.t b/t/plugin/http-logger2.t index 12ee8b437d78..e8cee411f1e1 100644 --- a/t/plugin/http-logger2.t +++ b/t/plugin/http-logger2.t @@ -95,6 +95,60 @@ add_block_preprocessor(sub { } } } + + server { + listen 11451; + gzip on; + gzip_types *; + gzip_min_length 1; + location /gzip_hello { + content_by_lua_block { + ngx.req.read_body() + local s = "gzip hello world" + ngx.header['Content-Length'] = #s + 1 + ngx.say(s) + } + } + } + + server { + listen 11452; + location /brotli_hello { + content_by_lua_block { + ngx.req.read_body() + local s = "brotli hello world" + ngx.header['Content-Length'] = #s + 1 + ngx.say(s) + } + header_filter_by_lua_block { + local conf = { + comp_level = 6, + http_version = 1.1, + lgblock = 0, + lgwin = 19, + min_length = 1, + mode = 0, + types = "*", + } + local brotli = require("apisix.plugins.brotli") + brotli.header_filter(conf, ngx.ctx) + } + body_filter_by_lua_block { + local conf = { + comp_level = 6, + http_version = 1.1, + lgblock = 0, + lgwin = 19, + min_length = 1, + mode = 0, + types = "*", + } + local brotli = require("apisix.plugins.brotli") + brotli.body_filter(conf, ngx.ctx) + } + } + } + _EOC_ $block->set_value("http_config", $http_config); @@ -265,7 +319,109 @@ response.body:test-http-logger-response -=== TEST 8: test default Authorization header sent to the log server +=== TEST 8: set fetch request body and response body route - gzip +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "methods": ["GET"], + "plugins": { + "http-logger": { + "uri": "http://127.0.0.1:12001/http-logger/center?query[]=response.body", + "batch_max_size": 1, + "max_retry_count": 1, + "retry_delay": 2, + "buffer_duration": 2, + "inactive_timeout": 2, + "include_resp_body": true + } + }, + "upstream": { + "nodes": { + "127.0.0.1:11451": 1 + }, + "type": "roundrobin" + }, + "uri": "/gzip_hello" + }]]) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 9: test fetch request body and response body route +--- request +GET /gzip_hello +--- more_headers +Accept-Encoding: gzip +--- error_log +response.body:gzip hello world +--- wait: 1.5 + + + +=== TEST 10: set fetch request body and response body route - brotli +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "methods": ["GET"], + "plugins": { + "http-logger": { + "uri": "http://127.0.0.1:12001/http-logger/center?query[]=response.body", + "batch_max_size": 1, + "max_retry_count": 1, + "retry_delay": 2, + "buffer_duration": 2, + "inactive_timeout": 2, + "include_resp_body": true + } + }, + "upstream": { + "nodes": { + "127.0.0.1:11452": 1 + }, + "type": "roundrobin" + }, + "uri": "/brotli_hello" + }]]) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 11: test fetch request body and response body route +--- request +GET /brotli_hello +--- more_headers +Accept-Encoding: br +--- error_log +response.body:brotli hello world +--- wait: 1.5 + + + +=== TEST 12: test default Authorization header sent to the log server --- config location /t { content_by_lua_block { @@ -304,7 +460,7 @@ passed -=== TEST 9: hit +=== TEST 13: hit --- request POST /http-logger/test test-http-logger-request @@ -314,7 +470,7 @@ received Authorization header: [nil] -=== TEST 10: add default path +=== TEST 14: add default path --- config location /t { content_by_lua_block { @@ -352,7 +508,7 @@ passed -=== TEST 11: hit +=== TEST 15: hit --- request GET /http-logger/test --- error_log diff --git a/t/plugin/kafka-logger2.t b/t/plugin/kafka-logger2.t index 7319a2db3624..84b6f90cd4cb 100644 --- a/t/plugin/kafka-logger2.t +++ b/t/plugin/kafka-logger2.t @@ -633,7 +633,166 @@ done -=== TEST 12: set route(id: 1,include_resp_body = true,include_resp_body_expr = array) +=== TEST 12: set route include_resp_body - gzip +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "include_resp_body": true, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:11451": 1 + }, + "type": "roundrobin" + }, + "uri": "/gzip_hello" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- response_body +passed + + + +=== TEST 13: hit +--- http_config +server { + listen 11451; + gzip on; + gzip_types *; + gzip_min_length 1; + location /gzip_hello { + content_by_lua_block { + ngx.req.read_body() + local s = "gzip hello world" + ngx.header['Content-Length'] = #s + 1 + ngx.say(s) + } + } +} +--- request +GET /gzip_hello +--- more_headers +Accept-Encoding: gzip +--- error_log eval +qr/send data to kafka: \{.*"body":"gzip hello world\\n"/ +--- wait: 2 + + + +=== TEST 14: set route include_resp_body - brotli +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "include_resp_body": true, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:11452": 1 + }, + "type": "roundrobin" + }, + "uri": "/brotli_hello" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- response_body +passed + + + +=== TEST 15: hit +--- http_config +server { + listen 11452; + location /brotli_hello { + content_by_lua_block { + ngx.req.read_body() + local s = "brotli hello world" + ngx.header['Content-Length'] = #s + 1 + ngx.say(s) + } + header_filter_by_lua_block { + local conf = { + comp_level = 6, + http_version = 1.1, + lgblock = 0, + lgwin = 19, + min_length = 1, + mode = 0, + types = "*", + } + local brotli = require("apisix.plugins.brotli") + brotli.header_filter(conf, ngx.ctx) + } + body_filter_by_lua_block { + local conf = { + comp_level = 6, + http_version = 1.1, + lgblock = 0, + lgwin = 19, + min_length = 1, + mode = 0, + types = "*", + } + local brotli = require("apisix.plugins.brotli") + brotli.body_filter(conf, ngx.ctx) + } + } +} +--- request +GET /brotli_hello +--- more_headers +Accept-Encoding: br +--- error_log eval +qr/send data to kafka: \{.*"body":"brotli hello world\\n"/ +--- wait: 2 + + + +=== TEST 16: set route(id: 1,include_resp_body = true,include_resp_body_expr = array) --- config location /t { content_by_lua_block { @@ -682,7 +841,7 @@ passed -=== TEST 13: hit route, expr eval success +=== TEST 17: hit route, expr eval success --- request POST /hello?name=qwerty abcdef @@ -694,7 +853,7 @@ qr/send data to kafka: \{.*"body":"hello world\\n"/ -=== TEST 14: hit route,expr eval fail +=== TEST 18: hit route,expr eval fail --- request POST /hello?name=zcxv abcdef @@ -706,7 +865,7 @@ qr/send data to kafka: \{.*"body":"hello world\\n"/ -=== TEST 15: multi level nested expr conditions +=== TEST 19: multi level nested expr conditions --- config location /t { content_by_lua_block { @@ -758,7 +917,7 @@ passed -=== TEST 16: hit route, req_body_expr and resp_body_expr both eval success +=== TEST 20: hit route, req_body_expr and resp_body_expr both eval success --- request POST /hello?name=qwerty abcdef @@ -771,7 +930,7 @@ qr/send data to kafka: \{.*"body":"hello world\\n"/] -=== TEST 17: hit route, req_body_expr eval success, resp_body_expr both eval failed +=== TEST 21: hit route, req_body_expr eval success, resp_body_expr both eval failed --- request POST /hello?name=asdfgh abcdef @@ -785,7 +944,7 @@ qr/send data to kafka: \{.*"body":"hello world\\n"/ -=== TEST 18: hit route, req_body_expr eval failed, resp_body_expr both eval success +=== TEST 22: hit route, req_body_expr eval failed, resp_body_expr both eval success --- request POST /hello?name=zxcvbn abcdef @@ -799,7 +958,7 @@ qr/send data to kafka: \{.*"body":"abcdef"/ -=== TEST 19: hit route, req_body_expr eval success, resp_body_expr both eval failed +=== TEST 23: hit route, req_body_expr eval success, resp_body_expr both eval failed --- request POST /hello?name=xxxxxx abcdef @@ -812,7 +971,7 @@ qr/send data to kafka: \{.*"body":"hello world\\n"/] -=== TEST 20: update route(id: 1,include_req_body = true,include_req_body_expr = array) +=== TEST 24: update route(id: 1,include_req_body = true,include_req_body_expr = array) --- config location /t { content_by_lua_block { @@ -862,7 +1021,7 @@ passed -=== TEST 21: hit route, expr eval success +=== TEST 25: hit route, expr eval success --- request POST /hello?name=qwerty abcdef @@ -874,7 +1033,7 @@ qr/send data to kafka: \{.*"body":"abcdef"/ -=== TEST 22: setup route with meta_refresh_interval +=== TEST 26: setup route with meta_refresh_interval --- config location /t { content_by_lua_block { @@ -918,7 +1077,7 @@ passed -=== TEST 23: hit route, send data to kafka successfully +=== TEST 27: hit route, send data to kafka successfully --- request POST /hello abcdef diff --git a/t/plugin/rocketmq-logger2.t b/t/plugin/rocketmq-logger2.t index 7f3d054fa6ed..3b6087c16f3f 100644 --- a/t/plugin/rocketmq-logger2.t +++ b/t/plugin/rocketmq-logger2.t @@ -391,7 +391,6 @@ POST /hello?name=qwerty abcdef --- response_body hello world - --- error_log eval qr/send data to rocketmq: \{.*"body":"hello world\\n"/ --- wait: 2 @@ -410,7 +409,160 @@ qr/send data to rocketmq: \{.*"body":"hello world\\n"/ -=== TEST 13: multi level nested expr conditions +=== TEST 13: set route include_resp_body = true - gzip +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + "key" : "key1", + "timeout" : 1, + "include_resp_body": true, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:11451": 1 + }, + "type": "roundrobin" + }, + "uri": "/gzip_hello" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- response_body +passed + + + +=== TEST 14: hit +--- http_config +server { + listen 11451; + gzip on; + gzip_types *; + gzip_min_length 1; + location /gzip_hello { + content_by_lua_block { + ngx.req.read_body() + local s = "gzip hello world" + ngx.header['Content-Length'] = #s + 1 + ngx.say(s) + } + } +} +--- request +GET /gzip_hello +--- more_headers +Accept-Encoding: gzip +--- error_log eval +qr/send data to rocketmq: \{.*"body":"gzip hello world\\n"/ +--- wait: 2 + + + +=== TEST 15: set route include_resp_body - brotli +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + "key" : "key1", + "timeout" : 1, + "include_resp_body": true, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:11452": 1 + }, + "type": "roundrobin" + }, + "uri": "/brotli_hello" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- response_body +passed + + + +=== TEST 16: hit route, expr eval success +--- http_config +server { + listen 11452; + location /brotli_hello { + content_by_lua_block { + ngx.req.read_body() + local s = "brotli hello world" + ngx.header['Content-Length'] = #s + 1 + ngx.say(s) + } + header_filter_by_lua_block { + local conf = { + comp_level = 6, + http_version = 1.1, + lgblock = 0, + lgwin = 19, + min_length = 1, + mode = 0, + types = "*", + } + local brotli = require("apisix.plugins.brotli") + brotli.header_filter(conf, ngx.ctx) + } + body_filter_by_lua_block { + local conf = { + comp_level = 6, + http_version = 1.1, + lgblock = 0, + lgwin = 19, + min_length = 1, + mode = 0, + types = "*", + } + local brotli = require("apisix.plugins.brotli") + brotli.body_filter(conf, ngx.ctx) + } + } +} +--- request +GET /brotli_hello +--- more_headers +Accept-Encoding: br +--- error_log eval +qr/send data to rocketmq: \{.*"body":"brotli hello world\\n"/ +--- wait: 2 + + + +=== TEST 17: multi level nested expr conditions --- config location /t { content_by_lua_block { @@ -443,7 +595,7 @@ done -=== TEST 14: data encryption for secret_key +=== TEST 18: data encryption for secret_key --- yaml_config apisix: data_encryption: