ruby
class ConcurrencyV2
SLEEP_DURATION = 0.0000001
READY_STATE = 'ready'
PROCESSING_STATE = 'processing'
def initialize(**options)
# 经过测试,常见计算机20线程左右能比较好的工作
# 线程数量过大,CPU上下文切换成本增加,效率反而降低
@max_thread = options[:max] || 22
@sleep_duration = options[:sleep_duration] || SLEEP_DURATION
pool = Rails.configuration.database_configuration[Rails.env]["pool"]
Rails.logger.warn("You set max thread #@max_thread, but your database config pool is #{pool}") if pool < @max_thread
# @task_name = options[:task_name] || SecureRandom.uuid
@threads = []
while @threads.length < @max_thread
@threads << Thread.new do
thr_start = proc do
loop do
Thread.current[:status] = READY_STATE
Thread.stop
Thread.current[:status] = PROCESSING_STATE
Thread.current[:exec].call
end
end
if options[:with_db_query] || true
ActiveRecord::Base.connection_pool.with_connection(&thr_start)
else
thr_start.call
end
end
end
end
def exec(&blk)
t = nil
loop do
t = @threads.detect{ |thr| thr[:status] == READY_STATE }
sleep(@sleep_duration)
break if t
end
t[:exec] = blk
t.run
end
def promise
sleep(@sleep_duration) until @threads.all? { |t| t[:status] == READY_STATE }
@threads.each(&:kill)
yield if block_given?
end
end
发表于 2020.05.26
© 自由转载 - 非商用 - 非衍生 - 保持署名