class GRPC::Pool
Pool is a simple thread pool.
Private Class Methods
new(size, keep_alive: DEFAULT_KEEP_ALIVE)
click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 76 def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) fail 'pool size must be positive' unless size > 0 @jobs = Queue.new @size = size @stopped = false @stop_mutex = Mutex.new # needs to be held when accessing @stopped @stop_cond = ConditionVariable.new @workers = [] @keep_alive = keep_alive end
Private Instance Methods
forcibly_stop_workers()
click to toggle source
Forcibly shutdown any threads that are still alive.
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 139 def forcibly_stop_workers return unless @workers.size > 0 GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)") @workers.each do |t| next unless t.alive? begin t.exit rescue StandardError => e GRPC.logger.warn('error while terminating a worker') GRPC.logger.warn(e) end end end
jobs_waiting()
click to toggle source
Returns the number of jobs waiting
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 88 def jobs_waiting @jobs.size end
loop_execute_jobs()
click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 162 def loop_execute_jobs loop do begin blk, args = @jobs.pop blk.call(*args) rescue StandardError => e GRPC.logger.warn('Error in worker thread') GRPC.logger.warn(e) end end end
remove_current_thread()
click to toggle source
removes the threads from workers, and signal when all the threads are complete.
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 155 def remove_current_thread @stop_mutex.synchronize do @workers.delete(Thread.current) @stop_cond.signal if @workers.size.zero? end end
schedule(*args, &blk)
click to toggle source
Runs the given block on the queue with the provided args.
@param args the args passed blk when it is called @param blk the block to call
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 96 def schedule(*args, &blk) return if blk.nil? @stop_mutex.synchronize do if @stopped GRPC.logger.warn('did not schedule job, already stopped') return end GRPC.logger.info('schedule another job') @jobs << [blk, args] end end
start()
click to toggle source
Starts running the jobs in the thread pool.
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 109 def start @stop_mutex.synchronize do fail 'already stopped' if @stopped end until @workers.size == @size.to_i next_thread = Thread.new do catch(:exit) do # allows { throw :exit } to kill a thread loop_execute_jobs end remove_current_thread end @workers << next_thread end end
stop()
click to toggle source
Stops the jobs in the pool
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 125 def stop GRPC.logger.info('stopping, will wait for all the workers to exit') @workers.size.times { schedule { throw :exit } } @stop_mutex.synchronize do # wait @keep_alive for works to stop @stopped = true @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 end forcibly_stop_workers GRPC.logger.info('stopped, all workers are shutdown') end