summaryrefslogtreecommitdiff
path: root/go/vendor/gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer/payload.go
blob: 65ce37fecb76e0a6493ff712bd50bb43f10073a6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package tracer

import (
	"bytes"
	"encoding/binary"
	"io"

	"github.com/tinylib/msgp/msgp"
)

// payload is a wrapper on top of the msgpack encoder which allows constructing an
// encoded array by pushing its entries sequentially, one at a time. It basically
// allows us to encode as we would with a stream, except that the contents of the stream
// can be read as a slice by the msgpack decoder at any time. It follows the guidelines
// from the msgpack array spec:
// https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
//
// payload implements io.Reader and can be used with the decoder directly. To create
// a new payload use the newPayload method.
//
// payload is not safe for concurrent use.
//
// This structure basically allows us to push traces into the payload one at a time
// in order to always have knowledge of the payload size, but also making it possible
// for the agent to decode it as an array.
type payload struct {
	// header specifies the first few bytes in the msgpack stream
	// indicating the type of array (fixarray, array16 or array32)
	// and the number of items contained in the stream.
	header []byte

	// off specifies the current read position on the header.
	off int

	// count specifies the number of items in the stream.
	count uint64

	// buf holds the sequence of msgpack-encoded items.
	buf bytes.Buffer
}

var _ io.Reader = (*payload)(nil)

// newPayload returns a ready to use payload.
func newPayload() *payload {
	p := &payload{
		header: make([]byte, 8),
		off:    8,
	}
	return p
}

// push pushes a new item into the stream.
func (p *payload) push(t spanList) error {
	if err := msgp.Encode(&p.buf, t); err != nil {
		return err
	}
	p.count++
	p.updateHeader()
	return nil
}

// itemCount returns the number of items available in the srteam.
func (p *payload) itemCount() int {
	return int(p.count)
}

// size returns the payload size in bytes. After the first read the value becomes
// inaccurate by up to 8 bytes.
func (p *payload) size() int {
	return p.buf.Len() + len(p.header) - p.off
}

// reset resets the internal buffer, counter and read offset.
func (p *payload) reset() {
	p.off = 8
	p.count = 0
	p.buf.Reset()
}

// https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
const (
	msgpackArrayFix byte = 144  // up to 15 items
	msgpackArray16       = 0xdc // up to 2^16-1 items, followed by size in 2 bytes
	msgpackArray32       = 0xdd // up to 2^32-1 items, followed by size in 4 bytes
)

// updateHeader updates the payload header based on the number of items currently
// present in the stream.
func (p *payload) updateHeader() {
	n := p.count
	switch {
	case n <= 15:
		p.header[7] = msgpackArrayFix + byte(n)
		p.off = 7
	case n <= 1<<16-1:
		binary.BigEndian.PutUint64(p.header, n) // writes 2 bytes
		p.header[5] = msgpackArray16
		p.off = 5
	default: // n <= 1<<32-1
		binary.BigEndian.PutUint64(p.header, n) // writes 4 bytes
		p.header[3] = msgpackArray32
		p.off = 3
	}
}

// Read implements io.Reader. It reads from the msgpack-encoded stream.
func (p *payload) Read(b []byte) (n int, err error) {
	if p.off < len(p.header) {
		// reading header
		n = copy(b, p.header[p.off:])
		p.off += n
		return n, nil
	}
	return p.buf.Read(b)
}