diff options
Diffstat (limited to 'libgo/runtime/chan.c')
-rw-r--r-- | libgo/runtime/chan.c | 295 |
1 files changed, 225 insertions, 70 deletions
diff --git a/libgo/runtime/chan.c b/libgo/runtime/chan.c index 1e389a218de..de25e9fd834 100644 --- a/libgo/runtime/chan.c +++ b/libgo/runtime/chan.c @@ -4,8 +4,9 @@ #include "runtime.h" #include "arch.h" -#include "malloc.h" #include "go-type.h" +#include "race.h" +#include "malloc.h" #define NOSELGEN 1 @@ -24,6 +25,7 @@ struct SudoG G* g; // g and selgen constitute uint32 selgen; // a weak pointer to g SudoG* link; + int64 releasetime; byte* elem; // data element }; @@ -35,13 +37,13 @@ struct WaitQ struct Hchan { - uint32 qcount; // total data in the q - uint32 dataqsiz; // size of the circular q + uintgo qcount; // total data in the q + uintgo dataqsiz; // size of the circular q uint16 elemsize; bool closed; uint8 elemalign; - uint32 sendx; // send index - uint32 recvx; // receive index + uintgo sendx; // send index + uintgo recvx; // receive index WaitQ recvq; // list of recv waiters WaitQ sendq; // list of send waiters Lock; @@ -80,17 +82,22 @@ struct Select static void dequeueg(WaitQ*); static SudoG* dequeue(WaitQ*); static void enqueue(WaitQ*, SudoG*); +static void racesync(Hchan*, SudoG*); Hchan* runtime_makechan_c(ChanType *t, int64 hint) { Hchan *c; - int32 n; + uintptr n; const Type *elem; elem = t->__element_type; - if(hint < 0 || (int32)hint != hint || (elem->__size > 0 && (uintptr)hint > MaxMem / elem->__size)) + // compiler checks this but be safe. + if(elem->__size >= (1<<16)) + runtime_throw("makechan: invalid channel element type"); + + if(hint < 0 || (intgo)hint != hint || (elem->__size > 0 && (uintptr)hint > MaxMem / elem->__size)) runtime_panicstring("makechan: size out of range"); n = sizeof(*c); @@ -102,19 +109,19 @@ runtime_makechan_c(ChanType *t, int64 hint) c->dataqsiz = hint; if(debug) - runtime_printf("makechan: chan=%p; elemsize=%D; elemalign=%d; dataqsiz=%d\n", - c, (int64)elem->__size, elem->__align, c->dataqsiz); + runtime_printf("makechan: chan=%p; elemsize=%D; elemalign=%d; dataqsiz=%D\n", + c, (int64)elem->__size, elem->__align, (int64)c->dataqsiz); return c; } // For reflect -// func makechan(typ *ChanType, size uint32) (chan) -uintptr reflect_makechan(ChanType *, uint32) +// func makechan(typ *ChanType, size uint64) (chan) +uintptr reflect_makechan(ChanType *, uint64) asm ("reflect.makechan"); uintptr -reflect_makechan(ChanType *t, uint32 size) +reflect_makechan(ChanType *t, uint64 size) { void *ret; Hchan *c; @@ -153,11 +160,12 @@ __go_new_channel_big(ChanType *t, uint64 hint) * the operation; we'll see that it's now closed. */ void -runtime_chansend(ChanType *t, Hchan *c, byte *ep, bool *pres) +runtime_chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc) { SudoG *sg; SudoG mysg; G* gp; + int64 t0; G* g; g = runtime_g(); @@ -168,9 +176,7 @@ runtime_chansend(ChanType *t, Hchan *c, byte *ep, bool *pres) *pres = false; return; } - g->status = Gwaiting; - g->waitreason = "chan send (nil chan)"; - runtime_gosched(); + runtime_park(nil, nil, "chan send (nil chan)"); return; // not reached } @@ -181,7 +187,17 @@ runtime_chansend(ChanType *t, Hchan *c, byte *ep, bool *pres) runtime_printf("chansend: chan=%p\n", c); } + t0 = 0; + mysg.releasetime = 0; + if(runtime_blockprofilerate > 0) { + t0 = runtime_cputicks(); + mysg.releasetime = -1; + } + runtime_lock(c); + // TODO(dvyukov): add similar instrumentation to select. + if(raceenabled) + runtime_racereadpc(c, pc); if(c->closed) goto closed; @@ -190,12 +206,16 @@ runtime_chansend(ChanType *t, Hchan *c, byte *ep, bool *pres) sg = dequeue(&c->recvq); if(sg != nil) { + if(raceenabled) + racesync(c, sg); runtime_unlock(c); gp = sg->g; gp->param = sg; if(sg->elem != nil) runtime_memmove(sg->elem, ep, c->elemsize); + if(sg->releasetime) + sg->releasetime = runtime_cputicks(); runtime_ready(gp); if(pres != nil) @@ -213,11 +233,8 @@ runtime_chansend(ChanType *t, Hchan *c, byte *ep, bool *pres) mysg.g = g; mysg.selgen = NOSELGEN; g->param = nil; - g->status = Gwaiting; - g->waitreason = "chan send"; enqueue(&c->sendq, &mysg); - runtime_unlock(c); - runtime_gosched(); + runtime_park(runtime_unlock, c, "chan send"); if(g->param == nil) { runtime_lock(c); @@ -226,6 +243,9 @@ runtime_chansend(ChanType *t, Hchan *c, byte *ep, bool *pres) goto closed; } + if(mysg.releasetime > 0) + runtime_blockevent(mysg.releasetime - t0, 2); + return; asynch: @@ -241,15 +261,16 @@ asynch: mysg.g = g; mysg.elem = nil; mysg.selgen = NOSELGEN; - g->status = Gwaiting; - g->waitreason = "chan send"; enqueue(&c->sendq, &mysg); - runtime_unlock(c); - runtime_gosched(); + runtime_park(runtime_unlock, c, "chan send"); runtime_lock(c); goto asynch; } + + if(raceenabled) + runtime_racerelease(chanbuf(c, c->sendx)); + runtime_memmove(chanbuf(c, c->sendx), ep, c->elemsize); if(++c->sendx == c->dataqsiz) c->sendx = 0; @@ -259,11 +280,15 @@ asynch: if(sg != nil) { gp = sg->g; runtime_unlock(c); + if(sg->releasetime) + sg->releasetime = runtime_cputicks(); runtime_ready(gp); } else runtime_unlock(c); if(pres != nil) *pres = true; + if(mysg.releasetime > 0) + runtime_blockevent(mysg.releasetime - t0, 2); return; closed: @@ -278,6 +303,7 @@ runtime_chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received SudoG *sg; SudoG mysg; G *gp; + int64 t0; G *g; if(runtime_gcwaiting) @@ -294,12 +320,17 @@ runtime_chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received *selected = false; return; } - g->status = Gwaiting; - g->waitreason = "chan receive (nil chan)"; - runtime_gosched(); + runtime_park(nil, nil, "chan receive (nil chan)"); return; // not reached } + t0 = 0; + mysg.releasetime = 0; + if(runtime_blockprofilerate > 0) { + t0 = runtime_cputicks(); + mysg.releasetime = -1; + } + runtime_lock(c); if(c->dataqsiz > 0) goto asynch; @@ -309,12 +340,16 @@ runtime_chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received sg = dequeue(&c->sendq); if(sg != nil) { + if(raceenabled) + racesync(c, sg); runtime_unlock(c); if(ep != nil) runtime_memmove(ep, sg->elem, c->elemsize); gp = sg->g; gp->param = sg; + if(sg->releasetime) + sg->releasetime = runtime_cputicks(); runtime_ready(gp); if(selected != nil) @@ -334,11 +369,8 @@ runtime_chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received mysg.g = g; mysg.selgen = NOSELGEN; g->param = nil; - g->status = Gwaiting; - g->waitreason = "chan receive"; enqueue(&c->recvq, &mysg); - runtime_unlock(c); - runtime_gosched(); + runtime_park(runtime_unlock, c, "chan receive"); if(g->param == nil) { runtime_lock(c); @@ -349,6 +381,8 @@ runtime_chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received if(received != nil) *received = true; + if(mysg.releasetime > 0) + runtime_blockevent(mysg.releasetime - t0, 2); return; asynch: @@ -366,15 +400,16 @@ asynch: mysg.g = g; mysg.elem = nil; mysg.selgen = NOSELGEN; - g->status = Gwaiting; - g->waitreason = "chan receive"; enqueue(&c->recvq, &mysg); - runtime_unlock(c); - runtime_gosched(); + runtime_park(runtime_unlock, c, "chan receive"); runtime_lock(c); goto asynch; } + + if(raceenabled) + runtime_raceacquire(chanbuf(c, c->recvx)); + if(ep != nil) runtime_memmove(ep, chanbuf(c, c->recvx), c->elemsize); runtime_memclr(chanbuf(c, c->recvx), c->elemsize); @@ -386,6 +421,8 @@ asynch: if(sg != nil) { gp = sg->g; runtime_unlock(c); + if(sg->releasetime) + sg->releasetime = runtime_cputicks(); runtime_ready(gp); } else runtime_unlock(c); @@ -394,6 +431,8 @@ asynch: *selected = true; if(received != nil) *received = true; + if(mysg.releasetime > 0) + runtime_blockevent(mysg.releasetime - t0, 2); return; closed: @@ -403,7 +442,11 @@ closed: *selected = true; if(received != nil) *received = false; + if(raceenabled) + runtime_raceacquire(c); runtime_unlock(c); + if(mysg.releasetime > 0) + runtime_blockevent(mysg.releasetime - t0, 2); } // The compiler generates a call to __go_send_small to send a value 8 @@ -424,7 +467,7 @@ __go_send_small(ChanType *t, Hchan* c, uint64 val) #else p = u.b + sizeof(uint64) - t->__element_type->__size; #endif - runtime_chansend(t, c, p, nil); + runtime_chansend(t, c, p, nil, runtime_getcallerpc(&t)); } // The compiler generates a call to __go_send_big to send a value @@ -432,7 +475,7 @@ __go_send_small(ChanType *t, Hchan* c, uint64 val) void __go_send_big(ChanType *t, Hchan* c, byte* p) { - runtime_chansend(t, c, p, nil); + runtime_chansend(t, c, p, nil, runtime_getcallerpc(&t)); } // The compiler generates a call to __go_receive_small to receive a @@ -500,7 +543,7 @@ runtime_selectnbsend(ChanType *t, Hchan *c, byte *p) { bool res; - runtime_chansend(t, c, p, &res); + runtime_chansend(t, c, p, &res, runtime_getcallerpc(&t)); return res; } @@ -590,7 +633,7 @@ reflect_chansend(ChanType *t, Hchan *c, uintptr val, _Bool nb) vp = (byte*)&val; else vp = (byte*)val; - runtime_chansend(t, c, vp, sp); + runtime_chansend(t, c, vp, sp, runtime_getcallerpc(&t)); return selected; } @@ -643,10 +686,10 @@ static void newselect(int32, Select**); // newselect(size uint32) (sel *byte); -void* runtime_newselect(int) __asm__("runtime.newselect"); +void* runtime_newselect(int32) __asm__("runtime.newselect"); void* -runtime_newselect(int size) +runtime_newselect(int32 size) { Select *sel; @@ -688,11 +731,11 @@ static void selectsend(Select *sel, Hchan *c, int index, void *elem); // selectsend(sel *byte, hchan *chan any, elem *any) (selected bool); -void runtime_selectsend(Select *, Hchan *, void *, int) +void runtime_selectsend(Select *, Hchan *, void *, int32) __asm__("runtime.selectsend"); void -runtime_selectsend(Select *sel, Hchan *c, void *elem, int index) +runtime_selectsend(Select *sel, Hchan *c, void *elem, int32 index) { // nil cases do not compete if(c == nil) @@ -728,11 +771,11 @@ static void selectrecv(Select *sel, Hchan *c, int index, void *elem, bool*); // selectrecv(sel *byte, hchan *chan any, elem *any) (selected bool); -void runtime_selectrecv(Select *, Hchan *, void *, int) +void runtime_selectrecv(Select *, Hchan *, void *, int32) __asm__("runtime.selectrecv"); void -runtime_selectrecv(Select *sel, Hchan *c, void *elem, int index) +runtime_selectrecv(Select *sel, Hchan *c, void *elem, int32 index) { // nil cases do not compete if(c == nil) @@ -743,11 +786,11 @@ runtime_selectrecv(Select *sel, Hchan *c, void *elem, int index) // selectrecv2(sel *byte, hchan *chan any, elem *any, received *bool) (selected bool); -void runtime_selectrecv2(Select *, Hchan *, void *, bool *, int) +void runtime_selectrecv2(Select *, Hchan *, void *, bool *, int32) __asm__("runtime.selectrecv2"); void -runtime_selectrecv2(Select *sel, Hchan *c, void *elem, bool *received, int index) +runtime_selectrecv2(Select *sel, Hchan *c, void *elem, bool *received, int32 index) { // nil cases do not compete if(c == nil) @@ -784,16 +827,16 @@ static void selectdefault(Select*, int); // selectdefault(sel *byte) (selected bool); -void runtime_selectdefault(Select *, int) __asm__("runtime.selectdefault"); +void runtime_selectdefault(Select *, int32) __asm__("runtime.selectdefault"); void -runtime_selectdefault(Select *sel, int index) +runtime_selectdefault(Select *sel, int32 index) { selectdefault(sel, index); } static void -selectdefault(Select *sel, int index) +selectdefault(Select *sel, int32 index) { int32 i; Scase *cas; @@ -848,12 +891,7 @@ selunlock(Select *sel) void runtime_block(void) { - G *g; - - g = runtime_g(); - g->status = Gwaiting; // forever - g->waitreason = "select (no cases)"; - runtime_gosched(); + runtime_park(nil, nil, "select (no cases)"); // forever } static int selectgo(Select**); @@ -985,10 +1023,7 @@ loop: } g->param = nil; - g->status = Gwaiting; - g->waitreason = "select"; - selunlock(sel); - runtime_gosched(); + runtime_park((void(*)(Lock*))selunlock, (Lock*)sel, "select"); sellock(sel); sg = g->param; @@ -1029,6 +1064,8 @@ loop: asyncrecv: // can receive from buffer + if(raceenabled) + runtime_raceacquire(chanbuf(c, c->recvx)); if(cas->receivedp != nil) *cas->receivedp = true; if(cas->sg.elem != nil) @@ -1049,6 +1086,8 @@ asyncrecv: asyncsend: // can send to buffer + if(raceenabled) + runtime_racerelease(chanbuf(c, c->sendx)); runtime_memmove(chanbuf(c, c->sendx), cas->sg.elem, c->elemsize); if(++c->sendx == c->dataqsiz) c->sendx = 0; @@ -1065,6 +1104,8 @@ asyncsend: syncrecv: // can receive from sleeping sender (sg) + if(raceenabled) + racesync(c, sg); selunlock(sel); if(debug) runtime_printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o); @@ -1084,10 +1125,14 @@ rclose: *cas->receivedp = false; if(cas->sg.elem != nil) runtime_memclr(cas->sg.elem, c->elemsize); + if(raceenabled) + runtime_raceacquire(c); goto retc; syncsend: // can send to sleeping receiver (sg) + if(raceenabled) + racesync(c, sg); selunlock(sel); if(debug) runtime_printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o); @@ -1110,6 +1155,102 @@ sclose: return 0; // not reached } +// This struct must match ../reflect/value.go:/runtimeSelect. +typedef struct runtimeSelect runtimeSelect; +struct runtimeSelect +{ + uintptr dir; + ChanType *typ; + Hchan *ch; + uintptr val; +}; + +// This enum must match ../reflect/value.go:/SelectDir. +enum SelectDir { + SelectSend = 1, + SelectRecv, + SelectDefault, +}; + +struct rselect_ret { + intgo chosen; + uintptr word; + bool recvOK; +}; + +// func rselect(cases []runtimeSelect) (chosen int, word uintptr, recvOK bool) + +struct rselect_ret reflect_rselect(Slice) + asm("reflect.rselect"); + +struct rselect_ret +reflect_rselect(Slice cases) +{ + struct rselect_ret ret; + int32 i; + Select *sel; + runtimeSelect* rcase, *rc; + void *elem; + void *recvptr; + uintptr maxsize; + bool onlyptr; + + ret.chosen = -1; + ret.word = 0; + ret.recvOK = false; + + maxsize = 0; + onlyptr = true; + rcase = (runtimeSelect*)cases.__values; + for(i=0; i<cases.__count; i++) { + rc = &rcase[i]; + if(rc->dir == SelectRecv && rc->ch != nil) { + if(maxsize < rc->typ->__element_type->__size) + maxsize = rc->typ->__element_type->__size; + if(!__go_is_pointer_type(rc->typ->__element_type)) + onlyptr = false; + } + } + + recvptr = nil; + if(!onlyptr) + recvptr = runtime_mal(maxsize); + + newselect(cases.__count, &sel); + for(i=0; i<cases.__count; i++) { + rc = &rcase[i]; + switch(rc->dir) { + case SelectDefault: + selectdefault(sel, i); + break; + case SelectSend: + if(rc->ch == nil) + break; + if(!__go_is_pointer_type(rc->typ->__element_type)) + elem = (void*)rc->val; + else + elem = (void*)&rc->val; + selectsend(sel, rc->ch, i, elem); + break; + case SelectRecv: + if(rc->ch == nil) + break; + if(!__go_is_pointer_type(rc->typ->__element_type)) + elem = recvptr; + else + elem = &ret.word; + selectrecv(sel, rc->ch, i, elem, &ret.recvOK); + break; + } + } + + ret.chosen = (intgo)(uintptr)selectgo(&sel); + if(rcase[ret.chosen].dir == SelectRecv && !__go_is_pointer_type(rcase[ret.chosen].typ->__element_type)) + ret.word = (uintptr)recvptr; + + return ret; +} + // closechan(sel *byte); void runtime_closechan(Hchan *c) @@ -1129,6 +1270,11 @@ runtime_closechan(Hchan *c) runtime_panicstring("close of closed channel"); } + if(raceenabled) { + runtime_racewritepc(c, runtime_getcallerpc(&c)); + runtime_racerelease(c); + } + c->closed = true; // release all readers @@ -1172,15 +1318,15 @@ reflect_chanclose(uintptr c) } // For reflect -// func chanlen(c chan) (len int32) +// func chanlen(c chan) (len int) -int32 reflect_chanlen(uintptr) __asm__("reflect.chanlen"); +intgo reflect_chanlen(uintptr) __asm__("reflect.chanlen"); -int32 +intgo reflect_chanlen(uintptr ca) { Hchan *c; - int32 len; + intgo len; c = (Hchan*)ca; if(c == nil) @@ -1190,22 +1336,22 @@ reflect_chanlen(uintptr ca) return len; } -int +intgo __go_chan_len(Hchan *c) { return reflect_chanlen((uintptr)c); } // For reflect -// func chancap(c chan) (cap int32) +// func chancap(c chan) (cap intgo) -int32 reflect_chancap(uintptr) __asm__("reflect.chancap"); +intgo reflect_chancap(uintptr) __asm__("reflect.chancap"); -int32 +intgo reflect_chancap(uintptr ca) { Hchan *c; - int32 cap; + intgo cap; c = (Hchan*)ca; if(c == nil) @@ -1215,7 +1361,7 @@ reflect_chancap(uintptr ca) return cap; } -int +intgo __go_chan_cap(Hchan *c) { return reflect_chancap((uintptr)c); @@ -1273,3 +1419,12 @@ enqueue(WaitQ *q, SudoG *sgp) q->last->link = sgp; q->last = sgp; } + +static void +racesync(Hchan *c, SudoG *sg) +{ + runtime_racerelease(chanbuf(c, 0)); + runtime_raceacquireg(sg->g, chanbuf(c, 0)); + runtime_racereleaseg(sg->g, chanbuf(c, 0)); + runtime_raceacquire(chanbuf(c, 0)); +} |