summaryrefslogtreecommitdiff
path: root/daemon/logger/copier.go
blob: 30c68ea364d2b30cbadca581a2417adc98863a97 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package logger // import "github.com/docker/docker/daemon/logger"

import (
	"bytes"
	"io"
	"sync"
	"time"

	types "github.com/docker/docker/api/types/backend"
	"github.com/docker/docker/pkg/stringid"
	"github.com/sirupsen/logrus"
)

const (
	// readSize is the maximum bytes read during a single read
	// operation.
	readSize = 2 * 1024

	// defaultBufSize provides a reasonable default for loggers that do
	// not have an external limit to impose on log line size.
	defaultBufSize = 16 * 1024
)

// Copier can copy logs from specified sources to Logger and attach Timestamp.
// Writes are concurrent, so you need implement some sync in your logger.
type Copier struct {
	// srcs is map of name -> reader pairs, for example "stdout", "stderr"
	srcs      map[string]io.Reader
	dst       Logger
	copyJobs  sync.WaitGroup
	closeOnce sync.Once
	closed    chan struct{}
}

// NewCopier creates a new Copier
func NewCopier(srcs map[string]io.Reader, dst Logger) *Copier {
	return &Copier{
		srcs:   srcs,
		dst:    dst,
		closed: make(chan struct{}),
	}
}

// Run starts logs copying
func (c *Copier) Run() {
	for src, w := range c.srcs {
		c.copyJobs.Add(1)
		go c.copySrc(src, w)
	}
}

func (c *Copier) copySrc(name string, src io.Reader) {
	defer c.copyJobs.Done()

	bufSize := defaultBufSize
	if sizedLogger, ok := c.dst.(SizedLogger); ok {
		size := sizedLogger.BufSize()
		// Loggers that wrap another loggers would have BufSize(), but cannot return the size
		// when the wrapped loggers doesn't have BufSize().
		if size > 0 {
			bufSize = size
		}
	}
	buf := make([]byte, bufSize)

	n := 0
	eof := false
	var partialid string
	var partialTS time.Time
	var ordinal int
	firstPartial := true
	hasMorePartial := false

	for {
		select {
		case <-c.closed:
			return
		default:
			// Work out how much more data we are okay with reading this time.
			upto := n + readSize
			if upto > cap(buf) {
				upto = cap(buf)
			}
			// Try to read that data.
			if upto > n {
				read, err := src.Read(buf[n:upto])
				if err != nil {
					if err != io.EOF {
						logReadsFailedCount.Inc(1)
						logrus.Errorf("Error scanning log stream: %s", err)
						return
					}
					eof = true
				}
				n += read
			}
			// If we have no data to log, and there's no more coming, we're done.
			if n == 0 && eof {
				return
			}
			// Break up the data that we've buffered up into lines, and log each in turn.
			p := 0

			for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') {
				select {
				case <-c.closed:
					return
				default:
					msg := NewMessage()
					msg.Source = name
					msg.Line = append(msg.Line, buf[p:p+q]...)

					if hasMorePartial {
						msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: true}

						// reset
						partialid = ""
						ordinal = 0
						firstPartial = true
						hasMorePartial = false
					}
					if msg.PLogMetaData == nil {
						msg.Timestamp = time.Now().UTC()
					} else {
						msg.Timestamp = partialTS
					}

					if logErr := c.dst.Log(msg); logErr != nil {
						logDriverError(c.dst.Name(), string(msg.Line), logErr)
					}
				}
				p += q + 1
			}
			// If there's no more coming, or the buffer is full but
			// has no newlines, log whatever we haven't logged yet,
			// noting that it's a partial log line.
			if eof || (p == 0 && n == len(buf)) {
				if p < n {
					msg := NewMessage()
					msg.Source = name
					msg.Line = append(msg.Line, buf[p:n]...)

					// Generate unique partialID for first partial. Use it across partials.
					// Record timestamp for first partial. Use it across partials.
					// Initialize Ordinal for first partial. Increment it across partials.
					if firstPartial {
						msg.Timestamp = time.Now().UTC()
						partialTS = msg.Timestamp
						partialid = stringid.GenerateRandomID()
						ordinal = 1
						firstPartial = false
						totalPartialLogs.Inc(1)
					} else {
						msg.Timestamp = partialTS
					}
					msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: false}
					ordinal++
					hasMorePartial = true

					if logErr := c.dst.Log(msg); logErr != nil {
						logDriverError(c.dst.Name(), string(msg.Line), logErr)
					}
					p = 0
					n = 0
				}
				if eof {
					return
				}
			}
			// Move any unlogged data to the front of the buffer in preparation for another read.
			if p > 0 {
				copy(buf[0:], buf[p:n])
				n -= p
			}
		}
	}
}

// Wait waits until all copying is done
func (c *Copier) Wait() {
	c.copyJobs.Wait()
}

// Close closes the copier
func (c *Copier) Close() {
	c.closeOnce.Do(func() {
		close(c.closed)
	})
}