ruby 并发任务处理线程池模型

ruby

class ConcurrencyV2
  SLEEP_DURATION = 0.0000001
  READY_STATE = 'ready'
  PROCESSING_STATE = 'processing'
  def initialize(**options)
    # 经过测试,常见计算机20线程左右能比较好的工作
    # 线程数量过大,CPU上下文切换成本增加,效率反而降低
    @max_thread = options[:max] || 22
    @sleep_duration = options[:sleep_duration] || SLEEP_DURATION
    pool = Rails.configuration.database_configuration[Rails.env]["pool"]
    Rails.logger.warn("You set max thread #@max_thread, but your database config pool is #{pool}") if pool < @max_thread
    # @task_name = options[:task_name] || SecureRandom.uuid
    @threads = []
    while @threads.length < @max_thread
      @threads << Thread.new do
        thr_start = proc do
          loop do
            Thread.current[:status] = READY_STATE
            Thread.stop
            Thread.current[:status] = PROCESSING_STATE
            Thread.current[:exec].call
          end
        end
        if options[:with_db_query] || true
          ActiveRecord::Base.connection_pool.with_connection(&thr_start)
        else
          thr_start.call
        end
      end
    end
  end

  def exec(&blk)
    t = nil
    loop do
      t = @threads.detect{ |thr| thr[:status] == READY_STATE }
      sleep(@sleep_duration)
      break if t
    end
    t[:exec] = blk
    t.run
  end

  def promise
    sleep(@sleep_duration) until @threads.all? { |t| t[:status] == READY_STATE }
    @threads.each(&:kill)
    yield if block_given?
  end
end

发表于 2020.05.26