Skip to content

Commit

Permalink
Merge pull request #1115 from senid231/improve-calls-monitoring
Browse files Browse the repository at this point in the history
Improve calls monitoring
  • Loading branch information
Fivell authored Feb 22, 2022
2 parents eeed271 + 46166aa commit 78fdaa4
Show file tree
Hide file tree
Showing 15 changed files with 566 additions and 92 deletions.
8 changes: 5 additions & 3 deletions app/jobs/base_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ def scheduler_options
end

def call
after_start
execute
before_finish
logger.tagged(self.class.name) do
after_start
execute
before_finish
end
end

def execute
Expand Down
205 changes: 121 additions & 84 deletions app/jobs/jobs/calls_monitoring.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,43 @@ def after_start
end

def execute
detect_customers_calls_to_reject
detect_customers_auth_calls_to_reject
detect_vendors_calls_to_reject
detect_gateway_calls_to_reject
detect_random_calls_to_reject
log_time('detect_customers_calls_to_reject') do
detect_customers_calls_to_reject
end

log_time('detect_customers_auth_calls_to_reject') do
detect_customers_auth_calls_to_reject
end

log_time('detect_vendors_calls_to_reject') do
detect_vendors_calls_to_reject
end

log_time('detect_gateway_calls_to_reject') do
detect_gateway_calls_to_reject
end

log_time('detect_random_calls_to_reject') do
detect_random_calls_to_reject
end
end

def before_finish
log_time('save_stats') do
save_stats
end

log_time('send_prometheus_metrics') do
send_prometheus_metrics
end

log_time('terminate_calls!') do
terminate_calls!
end
end

private

# random_disconnect_enable | f
# random_disconnect_length | 7000
def detect_random_calls_to_reject
Expand All @@ -172,9 +202,7 @@ def detect_customers_calls_to_reject
account = active_customers_balances[acc_id]

if account
call_collection = CallCollection.new(calls,
key: :destination,
account: account)
call_collection = CallCollection.new(calls, key: :destination, account: account)

