1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
|
# -*- test-case-name: twisted.test.test_htb -*-
#
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
# See LICENSE for details.
"""Hierarchical Token Bucket traffic shaping.
Patterned after U{Martin Devera's Hierarchical Token Bucket traffic
shaper for the Linux kernel<http://luxik.cdi.cz/~devik/qos/htb/>}.
@seealso: U{HTB Linux queuing discipline manual - user guide
<http://luxik.cdi.cz/~devik/qos/htb/manual/userg.htm>}
@seealso: U{Token Bucket Filter in Linux Advanced Routing & Traffic Control
HOWTO<http://lartc.org/howto/lartc.qdisc.classless.html#AEN682>}
@author: Kevin Turner
"""
from __future__ import nested_scopes
__version__ = '$Revision: 1.5 $'[11:-2]
# TODO: Investigate whether we should be using os.times()[-1] instead of
# time.time. time.time, it has been pointed out, can go backwards. Is
# the same true of os.times?
from time import time
from zope.interface import implements, Interface
from twisted.protocols import pcp
class Bucket:
"""Token bucket, or something like it.
I can hold up to a certain number of tokens, and I drain over time.
@cvar maxburst: Size of the bucket, in bytes. If None, the bucket is
never full.
@type maxburst: int
@cvar rate: Rate the bucket drains, in bytes per second. If None,
the bucket drains instantaneously.
@type rate: int
"""
maxburst = None
rate = None
_refcount = 0
def __init__(self, parentBucket=None):
self.content = 0
self.parentBucket=parentBucket
self.lastDrip = time()
def add(self, amount):
"""Add tokens to me.
@param amount: A quanity of tokens to add.
@type amount: int
@returns: The number of tokens that fit.
@returntype: int
"""
self.drip()
if self.maxburst is None:
allowable = amount
else:
allowable = min(amount, self.maxburst - self.content)
if self.parentBucket is not None:
allowable = self.parentBucket.add(allowable)
self.content += allowable
return allowable
def drip(self):
"""Let some of the bucket drain.
How much of the bucket drains depends on how long it has been
since I was last called.
@returns: True if I am now empty.
@returntype: bool
"""
if self.parentBucket is not None:
self.parentBucket.drip()
if self.rate is None:
self.content = 0
return True
else:
now = time()
deltaT = now - self.lastDrip
self.content = long(max(0, self.content - deltaT * self.rate))
self.lastDrip = now
return False
class IBucketFilter(Interface):
def getBucketFor(*somethings, **some_kw):
"""I'll give you a bucket for something.
@returntype: L{Bucket}
"""
class HierarchicalBucketFilter:
"""I filter things into buckets, and I am nestable.
@cvar bucketFactory: Class of buckets to make.
@type bucketFactory: L{Bucket} class
@cvar sweepInterval: Seconds between sweeping out the bucket cache.
@type sweepInterval: int
"""
implements(IBucketFilter)
bucketFactory = Bucket
sweepInterval = None
def __init__(self, parentFilter=None):
self.buckets = {}
self.parentFilter = parentFilter
self.lastSweep = time()
def getBucketFor(self, *a, **kw):
"""You want a bucket for that? I'll give you a bucket.
Any parameters are passed on to L{getBucketKey}, from them it
decides which bucket you get.
@returntype: L{Bucket}
"""
if ((self.sweepInterval is not None)
and ((time() - self.lastSweep) > self.sweepInterval)):
self.sweep()
if self.parentFilter:
parentBucket = self.parentFilter.getBucketFor(self, *a, **kw)
else:
parentBucket = None
key = self.getBucketKey(*a, **kw)
bucket = self.buckets.get(key)
if bucket is None:
bucket = self.bucketFactory(parentBucket)
self.buckets[key] = bucket
return bucket
def getBucketKey(self, *a, **kw):
"""I determine who gets which bucket.
Unless I'm overridden, everything gets the same bucket.
@returns: something to be used as a key in the bucket cache.
"""
return None
def sweep(self):
"""I throw away references to empty buckets."""
for key, bucket in self.buckets.items():
if (bucket._refcount == 0) and bucket.drip():
del self.buckets[key]
self.lastSweep = time()
class FilterByHost(HierarchicalBucketFilter):
"""A bucket filter with a bucket for each host.
"""
sweepInterval = 60 * 20
def getBucketKey(self, transport):
return transport.getPeer()[1]
class FilterByServer(HierarchicalBucketFilter):
"""A bucket filter with a bucket for each service.
"""
sweepInterval = None
def getBucketKey(self, transport):
return transport.getHost()[2]
class ShapedConsumer(pcp.ProducerConsumerProxy):
"""I wrap a Consumer and shape the rate at which it receives data.
"""
# Providing a Pull interface means I don't have to try to schedule
# traffic with callLaters.
iAmStreaming = False
def __init__(self, consumer, bucket):
pcp.ProducerConsumerProxy.__init__(self, consumer)
self.bucket = bucket
self.bucket._refcount += 1
def _writeSomeData(self, data):
# In practice, this actually results in obscene amounts of
# overhead, as a result of generating lots and lots of packets
# with twelve-byte payloads. We may need to do a version of
# this with scheduled writes after all.
amount = self.bucket.add(len(data))
return pcp.ProducerConsumerProxy._writeSomeData(self, data[:amount])
def stopProducing(self):
pcp.ProducerConsumerProxy.stopProducing(self)
self.bucket._refcount -= 1
class ShapedTransport(ShapedConsumer):
"""I wrap a Transport and shape the rate at which it receives data.
I am a L{ShapedConsumer} with a little bit of magic to provide for
the case where the consumer I wrap is also a Transport and people
will be attempting to access attributes I do not proxy as a
Consumer (e.g. loseConnection).
"""
# Ugh. We only wanted to filter IConsumer, not ITransport.
iAmStreaming = False
def __getattr__(self, name):
# Because people will be doing things like .getPeer and
# .loseConnection on me.
return getattr(self.consumer, name)
class ShapedProtocolFactory:
"""I dispense Protocols with traffic shaping on their transports.
Usage::
myserver = SomeFactory()
myserver.protocol = ShapedProtocolFactory(myserver.protocol,
bucketFilter)
Where SomeServerFactory is a L{twisted.internet.protocol.Factory}, and
bucketFilter is an instance of L{HierarchicalBucketFilter}.
"""
def __init__(self, protoClass, bucketFilter):
"""Tell me what to wrap and where to get buckets.
@param protoClass: The class of Protocol I will generate
wrapped instances of.
@type protoClass: L{Protocol<twisted.internet.interfaces.IProtocol>}
class
@param bucketFilter: The filter which will determine how
traffic is shaped.
@type bucketFilter: L{HierarchicalBucketFilter}.
"""
# More precisely, protoClass can be any callable that will return
# instances of something that implements IProtocol.
self.protocol = protoClass
self.bucketFilter = bucketFilter
def __call__(self, *a, **kw):
"""Make a Protocol instance with a shaped transport.
Any parameters will be passed on to the protocol's initializer.
@returns: a Protocol instance with a L{ShapedTransport}.
"""
proto = self.protocol(*a, **kw)
origMakeConnection = proto.makeConnection
def makeConnection(transport):
bucket = self.bucketFilter.getBucketFor(transport)
shapedTransport = ShapedTransport(transport, bucket)
return origMakeConnection(shapedTransport)
proto.makeConnection = makeConnection
return proto
|