Skip to content

Commit

Permalink
Clear job locks on generic workers after grace period is exceeded (#477)
Browse files Browse the repository at this point in the history
Introduces a configurable grace period after which the generic worker processes will be killed.
This allows workers to finish their current job during e.g. an update without being killed after 15 seconds (bpm default).
After the worker processes are stopped/killed pending locks will be cleared which allows other workers to pick up pending jobs. Before the locks would be only cleared after the job timeout has been exceeded (default 4 hours).
This is based on the assumption that jobs processed on the generic workers are idempotent.
  • Loading branch information
johha authored Oct 16, 2024
1 parent 763bafc commit e3ee946
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ jobs:
<% if_p("cc.jobs.priorities") do |priorities| %>
priorities: <%= priorities.to_json %>
<% end %>
<% if_p("cc.jobs.number_of_worker_threads") do |number_of_worker_threads| %>
number_of_worker_threads: <%= number_of_worker_threads %>
<% end %>


cpu_weight_min_memory: <%= p("cc.cpu_weight_min_memory") %>
Expand Down
10 changes: 7 additions & 3 deletions jobs/cloud_controller_worker/spec
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ templates:
droplets_ca_cert.pem.erb: config/certs/droplets_ca_cert.pem
packages_ca_cert.pem.erb: config/certs/packages_ca_cert.pem
resource_pool_ca_cert.pem.erb: config/certs/resource_pool_ca_cert.pem
shutdown_drain.rb.erb: bin/shutdown_drain
mutual_tls_ca.crt.erb: config/certs/mutual_tls_ca.crt
mutual_tls.crt.erb: config/certs/mutual_tls.crt
mutual_tls.key.erb: config/certs/mutual_tls.key
Expand Down Expand Up @@ -123,9 +124,6 @@ properties:
description: "Maximum health check timeout (in seconds) that can be set for the app"
cc.jobs.priorities:
description: "List of hashes containing delayed jobs 'display_name' and its desired priority. This will overwrite the default priority of ccng"
cc.jobs.number_of_worker_threads:
description: "If set multiple delayed job workers will be started as threads in the same process. If not set there will be one delayed job worker per process."

cc.staging_upload_user:
description: "User name used to access internal endpoints of Cloud Controller to upload files when staging"
cc.staging_upload_password:
Expand Down Expand Up @@ -418,6 +416,12 @@ properties:
cc.jobs.generic.number_of_workers:
default: 1
description: "Number of generic cloud_controller_worker workers"
cc.jobs.generic.number_of_worker_threads:
description: "Optional. Number of worker threads to start for each generic cloud_controller_worker worker process"
cc.jobs.generic.worker_grace_period_seconds:
description: "The number of seconds to wait for each generic cloud_controller_worker worker process to finish processing jobs before forcefully shutting it down"
default: 15


uaa.clients.cc-service-dashboards.secret:
description: "Used for generating SSO clients for service brokers."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ export LD_PRELOAD=/var/vcap/packages/jemalloc/lib/libjemalloc.so
<% end %>

cd /var/vcap/packages/cloud_controller_ng/cloud_controller_ng
exec bundle exec rake jobs:generic[cc_global_worker.<%= spec.job.name %>.<%= spec.index %>.${INDEX}]

<% num_threads = p("cc.jobs.generic.number_of_worker_threads", nil) %>
<% grace_period = p("cc.jobs.generic.worker_grace_period_seconds") %>

exec bundle exec rake jobs:generic[cc_global_worker.<%= spec.job.name %>.<%= spec.index %>.${INDEX}<%= ",#{num_threads},#{grace_period.to_i - 1}" if num_threads %>]
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ jobs:
<% if_p("cc.jobs.priorities") do |priorities| %>
priorities: <%= priorities.to_json %>
<% end %>
<% if_p("cc.jobs.number_of_worker_threads") do |number_of_worker_threads| %>
number_of_worker_threads: <%= number_of_worker_threads %>
<% end %>

default_app_memory: <%= p("cc.default_app_memory") %>
default_app_disk_in_mb: <%= p("cc.default_app_disk_in_mb") %>
Expand Down
14 changes: 11 additions & 3 deletions jobs/cloud_controller_worker/templates/drain.sh.erb
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
#!/usr/bin/env bash

for i in {1..<%=p("cc.jobs.generic.number_of_workers")%>}; do
/var/vcap/jobs/bpm/bin/bpm stop cloud_controller_worker -p "worker_${i}" 1>&2
done
source /var/vcap/jobs/cloud_controller_worker/bin/ruby_version.sh
export CLOUD_CONTROLLER_NG_CONFIG=/var/vcap/jobs/cloud_controller_worker/config/cloud_controller_ng.yml

/var/vcap/jobs/cloud_controller_worker/bin/shutdown_drain 1>&2

pushd /var/vcap/packages/cloud_controller_ng/cloud_controller_ng > /dev/null || exit 1
for i in {1..<%=p("cc.jobs.generic.number_of_workers")%>}; do
# shellcheck disable=SC2093
bundle exec rake jobs:clear_pending_locks[cc_global_worker.<%= spec.job.name %>.<%= spec.index %>."${i}"] 1>&2
done
popd > /dev/null || exit 1

echo 0 # tell bosh not wait for anything
exit 0
16 changes: 16 additions & 0 deletions jobs/cloud_controller_worker/templates/shutdown_drain.rb.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/var/vcap/packages/ruby-3.2/bin/ruby --disable-all

$LOAD_PATH.unshift('/var/vcap/packages/cloud_controller_ng/cloud_controller_ng/app')
$LOAD_PATH.unshift('/var/vcap/packages/cloud_controller_ng/cloud_controller_ng/lib')

require 'cloud_controller/drain'

@threads = []
@grace_period = <%= p("cc.jobs.generic.worker_grace_period_seconds") %>
@drain = VCAP::CloudController::Drain.new('/var/vcap/sys/log/cloud_controller_worker')

(1..<%= p("cc.jobs.generic.number_of_workers") %>).each do |i|
@threads << Thread.new { @drain.shutdown_delayed_worker("/var/vcap/sys/run/bpm/cloud_controller_worker/worker_#{i}.pid", @grace_period.to_i) }
end

@threads.each(&:join)
11 changes: 0 additions & 11 deletions spec/cloud_controller_ng/cloud_controller_ng_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -711,17 +711,6 @@ module Test
end
end

describe 'cc_jobs_number_of_worker_threads' do
context "when 'cc.jobs.number_of_worker_threads' is set" do
before { merged_manifest_properties['cc']['jobs'] = { 'number_of_worker_threads' => 7 } }

it 'renders the correct value into the ccng config' do
template_hash = YAML.safe_load(template.render(merged_manifest_properties, consumes: links))
expect(template_hash['jobs']['number_of_worker_threads']).to eq(7)
end
end
end

describe 'cc_jobs_queues' do
context 'when cc.jobs.queues is not set' do
it 'does not render ccng config' do
Expand Down
11 changes: 0 additions & 11 deletions spec/cloud_controller_worker/cloud_controller_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -250,17 +250,6 @@ module Test
end
end

describe 'cc_jobs_number_of_worker_threads' do
context "when 'cc.jobs.number_of_worker_threads' is set" do
before { manifest_properties['cc']['jobs'] = { 'number_of_worker_threads' => 7 } }

it 'renders the correct value into the ccng config' do
template_hash = YAML.safe_load(template.render(manifest_properties, consumes: links))
expect(template_hash['jobs']['number_of_worker_threads']).to eq(7)
end
end
end

describe 'cc_jobs_queues' do
context 'when cc.jobs.queues is not set' do
it 'does not render ccng config' do
Expand Down
66 changes: 66 additions & 0 deletions spec/cloud_controller_worker/drain_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# frozen_string_literal: true

require 'rspec'
require 'bosh/template/test'

module Bosh
module Template
module Test
describe 'drain template rendering' do
let(:release_path) { File.join(File.dirname(__FILE__), '../..') }
let(:release) { ReleaseDir.new(release_path) }
let(:job) { release.job('cloud_controller_worker') }

describe 'bin/shutdown_drain' do
let(:template) { job.template('bin/shutdown_drain') }

it 'renders the default value' do
rendered_file = template.render({}, consumes: {})
expect(rendered_file).to include('@grace_period = 15')
end

context "when 'worker_grace_period_seconds' is provided" do
it 'renders the provided value' do
rendered_file = template.render({ 'cc' => { 'jobs' => { 'generic' => { 'worker_grace_period_seconds' => 60 } } } }, consumes: {})
expect(rendered_file).to include('@grace_period = 60')
end
end

it 'renders the default number of workers' do
rendered_file = template.render({}, consumes: {})
expect(rendered_file).to include('(1..1).each do |i|')
end

context "when 'number_of_workers' is provided" do
it 'renders the provided number of workers' do
rendered_file = template.render({ 'cc' => { 'jobs' => { 'generic' => { 'number_of_workers' => 5 } } } }, consumes: {})
expect(rendered_file).to include('(1..5).each do |i|')
end
end
end

describe 'bin/drain' do
let(:template) { job.template('bin/drain') }

it 'renders the default number of workers' do
rendered_file = template.render({}, consumes: {})
expect(rendered_file).to include('for i in {1..1}; do')
end

context "when 'number_of_workers' is provided" do
it 'renders the provided number of workers' do
rendered_file = template.render({ 'cc' => { 'jobs' => { 'generic' => { 'number_of_workers' => 5 } } } }, consumes: {})
expect(rendered_file).to include('for i in {1..5}; do')
end
end

it 'renders the job name and index' do
rendered_file = template.render({ 'job_name' => 'cc-worker' }, consumes: {})
# Default job name is 'me' in tests (bosh-template)
expect(rendered_file).to include('bundle exec rake jobs:clear_pending_locks[cc_global_worker.me.0."${i}"]')
end
end
end
end
end
end

0 comments on commit e3ee946

Please sign in to comment.