summaryrefslogtreecommitdiff
path: root/libgo/go/netchan/common.go
blob: 87981ca860387dfdb9ca5d4b5e54a8f60b434dd0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package netchan

import (
	"gob"
	"net"
	"os"
	"reflect"
	"sync"
	"time"
)

// The direction of a connection from the client's perspective.
type Dir int

const (
	Recv Dir = iota
	Send
)

func (dir Dir) String() string {
	switch dir {
	case Recv:
		return "Recv"
	case Send:
		return "Send"
	}
	return "???"
}

// Payload types
const (
	payRequest = iota // request structure follows
	payError          // error structure follows
	payData           // user payload follows
	payAck            // acknowledgement; no payload
	payClosed         // channel is now closed
)

// A header is sent as a prefix to every transmission.  It will be followed by
// a request structure, an error structure, or an arbitrary user payload structure.
type header struct {
	name        string
	payloadType int
	seqNum      int64
}

// Sent with a header once per channel from importer to exporter to report
// that it wants to bind to a channel with the specified direction for count
// messages.  If count is -1, it means unlimited.
type request struct {
	count int64
	dir   Dir
}

// Sent with a header to report an error.
type error struct {
	error string
}

// Used to unify management of acknowledgements for import and export.
type unackedCounter interface {
	unackedCount() int64
	ack() int64
	seq() int64
}

// A channel and its direction.
type chanDir struct {
	ch  *reflect.ChanValue
	dir Dir
}

// clientSet contains the objects and methods needed for tracking
// clients of an exporter and draining outstanding messages.
type clientSet struct {
	mu      sync.Mutex // protects access to channel and client maps
	chans   map[string]*chanDir
	clients map[unackedCounter]bool
}

// Mutex-protected encoder and decoder pair.
type encDec struct {
	decLock sync.Mutex
	dec     *gob.Decoder
	encLock sync.Mutex
	enc     *gob.Encoder
}

func newEncDec(conn net.Conn) *encDec {
	return &encDec{
		dec: gob.NewDecoder(conn),
		enc: gob.NewEncoder(conn),
	}
}

// Decode an item from the connection.
func (ed *encDec) decode(value reflect.Value) os.Error {
	ed.decLock.Lock()
	err := ed.dec.DecodeValue(value)
	if err != nil {
		// TODO: tear down connection?
	}
	ed.decLock.Unlock()
	return err
}

// Encode a header and payload onto the connection.
func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) os.Error {
	ed.encLock.Lock()
	hdr.payloadType = payloadType
	err := ed.enc.Encode(hdr)
	if err == nil {
		if payload != nil {
			err = ed.enc.Encode(payload)
		}
	}
	if err != nil {
		// TODO: tear down connection if there is an error?
	}
	ed.encLock.Unlock()
	return err
}

// See the comment for Exporter.Drain.
func (cs *clientSet) drain(timeout int64) os.Error {
	startTime := time.Nanoseconds()
	for {
		pending := false
		cs.mu.Lock()
		// Any messages waiting for a client?
		for _, chDir := range cs.chans {
			if chDir.ch.Len() > 0 {
				pending = true
			}
		}
		// Any unacknowledged messages?
		for client := range cs.clients {
			n := client.unackedCount()
			if n > 0 { // Check for > rather than != just to be safe.
				pending = true
				break
			}
		}
		cs.mu.Unlock()
		if !pending {
			break
		}
		if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
			return os.ErrorString("timeout")
		}
		time.Sleep(100 * 1e6) // 100 milliseconds
	}
	return nil
}

// See the comment for Exporter.Sync.
func (cs *clientSet) sync(timeout int64) os.Error {
	startTime := time.Nanoseconds()
	// seq remembers the clients and their seqNum at point of entry.
	seq := make(map[unackedCounter]int64)
	for client := range cs.clients {
		seq[client] = client.seq()
	}
	for {
		pending := false
		cs.mu.Lock()
		// Any unacknowledged messages?  Look only at clients that existed
		// when we started and are still in this client set.
		for client := range seq {
			if _, ok := cs.clients[client]; ok {
				if client.ack() < seq[client] {
					pending = true
					break
				}
			}
		}
		cs.mu.Unlock()
		if !pending {
			break
		}
		if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
			return os.ErrorString("timeout")
		}
		time.Sleep(100 * 1e6) // 100 milliseconds
	}
	return nil
}