From 5fa589691c780735fae153e7e6907906663349a1 Mon Sep 17 00:00:00 2001 From: frsyuki Date: Wed, 26 May 2010 07:01:28 +0900 Subject: ruby: use malloc/realloc for stream buffer --- ruby/unpack.c | 178 ++++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 117 insertions(+), 61 deletions(-) (limited to 'ruby') diff --git a/ruby/unpack.c b/ruby/unpack.c index ba20ae6..c4b5e29 100644 --- a/ruby/unpack.c +++ b/ruby/unpack.c @@ -27,16 +27,17 @@ static ID s_readpartial; int s_ascii_8bit; #endif -static ID s_slice_bang; -#ifdef RUBY_VM -static ID s_clear; -#endif +struct unpack_buffer { + size_t size; + size_t free; + char* ptr; +}; typedef struct { int finished; VALUE source; size_t offset; - VALUE buffer; + struct unpack_buffer buffer; VALUE stream; VALUE streambuf; ID stream_append_method; @@ -129,8 +130,16 @@ static inline int template_callback_map_item(unpack_user* u, VALUE* c, VALUE k, #endif static inline int template_callback_raw(unpack_user* u, const char* b, const char* p, unsigned int l, VALUE* o) -{ *o = (l <= COW_MIN_SIZE) ? rb_str_new(p, l) : rb_str_substr(u->source, p - b, l); return 0; } +//{ *o = (l <= COW_MIN_SIZE) ? rb_str_new(p, l) : rb_str_substr(u->source, p - b, l); return 0; } //{ *o = rb_str_new(p, l); return 0; } +{ + if(u->source == Qnil || l <= COW_MIN_SIZE) { + *o = rb_str_new(p, l); + } else { + *o = rb_str_substr(u->source, p - b, l); + } + return 0; +} #include "msgpack/unpack_template.h" @@ -209,6 +218,27 @@ static int template_execute_wrap(msgpack_unpack_t* mp, return ret; } +static int template_execute_wrap_each(msgpack_unpack_t* mp, + const char* ptr, size_t dlen, size_t* from) +{ + VALUE args[4] = { + (VALUE)mp, + (VALUE)ptr, + (VALUE)dlen, + (VALUE)from, + }; + + // FIXME execute実行中はmp->topが更新されないのでGC markが機能しない + rb_gc_disable(); + + int ret = (int)rb_rescue(template_execute_do, (VALUE)args, + template_execute_rescue, Qnil); + + rb_gc_enable(); + + return ret; +} + static VALUE cUnpacker; @@ -220,15 +250,26 @@ static VALUE cUnpacker; static VALUE eUnpackError; +#ifndef MSGPACK_UNPACKER_BUFFER_INIT_SIZE +#define MSGPACK_UNPACKER_BUFFER_INIT_SIZE (32*1024) +#endif + +#ifndef MSGPACK_UNPACKER_BUFFER_RESERVE_SIZE +#define MSGPACK_UNPACKER_BUFFER_RESERVE_SIZE (8*1024) +#endif + static void MessagePack_Unpacker_free(void* data) { - if(data) { free(data); } + if(data) { + msgpack_unpack_t* mp = (msgpack_unpack_t*)data; + free(mp->user.buffer.ptr); + free(mp); + } } static void MessagePack_Unpacker_mark(msgpack_unpack_t *mp) { unsigned int i; - rb_gc_mark(mp->user.buffer); rb_gc_mark(mp->user.stream); rb_gc_mark(mp->user.streambuf); for(i=0; i < mp->top; ++i) { @@ -255,14 +296,6 @@ static ID append_method_of(VALUE stream) } } -#ifndef MSGPACK_UNPACKER_BUFFER_INIT_SIZE -#define MSGPACK_UNPACKER_BUFFER_INIT_SIZE (32*1024) -#endif - -#ifndef MSGPACK_UNPACKER_BUFFER_RESERVE_SIZE -#define MSGPACK_UNPACKER_BUFFER_RESERVE_SIZE (8*1024) -#endif - /** * Document-method: MessagePack::Unpacker#initialize * @@ -297,7 +330,9 @@ static VALUE MessagePack_Unpacker_initialize(int argc, VALUE *argv, VALUE self) template_init(mp); mp->user.finished = 0; mp->user.offset = 0; - mp->user.buffer = rb_str_buf_new(MSGPACK_UNPACKER_BUFFER_INIT_SIZE); + mp->user.buffer.size = 0; + mp->user.buffer.free = 0; + mp->user.buffer.ptr = NULL; mp->user.stream = stream; mp->user.streambuf = rb_str_buf_new(MSGPACK_UNPACKER_BUFFER_RESERVE_SIZE); mp->user.stream_append_method = append_method_of(stream); @@ -363,34 +398,66 @@ static VALUE MessagePack_Unpacker_stream_set(VALUE self, VALUE val) # define NEED_MORE_CAPA(s,size) (!FL_TEST(s,STR_NOCAPA) && RSTRING(s)->aux.capa < size) #endif -static void try_rewind_buffer(msgpack_unpack_t* mp, size_t required) +static void feed_buffer(msgpack_unpack_t* mp, const char* ptr, size_t len) { - VALUE buffer = mp->user.buffer; + struct unpack_buffer* buffer = &mp->user.buffer; + + if(buffer->size == 0) { + char* tmp = (char*)malloc(MSGPACK_UNPACKER_BUFFER_INIT_SIZE); + // FIXME check tmp == NULL + buffer->ptr = tmp; + buffer->free = MSGPACK_UNPACKER_BUFFER_INIT_SIZE; + buffer->size = 0; + + } else if(buffer->size <= mp->user.offset) { + /* clear buffer and rewind offset */ + buffer->free += buffer->size; + buffer->size = 0; + mp->user.offset = 0; + } - size_t need_capa = RSTRING_LEN(buffer) + required; + if(len <= buffer->free) { + /* enough free space: just copy */ + memcpy(buffer->ptr+buffer->size, ptr, len); + buffer->size += len; + buffer->free -= len; + return; + } - if(NEED_MORE_CAPA(buffer, need_capa)) { - /* FIXME -#ifdef RUBY_VM - if(RSTRING_LEN(buffer) <= mp->user.offset) { - rb_funcall(buffer, s_clear, 0); - mp->user.offset = 0; - return; + size_t csize = buffer->size + buffer->free; + + if(mp->user.offset <= buffer->size / 2) { + /* parsed less than half: realloc and copy */ + csize *= 2; + while(csize < buffer->size + len) { + csize *= 2; } -#endif - rb_funcall(buffer, s_slice_bang, 2, LONG2FIX(0), LONG2FIX(mp->user.offset)); - mp->user.offset = 0; - */ - size_t not_parsed = RSTRING_LEN(buffer) - mp->user.offset; - size_t nsize = MSGPACK_UNPACKER_BUFFER_INIT_SIZE * 2; - while(nsize < not_parsed + required) { - nsize *= 2; + char* tmp = (char*)realloc(buffer->ptr, csize); + // FIXME check tmp == NULL + memcpy(tmp + buffer->size, ptr, len); + buffer->ptr = tmp; + buffer->free = csize - buffer->size; + return; + } + + size_t not_parsed = buffer->size - mp->user.offset; + + if(csize < not_parsed + len) { + /* more buffer size */ + csize *= 2; + while(csize < not_parsed + len) { + csize *= 2; } - VALUE nbuffer = rb_str_buf_new(nsize); - rb_str_buf_cat(nbuffer, RSTRING_PTR(buffer)+mp->user.offset, not_parsed); - mp->user.buffer = nbuffer; - mp->user.offset = 0; + char* tmp = (char*)realloc(buffer->ptr, csize); + // FIXME check tmp == NULL + buffer->ptr = tmp; } + + memcpy(buffer->ptr+not_parsed, ptr, not_parsed); + buffer->size = not_parsed; + buffer->free = csize - buffer->size; + buffer->ptr = buffer->ptr; + mp->user.offset = 0; } /** @@ -405,8 +472,7 @@ static VALUE MessagePack_Unpacker_feed(VALUE self, VALUE data) { UNPACKER(self, mp); StringValue(data); - try_rewind_buffer(mp, RSTRING_LEN(data)); - rb_str_cat(mp->user.buffer, RSTRING_PTR(data), RSTRING_LEN(data)); + feed_buffer(mp, RSTRING_PTR(data), RSTRING_LEN(data)); return Qnil; } @@ -432,18 +498,12 @@ static VALUE MessagePack_Unpacker_fill(VALUE self) return Qnil; } - long len; - if(RSTRING_LEN(mp->user.buffer) == 0) { - rb_funcall(mp->user.stream, mp->user.stream_append_method, 2, - LONG2FIX(MSGPACK_UNPACKER_BUFFER_INIT_SIZE), mp->user.buffer); - len = RSTRING_LEN(mp->user.buffer); - } else { - rb_funcall(mp->user.stream, mp->user.stream_append_method, 2, - LONG2FIX(MSGPACK_UNPACKER_BUFFER_RESERVE_SIZE), mp->user.streambuf); - len = RSTRING_LEN(mp->user.streambuf); - try_rewind_buffer(mp, len); - rb_str_cat(mp->user.buffer, RSTRING_PTR(mp->user.streambuf), RSTRING_LEN(mp->user.streambuf)); - } + rb_funcall(mp->user.stream, mp->user.stream_append_method, 2, + LONG2FIX(MSGPACK_UNPACKER_BUFFER_RESERVE_SIZE), + mp->user.streambuf); + + size_t len = RSTRING_LEN(mp->user.streambuf); + feed_buffer(mp, RSTRING_PTR(mp->user.streambuf), len); return LONG2FIX(len); } @@ -470,7 +530,7 @@ static VALUE MessagePack_Unpacker_each(VALUE self) #endif while(1) { - if(RSTRING_LEN(mp->user.buffer) <= mp->user.offset) { + if(mp->user.buffer.size <= mp->user.offset) { do_fill: { VALUE len = MessagePack_Unpacker_fill(self); @@ -480,8 +540,9 @@ static VALUE MessagePack_Unpacker_each(VALUE self) } } - ret = template_execute_wrap(mp, mp->user.buffer, - RSTRING_LEN(mp->user.buffer), &mp->user.offset); + ret = template_execute_wrap_each(mp, + mp->user.buffer.ptr, mp->user.buffer.size, + &mp->user.offset); if(ret < 0) { rb_raise(eUnpackError, "parse error."); @@ -689,11 +750,6 @@ void Init_msgpack_unpack(VALUE mMessagePack) s_ascii_8bit = rb_enc_find_index("ASCII-8BIT"); #endif - s_slice_bang = rb_intern("slice!"); -#ifdef RUBY_VM - s_clear = rb_intern("clear"); -#endif - eUnpackError = rb_define_class_under(mMessagePack, "UnpackError", rb_eStandardError); cUnpacker = rb_define_class_under(mMessagePack, "Unpacker", rb_cObject); rb_define_alloc_func(cUnpacker, MessagePack_Unpacker_alloc); -- cgit v1.2.1