summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Kanis <lars@greiz-reinsdorf.de>2023-04-18 10:06:44 +0200
committerLars Kanis <lars@greiz-reinsdorf.de>2023-04-18 13:13:25 +0200
commit8f77740e3bae97eae50ac17132b133db50da197b (patch)
tree12aa1cc06bae0e026dc4274febb190e61955ac39
parent5247d3e736f77ce19bbb3e69cf5186fa5a7084f4 (diff)
downloadffi-8f77740e3bae97eae50ac17132b133db50da197b.tar.gz
Use a Ractor-local callback dispatcher for calls from non-ruby threads
The callback is now routet to the Ractor, that created the FFI::Function . Within this Ractor a new ruby thread is created to execute the Proc, like before.
-rw-r--r--ext/ffi_c/Function.c212
-rw-r--r--spec/ffi/async_callback_spec.rb23
2 files changed, 171 insertions, 64 deletions
diff --git a/ext/ffi_c/Function.c b/ext/ffi_c/Function.c
index a74136a..f88bc93 100644
--- a/ext/ffi_c/Function.c
+++ b/ext/ffi_c/Function.c
@@ -42,6 +42,10 @@
#include <ruby.h>
#include <ruby/thread.h>
+#if HAVE_RB_EXT_RACTOR_SAFE
+#include <ruby/ractor.h>
+#endif
+
#include <ffi.h>
#if defined(HAVE_NATIVETHREAD) && !defined(_WIN32)
#include <pthread.h>
@@ -65,6 +69,9 @@
#include "MethodHandle.h"
#include "Function.h"
+#define DEFER_ASYNC_CALLBACK 1
+
+struct async_cb_dispatcher;
typedef struct Function_ {
Pointer base;
FunctionType* info;
@@ -73,6 +80,9 @@ typedef struct Function_ {
Closure* closure;
VALUE rbProc;
VALUE rbFunctionInfo;
+#if defined(DEFER_ASYNC_CALLBACK)
+ struct async_cb_dispatcher *dispatcher;
+#endif
} Function;
static void function_mark(void *data);
@@ -86,9 +96,6 @@ static void* callback_with_gvl(void* data);
static VALUE invoke_callback(VALUE data);
static VALUE save_callback_exception(VALUE data, VALUE exc);
-#define DEFER_ASYNC_CALLBACK 1
-
-
#if defined(DEFER_ASYNC_CALLBACK)
static VALUE async_cb_event(void *);
static VALUE async_cb_call(void *);
@@ -113,10 +120,6 @@ static const rb_data_type_t function_data_type = {
VALUE rbffi_FunctionClass = Qnil;
-#if defined(DEFER_ASYNC_CALLBACK)
-static VALUE async_cb_thread = Qnil;
-#endif
-
static ID id_call = 0, id_to_native = 0, id_from_native = 0, id_cbtable = 0, id_cb_ref = 0;
struct gvl_callback {
@@ -126,7 +129,10 @@ struct gvl_callback {
bool done;
rbffi_frame_t *frame;
#if defined(DEFER_ASYNC_CALLBACK)
+ struct async_cb_dispatcher *dispatcher;
struct gvl_callback* next;
+
+ /* Signal when the callback has finished and retval is set */
# ifndef _WIN32
pthread_cond_t async_cond;
pthread_mutex_t async_mutex;
@@ -138,16 +144,74 @@ struct gvl_callback {
#if defined(DEFER_ASYNC_CALLBACK)
-static struct gvl_callback* async_cb_list = NULL;
+struct async_cb_dispatcher {
+ /* the Ractor-local dispatcher thread */
+ VALUE thread;
+
+ /* single linked list of pending callbacks */
+ struct gvl_callback* async_cb_list;
+
+ /* Signal new entries in async_cb_list */
# ifndef _WIN32
- static pthread_mutex_t async_cb_mutex = PTHREAD_MUTEX_INITIALIZER;
- static pthread_cond_t async_cb_cond = PTHREAD_COND_INITIALIZER;
+ pthread_mutex_t async_cb_mutex;
+ pthread_cond_t async_cb_cond;
# else
- static HANDLE async_cb_cond;
- static CRITICAL_SECTION async_cb_lock;
+ HANDLE async_cb_cond;
+ CRITICAL_SECTION async_cb_lock;
# endif
-#endif
+};
+#if HAVE_RB_EXT_RACTOR_SAFE
+static void
+async_cb_dispatcher_mark(void *ptr)
+{
+ struct async_cb_dispatcher *ctx = (struct async_cb_dispatcher *)ptr;
+ rb_gc_mark(ctx->thread);
+}
+
+static void
+async_cb_dispatcher_free(void *ptr)
+{
+ struct async_cb_dispatcher *ctx = (struct async_cb_dispatcher *)ptr;
+ xfree(ctx);
+}
+
+struct rb_ractor_local_storage_type async_cb_dispatcher_key_type = {
+ async_cb_dispatcher_mark,
+ async_cb_dispatcher_free,
+};
+
+static rb_ractor_local_key_t async_cb_dispatcher_key;
+
+static struct async_cb_dispatcher *
+async_cb_dispatcher_get(void)
+{
+ struct async_cb_dispatcher *ctx = (struct async_cb_dispatcher *)rb_ractor_local_storage_ptr(async_cb_dispatcher_key);
+ return ctx;
+}
+
+static void
+async_cb_dispatcher_set(struct async_cb_dispatcher *ctx)
+{
+ rb_ractor_local_storage_ptr_set(async_cb_dispatcher_key, ctx);
+}
+#else
+// for ruby 2.x
+static struct async_cb_dispatcher *async_cb_dispatcher = NULL;
+
+static struct async_cb_dispatcher *
+async_cb_dispatcher_get(void)
+{
+ return async_cb_dispatcher;
+}
+
+static void
+async_cb_dispatcher_set(struct async_cb_dispatcher *ctx)
+{
+ async_cb_dispatcher = ctx;
+}
+#endif
+#endif
static VALUE
function_allocate(VALUE klass)
@@ -328,9 +392,7 @@ static void
after_fork_callback(void)
{
/* Ensure that a new dispatcher thread is started in a forked process */
- async_cb_thread = Qnil;
- pthread_mutex_init(&async_cb_mutex, NULL);
- pthread_cond_init(&async_cb_cond, NULL);
+ async_cb_dispatcher_set(NULL);
}
#endif
@@ -360,17 +422,30 @@ function_init(VALUE self, VALUE rbFunctionInfo, VALUE rbProc)
}
#if defined(DEFER_ASYNC_CALLBACK)
- if (async_cb_thread == Qnil) {
+ {
+ struct async_cb_dispatcher *ctx = async_cb_dispatcher_get();
+ if (ctx == NULL) {
+ ctx = (struct async_cb_dispatcher*)ALLOC(struct async_cb_dispatcher);
+ ctx->async_cb_list = NULL;
#if !defined(_WIN32)
- if( pthread_atfork(NULL, NULL, after_fork_callback) ){
- rb_warn("FFI: unable to register fork callback");
- }
+ pthread_mutex_init(&ctx->async_cb_mutex, NULL);
+ pthread_cond_init(&ctx->async_cb_cond, NULL);
+ if( pthread_atfork(NULL, NULL, after_fork_callback) ){
+ rb_warn("FFI: unable to register fork callback");
+ }
+#else
+ InitializeCriticalSection(&ctx->async_cb_lock);
+ ctx->async_cb_cond = CreateEvent(NULL, FALSE, FALSE, NULL);
#endif
+ ctx->thread = rb_thread_create(async_cb_event, ctx);
- async_cb_thread = rb_thread_create(async_cb_event, NULL);
- /* Name thread, for better debugging */
- rb_funcall(async_cb_thread, rb_intern("name="), 1, rb_str_new2("FFI Callback Dispatcher"));
+ /* Name thread, for better debugging */
+ rb_funcall(ctx->thread, rb_intern("name="), 1, rb_str_new2("FFI Callback Dispatcher"));
+
+ async_cb_dispatcher_set(ctx);
+ }
+ fn->dispatcher = ctx;
}
#endif
@@ -505,6 +580,7 @@ function_release(VALUE self)
static void
callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data)
{
+ Function* fn;
struct gvl_callback cb = { 0 };
cb.closure = (Closure *) user_data;
@@ -512,6 +588,7 @@ callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data)
cb.parameters = parameters;
cb.done = false;
cb.frame = rbffi_frame_current();
+ fn = (Function *) cb.closure->info;
if (cb.frame != NULL) cb.frame->exc = Qnil;
@@ -524,18 +601,19 @@ callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data)
#if defined(DEFER_ASYNC_CALLBACK) && !defined(_WIN32)
} else {
bool empty = false;
+ struct async_cb_dispatcher *ctx = fn->dispatcher;
pthread_mutex_init(&cb.async_mutex, NULL);
pthread_cond_init(&cb.async_cond, NULL);
- /* Now signal the async callback thread */
- pthread_mutex_lock(&async_cb_mutex);
- empty = async_cb_list == NULL;
- cb.next = async_cb_list;
- async_cb_list = &cb;
+ /* Now signal the async callback dispatcher thread */
+ pthread_mutex_lock(&ctx->async_cb_mutex);
+ empty = ctx->async_cb_list == NULL;
+ cb.next = ctx->async_cb_list;
+ ctx->async_cb_list = &cb;
- pthread_cond_signal(&async_cb_cond);
- pthread_mutex_unlock(&async_cb_mutex);
+ pthread_cond_signal(&ctx->async_cb_cond);
+ pthread_mutex_unlock(&ctx->async_cb_mutex);
/* Wait for the thread executing the ruby callback to signal it is done */
pthread_mutex_lock(&cb.async_mutex);
@@ -549,17 +627,18 @@ callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data)
#elif defined(DEFER_ASYNC_CALLBACK) && defined(_WIN32)
} else {
bool empty = false;
+ struct async_cb_dispatcher *ctx = fn->dispatcher;
cb.async_event = CreateEvent(NULL, FALSE, FALSE, NULL);
- /* Now signal the async callback thread */
- EnterCriticalSection(&async_cb_lock);
- empty = async_cb_list == NULL;
- cb.next = async_cb_list;
- async_cb_list = &cb;
- LeaveCriticalSection(&async_cb_lock);
+ /* Now signal the async callback dispatcher thread */
+ EnterCriticalSection(&ctx->async_cb_lock);
+ empty = ctx->async_cb_list == NULL;
+ cb.next = ctx->async_cb_list;
+ ctx->async_cb_list = &cb;
+ LeaveCriticalSection(&ctx->async_cb_lock);
- SetEvent(async_cb_cond);
+ SetEvent(ctx->async_cb_cond);
/* Wait for the thread executing the ruby callback to signal it is done */
WaitForSingleObject(cb.async_event, INFINITE);
@@ -570,6 +649,7 @@ callback_invoke(ffi_cif* cif, void* retval, void** parameters, void* user_data)
#if defined(DEFER_ASYNC_CALLBACK)
struct async_wait {
+ struct async_cb_dispatcher *dispatcher;
void* cb;
bool stop;
};
@@ -578,9 +658,10 @@ static void * async_cb_wait(void *);
static void async_cb_stop(void *);
static VALUE
-async_cb_event(void* unused)
+async_cb_event(void* ptr)
{
- struct async_wait w = { 0 };
+ struct async_cb_dispatcher *ctx = (struct async_cb_dispatcher *)ptr;
+ struct async_wait w = { ctx };
w.stop = false;
while (!w.stop) {
@@ -601,23 +682,24 @@ static void *
async_cb_wait(void *data)
{
struct async_wait* w = (struct async_wait *) data;
+ struct async_cb_dispatcher *ctx = w->dispatcher;
w->cb = NULL;
- EnterCriticalSection(&async_cb_lock);
+ EnterCriticalSection(&ctx->async_cb_lock);
- while (!w->stop && async_cb_list == NULL) {
- LeaveCriticalSection(&async_cb_lock);
- WaitForSingleObject(async_cb_cond, INFINITE);
- EnterCriticalSection(&async_cb_lock);
+ while (!w->stop && ctx->async_cb_list == NULL) {
+ LeaveCriticalSection(&ctx->async_cb_lock);
+ WaitForSingleObject(ctx->async_cb_cond, INFINITE);
+ EnterCriticalSection(&ctx->async_cb_lock);
}
- if (async_cb_list != NULL) {
- w->cb = async_cb_list;
- async_cb_list = async_cb_list->next;
+ if (ctx->async_cb_list != NULL) {
+ w->cb = ctx->async_cb_list;
+ ctx->async_cb_list = ctx->async_cb_list->next;
}
- LeaveCriticalSection(&async_cb_lock);
+ LeaveCriticalSection(&ctx->async_cb_lock);
return NULL;
}
@@ -626,11 +708,12 @@ static void
async_cb_stop(void *data)
{
struct async_wait* w = (struct async_wait *) data;
+ struct async_cb_dispatcher *ctx = w->dispatcher;
- EnterCriticalSection(&async_cb_lock);
+ EnterCriticalSection(&ctx->async_cb_lock);
w->stop = true;
- LeaveCriticalSection(&async_cb_lock);
- SetEvent(async_cb_cond);
+ LeaveCriticalSection(&ctx->async_cb_lock);
+ SetEvent(ctx->async_cb_cond);
}
#else
@@ -638,21 +721,22 @@ static void *
async_cb_wait(void *data)
{
struct async_wait* w = (struct async_wait *) data;
+ struct async_cb_dispatcher *ctx = w->dispatcher;
w->cb = NULL;
- pthread_mutex_lock(&async_cb_mutex);
+ pthread_mutex_lock(&ctx->async_cb_mutex);
- while (!w->stop && async_cb_list == NULL) {
- pthread_cond_wait(&async_cb_cond, &async_cb_mutex);
+ while (!w->stop && ctx->async_cb_list == NULL) {
+ pthread_cond_wait(&ctx->async_cb_cond, &ctx->async_cb_mutex);
}
- if (async_cb_list != NULL) {
- w->cb = async_cb_list;
- async_cb_list = async_cb_list->next;
+ if (ctx->async_cb_list != NULL) {
+ w->cb = ctx->async_cb_list;
+ ctx->async_cb_list = ctx->async_cb_list->next;
}
- pthread_mutex_unlock(&async_cb_mutex);
+ pthread_mutex_unlock(&ctx->async_cb_mutex);
return NULL;
}
@@ -661,11 +745,12 @@ static void
async_cb_stop(void *data)
{
struct async_wait* w = (struct async_wait *) data;
+ struct async_cb_dispatcher *ctx = w->dispatcher;
- pthread_mutex_lock(&async_cb_mutex);
+ pthread_mutex_lock(&ctx->async_cb_mutex);
w->stop = true;
- pthread_cond_signal(&async_cb_cond);
- pthread_mutex_unlock(&async_cb_mutex);
+ pthread_cond_signal(&ctx->async_cb_cond);
+ pthread_mutex_unlock(&ctx->async_cb_mutex);
}
#endif
@@ -961,8 +1046,7 @@ rbffi_Function_Init(VALUE moduleFFI)
id_cb_ref = rb_intern("@__ffi_callback__");
id_to_native = rb_intern("to_native");
id_from_native = rb_intern("from_native");
-#if defined(_WIN32)
- InitializeCriticalSection(&async_cb_lock);
- async_cb_cond = CreateEvent(NULL, FALSE, FALSE, NULL);
+#if defined(DEFER_ASYNC_CALLBACK) && defined(HAVE_RB_EXT_RACTOR_SAFE)
+ async_cb_dispatcher_key = rb_ractor_local_storage_ptr_newkey(&async_cb_dispatcher_key_type);
#endif
}
diff --git a/spec/ffi/async_callback_spec.rb b/spec/ffi/async_callback_spec.rb
index 51a4886..7bb9f70 100644
--- a/spec/ffi/async_callback_spec.rb
+++ b/spec/ffi/async_callback_spec.rb
@@ -50,4 +50,27 @@ describe "async callback" do
expect(callback_runner_thread.name).to eq("FFI Callback Runner")
end
+
+ it "works in Ractor", :ractor do
+ skip "not yet supported on TruffleRuby" if RUBY_ENGINE == "truffleruby"
+
+ res = Ractor.new do
+ v = 0xdeadbeef
+ correct_ractor = false
+ correct_thread = false
+ thread = Thread.current
+ rac = Ractor.current
+ cb = Proc.new do |i|
+ v = i
+ correct_ractor = rac == Ractor.current
+ correct_thread = thread != Thread.current
+ end
+ LibTest.testAsyncCallback(cb, 0x7fffffff)
+
+ [v, correct_ractor, correct_thread]
+ end.take
+
+ expect(res).to eq([0x7fffffff, true, true])
+ end
+
end