summaryrefslogtreecommitdiff
path: root/libgo/runtime/chan.c
diff options
context:
space:
mode:
Diffstat (limited to 'libgo/runtime/chan.c')
-rw-r--r--libgo/runtime/chan.c295
1 files changed, 225 insertions, 70 deletions
diff --git a/libgo/runtime/chan.c b/libgo/runtime/chan.c
index 1e389a218de..de25e9fd834 100644
--- a/libgo/runtime/chan.c
+++ b/libgo/runtime/chan.c
@@ -4,8 +4,9 @@
#include "runtime.h"
#include "arch.h"
-#include "malloc.h"
#include "go-type.h"
+#include "race.h"
+#include "malloc.h"
#define NOSELGEN 1
@@ -24,6 +25,7 @@ struct SudoG
G* g; // g and selgen constitute
uint32 selgen; // a weak pointer to g
SudoG* link;
+ int64 releasetime;
byte* elem; // data element
};
@@ -35,13 +37,13 @@ struct WaitQ
struct Hchan
{
- uint32 qcount; // total data in the q
- uint32 dataqsiz; // size of the circular q
+ uintgo qcount; // total data in the q
+ uintgo dataqsiz; // size of the circular q
uint16 elemsize;
bool closed;
uint8 elemalign;
- uint32 sendx; // send index
- uint32 recvx; // receive index
+ uintgo sendx; // send index
+ uintgo recvx; // receive index
WaitQ recvq; // list of recv waiters
WaitQ sendq; // list of send waiters
Lock;
@@ -80,17 +82,22 @@ struct Select
static void dequeueg(WaitQ*);
static SudoG* dequeue(WaitQ*);
static void enqueue(WaitQ*, SudoG*);
+static void racesync(Hchan*, SudoG*);
Hchan*
runtime_makechan_c(ChanType *t, int64 hint)
{
Hchan *c;
- int32 n;
+ uintptr n;
const Type *elem;
elem = t->__element_type;
- if(hint < 0 || (int32)hint != hint || (elem->__size > 0 && (uintptr)hint > MaxMem / elem->__size))
+ // compiler checks this but be safe.
+ if(elem->__size >= (1<<16))
+ runtime_throw("makechan: invalid channel element type");
+
+ if(hint < 0 || (intgo)hint != hint || (elem->__size > 0 && (uintptr)hint > MaxMem / elem->__size))
runtime_panicstring("makechan: size out of range");
n = sizeof(*c);
@@ -102,19 +109,19 @@ runtime_makechan_c(ChanType *t, int64 hint)
c->dataqsiz = hint;
if(debug)
- runtime_printf("makechan: chan=%p; elemsize=%D; elemalign=%d; dataqsiz=%d\n",
- c, (int64)elem->__size, elem->__align, c->dataqsiz);
+ runtime_printf("makechan: chan=%p; elemsize=%D; elemalign=%d; dataqsiz=%D\n",
+ c, (int64)elem->__size, elem->__align, (int64)c->dataqsiz);
return c;
}
// For reflect
-// func makechan(typ *ChanType, size uint32) (chan)
-uintptr reflect_makechan(ChanType *, uint32)
+// func makechan(typ *ChanType, size uint64) (chan)
+uintptr reflect_makechan(ChanType *, uint64)
asm ("reflect.makechan");
uintptr
-reflect_makechan(ChanType *t, uint32 size)
+reflect_makechan(ChanType *t, uint64 size)
{
void *ret;
Hchan *c;
@@ -153,11 +160,12 @@ __go_new_channel_big(ChanType *t, uint64 hint)
* the operation; we'll see that it's now closed.
*/
void
-runtime_chansend(ChanType *t, Hchan *c, byte *ep, bool *pres)
+runtime_chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc)
{
SudoG *sg;
SudoG mysg;
G* gp;
+ int64 t0;
G* g;
g = runtime_g();
@@ -168,9 +176,7 @@ runtime_chansend(ChanType *t, Hchan *c, byte *ep, bool *pres)
*pres = false;
return;
}
- g->status = Gwaiting;
- g->waitreason = "chan send (nil chan)";
- runtime_gosched();
+ runtime_park(nil, nil, "chan send (nil chan)");
return; // not reached
}
@@ -181,7 +187,17 @@ runtime_chansend(ChanType *t, Hchan *c, byte *ep, bool *pres)
runtime_printf("chansend: chan=%p\n", c);
}
+ t0 = 0;
+ mysg.releasetime = 0;
+ if(runtime_blockprofilerate > 0) {
+ t0 = runtime_cputicks();
+ mysg.releasetime = -1;
+ }
+
runtime_lock(c);
+ // TODO(dvyukov): add similar instrumentation to select.
+ if(raceenabled)
+ runtime_racereadpc(c, pc);
if(c->closed)
goto closed;
@@ -190,12 +206,16 @@ runtime_chansend(ChanType *t, Hchan *c, byte *ep, bool *pres)
sg = dequeue(&c->recvq);
if(sg != nil) {
+ if(raceenabled)
+ racesync(c, sg);
runtime_unlock(c);
gp = sg->g;
gp->param = sg;
if(sg->elem != nil)
runtime_memmove(sg->elem, ep, c->elemsize);
+ if(sg->releasetime)
+ sg->releasetime = runtime_cputicks();
runtime_ready(gp);
if(pres != nil)
@@ -213,11 +233,8 @@ runtime_chansend(ChanType *t, Hchan *c, byte *ep, bool *pres)
mysg.g = g;
mysg.selgen = NOSELGEN;
g->param = nil;
- g->status = Gwaiting;
- g->waitreason = "chan send";
enqueue(&c->sendq, &mysg);
- runtime_unlock(c);
- runtime_gosched();
+ runtime_park(runtime_unlock, c, "chan send");
if(g->param == nil) {
runtime_lock(c);
@@ -226,6 +243,9 @@ runtime_chansend(ChanType *t, Hchan *c, byte *ep, bool *pres)
goto closed;
}
+ if(mysg.releasetime > 0)
+ runtime_blockevent(mysg.releasetime - t0, 2);
+
return;
asynch:
@@ -241,15 +261,16 @@ asynch:
mysg.g = g;
mysg.elem = nil;
mysg.selgen = NOSELGEN;
- g->status = Gwaiting;
- g->waitreason = "chan send";
enqueue(&c->sendq, &mysg);
- runtime_unlock(c);
- runtime_gosched();
+ runtime_park(runtime_unlock, c, "chan send");
runtime_lock(c);
goto asynch;
}
+
+ if(raceenabled)
+ runtime_racerelease(chanbuf(c, c->sendx));
+
runtime_memmove(chanbuf(c, c->sendx), ep, c->elemsize);
if(++c->sendx == c->dataqsiz)
c->sendx = 0;
@@ -259,11 +280,15 @@ asynch:
if(sg != nil) {
gp = sg->g;
runtime_unlock(c);
+ if(sg->releasetime)
+ sg->releasetime = runtime_cputicks();
runtime_ready(gp);
} else
runtime_unlock(c);
if(pres != nil)
*pres = true;
+ if(mysg.releasetime > 0)
+ runtime_blockevent(mysg.releasetime - t0, 2);
return;
closed:
@@ -278,6 +303,7 @@ runtime_chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received
SudoG *sg;
SudoG mysg;
G *gp;
+ int64 t0;
G *g;
if(runtime_gcwaiting)
@@ -294,12 +320,17 @@ runtime_chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received
*selected = false;
return;
}
- g->status = Gwaiting;
- g->waitreason = "chan receive (nil chan)";
- runtime_gosched();
+ runtime_park(nil, nil, "chan receive (nil chan)");
return; // not reached
}
+ t0 = 0;
+ mysg.releasetime = 0;
+ if(runtime_blockprofilerate > 0) {
+ t0 = runtime_cputicks();
+ mysg.releasetime = -1;
+ }
+
runtime_lock(c);
if(c->dataqsiz > 0)
goto asynch;
@@ -309,12 +340,16 @@ runtime_chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received
sg = dequeue(&c->sendq);
if(sg != nil) {
+ if(raceenabled)
+ racesync(c, sg);
runtime_unlock(c);
if(ep != nil)
runtime_memmove(ep, sg->elem, c->elemsize);
gp = sg->g;
gp->param = sg;
+ if(sg->releasetime)
+ sg->releasetime = runtime_cputicks();
runtime_ready(gp);
if(selected != nil)
@@ -334,11 +369,8 @@ runtime_chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received
mysg.g = g;
mysg.selgen = NOSELGEN;
g->param = nil;
- g->status = Gwaiting;
- g->waitreason = "chan receive";
enqueue(&c->recvq, &mysg);
- runtime_unlock(c);
- runtime_gosched();
+ runtime_park(runtime_unlock, c, "chan receive");
if(g->param == nil) {
runtime_lock(c);
@@ -349,6 +381,8 @@ runtime_chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received
if(received != nil)
*received = true;
+ if(mysg.releasetime > 0)
+ runtime_blockevent(mysg.releasetime - t0, 2);
return;
asynch:
@@ -366,15 +400,16 @@ asynch:
mysg.g = g;
mysg.elem = nil;
mysg.selgen = NOSELGEN;
- g->status = Gwaiting;
- g->waitreason = "chan receive";
enqueue(&c->recvq, &mysg);
- runtime_unlock(c);
- runtime_gosched();
+ runtime_park(runtime_unlock, c, "chan receive");
runtime_lock(c);
goto asynch;
}
+
+ if(raceenabled)
+ runtime_raceacquire(chanbuf(c, c->recvx));
+
if(ep != nil)
runtime_memmove(ep, chanbuf(c, c->recvx), c->elemsize);
runtime_memclr(chanbuf(c, c->recvx), c->elemsize);
@@ -386,6 +421,8 @@ asynch:
if(sg != nil) {
gp = sg->g;
runtime_unlock(c);
+ if(sg->releasetime)
+ sg->releasetime = runtime_cputicks();
runtime_ready(gp);
} else
runtime_unlock(c);
@@ -394,6 +431,8 @@ asynch:
*selected = true;
if(received != nil)
*received = true;
+ if(mysg.releasetime > 0)
+ runtime_blockevent(mysg.releasetime - t0, 2);
return;
closed:
@@ -403,7 +442,11 @@ closed:
*selected = true;
if(received != nil)
*received = false;
+ if(raceenabled)
+ runtime_raceacquire(c);
runtime_unlock(c);
+ if(mysg.releasetime > 0)
+ runtime_blockevent(mysg.releasetime - t0, 2);
}
// The compiler generates a call to __go_send_small to send a value 8
@@ -424,7 +467,7 @@ __go_send_small(ChanType *t, Hchan* c, uint64 val)
#else
p = u.b + sizeof(uint64) - t->__element_type->__size;
#endif
- runtime_chansend(t, c, p, nil);
+ runtime_chansend(t, c, p, nil, runtime_getcallerpc(&t));
}
// The compiler generates a call to __go_send_big to send a value
@@ -432,7 +475,7 @@ __go_send_small(ChanType *t, Hchan* c, uint64 val)
void
__go_send_big(ChanType *t, Hchan* c, byte* p)
{
- runtime_chansend(t, c, p, nil);
+ runtime_chansend(t, c, p, nil, runtime_getcallerpc(&t));
}
// The compiler generates a call to __go_receive_small to receive a
@@ -500,7 +543,7 @@ runtime_selectnbsend(ChanType *t, Hchan *c, byte *p)
{
bool res;
- runtime_chansend(t, c, p, &res);
+ runtime_chansend(t, c, p, &res, runtime_getcallerpc(&t));
return res;
}
@@ -590,7 +633,7 @@ reflect_chansend(ChanType *t, Hchan *c, uintptr val, _Bool nb)
vp = (byte*)&val;
else
vp = (byte*)val;
- runtime_chansend(t, c, vp, sp);
+ runtime_chansend(t, c, vp, sp, runtime_getcallerpc(&t));
return selected;
}
@@ -643,10 +686,10 @@ static void newselect(int32, Select**);
// newselect(size uint32) (sel *byte);
-void* runtime_newselect(int) __asm__("runtime.newselect");
+void* runtime_newselect(int32) __asm__("runtime.newselect");
void*
-runtime_newselect(int size)
+runtime_newselect(int32 size)
{
Select *sel;
@@ -688,11 +731,11 @@ static void selectsend(Select *sel, Hchan *c, int index, void *elem);
// selectsend(sel *byte, hchan *chan any, elem *any) (selected bool);
-void runtime_selectsend(Select *, Hchan *, void *, int)
+void runtime_selectsend(Select *, Hchan *, void *, int32)
__asm__("runtime.selectsend");
void
-runtime_selectsend(Select *sel, Hchan *c, void *elem, int index)
+runtime_selectsend(Select *sel, Hchan *c, void *elem, int32 index)
{
// nil cases do not compete
if(c == nil)
@@ -728,11 +771,11 @@ static void selectrecv(Select *sel, Hchan *c, int index, void *elem, bool*);
// selectrecv(sel *byte, hchan *chan any, elem *any) (selected bool);
-void runtime_selectrecv(Select *, Hchan *, void *, int)
+void runtime_selectrecv(Select *, Hchan *, void *, int32)
__asm__("runtime.selectrecv");
void
-runtime_selectrecv(Select *sel, Hchan *c, void *elem, int index)
+runtime_selectrecv(Select *sel, Hchan *c, void *elem, int32 index)
{
// nil cases do not compete
if(c == nil)
@@ -743,11 +786,11 @@ runtime_selectrecv(Select *sel, Hchan *c, void *elem, int index)
// selectrecv2(sel *byte, hchan *chan any, elem *any, received *bool) (selected bool);
-void runtime_selectrecv2(Select *, Hchan *, void *, bool *, int)
+void runtime_selectrecv2(Select *, Hchan *, void *, bool *, int32)
__asm__("runtime.selectrecv2");
void
-runtime_selectrecv2(Select *sel, Hchan *c, void *elem, bool *received, int index)
+runtime_selectrecv2(Select *sel, Hchan *c, void *elem, bool *received, int32 index)
{
// nil cases do not compete
if(c == nil)
@@ -784,16 +827,16 @@ static void selectdefault(Select*, int);
// selectdefault(sel *byte) (selected bool);
-void runtime_selectdefault(Select *, int) __asm__("runtime.selectdefault");
+void runtime_selectdefault(Select *, int32) __asm__("runtime.selectdefault");
void
-runtime_selectdefault(Select *sel, int index)
+runtime_selectdefault(Select *sel, int32 index)
{
selectdefault(sel, index);
}
static void
-selectdefault(Select *sel, int index)
+selectdefault(Select *sel, int32 index)
{
int32 i;
Scase *cas;
@@ -848,12 +891,7 @@ selunlock(Select *sel)
void
runtime_block(void)
{
- G *g;
-
- g = runtime_g();
- g->status = Gwaiting; // forever
- g->waitreason = "select (no cases)";
- runtime_gosched();
+ runtime_park(nil, nil, "select (no cases)"); // forever
}
static int selectgo(Select**);
@@ -985,10 +1023,7 @@ loop:
}
g->param = nil;
- g->status = Gwaiting;
- g->waitreason = "select";
- selunlock(sel);
- runtime_gosched();
+ runtime_park((void(*)(Lock*))selunlock, (Lock*)sel, "select");
sellock(sel);
sg = g->param;
@@ -1029,6 +1064,8 @@ loop:
asyncrecv:
// can receive from buffer
+ if(raceenabled)
+ runtime_raceacquire(chanbuf(c, c->recvx));
if(cas->receivedp != nil)
*cas->receivedp = true;
if(cas->sg.elem != nil)
@@ -1049,6 +1086,8 @@ asyncrecv:
asyncsend:
// can send to buffer
+ if(raceenabled)
+ runtime_racerelease(chanbuf(c, c->sendx));
runtime_memmove(chanbuf(c, c->sendx), cas->sg.elem, c->elemsize);
if(++c->sendx == c->dataqsiz)
c->sendx = 0;
@@ -1065,6 +1104,8 @@ asyncsend:
syncrecv:
// can receive from sleeping sender (sg)
+ if(raceenabled)
+ racesync(c, sg);
selunlock(sel);
if(debug)
runtime_printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
@@ -1084,10 +1125,14 @@ rclose:
*cas->receivedp = false;
if(cas->sg.elem != nil)
runtime_memclr(cas->sg.elem, c->elemsize);
+ if(raceenabled)
+ runtime_raceacquire(c);
goto retc;
syncsend:
// can send to sleeping receiver (sg)
+ if(raceenabled)
+ racesync(c, sg);
selunlock(sel);
if(debug)
runtime_printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
@@ -1110,6 +1155,102 @@ sclose:
return 0; // not reached
}
+// This struct must match ../reflect/value.go:/runtimeSelect.
+typedef struct runtimeSelect runtimeSelect;
+struct runtimeSelect
+{
+ uintptr dir;
+ ChanType *typ;
+ Hchan *ch;
+ uintptr val;
+};
+
+// This enum must match ../reflect/value.go:/SelectDir.
+enum SelectDir {
+ SelectSend = 1,
+ SelectRecv,
+ SelectDefault,
+};
+
+struct rselect_ret {
+ intgo chosen;
+ uintptr word;
+ bool recvOK;
+};
+
+// func rselect(cases []runtimeSelect) (chosen int, word uintptr, recvOK bool)
+
+struct rselect_ret reflect_rselect(Slice)
+ asm("reflect.rselect");
+
+struct rselect_ret
+reflect_rselect(Slice cases)
+{
+ struct rselect_ret ret;
+ int32 i;
+ Select *sel;
+ runtimeSelect* rcase, *rc;
+ void *elem;
+ void *recvptr;
+ uintptr maxsize;
+ bool onlyptr;
+
+ ret.chosen = -1;
+ ret.word = 0;
+ ret.recvOK = false;
+
+ maxsize = 0;
+ onlyptr = true;
+ rcase = (runtimeSelect*)cases.__values;
+ for(i=0; i<cases.__count; i++) {
+ rc = &rcase[i];
+ if(rc->dir == SelectRecv && rc->ch != nil) {
+ if(maxsize < rc->typ->__element_type->__size)
+ maxsize = rc->typ->__element_type->__size;
+ if(!__go_is_pointer_type(rc->typ->__element_type))
+ onlyptr = false;
+ }
+ }
+
+ recvptr = nil;
+ if(!onlyptr)
+ recvptr = runtime_mal(maxsize);
+
+ newselect(cases.__count, &sel);
+ for(i=0; i<cases.__count; i++) {
+ rc = &rcase[i];
+ switch(rc->dir) {
+ case SelectDefault:
+ selectdefault(sel, i);
+ break;
+ case SelectSend:
+ if(rc->ch == nil)
+ break;
+ if(!__go_is_pointer_type(rc->typ->__element_type))
+ elem = (void*)rc->val;
+ else
+ elem = (void*)&rc->val;
+ selectsend(sel, rc->ch, i, elem);
+ break;
+ case SelectRecv:
+ if(rc->ch == nil)
+ break;
+ if(!__go_is_pointer_type(rc->typ->__element_type))
+ elem = recvptr;
+ else
+ elem = &ret.word;
+ selectrecv(sel, rc->ch, i, elem, &ret.recvOK);
+ break;
+ }
+ }
+
+ ret.chosen = (intgo)(uintptr)selectgo(&sel);
+ if(rcase[ret.chosen].dir == SelectRecv && !__go_is_pointer_type(rcase[ret.chosen].typ->__element_type))
+ ret.word = (uintptr)recvptr;
+
+ return ret;
+}
+
// closechan(sel *byte);
void
runtime_closechan(Hchan *c)
@@ -1129,6 +1270,11 @@ runtime_closechan(Hchan *c)
runtime_panicstring("close of closed channel");
}
+ if(raceenabled) {
+ runtime_racewritepc(c, runtime_getcallerpc(&c));
+ runtime_racerelease(c);
+ }
+
c->closed = true;
// release all readers
@@ -1172,15 +1318,15 @@ reflect_chanclose(uintptr c)
}
// For reflect
-// func chanlen(c chan) (len int32)
+// func chanlen(c chan) (len int)
-int32 reflect_chanlen(uintptr) __asm__("reflect.chanlen");
+intgo reflect_chanlen(uintptr) __asm__("reflect.chanlen");
-int32
+intgo
reflect_chanlen(uintptr ca)
{
Hchan *c;
- int32 len;
+ intgo len;
c = (Hchan*)ca;
if(c == nil)
@@ -1190,22 +1336,22 @@ reflect_chanlen(uintptr ca)
return len;
}
-int
+intgo
__go_chan_len(Hchan *c)
{
return reflect_chanlen((uintptr)c);
}
// For reflect
-// func chancap(c chan) (cap int32)
+// func chancap(c chan) (cap intgo)
-int32 reflect_chancap(uintptr) __asm__("reflect.chancap");
+intgo reflect_chancap(uintptr) __asm__("reflect.chancap");
-int32
+intgo
reflect_chancap(uintptr ca)
{
Hchan *c;
- int32 cap;
+ intgo cap;
c = (Hchan*)ca;
if(c == nil)
@@ -1215,7 +1361,7 @@ reflect_chancap(uintptr ca)
return cap;
}
-int
+intgo
__go_chan_cap(Hchan *c)
{
return reflect_chancap((uintptr)c);
@@ -1273,3 +1419,12 @@ enqueue(WaitQ *q, SudoG *sgp)
q->last->link = sgp;
q->last = sgp;
}
+
+static void
+racesync(Hchan *c, SudoG *sg)
+{
+ runtime_racerelease(chanbuf(c, 0));
+ runtime_raceacquireg(sg->g, chanbuf(c, 0));
+ runtime_racereleaseg(sg->g, chanbuf(c, 0));
+ runtime_raceacquire(chanbuf(c, 0));
+}