if call_collection.exceed_min_balance?
@terminate_calls.merge!(
Expand Down Expand Up @@ -254,6 +282,87 @@ def detect_gateway_calls_to_reject
end
end

def send_prometheus_metrics
return unless PrometheusConfig.enabled?

metrics = []

total = active_calls.values.sum(&:count)
metrics << ActiveCallsProcessor.collect(total: total)

customers_active_calls.each do |account_id, calls|
account_external_id = calls.first[:customer_acc_external_id]
collection = CallCollection.new(calls, key: :destination, account: [])
src_prefixes = calls.map { |c| c[:src_prefix_routing] }
dst_prefixes = calls.map { |c| c[:dst_prefix_routing] }

metrics << ActiveCallsProcessor.collect(
account_originated: calls.count,
account_originated_unique_src: src_prefixes.uniq.count,
account_originated_unique_dst: dst_prefixes.uniq.count,
account_price_originated: collection.total_calls_cost,
labels: { account_external_id: account_external_id, account_id: account_id }
)
end

vendors_active_calls.each do |account_id, calls|
account_external_id = calls.first[:vendor_acc_external_id]
collection = CallCollection.new(calls, key: :dialpeer, account: [])

metrics << ActiveCallsProcessor.collect(
account_terminated: calls.count,
account_price_terminated: collection.total_calls_cost,
labels: { account_external_id: account_external_id, account_id: account_id }
)
end

client = PrometheusExporter::Client.default
metrics.each { |metric| client.send_json(metric) }
end

def save_stats
Stats::ActiveCall.transaction do
ActiveCalls::CreateStats.call(
calls: active_calls,
current_time: now
)

if YetiConfig.calls_monitoring.write_account_stats
ActiveCalls::CreateAccountStats.call(
customer_calls: customers_active_calls,
vendor_calls: vendors_active_calls,
current_time: now
)
end
if YetiConfig.calls_monitoring.write_gateway_stats
ActiveCalls::CreateOriginationGatewayStats.call(
calls: flatten_calls.group_by { |c| c[:orig_gw_id] },
current_time: now
)
ActiveCalls::CreateTerminationGatewayStats.call(
calls: flatten_calls.group_by { |c| c[:term_gw_id] },
current_time: now
)
end
end
end

def terminate_calls!
logger.info { "Going to terminate #{@terminate_calls.keys.size} call(s)." }
nodes = Node.all.index_by(&:id)
@terminate_calls.each do |local_tag, call|
logger.warn { "Terminate call Node: #{call[:node_id]}, local_tag :#{local_tag}" }
begin
node_id = call[:node_id].to_i
nodes[node_id].drop_call(local_tag)
rescue StandardError => e
node_id = call.is_a?(Hash) ? call[:node_id] : nil
capture_error(e, extra: { local_tag: local_tag, node_id: node_id })
logger.error "#{e.class} #{e.message}"
end
end
end

def flatten_calls
@flatten_calls ||= active_calls.values.flatten
end
Expand Down Expand Up @@ -336,82 +445,10 @@ def active_calls
end
end

def before_finish
save_stats
send_prometheus_metrics
terminate_calls!
end

private

def send_prometheus_metrics
return unless PrometheusConfig.enabled?

metrics = []

total = active_calls.values.sum(&:count)
metrics << ActiveCallsProcessor.collect(total: total)

customers_active_calls.each do |account_id, calls|
account_external_id = calls.first[:customer_acc_external_id]
collection = CallCollection.new(calls, key: :destination, account: [])
src_prefixes = calls.map { |c| c[:src_prefix_routing] }
dst_prefixes = calls.map { |c| c[:dst_prefix_routing] }

metrics << ActiveCallsProcessor.collect(
account_originated: calls.count,
account_originated_unique_src: src_prefixes.uniq.count,
account_originated_unique_dst: dst_prefixes.uniq.count,
account_price_originated: collection.total_calls_cost,
labels: { account_external_id: account_external_id, account_id: account_id }
)
end

vendors_active_calls.each do |account_id, calls|
account_external_id = calls.first[:vendor_acc_external_id]
collection = CallCollection.new(calls, key: :dialpeer, account: [])

metrics << ActiveCallsProcessor.collect(
account_terminated: calls.count,
account_price_terminated: collection.total_calls_cost,
labels: { account_external_id: account_external_id, account_id: account_id }
)
end

client = PrometheusExporter::Client.default
metrics.each { |metric| client.send_json(metric) }
end

def save_stats
Stats::ActiveCall.transaction do
Stats::ActiveCall.create_stats(active_calls, now)
if YetiConfig.calls_monitoring.write_account_stats
ActiveCalls::CreateAccountStats.call(
customer_calls: customers_active_calls,
vendor_calls: vendors_active_calls,
current_time: now
)
end
orig_gw_grouped_calls = flatten_calls.group_by { |c| c[:orig_gw_id] }
Stats::ActiveCallOrigGateway.create_stats(orig_gw_grouped_calls, now)
term_gw_grouped_calls = flatten_calls.group_by { |c| c[:term_gw_id] }
Stats::ActiveCallTermGateway.create_stats(term_gw_grouped_calls, now)
end
end

def terminate_calls!
nodes = Node.all.index_by(&:id)
@terminate_calls.each do |local_tag, call|
logger.warn { "CallsMonitoring#terminate_calls! Node #{call[:node_id]}, local_tag :#{local_tag}" }
begin
node_id = call[:node_id].to_i
nodes[node_id].drop_call(local_tag)
rescue StandardError => e
node_id = call.is_a?(Hash) ? call[:node_id] : nil
capture_error(e, extra: { local_tag: local_tag, node_id: node_id })
logger.error e.message
end
end
def log_time(name, &block)
logger.info { "Operation #{name} started." }
seconds = logger.tagged(name) { ::Benchmark.realtime(&block) }
logger.info { format("Operation #{name} finished %.6f sec.", seconds) }
end
end
end
4 changes: 0 additions & 4 deletions app/models/concerns/chart.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ module Chart
end

class_methods do
def create_stats(calls = {}, now_time)
super calls, now_time, chart_entity_klass.all, chart_entity_column
end

def to_chart(id, options = {})
time_column = options.delete(:time_column) || :created_at
count_column = options.delete(:count_column) || :count
Expand Down
39 changes: 39 additions & 0 deletions app/services/active_calls/create_origination_gateway_stats.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

module ActiveCalls
class CreateOriginationGatewayStats < ApplicationService
parameter :calls, required: true
parameter :current_time, required: true

def call
attrs_list = build_calls_attrs_list
missing_gateway_ids = Gateway.where.not(id: calls.keys).pluck(:id)
attrs_list.concat build_empty_attrs_list(missing_gateway_ids)
return if attrs_list.empty?

Stats::ActiveCallOrigGateway.insert_all!(attrs_list)
end

private

def build_calls_attrs_list
calls.map do |gateway_id, sub_calls|
{
count: sub_calls.count,
created_at: current_time,
gateway_id: gateway_id
}
end
end

def build_empty_attrs_list(gateway_ids)
gateway_ids.map do |gateway_id|
{
count: 0,
created_at: current_time,
gateway_id: gateway_id
}
end
end
end
end
39 changes: 39 additions & 0 deletions app/services/active_calls/create_stats.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

module ActiveCalls
class CreateStats < ApplicationService
parameter :calls, required: true
parameter :current_time, required: true

def call
attrs_list = build_calls_attrs_list
missing_node_ids = Node.where.not(id: calls.keys).pluck(:id)
attrs_list.concat build_empty_attrs_list(missing_node_ids)
return if attrs_list.empty?

Stats::ActiveCall.insert_all!(attrs_list)
end

private

def build_calls_attrs_list
calls.map do |node_id, sub_calls|
{
count: sub_calls.count,
created_at: current_time,
node_id: node_id
}
end
end

def build_empty_attrs_list(node_ids)
node_ids.map do |node_id|
{
count: 0,
created_at: current_time,
node_id: node_id
}
end
end
end
end
39 changes: 39 additions & 0 deletions app/services/active_calls/create_termination_gateway_stats.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

module ActiveCalls
class CreateTerminationGatewayStats < ApplicationService
parameter :calls, required: true
parameter :current_time, required: true

def call
attrs_list = build_calls_attrs_list
missing_gateway_ids = Gateway.where.not(id: calls.keys).pluck(:id)
attrs_list.concat build_empty_attrs_list(missing_gateway_ids)
return if attrs_list.empty?

Stats::ActiveCallTermGateway.insert_all!(attrs_list)
end

private

def build_calls_attrs_list
calls.map do |gateway_id, sub_calls|
{
count: sub_calls.count,
created_at: current_time,
gateway_id: gateway_id
}
end
end

def build_empty_attrs_list(gateway_ids)
gateway_ids.map do |gateway_id|
{
count: 0,
created_at: current_time,
gateway_id: gateway_id
}
end
end
end
end
1 change: 1 addition & 0 deletions config/initializers/_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def self.setting_files(config_root, _env)

required(:calls_monitoring).schema do
required(:write_account_stats).value(:bool?)
required(:write_gateway_stats).value(:bool?)
end

required(:api).schema do
Expand Down
1 change: 1 addition & 0 deletions config/yeti_web.yml.ci
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ site_title: "Yeti Admin"
site_title_image: "yeti.png"
calls_monitoring:
write_account_stats: true
write_gateway_stats: true
api:
token_lifetime: 600 # jwt token lifetime in seconds, empty string means permanent tokens
cdr_export:
Expand Down
Loading

0 comments on commit 78fdaa4

Please sign in to comment.