From 8f77740e3bae97eae50ac17132b133db50da197b Mon Sep 17 00:00:00 2001 From: Lars Kanis Date: Tue, 18 Apr 2023 10:06:44 +0200 Subject: Use a Ractor-local callback dispatcher for calls from non-ruby threads The callback is now routet to the Ractor, that created the FFI::Function . Within this Ractor a new ruby thread is created to execute the Proc, like before. --- ext/ffi_c/Function.c | 212 ++++++++++++++++++++++++++++------------ spec/ffi/async_callback_spec.rb | 23 +++++ 2 files changed, 171 insertions(+), 64 deletions(-) diff --git a/ext/ffi_c/Function.c b/ext/ffi_c/Function.c index a74136a..f88bc93 100644 --- a/ext/ffi_c/Function.c +++ b/ext/ffi_c/Function.c @@ -42,6 +42,10 @@ #include #include +#if HAVE_RB_EXT_RACTOR_SAFE +#include +#endif + #include #if defined(HAVE_NATIVETHREAD) && !defined(_WIN32) #include @@ -65,6 +69,9 @@ #include "MethodHandle.h" #include "Function.h" +#define DEFER_ASYNC_CALLBACK 1 + +struct async_cb_dispatcher; typedef struct Function_ { Pointer base; FunctionType* info; @@ -73,6 +80,9 @@ typedef struct Function_ { Closure* closure; VALUE rbProc; VALUE rbFunctionInfo; +#if defined(DEFER_ASYNC_CALLBACK) + struct async_cb_dispatcher *dispatcher; +#endif } Function; static void function_mark(void *data); @@ -86,9 +96,6 @@ static void* callback_with_gvl(void* data); static VALUE invoke_callback(VALUE data); static VALUE save_callback_exception(VALUE data, VALUE exc); -#define DEFER_ASYNC_CALLBACK 1 - - #if defined(DEFER_ASYNC_CALLBACK) static VALUE async_cb_event(void *); static VALUE async_cb_call(void *); @@ -113,10 +120,6 @@ static const rb_data_type_t function_data_type = { VALUE rbffi_FunctionClass = Qnil; -#if defined(DEFER_ASYNC_CALLBACK) -static VALUE async_cb_thread = Qnil; -#endif - static ID id_call = 0, id_to_native = 0, id_from_native = 0, id_cbtable = 0, id_cb_ref = 0; struct gvl_callback { @@ -126,7 +129,10 @@ struct gvl_callback { bool done; rbffi_frame_t *frame; #if defined(DEFER_ASYNC_CALLBACK) + struct async_cb_dispatcher *dispatcher; struct gvl_callback* next; + + /* Signal when the callback has finished and retval is set */ # ifndef _WIN32 pthread_cond_t async_cond; pthread_mutex_t async_mutex; @@ -138,16 +144,74 @@ struct gvl_callback { #if defined(DEFER_ASYNC_CALLBACK) -static struct gvl_callback* async_cb_list = NULL; +struct async_cb_dispatcher { + /* the Ractor-local dispatcher thread */ + VALUE thread; + + /* single linked list of pending callbacks */ + struct gvl_callback* async_cb_list; + + /* Signal new entries in async_cb_list */ # ifndef _WIN32 - static pthread_mutex_t async_cb_mutex = PTHREAD_MUTEX_INITIALIZER; - static pthread_cond_t async_cb_cond = PTHREAD_COND_INITIALIZER; + pthread_mutex_t async_cb_mutex; + pthread_cond_t async_cb_cond; # else - static HANDLE async_cb_cond; - static CRITICAL_SECTION async_cb_lock; + HANDLE async_cb_cond; + CRITICAL_SECTION async_cb_lock; # endif -#endif +}; +#if HAVE_RB_EXT_RACTOR_SAFE +static void +async_cb_dispatcher_mark(void *ptr) +{ + struct async_cb_dispatcher *ctx = (struct async_cb_dispatcher *)ptr; + rb_gc_mark(ctx->thread); +} + +static void +async_cb_dispatcher_free(void *ptr) +{ + struct async_cb_dispatcher *ctx = (struct async_cb_dispatcher *)ptr; + xfree(ctx); +} + +struct rb_ractor_local_storage_type async_cb_dispatcher_key_type = { + async_cb_dispatcher_mark, + async_cb_dispatcher_free, +}; + +static rb_ractor_local_key_t async_cb_dispatcher_key; + +static struct async_cb_dispatcher * +async_cb_dispatcher_get(void) +{ + struct async_cb_dispatcher *ctx = (struct async_cb_dispatcher *)rb_ractor_local_storage_ptr(async_cb_dispatcher_key); + return ctx; +} + +static void +async_cb_dispatcher_set(struct async_cb_dispatcher *ctx) +{ + rb_ractor_local_storage_ptr_set(async_cb_dispatcher_key, ctx); +} +#else +// for ruby 2.x +static struct async_cb_dispatcher *async_cb_dispatcher = NULL; + +static struct async_cb_dispatcher * +async_cb_dispatcher_get(void) +{ + return async_cb_dispatcher; +} + +static void +async_cb_dispatcher_set(struct async_cb_dispatcher *ctx) +{ + async_cb_dispatcher = ctx; +} +#endif +#endif static VALUE function_allocate(VALUE klass) @@ -328,9 +392,7 @@ static void after_fork_callback(void) { /* Ensure that a new dispatcher thread is started in a forked process */ - async_cb_thread = Qnil; - pthread_mutex_init(&async_cb_mutex, NULL); - pthread_cond_init(&async_cb_cond, NULL); + async_cb_dispatcher_set(NULL); } #endif @@ -360,17 +422,30 @@ function_init(VALUE self, VALUE rbFunctionInfo, VALUE rbProc) } #if defined(DEFER_ASYNC_CALLBACK) - if (async_cb_thread == Qnil) { + { + struct async_cb_dispatcher *ctx = async_cb_dispatcher_get(); + if (ctx == NULL) { + ctx = (struct async_cb_dispatcher*)ALLOC(struct async_cb_dispatcher); + ctx->async_cb_list = NULL; #if !defined(_WIN32) - if( pthread_atfork(NULL, NULL, after_fork_callback) ){ - rb_warn("FFI: unable to register fork callback"); - } + pthread_mutex_init(&ctx->async_cb_mutex, NULL); + pthread_cond_init(&ctx->async_cb_cond, NULL); + if( pthread_atfork(NULL, NULL, after_fork_callback) ){ + rb_warn("FFI: unable to register fork callback"); + } +#else + InitializeCriticalSection(&ctx->async_cb_lock); + ctx->async_cb_cond = CreateEvent(NULL, FALSE, FALSE, NULL); #endif + ctx->thread = rb_thread_create(async_cb_event, ctx); - async_cb_thread = rb_thread_create(async_cb_event, NULL); - /* Name thread, for better debugging */ - rb_funcall(async_cb_thread, rb_intern("name="), 1, rb_str_new2("FFI Callback Dispatcher")); + /* Name thread, for better debugging */ + rb_funcall(ctx->thread, rb_intern("name="), 1, rb_str_new2("FFI Callback Dispatcher")); + + async_cb_dispatcher_set(ctx); + } + fn->dispatcher = ctx; } #endif @@ -505,6 +580,7 @@ function_release(VALUE self) static void callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data) { + Function* fn; struct gvl_callback cb = { 0 }; cb.closure = (Closure *) user_data; @@ -512,6 +588,7 @@ callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data) cb.parameters = parameters; cb.done = false; cb.frame = rbffi_frame_current(); + fn = (Function *) cb.closure->info; if (cb.frame != NULL) cb.frame->exc = Qnil; @@ -524,18 +601,19 @@ callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data) #if defined(DEFER_ASYNC_CALLBACK) && !defined(_WIN32) } else { bool empty = false; + struct async_cb_dispatcher *ctx = fn->dispatcher; pthread_mutex_init(&cb.async_mutex, NULL); pthread_cond_init(&cb.async_cond, NULL); - /* Now signal the async callback thread */ - pthread_mutex_lock(&async_cb_mutex); - empty = async_cb_list == NULL; - cb.next = async_cb_list; - async_cb_list = &cb; + /* Now signal the async callback dispatcher thread */ + pthread_mutex_lock(&ctx->async_cb_mutex); + empty = ctx->async_cb_list == NULL; + cb.next = ctx->async_cb_list; + ctx->async_cb_list = &cb; - pthread_cond_signal(&async_cb_cond); - pthread_mutex_unlock(&async_cb_mutex); + pthread_cond_signal(&ctx->async_cb_cond); + pthread_mutex_unlock(&ctx->async_cb_mutex); /* Wait for the thread executing the ruby callback to signal it is done */ pthread_mutex_lock(&cb.async_mutex); @@ -549,17 +627,18 @@ callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data) #elif defined(DEFER_ASYNC_CALLBACK) && defined(_WIN32) } else { bool empty = false; + struct async_cb_dispatcher *ctx = fn->dispatcher; cb.async_event = CreateEvent(NULL, FALSE, FALSE, NULL); - /* Now signal the async callback thread */ - EnterCriticalSection(&async_cb_lock); - empty = async_cb_list == NULL; - cb.next = async_cb_list; - async_cb_list = &cb; - LeaveCriticalSection(&async_cb_lock); + /* Now signal the async callback dispatcher thread */ + EnterCriticalSection(&ctx->async_cb_lock); + empty = ctx->async_cb_list == NULL; + cb.next = ctx->async_cb_list; + ctx->async_cb_list = &cb; + LeaveCriticalSection(&ctx->async_cb_lock); - SetEvent(async_cb_cond); + SetEvent(ctx->async_cb_cond); /* Wait for the thread executing the ruby callback to signal it is done */ WaitForSingleObject(cb.async_event, INFINITE); @@ -570,6 +649,7 @@ callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data) #if defined(DEFER_ASYNC_CALLBACK) struct async_wait { + struct async_cb_dispatcher *dispatcher; void* cb; bool stop; }; @@ -578,9 +658,10 @@ static void * async_cb_wait(void *); static void async_cb_stop(void *); static VALUE -async_cb_event(void* unused) +async_cb_event(void* ptr) { - struct async_wait w = { 0 }; + struct async_cb_dispatcher *ctx = (struct async_cb_dispatcher *)ptr; + struct async_wait w = { ctx }; w.stop = false; while (!w.stop) { @@ -601,23 +682,24 @@ static void * async_cb_wait(void *data) { struct async_wait* w = (struct async_wait *) data; + struct async_cb_dispatcher *ctx = w->dispatcher; w->cb = NULL; - EnterCriticalSection(&async_cb_lock); + EnterCriticalSection(&ctx->async_cb_lock); - while (!w->stop && async_cb_list == NULL) { - LeaveCriticalSection(&async_cb_lock); - WaitForSingleObject(async_cb_cond, INFINITE); - EnterCriticalSection(&async_cb_lock); + while (!w->stop && ctx->async_cb_list == NULL) { + LeaveCriticalSection(&ctx->async_cb_lock); + WaitForSingleObject(ctx->async_cb_cond, INFINITE); + EnterCriticalSection(&ctx->async_cb_lock); } - if (async_cb_list != NULL) { - w->cb = async_cb_list; - async_cb_list = async_cb_list->next; + if (ctx->async_cb_list != NULL) { + w->cb = ctx->async_cb_list; + ctx->async_cb_list = ctx->async_cb_list->next; } - LeaveCriticalSection(&async_cb_lock); + LeaveCriticalSection(&ctx->async_cb_lock); return NULL; } @@ -626,11 +708,12 @@ static void async_cb_stop(void *data) { struct async_wait* w = (struct async_wait *) data; + struct async_cb_dispatcher *ctx = w->dispatcher; - EnterCriticalSection(&async_cb_lock); + EnterCriticalSection(&ctx->async_cb_lock); w->stop = true; - LeaveCriticalSection(&async_cb_lock); - SetEvent(async_cb_cond); + LeaveCriticalSection(&ctx->async_cb_lock); + SetEvent(ctx->async_cb_cond); } #else @@ -638,21 +721,22 @@ static void * async_cb_wait(void *data) { struct async_wait* w = (struct async_wait *) data; + struct async_cb_dispatcher *ctx = w->dispatcher; w->cb = NULL; - pthread_mutex_lock(&async_cb_mutex); + pthread_mutex_lock(&ctx->async_cb_mutex); - while (!w->stop && async_cb_list == NULL) { - pthread_cond_wait(&async_cb_cond, &async_cb_mutex); + while (!w->stop && ctx->async_cb_list == NULL) { + pthread_cond_wait(&ctx->async_cb_cond, &ctx->async_cb_mutex); } - if (async_cb_list != NULL) { - w->cb = async_cb_list; - async_cb_list = async_cb_list->next; + if (ctx->async_cb_list != NULL) { + w->cb = ctx->async_cb_list; + ctx->async_cb_list = ctx->async_cb_list->next; } - pthread_mutex_unlock(&async_cb_mutex); + pthread_mutex_unlock(&ctx->async_cb_mutex); return NULL; } @@ -661,11 +745,12 @@ static void async_cb_stop(void *data) { struct async_wait* w = (struct async_wait *) data; + struct async_cb_dispatcher *ctx = w->dispatcher; - pthread_mutex_lock(&async_cb_mutex); + pthread_mutex_lock(&ctx->async_cb_mutex); w->stop = true; - pthread_cond_signal(&async_cb_cond); - pthread_mutex_unlock(&async_cb_mutex); + pthread_cond_signal(&ctx->async_cb_cond); + pthread_mutex_unlock(&ctx->async_cb_mutex); } #endif @@ -961,8 +1046,7 @@ rbffi_Function_Init(VALUE moduleFFI) id_cb_ref = rb_intern("@__ffi_callback__"); id_to_native = rb_intern("to_native"); id_from_native = rb_intern("from_native"); -#if defined(_WIN32) - InitializeCriticalSection(&async_cb_lock); - async_cb_cond = CreateEvent(NULL, FALSE, FALSE, NULL); +#if defined(DEFER_ASYNC_CALLBACK) && defined(HAVE_RB_EXT_RACTOR_SAFE) + async_cb_dispatcher_key = rb_ractor_local_storage_ptr_newkey(&async_cb_dispatcher_key_type); #endif } diff --git a/spec/ffi/async_callback_spec.rb b/spec/ffi/async_callback_spec.rb index 51a4886..7bb9f70 100644 --- a/spec/ffi/async_callback_spec.rb +++ b/spec/ffi/async_callback_spec.rb @@ -50,4 +50,27 @@ describe "async callback" do expect(callback_runner_thread.name).to eq("FFI Callback Runner") end + + it "works in Ractor", :ractor do + skip "not yet supported on TruffleRuby" if RUBY_ENGINE == "truffleruby" + + res = Ractor.new do + v = 0xdeadbeef + correct_ractor = false + correct_thread = false + thread = Thread.current + rac = Ractor.current + cb = Proc.new do |i| + v = i + correct_ractor = rac == Ractor.current + correct_thread = thread != Thread.current + end + LibTest.testAsyncCallback(cb, 0x7fffffff) + + [v, correct_ractor, correct_thread] + end.take + + expect(res).to eq([0x7fffffff, true, true]) + end + end -- cgit v1.2.1