Security Virtual Roundtable: Join DZone SMEs as they discuss software supply chains, the role of AI/ML in security, CNAPP, and more.

Data Engineering: Work with DBs? Build data pipelines? Or maybe you're exploring AI-driven data capabilities? We want to hear your insights.

Enterprise Security: Now is the time to ensure your systems are secure. Expand your org's tactics and put future attackers in their place.

Secrets Management: Learn key strategies for secrets management to enhance visibility. Identify and mitigate potential security risks.

Kubernetes: (Graceful) Sidekiq Worker Lifecycle

Here is a scripted solution to achieve graceful shutdown of Sidekiq workers via Pre-Stop hooks, part of the Kubernetes pod shutdown lifecycle.

By  · Tutorial
Save
5.0K Views

With our recent release of Container Stacks v2 into public beta, we're totally loving Kubernetes. But as with all love affairs, there are some bothersome aspects that we have to accept and work with. One such aspect is in the inflexibility of the vanilla shutdown sequence provided by Kubernetes.

We're also prolific users of Sidekiq for the parts of our backend that are Ruby-based (we're running a bunch of other technologies, but we think Sidekiq is hands-down the best for running Ruby jobs). As with any background workers, Sidekiq is sensitive to its shutdown sequence. We need to have more control over this.

Problem

There is a lot of documentation out there around the current Kubernetes pod shutdown sequence (see appendices for some starting points). NOTE: I say current as this information is only valid as of now... this might change (though I think at this point that is fairly unlikely). The current shutdown sequence looks like the following:

  1. POD marked as *terminating*
  2. Optional: PreStop hook called synchronously
  3. SIGTERM sent to container process if still present
  4. Kubernetes waits upto *grace-period* for container to exit
  5. SIGKILL sent to container process if still present

Kubernetes allows you to specify the terminationGracePeriodSeconds (ie. how long it will wait for shutdown after SIGTERM sent) in your spec. Unfortunately, Kubernetes doesn't allow you to specify the shutdown sequence itself.

At Cloud 66, we were previously lucky enough to be controlling the shutdown process via our own homegrown scheduler, this enabled us to expose the shutdown sequence to our users directly (in the form of USR1;1h;TERM;10s;KILL , for example). But now we need another solution.

Furthermore (and specific to Sidekiq) as we have some very long running jobs (dependent on external resources), we want to have a long wait time; but also want to terminate the workers as soon as they are no longer busy. So our ideal Sidekiq shutdown sequence looks like the following:

  1. Send USR1 (or TSTP for sidekiq > 5.0.0) to workers 
  2. Wait until they are no longer processing jobs
  3. Send TERM

Solution: Use a Pre-Stop Hook

Looking at the shutdown sequence above, you'll see that there is a Pre-Stop hook point called during the sequence. More on this can be found in the Kubernetes Container Lifecycle Hooks documentation. The salient bit of information is, essentially, that Kubernetes will execute some command of your choosing at that hook point, and it will execute it synchronously, waiting for the command to complete before resuming the shutdown sequence.

Using this hook point, we can inject the graceful shutdown behavior we want for our Sidekiq workers. And because we need this ourselves (and given that Sidekiq is Ruby-based) I put together the following ruby script to do just that!

#! /usr/bin/env ruby
# encoding: utf-8

## PURPOSE: this script will quiet any sidekiq workers it finds,
## and then shut them down when they are no longer handling jobs

# utility class for logging and running commands
class Utils
	require 'open3'
	attr_accessor :output

	def initialize(output)
		@output = output
	end

	def run_command(command)
		log("RUNNING: \"#{command}\"")
		stdout, stderr, status = Open3.capture3(command)
		return stdout.strip if status.success?
		# handle errors
		stderr = stderr.strip
		if stderr.empty?
			log("FATAL:\nCommand: \"#{command}\"")
		else
			log("FATAL:\nCommand: \"#{command}\"\nError: #{stderr}")
		end
		exit(-1)
	end

	def log(message)
		line = "[#{Time.now}] #{message}"
		@output == 'stdout' ? puts(line) : File.open(@output, 'a') {|file| file.puts(line)}
	end

	def log_underline
		log('-' * 70)
	end
end

# class to encapsulate the worker manager
class WorkerManager
	attr_accessor :timeout, :utils

	STATUS_WAITING_THREADS = :waiting_threads
	STATUS_CAN_BE_TERMINATED = :can_be_terminated
	STATUS_CAN_BE_QUIETED = :can_be_quieted

	POLL_FREQUENCY = 10

	def initialize(timeout, utils)
		@timeout = timeout
		@utils = utils
	end

	def initiate_shutdown
		@utils.log('*******************************')
		@utils.log('** STARTED SHUTDOWN SEQUENCE **')
		@utils.log('*******************************')
		# figure out the timeout time
		current_time = Time.now
		timeout_time = current_time + @timeout
		# fetch latest worker info
		workers = materialize_workers
		while Time.now <= timeout_time && !workers.empty?
			# do what is needed for each worker
			workers.each {|worker| worker.handle_shutdown(false)}
			# sleep for the poll time
			@utils.log("...sleeping for #{POLL_FREQUENCY} seconds...")
			sleep(POLL_FREQUENCY)
			# fetch latest worker info
			workers = materialize_workers
		end
		if Time.now > timeout_time && !workers.empty?
			@utils.log('[[ TIMED-OUT ]]')
			# fetch latest worker info
			workers = materialize_workers
			# do what is needed for each worker
			workers.each {|worker| worker.handle_shutdown(true)}
			# give process time to respond to the signals
			@utils.log("...sleeping for #{POLL_FREQUENCY} seconds...")
			sleep(POLL_FREQUENCY)
		end
	end

	private

	def materialize_workers
		workers = []
		stdout = @utils.run_command('ps aux | grep [s]idekiq | grep busy\] || true')
		stdout.lines.each do |line|
			line = line.strip
			if line =~ Worker::WORKER_REGEX
				pid = $~[:pid].to_i
				version = $~[:version]
				active_threads = $~[:worker_count].to_i
				total_threads = $~[:total_threads].to_i
				is_quiet = line =~ /stopping$/
				worker = Worker.new(pid, version, active_threads, total_threads, is_quiet, @utils)
				workers << worker
			end
		end
		@utils.log_underline
		if workers.empty?
			@utils.log('CURRENT STATE: No workers found!')
		else
			@utils.log('CURRENT STATE:')
			workers.each {|worker| @utils.log(worker.status_text)}
		end
		@utils.log_underline
		return workers
	end

