class GRPC::Pool

Pool is a simple thread pool.

Private Class Methods

new(size, keep_alive: DEFAULT_KEEP_ALIVE) click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 76
def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
  fail 'pool size must be positive' unless size > 0
  @jobs = Queue.new
  @size = size
  @stopped = false
  @stop_mutex = Mutex.new # needs to be held when accessing @stopped
  @stop_cond = ConditionVariable.new
  @workers = []
  @keep_alive = keep_alive
end

Private Instance Methods

forcibly_stop_workers() click to toggle source

Forcibly shutdown any threads that are still alive.

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 139
def forcibly_stop_workers
  return unless @workers.size > 0
  GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)")
  @workers.each do |t|
    next unless t.alive?
    begin
      t.exit
    rescue StandardError => e
      GRPC.logger.warn('error while terminating a worker')
      GRPC.logger.warn(e)
    end
  end
end
jobs_waiting() click to toggle source

Returns the number of jobs waiting

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 88
def jobs_waiting
  @jobs.size
end
loop_execute_jobs() click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 162
def loop_execute_jobs
  loop do
    begin
      blk, args = @jobs.pop
      blk.call(*args)
    rescue StandardError => e
      GRPC.logger.warn('Error in worker thread')
      GRPC.logger.warn(e)
    end
  end
end
remove_current_thread() click to toggle source

removes the threads from workers, and signal when all the threads are complete.

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 155
def remove_current_thread
  @stop_mutex.synchronize do
    @workers.delete(Thread.current)
    @stop_cond.signal if @workers.size.zero?
  end
end
schedule(*args, &blk) click to toggle source

Runs the given block on the queue with the provided args.

@param args the args passed blk when it is called @param blk the block to call

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 96
def schedule(*args, &blk)
  return if blk.nil?
  @stop_mutex.synchronize do
    if @stopped
      GRPC.logger.warn('did not schedule job, already stopped')
      return
    end
    GRPC.logger.info('schedule another job')
    @jobs << [blk, args]
  end
end
start() click to toggle source

Starts running the jobs in the thread pool.

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 109
def start
  @stop_mutex.synchronize do
    fail 'already stopped' if @stopped
  end
  until @workers.size == @size.to_i
    next_thread = Thread.new do
      catch(:exit) do  # allows { throw :exit } to kill a thread
        loop_execute_jobs
      end
      remove_current_thread
    end
    @workers << next_thread
  end
end
stop() click to toggle source

Stops the jobs in the pool

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 125
def stop
  GRPC.logger.info('stopping, will wait for all the workers to exit')
  @workers.size.times { schedule { throw :exit } }
  @stop_mutex.synchronize do  # wait @keep_alive for works to stop
    @stopped = true
    @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
  end
  forcibly_stop_workers
  GRPC.logger.info('stopped, all workers are shutdown')
end