summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYuichi Bando <bando.yuichi@lab.ntt.co.jp>2015-03-26 14:08:26 +0900
committerSergey Shepelev <temotor@gmail.com>2017-03-17 22:42:27 +0300
commit654a271b82d28d3888a5a12358dea2130cfac928 (patch)
tree3bf06757a9df9d5b061b6ad7ef487d43f7072c04
parent86607d38203d050a9be88c4d6b82af74950d3df0 (diff)
downloadeventlet-654a271b82d28d3888a5a12358dea2130cfac928.tar.gz
New feature: Add zipkin tracing to eventlet
Zipkin is a trend distributed tracing framewrok developed at Twitter. Such tracing is useful for both developers and operatos to understand the behavior of complex distributed systems and find performance bottlenecks. This patch provides a WSGI application using eventlet with tracing facility that complies with Zipkin. Signed-off-by: Yuichi Bando <bando.yuichi@lab.ntt.co.jp> Original commit modified for PEP-8 fixes. https://github.com/eventlet/eventlet/pull/218
-rw-r--r--eventlet/zipkin/README.rst135
-rw-r--r--eventlet/zipkin/__init__.py0
-rw-r--r--eventlet/zipkin/_thrift/README.rst8
-rw-r--r--eventlet/zipkin/_thrift/__init__.py0
-rw-r--r--eventlet/zipkin/_thrift/zipkinCore.thrift55
-rw-r--r--eventlet/zipkin/_thrift/zipkinCore/__init__.py1
-rw-r--r--eventlet/zipkin/_thrift/zipkinCore/constants.py14
-rw-r--r--eventlet/zipkin/_thrift/zipkinCore/ttypes.py452
-rw-r--r--eventlet/zipkin/api.py186
-rw-r--r--eventlet/zipkin/client.py56
-rwxr-xr-xeventlet/zipkin/example/ex1.pngbin0 -> 53179 bytes
-rwxr-xr-xeventlet/zipkin/example/ex2.pngbin0 -> 40482 bytes
-rwxr-xr-xeventlet/zipkin/example/ex3.pngbin0 -> 73175 bytes
-rw-r--r--eventlet/zipkin/greenthread.py33
-rw-r--r--eventlet/zipkin/http.py61
-rw-r--r--eventlet/zipkin/log.py19
-rw-r--r--eventlet/zipkin/patcher.py41
-rw-r--r--eventlet/zipkin/wsgi.py78
18 files changed, 1139 insertions, 0 deletions
diff --git a/eventlet/zipkin/README.rst b/eventlet/zipkin/README.rst
new file mode 100644
index 0000000..a0dab00
--- /dev/null
+++ b/eventlet/zipkin/README.rst
@@ -0,0 +1,135 @@
+eventlet.zipkin
+===============
+
+`Zipkin <http://twitter.github.io/zipkin/>`_ is a distributed tracing system developed at Twitter.
+This package provides a WSGI application using eventlet
+with tracing facility that complies with Zipkin.
+
+Why use it?
+From the http://twitter.github.io/zipkin/:
+
+"Collecting traces helps developers gain deeper knowledge about how
+certain requests perform in a distributed system. Let's say we're having
+problems with user requests timing out. We can look up traced requests
+that timed out and display it in the web UI. We'll be able to quickly
+find the service responsible for adding the unexpected response time. If
+the service has been annotated adequately we can also find out where in
+that service the issue is happening."
+
+
+Screenshot
+----------
+
+Zipkin web ui screenshots obtained when applying this module to
+`OpenStack swift <https://github.com/openstack/swift>`_ are in example/.
+
+
+Requirement
+-----------
+
+A eventlet.zipkin needs `python scribe client <https://pypi.python.org/pypi/facebook-scribe/>`_
+and `thrift <https://thrift.apache.org/>`_ (>=0.9),
+because the zipkin collector speaks `scribe <https://github.com/facebookarchive/scribe>`_ protocol.
+Below command will install both scribe client and thrift.
+
+Install facebook-scribe:
+
+::
+
+ pip install facebook-scribe
+
+
+
+
+**Python**: ``2.6``, ``2.7`` (Because the current Python Thrift release doesn't
+support Python 3)
+
+
+
+How to use
+----------
+
+Add tracing facility to your application
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+Apply the monkey patch before you start wsgi server.
+
+.. code:: python
+
+ # Add only 2 lines to your code
+ from eventlet.zipkin import patcher
+ patcher.enable_trace_patch()
+
+ # existing code
+ from eventlet import wsgi
+ wsgi.server(sock, app)
+
+You can pass some parameters to ``enable_trace_patch()``
+
+* host: Scribe daemon IP address (default: '127.0.0.1')
+* port: Scribe daemon port (default: 9410)
+* trace_app_log: A Boolean indicating if the tracer will trace application log together or not. This facility assume that your application uses python standard logging library. (default: False)
+* sampling_rate: A Float value (0.0~1.0) that indicates the tracing frequency. If you specify 1.0, all requests are traced and sent to Zipkin collecotr. If you specify 0.1, only 1/10 requests are traced. (defult: 1.0)
+
+
+(Option) Annotation API
+~~~~~~~~~~~~~~~~~~~~~~~
+If you want to record additional information,
+you can use below API from anywhere in your code.
+
+.. code:: python
+
+ from eventlet.zipkin import api
+
+ api.put_annotation('Cache miss for %s' % request)
+ api.put_key_value('key', 'value')
+
+
+
+
+Zipkin simple setup
+-------------------
+
+::
+
+ $ git clone https://github.com/twitter/zipkin.git
+ $ cd zipkin
+ # Open 3 terminals
+ (terminal1) $ bin/collector
+ (terminal2) $ bin/query
+ (terminal3) $ bin/web
+
+Access http://localhost:8080 from your browser.
+
+
+(Option) fluentd
+----------------
+If you want to buffer the tracing data for performance,
+`fluentd scribe plugin <http://docs.fluentd.org/articles/in_scribe>`_ is available.
+Since ``out_scribe plugin`` extends `Buffer Plugin <http://docs.fluentd.org/articles/buffer-plugin-overview>`_ ,
+you can customize buffering parameters in the manner of fluentd.
+Scribe plugin is included in td-agent by default.
+
+
+Sample: ``/etc/td-agent/td-agent.conf``
+
+::
+
+ # in_scribe
+ <source>
+ type scribe
+ port 9999
+ </source>
+
+ # out_scribe
+ <match zipkin.**>
+ type scribe
+ host Zipkin_collector_IP
+ port 9410
+ flush_interval 60s
+ buffer_chunk_limit 256m
+ </match>
+
+| And, you need to specify ``patcher.enable_trace_patch(port=9999)`` for in_scribe.
+| In this case, trace data is passed like below.
+| Your application => Local fluentd in_scribe (9999) => Local fluentd out_scribe <buffering> =====> Remote zipkin collector (9410)
+
diff --git a/eventlet/zipkin/__init__.py b/eventlet/zipkin/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/eventlet/zipkin/__init__.py
diff --git a/eventlet/zipkin/_thrift/README.rst b/eventlet/zipkin/_thrift/README.rst
new file mode 100644
index 0000000..0317d50
--- /dev/null
+++ b/eventlet/zipkin/_thrift/README.rst
@@ -0,0 +1,8 @@
+_thrift
+========
+
+* This directory is auto-generated by Thrift Compiler by using
+ https://github.com/twitter/zipkin/blob/master/zipkin-thrift/src/main/thrift/com/twitter/zipkin/zipkinCore.thrift
+
+* Do not modify this directory.
+
diff --git a/eventlet/zipkin/_thrift/__init__.py b/eventlet/zipkin/_thrift/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/eventlet/zipkin/_thrift/__init__.py
diff --git a/eventlet/zipkin/_thrift/zipkinCore.thrift b/eventlet/zipkin/_thrift/zipkinCore.thrift
new file mode 100644
index 0000000..0787ca8
--- /dev/null
+++ b/eventlet/zipkin/_thrift/zipkinCore.thrift
@@ -0,0 +1,55 @@
+# Copyright 2012 Twitter Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+namespace java com.twitter.zipkin.gen
+namespace rb Zipkin
+
+//************** Collection related structs **************
+
+// these are the annotations we always expect to find in a span
+const string CLIENT_SEND = "cs"
+const string CLIENT_RECV = "cr"
+const string SERVER_SEND = "ss"
+const string SERVER_RECV = "sr"
+
+// this represents a host and port in a network
+struct Endpoint {
+ 1: i32 ipv4,
+ 2: i16 port // beware that this will give us negative ports. some conversion needed
+ 3: string service_name // which service did this operation happen on?
+}
+
+// some event took place, either one by the framework or by the user
+struct Annotation {
+ 1: i64 timestamp // microseconds from epoch
+ 2: string value // what happened at the timestamp?
+ 3: optional Endpoint host // host this happened on
+}
+
+enum AnnotationType { BOOL, BYTES, I16, I32, I64, DOUBLE, STRING }
+
+struct BinaryAnnotation {
+ 1: string key,
+ 2: binary value,
+ 3: AnnotationType annotation_type,
+ 4: optional Endpoint host
+}
+
+struct Span {
+ 1: i64 trace_id // unique trace id, use for all spans in trace
+ 3: string name, // span name, rpc method for example
+ 4: i64 id, // unique span id, only used for this span
+ 5: optional i64 parent_id, // parent span id
+ 6: list<Annotation> annotations, // list of all annotations/events that occured
+ 8: list<BinaryAnnotation> binary_annotations // any binary annotations
+}
diff --git a/eventlet/zipkin/_thrift/zipkinCore/__init__.py b/eventlet/zipkin/_thrift/zipkinCore/__init__.py
new file mode 100644
index 0000000..adefd8e
--- /dev/null
+++ b/eventlet/zipkin/_thrift/zipkinCore/__init__.py
@@ -0,0 +1 @@
+__all__ = ['ttypes', 'constants']
diff --git a/eventlet/zipkin/_thrift/zipkinCore/constants.py b/eventlet/zipkin/_thrift/zipkinCore/constants.py
new file mode 100644
index 0000000..3e04f77
--- /dev/null
+++ b/eventlet/zipkin/_thrift/zipkinCore/constants.py
@@ -0,0 +1,14 @@
+#
+# Autogenerated by Thrift Compiler (0.8.0)
+#
+# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+#
+#
+
+from thrift.Thrift import TType, TMessageType, TException
+from ttypes import *
+
+CLIENT_SEND = "cs"
+CLIENT_RECV = "cr"
+SERVER_SEND = "ss"
+SERVER_RECV = "sr"
diff --git a/eventlet/zipkin/_thrift/zipkinCore/ttypes.py b/eventlet/zipkin/_thrift/zipkinCore/ttypes.py
new file mode 100644
index 0000000..418911f
--- /dev/null
+++ b/eventlet/zipkin/_thrift/zipkinCore/ttypes.py
@@ -0,0 +1,452 @@
+#
+# Autogenerated by Thrift Compiler (0.8.0)
+#
+# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+#
+#
+
+from thrift.Thrift import TType, TMessageType, TException
+
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol, TProtocol
+try:
+ from thrift.protocol import fastbinary
+except:
+ fastbinary = None
+
+
+class AnnotationType:
+ BOOL = 0
+ BYTES = 1
+ I16 = 2
+ I32 = 3
+ I64 = 4
+ DOUBLE = 5
+ STRING = 6
+
+ _VALUES_TO_NAMES = {
+ 0: "BOOL",
+ 1: "BYTES",
+ 2: "I16",
+ 3: "I32",
+ 4: "I64",
+ 5: "DOUBLE",
+ 6: "STRING",
+ }
+
+ _NAMES_TO_VALUES = {
+ "BOOL": 0,
+ "BYTES": 1,
+ "I16": 2,
+ "I32": 3,
+ "I64": 4,
+ "DOUBLE": 5,
+ "STRING": 6,
+ }
+
+
+class Endpoint:
+ """
+ Attributes:
+ - ipv4
+ - port
+ - service_name
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I32, 'ipv4', None, None, ), # 1
+ (2, TType.I16, 'port', None, None, ), # 2
+ (3, TType.STRING, 'service_name', None, None, ), # 3
+ )
+
+ def __init__(self, ipv4=None, port=None, service_name=None,):
+ self.ipv4 = ipv4
+ self.port = port
+ self.service_name = service_name
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I32:
+ self.ipv4 = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.I16:
+ self.port = iprot.readI16();
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.STRING:
+ self.service_name = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('Endpoint')
+ if self.ipv4 is not None:
+ oprot.writeFieldBegin('ipv4', TType.I32, 1)
+ oprot.writeI32(self.ipv4)
+ oprot.writeFieldEnd()
+ if self.port is not None:
+ oprot.writeFieldBegin('port', TType.I16, 2)
+ oprot.writeI16(self.port)
+ oprot.writeFieldEnd()
+ if self.service_name is not None:
+ oprot.writeFieldBegin('service_name', TType.STRING, 3)
+ oprot.writeString(self.service_name)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class Annotation:
+ """
+ Attributes:
+ - timestamp
+ - value
+ - host
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I64, 'timestamp', None, None, ), # 1
+ (2, TType.STRING, 'value', None, None, ), # 2
+ (3, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 3
+ )
+
+ def __init__(self, timestamp=None, value=None, host=None,):
+ self.timestamp = timestamp
+ self.value = value
+ self.host = host
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I64:
+ self.timestamp = iprot.readI64();
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.value = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.STRUCT:
+ self.host = Endpoint()
+ self.host.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('Annotation')
+ if self.timestamp is not None:
+ oprot.writeFieldBegin('timestamp', TType.I64, 1)
+ oprot.writeI64(self.timestamp)
+ oprot.writeFieldEnd()
+ if self.value is not None:
+ oprot.writeFieldBegin('value', TType.STRING, 2)
+ oprot.writeString(self.value)
+ oprot.writeFieldEnd()
+ if self.host is not None:
+ oprot.writeFieldBegin('host', TType.STRUCT, 3)
+ self.host.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class BinaryAnnotation:
+ """
+ Attributes:
+ - key
+ - value
+ - annotation_type
+ - host
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'key', None, None, ), # 1
+ (2, TType.STRING, 'value', None, None, ), # 2
+ (3, TType.I32, 'annotation_type', None, None, ), # 3
+ (4, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 4
+ )
+
+ def __init__(self, key=None, value=None, annotation_type=None, host=None,):
+ self.key = key
+ self.value = value
+ self.annotation_type = annotation_type
+ self.host = host
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.key = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.value = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.I32:
+ self.annotation_type = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.STRUCT:
+ self.host = Endpoint()
+ self.host.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('BinaryAnnotation')
+ if self.key is not None:
+ oprot.writeFieldBegin('key', TType.STRING, 1)
+ oprot.writeString(self.key)
+ oprot.writeFieldEnd()
+ if self.value is not None:
+ oprot.writeFieldBegin('value', TType.STRING, 2)
+ oprot.writeString(self.value)
+ oprot.writeFieldEnd()
+ if self.annotation_type is not None:
+ oprot.writeFieldBegin('annotation_type', TType.I32, 3)
+ oprot.writeI32(self.annotation_type)
+ oprot.writeFieldEnd()
+ if self.host is not None:
+ oprot.writeFieldBegin('host', TType.STRUCT, 4)
+ self.host.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class Span:
+ """
+ Attributes:
+ - trace_id
+ - name
+ - id
+ - parent_id
+ - annotations
+ - binary_annotations
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I64, 'trace_id', None, None, ), # 1
+ None, # 2
+ (3, TType.STRING, 'name', None, None, ), # 3
+ (4, TType.I64, 'id', None, None, ), # 4
+ (5, TType.I64, 'parent_id', None, None, ), # 5
+ (6, TType.LIST, 'annotations', (TType.STRUCT,(Annotation, Annotation.thrift_spec)), None, ), # 6
+ None, # 7
+ (8, TType.LIST, 'binary_annotations', (TType.STRUCT,(BinaryAnnotation, BinaryAnnotation.thrift_spec)), None, ), # 8
+ )
+
+ def __init__(self, trace_id=None, name=None, id=None, parent_id=None, annotations=None, binary_annotations=None,):
+ self.trace_id = trace_id
+ self.name = name
+ self.id = id
+ self.parent_id = parent_id
+ self.annotations = annotations
+ self.binary_annotations = binary_annotations
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I64:
+ self.trace_id = iprot.readI64();
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.STRING:
+ self.name = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.I64:
+ self.id = iprot.readI64();
+ else:
+ iprot.skip(ftype)
+ elif fid == 5:
+ if ftype == TType.I64:
+ self.parent_id = iprot.readI64();
+ else:
+ iprot.skip(ftype)
+ elif fid == 6:
+ if ftype == TType.LIST:
+ self.annotations = []
+ (_etype3, _size0) = iprot.readListBegin()
+ for _i4 in xrange(_size0):
+ _elem5 = Annotation()
+ _elem5.read(iprot)
+ self.annotations.append(_elem5)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 8:
+ if ftype == TType.LIST:
+ self.binary_annotations = []
+ (_etype9, _size6) = iprot.readListBegin()
+ for _i10 in xrange(_size6):
+ _elem11 = BinaryAnnotation()
+ _elem11.read(iprot)
+ self.binary_annotations.append(_elem11)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('Span')
+ if self.trace_id is not None:
+ oprot.writeFieldBegin('trace_id', TType.I64, 1)
+ oprot.writeI64(self.trace_id)
+ oprot.writeFieldEnd()
+ if self.name is not None:
+ oprot.writeFieldBegin('name', TType.STRING, 3)
+ oprot.writeString(self.name)
+ oprot.writeFieldEnd()
+ if self.id is not None:
+ oprot.writeFieldBegin('id', TType.I64, 4)
+ oprot.writeI64(self.id)
+ oprot.writeFieldEnd()
+ if self.parent_id is not None:
+ oprot.writeFieldBegin('parent_id', TType.I64, 5)
+ oprot.writeI64(self.parent_id)
+ oprot.writeFieldEnd()
+ if self.annotations is not None:
+ oprot.writeFieldBegin('annotations', TType.LIST, 6)
+ oprot.writeListBegin(TType.STRUCT, len(self.annotations))
+ for iter12 in self.annotations:
+ iter12.write(oprot)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.binary_annotations is not None:
+ oprot.writeFieldBegin('binary_annotations', TType.LIST, 8)
+ oprot.writeListBegin(TType.STRUCT, len(self.binary_annotations))
+ for iter13 in self.binary_annotations:
+ iter13.write(oprot)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
diff --git a/eventlet/zipkin/api.py b/eventlet/zipkin/api.py
new file mode 100644
index 0000000..cd03ec0
--- /dev/null
+++ b/eventlet/zipkin/api.py
@@ -0,0 +1,186 @@
+import os
+import sys
+import time
+import struct
+import socket
+import random
+
+from eventlet.green import threading
+from eventlet.zipkin._thrift.zipkinCore import ttypes
+from eventlet.zipkin._thrift.zipkinCore.constants import SERVER_SEND
+
+
+client = None
+_tls = threading.local() # thread local storage
+
+
+def put_annotation(msg, endpoint=None):
+ """ This is annotation API.
+ You can add your own annotation from in your code.
+ Annotation is recorded with timestamp automatically.
+ e.g.) put_annotation('cache hit for %s' % request)
+
+ :param msg: String message
+ :param endpoint: host info
+ """
+ if is_sample():
+ a = ZipkinDataBuilder.build_annotation(msg, endpoint)
+ trace_data = get_trace_data()
+ trace_data.add_annotation(a)
+
+
+def put_key_value(key, value, endpoint=None):
+ """ This is binary annotation API.
+ You can add your own key-value extra information from in your code.
+ Key-value doesn't have a time component.
+ e.g.) put_key_value('http.uri', '/hoge/index.html')
+
+ :param key: String
+ :param value: String
+ :param endpoint: host info
+ """
+ if is_sample():
+ b = ZipkinDataBuilder.build_binary_annotation(key, value, endpoint)
+ trace_data = get_trace_data()
+ trace_data.add_binary_annotation(b)
+
+
+def is_tracing():
+ """ Return whether the current thread is tracking or not """
+ return hasattr(_tls, 'trace_data')
+
+
+def is_sample():
+ """ Return whether it should record trace information
+ for the request or not
+ """
+ return is_tracing() and _tls.trace_data.sampled
+
+
+def get_trace_data():
+ if is_tracing():
+ return _tls.trace_data
+
+
+def set_trace_data(trace_data):
+ _tls.trace_data = trace_data
+
+
+def init_trace_data():
+ if is_tracing():
+ del _tls.trace_data
+
+
+def _uniq_id():
+ """
+ Create a random 64-bit signed integer appropriate
+ for use as trace and span IDs.
+ XXX: By experimentation zipkin has trouble recording traces with ids
+ larger than (2 ** 56) - 1
+ """
+ return random.randint(0, (2 ** 56) - 1)
+
+
+def generate_trace_id():
+ return _uniq_id()
+
+
+def generate_span_id():
+ return _uniq_id()
+
+
+class TraceData(object):
+
+ END_ANNOTATION = SERVER_SEND
+
+ def __init__(self, name, trace_id, span_id, parent_id, sampled, endpoint):
+ """
+ :param name: RPC name (String)
+ :param trace_id: int
+ :param span_id: int
+ :param parent_id: int or None
+ :param sampled: lets the downstream servers know
+ if I should record trace data for the request (bool)
+ :param endpoint: zipkin._thrift.zipkinCore.ttypes.EndPoint
+ """
+ self.name = name
+ self.trace_id = trace_id
+ self.span_id = span_id
+ self.parent_id = parent_id
+ self.sampled = sampled
+ self.endpoint = endpoint
+ self.annotations = []
+ self.bannotations = []
+ self._done = False
+
+ def add_annotation(self, annotation):
+ if annotation.host is None:
+ annotation.host = self.endpoint
+ if not self._done:
+ self.annotations.append(annotation)
+ if annotation.value == self.END_ANNOTATION:
+ self.flush()
+
+ def add_binary_annotation(self, bannotation):
+ if bannotation.host is None:
+ bannotation.host = self.endpoint
+ if not self._done:
+ self.bannotations.append(bannotation)
+
+ def flush(self):
+ span = ZipkinDataBuilder.build_span(name=self.name,
+ trace_id=self.trace_id,
+ span_id=self.span_id,
+ parent_id=self.parent_id,
+ annotations=self.annotations,
+ bannotations=self.bannotations)
+ client.send_to_collector(span)
+ self.annotations = []
+ self.bannotations = []
+ self._done = True
+
+
+class ZipkinDataBuilder:
+ @staticmethod
+ def build_span(name, trace_id, span_id, parent_id,
+ annotations, bannotations):
+ return ttypes.Span(
+ name=name,
+ trace_id=trace_id,
+ id=span_id,
+ parent_id=parent_id,
+ annotations=annotations,
+ binary_annotations=bannotations
+ )
+
+ @staticmethod
+ def build_annotation(value, endpoint=None):
+ if isinstance(value, unicode):
+ value = value.encode('utf-8')
+ return ttypes.Annotation(time.time() * 1000 * 1000,
+ str(value), endpoint)
+
+ @staticmethod
+ def build_binary_annotation(key, value, endpoint=None):
+ annotation_type = ttypes.AnnotationType.STRING
+ return ttypes.BinaryAnnotation(key, value, annotation_type, endpoint)
+
+ @staticmethod
+ def build_endpoint(ipv4=None, port=None, service_name=None):
+ if ipv4 is not None:
+ ipv4 = ZipkinDataBuilder._ipv4_to_int(ipv4)
+ if service_name is None:
+ service_name = ZipkinDataBuilder._get_script_name()
+ return ttypes.Endpoint(
+ ipv4=ipv4,
+ port=port,
+ service_name=service_name
+ )
+
+ @staticmethod
+ def _ipv4_to_int(ipv4):
+ return struct.unpack('!i', socket.inet_aton(ipv4))[0]
+
+ @staticmethod
+ def _get_script_name():
+ return os.path.basename(sys.argv[0])
diff --git a/eventlet/zipkin/client.py b/eventlet/zipkin/client.py
new file mode 100644
index 0000000..c94070a
--- /dev/null
+++ b/eventlet/zipkin/client.py
@@ -0,0 +1,56 @@
+import base64
+import warnings
+
+from scribe import scribe
+from thrift.transport import TTransport, TSocket
+from thrift.protocol import TBinaryProtocol
+
+from eventlet import GreenPile
+
+
+CATEGORY = 'zipkin'
+
+
+class ZipkinClient(object):
+
+ def __init__(self, host='127.0.0.1', port=9410):
+ """
+ :param host: zipkin collector IP addoress (default '127.0.0.1')
+ :param port: zipkin collector port (default 9410)
+ """
+ self.host = host
+ self.port = port
+ self.pile = GreenPile(1)
+ self._connect()
+
+ def _connect(self):
+ socket = TSocket.TSocket(self.host, self.port)
+ self.transport = TTransport.TFramedTransport(socket)
+ protocol = TBinaryProtocol.TBinaryProtocol(self.transport,
+ False, False)
+ self.scribe_client = scribe.Client(protocol)
+ try:
+ self.transport.open()
+ except TTransport.TTransportException as e:
+ warnings.warn(e.message)
+
+ def _build_message(self, thrift_obj):
+ trans = TTransport.TMemoryBuffer()
+ protocol = TBinaryProtocol.TBinaryProtocolAccelerated(trans=trans)
+ thrift_obj.write(protocol)
+ return base64.b64encode(trans.getvalue())
+
+ def send_to_collector(self, span):
+ self.pile.spawn(self._send, span)
+
+ def _send(self, span):
+ log_entry = scribe.LogEntry(CATEGORY, self._build_message(span))
+ try:
+ self.scribe_client.Log([log_entry])
+ except Exception as e:
+ msg = 'ZipkinClient send error %s' % str(e)
+ warnings.warn(msg)
+ self._connect()
+
+ def close(self):
+ self.transport.close()
diff --git a/eventlet/zipkin/example/ex1.png b/eventlet/zipkin/example/ex1.png
new file mode 100755
index 0000000..7f7a049
--- /dev/null
+++ b/eventlet/zipkin/example/ex1.png
Binary files differ
diff --git a/eventlet/zipkin/example/ex2.png b/eventlet/zipkin/example/ex2.png
new file mode 100755
index 0000000..19dbc3a
--- /dev/null
+++ b/eventlet/zipkin/example/ex2.png
Binary files differ
diff --git a/eventlet/zipkin/example/ex3.png b/eventlet/zipkin/example/ex3.png
new file mode 100755
index 0000000..5ff9860
--- /dev/null
+++ b/eventlet/zipkin/example/ex3.png
Binary files differ
diff --git a/eventlet/zipkin/greenthread.py b/eventlet/zipkin/greenthread.py
new file mode 100644
index 0000000..37e12d6
--- /dev/null
+++ b/eventlet/zipkin/greenthread.py
@@ -0,0 +1,33 @@
+from eventlet import greenthread
+
+from eventlet.zipkin import api
+
+
+__original_init__ = greenthread.GreenThread.__init__
+__original_main__ = greenthread.GreenThread.main
+
+
+def _patched__init(self, parent):
+ # parent thread saves current TraceData from tls to self
+ if api.is_tracing():
+ self.trace_data = api.get_trace_data()
+
+ __original_init__(self, parent)
+
+
+def _patched_main(self, function, args, kwargs):
+ # child thread inherits TraceData
+ if hasattr(self, 'trace_data'):
+ api.set_trace_data(self.trace_data)
+
+ __original_main__(self, function, args, kwargs)
+
+
+def patch():
+ greenthread.GreenThread.__init__ = _patched__init
+ greenthread.GreenThread.main = _patched_main
+
+
+def unpatch():
+ greenthread.GreenThread.__init__ = __original_init__
+ greenthread.GreenThread.main = __original_main__
diff --git a/eventlet/zipkin/http.py b/eventlet/zipkin/http.py
new file mode 100644
index 0000000..668c3f9
--- /dev/null
+++ b/eventlet/zipkin/http.py
@@ -0,0 +1,61 @@
+import warnings
+
+from eventlet.support import six
+from eventlet.green import httplib
+from eventlet.zipkin import api
+
+
+# see https://twitter.github.io/zipkin/Instrumenting.html
+HDR_TRACE_ID = 'X-B3-TraceId'
+HDR_SPAN_ID = 'X-B3-SpanId'
+HDR_PARENT_SPAN_ID = 'X-B3-ParentSpanId'
+HDR_SAMPLED = 'X-B3-Sampled'
+
+
+if six.PY2:
+ __org_endheaders__ = httplib.HTTPConnection.endheaders
+ __org_begin__ = httplib.HTTPResponse.begin
+
+ def _patched_endheaders(self):
+ if api.is_tracing():
+ trace_data = api.get_trace_data()
+ new_span_id = api.generate_span_id()
+ self.putheader(HDR_TRACE_ID, hex_str(trace_data.trace_id))
+ self.putheader(HDR_SPAN_ID, hex_str(new_span_id))
+ self.putheader(HDR_PARENT_SPAN_ID, hex_str(trace_data.span_id))
+ self.putheader(HDR_SAMPLED, int(trace_data.sampled))
+ api.put_annotation('Client Send')
+
+ __org_endheaders__(self)
+
+ def _patched_begin(self):
+ __org_begin__(self)
+
+ if api.is_tracing():
+ api.put_annotation('Client Recv (%s)' % self.status)
+
+
+def patch():
+ if six.PY2:
+ httplib.HTTPConnection.endheaders = _patched_endheaders
+ httplib.HTTPResponse.begin = _patched_begin
+ if six.PY3:
+ warnings.warn("Since current Python thrift release \
+ doesn't support Python 3, eventlet.zipkin.http \
+ doesn't also support Python 3 (http.client)")
+
+
+def unpatch():
+ if six.PY2:
+ httplib.HTTPConnection.endheaders = __org_endheaders__
+ httplib.HTTPResponse.begin = __org_begin__
+ if six.PY3:
+ pass
+
+
+def hex_str(n):
+ """
+ Thrift uses a binary representation of trace and span ids
+ HTTP headers use a hexadecimal representation of the same
+ """
+ return '%0.16x' % (n,)
diff --git a/eventlet/zipkin/log.py b/eventlet/zipkin/log.py
new file mode 100644
index 0000000..b7f9d32
--- /dev/null
+++ b/eventlet/zipkin/log.py
@@ -0,0 +1,19 @@
+import logging
+
+from eventlet.zipkin import api
+
+
+__original_handle__ = logging.Logger.handle
+
+
+def _patched_handle(self, record):
+ __original_handle__(self, record)
+ api.put_annotation(record.getMessage())
+
+
+def patch():
+ logging.Logger.handle = _patched_handle
+
+
+def unpatch():
+ logging.Logger.handle = __original_handle__
diff --git a/eventlet/zipkin/patcher.py b/eventlet/zipkin/patcher.py
new file mode 100644
index 0000000..8e7d8ad
--- /dev/null
+++ b/eventlet/zipkin/patcher.py
@@ -0,0 +1,41 @@
+from eventlet.zipkin import http
+from eventlet.zipkin import wsgi
+from eventlet.zipkin import greenthread
+from eventlet.zipkin import log
+from eventlet.zipkin import api
+from eventlet.zipkin.client import ZipkinClient
+
+
+def enable_trace_patch(host='127.0.0.1', port=9410,
+ trace_app_log=False, sampling_rate=1.0):
+ """ Apply monkey patch to trace your WSGI application.
+
+ :param host: Scribe daemon IP address (default: '127.0.0.1')
+ :param port: Scribe daemon port (default: 9410)
+ :param trace_app_log: A Boolean indicating if the tracer will trace
+ application log together or not. This facility assume that
+ your application uses python standard logging library.
+ (default: False)
+ :param sampling_rate: A Float value (0.0~1.0) that indicates
+ the tracing frequency. If you specify 1.0, all request
+ are traced (and sent to Zipkin collecotr).
+ If you specify 0.1, only 1/10 requests are traced. (default: 1.0)
+ """
+ api.client = ZipkinClient(host, port)
+
+ # monkey patch for adding tracing facility
+ wsgi.patch(sampling_rate)
+ http.patch()
+ greenthread.patch()
+
+ # monkey patch for capturing application log
+ if trace_app_log:
+ log.patch()
+
+
+def disable_trace_patch():
+ http.unpatch()
+ wsgi.unpatch()
+ greenthread.unpatch()
+ log.unpatch()
+ api.client.close()
diff --git a/eventlet/zipkin/wsgi.py b/eventlet/zipkin/wsgi.py
new file mode 100644
index 0000000..3d52911
--- /dev/null
+++ b/eventlet/zipkin/wsgi.py
@@ -0,0 +1,78 @@
+import random
+
+from eventlet import wsgi
+from eventlet.zipkin import api
+from eventlet.zipkin._thrift.zipkinCore.constants import \
+ SERVER_RECV, SERVER_SEND
+from eventlet.zipkin.http import \
+ HDR_TRACE_ID, HDR_SPAN_ID, HDR_PARENT_SPAN_ID, HDR_SAMPLED
+
+
+_sampler = None
+__original_handle_one_response__ = wsgi.HttpProtocol.handle_one_response
+
+
+def _patched_handle_one_response(self):
+ api.init_trace_data()
+ trace_id = int_or_none(self.headers.getheader(HDR_TRACE_ID))
+ span_id = int_or_none(self.headers.getheader(HDR_SPAN_ID))
+ parent_id = int_or_none(self.headers.getheader(HDR_PARENT_SPAN_ID))
+ sampled = bool_or_none(self.headers.getheader(HDR_SAMPLED))
+ if trace_id is None: # front-end server
+ trace_id = span_id = api.generate_trace_id()
+ parent_id = None
+ sampled = _sampler.sampling()
+ ip, port = self.request.getsockname()[:2]
+ ep = api.ZipkinDataBuilder.build_endpoint(ip, port)
+ trace_data = api.TraceData(name=self.command,
+ trace_id=trace_id,
+ span_id=span_id,
+ parent_id=parent_id,
+ sampled=sampled,
+ endpoint=ep)
+ api.set_trace_data(trace_data)
+ api.put_annotation(SERVER_RECV)
+ api.put_key_value('http.uri', self.path)
+
+ __original_handle_one_response__(self)
+
+ if api.is_sample():
+ api.put_annotation(SERVER_SEND)
+
+
+class Sampler(object):
+ def __init__(self, sampling_rate):
+ self.sampling_rate = sampling_rate
+
+ def sampling(self):
+ # avoid generating unneeded random numbers
+ if self.sampling_rate == 1.0:
+ return True
+ r = random.random()
+ if r < self.sampling_rate:
+ return True
+ return False
+
+
+def int_or_none(val):
+ if val is None:
+ return None
+ return int(val, 16)
+
+
+def bool_or_none(val):
+ if val == '1':
+ return True
+ if val == '0':
+ return False
+ return None
+
+
+def patch(sampling_rate):
+ global _sampler
+ _sampler = Sampler(sampling_rate)
+ wsgi.HttpProtocol.handle_one_response = _patched_handle_one_response
+
+
+def unpatch():
+ wsgi.HttpProtocol.handle_one_response = __original_handle_one_response__