summaryrefslogtreecommitdiff
path: root/daemon/logger/adapter.go
blob: 97d59be5e0ebc9c110cc3d2ff80ae5b8b07e73a9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package logger // import "github.com/docker/docker/daemon/logger"

import (
	"io"
	"os"
	"path/filepath"
	"sync"
	"time"

	"github.com/docker/docker/api/types/plugins/logdriver"
	"github.com/docker/docker/pkg/plugingetter"
	"github.com/pkg/errors"
	"github.com/sirupsen/logrus"
)

// pluginAdapter takes a plugin and implements the Logger interface for logger
// instances
type pluginAdapter struct {
	driverName   string
	id           string
	plugin       logPlugin
	fifoPath     string
	capabilities Capability
	logInfo      Info

	// synchronize access to the log stream and shared buffer
	mu     sync.Mutex
	enc    logdriver.LogEntryEncoder
	stream io.WriteCloser
	// buf is shared for each `Log()` call to reduce allocations.
	// buf must be protected by mutex
	buf logdriver.LogEntry
}

func (a *pluginAdapter) Log(msg *Message) error {
	a.mu.Lock()

	a.buf.Line = msg.Line
	a.buf.TimeNano = msg.Timestamp.UnixNano()
	a.buf.Partial = msg.PLogMetaData != nil
	a.buf.Source = msg.Source
	if msg.PLogMetaData != nil {
		a.buf.PartialLogMetadata = &logdriver.PartialLogEntryMetadata{
			Id:      msg.PLogMetaData.ID,
			Last:    msg.PLogMetaData.Last,
			Ordinal: int32(msg.PLogMetaData.Ordinal),
		}
	}

	err := a.enc.Encode(&a.buf)
	a.buf.Reset()

	a.mu.Unlock()

	PutMessage(msg)
	return err
}

func (a *pluginAdapter) Name() string {
	return a.driverName
}

func (a *pluginAdapter) Close() error {
	a.mu.Lock()
	defer a.mu.Unlock()

	if err := a.plugin.StopLogging(filepath.Join("/", "run", "docker", "logging", a.id)); err != nil {
		return err
	}

	if err := a.stream.Close(); err != nil {
		logrus.WithError(err).Error("error closing plugin fifo")
	}
	if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) {
		logrus.WithError(err).Error("error cleaning up plugin fifo")
	}

	// may be nil, especially for unit tests
	if pluginGetter != nil {
		pluginGetter.Get(a.Name(), extName, plugingetter.Release)
	}
	return nil
}

type pluginAdapterWithRead struct {
	*pluginAdapter
}

func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
	watcher := NewLogWatcher()

	go func() {
		defer close(watcher.Msg)
		stream, err := a.plugin.ReadLogs(a.logInfo, config)
		if err != nil {
			watcher.Err <- errors.Wrap(err, "error getting log reader")
			return
		}
		defer stream.Close()

		dec := logdriver.NewLogEntryDecoder(stream)
		for {
			var buf logdriver.LogEntry
			if err := dec.Decode(&buf); err != nil {
				if err == io.EOF {
					return
				}
				watcher.Err <- errors.Wrap(err, "error decoding log message")
				return
			}

			msg := &Message{
				Timestamp: time.Unix(0, buf.TimeNano),
				Line:      buf.Line,
				Source:    buf.Source,
			}

			// plugin should handle this, but check just in case
			if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
				continue
			}
			if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
				return
			}

			// send the message unless the consumer is gone
			select {
			case watcher.Msg <- msg:
			case <-watcher.WatchConsumerGone():
				return
			}
		}
	}()

	return watcher
}