end

# class to encapsulate workers
class Worker
	attr_accessor :utils, :pid, :status, :active_threads, :total_threads, :version

	STATUS_WAITING_THREADS = :waiting_threads
	STATUS_CAN_BE_TERMINATED = :can_be_terminated
	STATUS_CAN_BE_QUIETED = :can_be_quieted

	WORKER_REGEX = /^.*?\s+(?<pid>\d+).*sidekiq\s+(?<version>[\d\.]+).*?\[(?<worker_count>\d+)\sof\s(?<total_threads>\d+) busy\]/

	def initialize(pid, version, active_threads, total_threads, is_quiet, utils)
		@utils = utils
		@pid = pid
		@version = version
		@active_threads = active_threads
		@total_threads = total_threads
		@status = parse_status(active_threads, is_quiet)
	end

	def status_text
		output = @status == STATUS_CAN_BE_QUIETED ? '[ACTIVE]' : '[QUIET]'
		output = "#{output} [PID:#{@pid}] [VERSION:#{@version}] [#{@active_threads} of #{@total_threads}]"
		return "#{output} - waiting for threads to complete" if status == STATUS_WAITING_THREADS
		return "#{output} - can be terminated" if status == STATUS_CAN_BE_TERMINATED
		return "#{output} - can be quieted" if status == STATUS_CAN_BE_QUIETED
	end

	def handle_shutdown(aggressive)
		if aggressive
			# kill worker
			@utils.run_command("kill -9 #{@pid}")
		else
			if @status == STATUS_CAN_BE_QUIETED
				major_version = @version.gsub(/\..*/,'').to_i
				if major_version < 5
					# quiet worker
					@utils.run_command("kill -USR1 #{@pid}")
				else
					# quiet worker
					@utils.run_command("kill -TSTP #{@pid}")
				end
			elsif @status == STATUS_CAN_BE_TERMINATED
				# stop worker
				@utils.run_command("kill -TERM #{@pid}")
			end
		end
	end

	private

	def parse_status(active_threads, is_quiet)
		return STATUS_CAN_BE_QUIETED unless is_quiet
		return STATUS_WAITING_THREADS if active_threads > 0
		STATUS_CAN_BE_TERMINATED
	end
end

# parse arguments
require 'optparse'
require 'ostruct'
options = OpenStruct.new
options.timeout = 120
options.output = 'stdout'
OptionParser.new do |opts|
	opts.banner = 'Usage: sidekiq_safe_shutdown.rb [options]'
	opts.on('-o [ARG]', '--output [ARG]', 'File-path or stdout (default: stdout)') {|v| options.output = v}
	opts.on('-t [ARG]', '--timeout [ARG]', 'Timeout in seconds (default: 120)') {|v| options.timeout = v}
	opts.on('-h', '--help', 'Display this help') do
		puts opts
		exit
	end
end.parse!


# handle timeou
utils = Utils.new(options.output)
options.timeout = options.timeout.to_i
if options.timeout < 10
	utils.log("FATAL:\nTimeout #{options.timeout} too short!")
	exit(-1)
end

# initiate shutdown
WorkerManager.new(options.timeout, utils).initiate_shutdown


As the hook command executes in the context of your image, you'll need to include this script inside your image (simply put it in your source code if you're using Cloud 66 SkyCap). Note that the script is executed with the following arguments:

Usage: sidekiq_safe_shutdown.rb [options]  
    -o, --output [ARG]     File-path or stdout (default: stdout)
    -t, --timeout [ARG]    Timeout in seconds (default: 120)
    -h, --help             Display this help


For the example, below we're putting this script in our image in the path: 

 /tmp/sidekiq_safe_shutdown.rb 

And don't forget to make it executable with:

chmod +x /tmp/sidekiq_safe_shutdown.rb

Invoking via Kubernetes Manually

If you're running Kubes directly, then you'll need to manually modify your pod spec to include terminationGracePeriodSeconds and invoking the Pre-Stop hook:

spec:  
  #with default timeout
  terminationGracePeriodSeconds: 15

  #or with specific timeout
  terminationGracePeriodSeconds: 3605
lifecycle:  
  preStop: 
    exec: 
      #with default timeout
      command: ["/tmp/sidekiq_safe_shutdown.rb"]

      #or with specific timeout
      command: ["/tmp/sidekiq_safe_shutdown.rb", "-t", "3600"]


Invoking via Cloud 66

If you're running via our awesome Container Stacks v2, then simply add this script to your service.yml with the following line:

#with default timeout
pre_stop_command: /tmp/sidekiq_safe_shutdown.rb  
stop_grace: 15s

#or with specific timeout
pre_stop_command: /tmp/sidekiq_safe_shutdown.rb -t 3600  
stop_grace: 3605s  


And that should be all you need — now when your Sidekiq workers shut down, they will do so gracefully!

Appendices (Further Reading)

Published at DZone with permission of Vic van Gool, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.


Comments