diff options
Diffstat (limited to 'thread_sync.c')
-rw-r--r-- | thread_sync.c | 70 |
1 files changed, 39 insertions, 31 deletions
diff --git a/thread_sync.c b/thread_sync.c index bd60231789..c0a61554c1 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -946,25 +946,29 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block) check_array(self, q->que); while (RARRAY_LEN(q->que) == 0) { - if (!should_block) { - rb_raise(rb_eThreadError, "queue empty"); - } - else if (queue_closed_p(self)) { - return queue_closed_result(self, q); - } - else { - struct queue_waiter qw; + if (!should_block) { + rb_raise(rb_eThreadError, "queue empty"); + } + else if (queue_closed_p(self)) { + return queue_closed_result(self, q); + } + else { + rb_execution_context_t *ec = GET_EC(); + struct queue_waiter qw; - assert(RARRAY_LEN(q->que) == 0); - assert(queue_closed_p(self) == 0); + assert(RARRAY_LEN(q->que) == 0); + assert(queue_closed_p(self) == 0); - qw.w.th = GET_THREAD(); - qw.as.q = q; - list_add_tail(queue_waitq(qw.as.q), &qw.w.node); - qw.as.q->num_waiting++; + qw.w.self = self; + qw.w.th = ec->thread_ptr; + qw.w.fiber = ec->fiber_ptr; - rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw); - } + qw.as.q = q; + list_add_tail(queue_waitq(qw.as.q), &qw.w.node); + qw.as.q->num_waiting++; + + rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw); + } } return rb_ary_shift(q->que); @@ -1188,27 +1192,31 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self) int should_block = szqueue_push_should_block(argc, argv); while (queue_length(self, &sq->q) >= sq->max) { - if (!should_block) { - rb_raise(rb_eThreadError, "queue full"); - } - else if (queue_closed_p(self)) { + if (!should_block) { + rb_raise(rb_eThreadError, "queue full"); + } + else if (queue_closed_p(self)) { break; - } - else { - struct queue_waiter qw; - struct list_head *pushq = szqueue_pushq(sq); + } + else { + rb_execution_context_t *ec = GET_EC(); + struct queue_waiter qw; + struct list_head *pushq = szqueue_pushq(sq); - qw.w.th = GET_THREAD(); - qw.as.sq = sq; - list_add_tail(pushq, &qw.w.node); - sq->num_waiting_push++; + qw.w.self = self; + qw.w.th = ec->thread_ptr; + qw.w.fiber = ec->fiber_ptr; - rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw); - } + qw.as.sq = sq; + list_add_tail(pushq, &qw.w.node); + sq->num_waiting_push++; + + rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw); + } } if (queue_closed_p(self)) { - raise_closed_queue_error(self); + raise_closed_queue_error(self); } return queue_do_push(self, &sq->q, argv[0]); |