diff options
author | Luke Gruber <luke.gruber@shopify.com> | 2025-06-23 14:33:52 -0400 |
---|---|---|
committer | John Hawthorn <john@hawthorn.email> | 2025-08-08 13:37:31 -0700 |
commit | 07878ebe787843f510be460738ff02dd883bf9ad (patch) | |
tree | 8eec4e32addd1ceb9ddc83eb3d254f7451a755f7 | |
parent | e639e5fd1af51e2462879d6db862ee5320914ba7 (diff) |
Fix lock ordering issue for rb_ractor_sched_wait() and rb_ractor_sched_wakeup()
In rb_ractor_sched_wait() (ex: Ractor.receive), we acquire
RACTOR_LOCK(cr) and then thread_sched_lock(cur_th). However, on wakeup
if we're a dnt, in thread_sched_wait_running_turn() we acquire
thread_sched_lock(cur_th) after condvar wakeup and then RACTOR_LOCK(cr).
This lock inversion can cause a deadlock with rb_ractor_wakeup_all()
(ex: port.send(obj)), where we acquire RACTOR_LOCK(other_r) and then
thread_sched_lock(other_th).
So, the error happens:
nt 1: Ractor.receive
rb_ractor_sched_wait() after condvar wakeup in thread_sched_wait_running_turn():
- thread_sched_lock(cur_th) (condvar) # acquires lock
- rb_ractor_lock_self(cr) # deadlock here: tries to acquire, HANGS
nt 2: port.send
ractor_wakeup_all()
- RACTOR_LOCK(port_r) # acquires lock
- thread_sched_lock # tries to acquire, HANGS
To fix it, we now unlock the thread_sched_lock before acquiring the
ractor_lock in rb_ractor_sched_wait().
Script that reproduces issue:
```ruby
require "async"
class RactorWrapper
def initialize
@ractor = Ractor.new do
Ractor.recv # Ractor doesn't start until explicitly told to
# Do some calculations
fib = ->(x) { x < 2 ? 1 : fib.call(x - 1) + fib.call(x - 2) }
fib.call(20)
end
end
def take_async
@ractor.send(nil)
Thread.new { @ractor.value }.value
end
end
Async do |task|
10_000.times do |i|
task.async do
RactorWrapper.new.take_async
puts i
end
end
end
exit 0
```
Fixes [Bug #21398]
Co-authored-by: John Hawthorn <john.hawthorn@shopify.com>
-rw-r--r-- | test/ruby/test_ractor.rb | 39 | ||||
-rw-r--r-- | thread_pthread.c | 13 |
2 files changed, 44 insertions, 8 deletions
diff --git a/test/ruby/test_ractor.rb b/test/ruby/test_ractor.rb index 0a456a1d0f..74de2bf9cd 100644 --- a/test/ruby/test_ractor.rb +++ b/test/ruby/test_ractor.rb @@ -162,6 +162,45 @@ class TestRactor < Test::Unit::TestCase RUBY end + # [Bug #21398] + def test_port_receive_dnt_with_port_send + assert_ractor(<<~'RUBY', timeout: 30) + THREADS = 10 + JOBS_PER_THREAD = 50 + ARRAY_SIZE = 20_000 + def ractor_job(job_count, array_size) + port = Ractor::Port.new + workers = (1..4).map do |i| + Ractor.new(port) do |job_port| + while job = Ractor.receive + result = job.map { |x| x * 2 }.sum + job_port.send result + end + end + end + jobs = Array.new(job_count) { Array.new(array_size) { rand(1000) } } + jobs.each_with_index do |job, i| + w_idx = i % 4 + workers[w_idx].send(job) + end + results = [] + jobs.size.times do + result = port.receive # dnt receive + results << result + end + results + end + threads = [] + # creates 40 ractors (THREADSx4) + THREADS.times do + threads << Thread.new do + ractor_job(JOBS_PER_THREAD, ARRAY_SIZE) + end + end + threads.each(&:join) + RUBY + end + def assert_make_shareable(obj) refute Ractor.shareable?(obj), "object was already shareable" Ractor.make_shareable(obj) diff --git a/thread_pthread.c b/thread_pthread.c index 377e1d9f64..730ecb5416 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -1351,6 +1351,7 @@ rb_ractor_sched_wait(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_fun } thread_sched_lock(sched, th); + rb_ractor_unlock_self(cr); { // setup sleep bool can_direct_transfer = !th_has_dedicated_nt(th); @@ -1358,16 +1359,12 @@ rb_ractor_sched_wait(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_fun th->status = THREAD_STOPPED_FOREVER; RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); thread_sched_wakeup_next_thread(sched, th, can_direct_transfer); - - rb_ractor_unlock_self(cr); - { - // sleep - thread_sched_wait_running_turn(sched, th, can_direct_transfer); - th->status = THREAD_RUNNABLE; - } - rb_ractor_lock_self(cr); + // sleep + thread_sched_wait_running_turn(sched, th, can_direct_transfer); + th->status = THREAD_RUNNABLE; } thread_sched_unlock(sched, th); + rb_ractor_lock_self(cr); ubf_clear(th); |