diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2017-08-30 15:09:25 -0700 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2020-02-05 15:57:01 -0800 |
commit | 6babefab60b05e405fdebde89e27fd8b02848633 (patch) | |
tree | d09fb906fabd104e456418a1c3045438af926e97 | |
parent | 66f9750e0fc5617ebc38a76144aa49d94b442f35 (diff) | |
download | kafka-python-6babefab60b05e405fdebde89e27fd8b02848633.tar.gz |
Remove some dead code
-rw-r--r-- | kafka/producer/buffer.py | 179 | ||||
-rw-r--r-- | kafka/protocol/struct.py | 8 | ||||
-rw-r--r-- | kafka/util.py | 9 |
3 files changed, 0 insertions, 196 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index 8a8d717..1008017 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -113,182 +113,3 @@ class SimpleBufferPool(object): """The number of threads blocked waiting on memory.""" with self._lock: return len(self._waiters) - -''' -class BufferPool(object): - """ - A pool of ByteBuffers kept under a given memory limit. This class is fairly - specific to the needs of the producer. In particular it has the following - properties: - - * There is a special "poolable size" and buffers of this size are kept in a - free list and recycled - * It is fair. That is all memory is given to the longest waiting thread - until it has sufficient memory. This prevents starvation or deadlock when - a thread asks for a large chunk of memory and needs to block until - multiple buffers are deallocated. - """ - def __init__(self, memory, poolable_size): - """Create a new buffer pool. - - Arguments: - memory (int): maximum memory that this buffer pool can allocate - poolable_size (int): memory size per buffer to cache in the free - list rather than deallocating - """ - self._poolable_size = poolable_size - self._lock = threading.RLock() - self._free = collections.deque() - self._waiters = collections.deque() - self._total_memory = memory - self._available_memory = memory - #self.metrics = metrics; - #self.waitTime = this.metrics.sensor("bufferpool-wait-time"); - #MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation."); - #this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); - - def allocate(self, size, max_time_to_block_ms): - """ - Allocate a buffer of the given size. This method blocks if there is not - enough memory and the buffer pool is configured with blocking mode. - - Arguments: - size (int): The buffer size to allocate in bytes - max_time_to_block_ms (int): The maximum time in milliseconds to - block for buffer memory to be available - - Returns: - buffer - - Raises: - InterruptedException If the thread is interrupted while blocked - IllegalArgumentException if size is larger than the total memory - controlled by the pool (and hence we would block forever) - """ - assert size <= self._total_memory, ( - "Attempt to allocate %d bytes, but there is a hard limit of %d on" - " memory allocations." % (size, self._total_memory)) - - with self._lock: - # check if we have a free buffer of the right size pooled - if (size == self._poolable_size and len(self._free) > 0): - return self._free.popleft() - - # now check if the request is immediately satisfiable with the - # memory on hand or if we need to block - free_list_size = len(self._free) * self._poolable_size - if self._available_memory + free_list_size >= size: - # we have enough unallocated or pooled memory to immediately - # satisfy the request - self._free_up(size) - self._available_memory -= size - raise NotImplementedError() - #return ByteBuffer.allocate(size) - else: - # we are out of memory and will have to block - accumulated = 0 - buf = None - more_memory = threading.Condition(self._lock) - self._waiters.append(more_memory) - # loop over and over until we have a buffer or have reserved - # enough memory to allocate one - while (accumulated < size): - start_wait = time.time() - if not more_memory.wait(max_time_to_block_ms / 1000.0): - raise Errors.KafkaTimeoutError( - "Failed to allocate memory within the configured" - " max blocking time") - end_wait = time.time() - #this.waitTime.record(endWait - startWait, time.milliseconds()); - - # check if we can satisfy this request from the free list, - # otherwise allocate memory - if (accumulated == 0 - and size == self._poolable_size - and self._free): - - # just grab a buffer from the free list - buf = self._free.popleft() - accumulated = size - else: - # we'll need to allocate memory, but we may only get - # part of what we need on this iteration - self._free_up(size - accumulated) - got = min(size - accumulated, self._available_memory) - self._available_memory -= got - accumulated += got - - # remove the condition for this thread to let the next thread - # in line start getting memory - removed = self._waiters.popleft() - assert removed is more_memory, 'Wrong condition' - - # signal any additional waiters if there is more memory left - # over for them - if (self._available_memory > 0 or len(self._free) > 0): - if len(self._waiters) > 0: - self._waiters[0].notify() - - # unlock and return the buffer - if buf is None: - raise NotImplementedError() - #return ByteBuffer.allocate(size) - else: - return buf - - def _free_up(self, size): - """ - Attempt to ensure we have at least the requested number of bytes of - memory for allocation by deallocating pooled buffers (if needed) - """ - while self._free and self._available_memory < size: - self._available_memory += self._free.pop().capacity - - def deallocate(self, buffer_, size=None): - """ - Return buffers to the pool. If they are of the poolable size add them - to the free list, otherwise just mark the memory as free. - - Arguments: - buffer (io.BytesIO): The buffer to return - size (int): The size of the buffer to mark as deallocated, note - that this maybe smaller than buffer.capacity since the buffer - may re-allocate itself during in-place compression - """ - with self._lock: - if size is None: - size = buffer_.capacity - if (size == self._poolable_size and size == buffer_.capacity): - buffer_.seek(0) - buffer_.truncate() - self._free.append(buffer_) - else: - self._available_memory += size - - if self._waiters: - more_mem = self._waiters[0] - more_mem.notify() - - def available_memory(self): - """The total free memory both unallocated and in the free list.""" - with self._lock: - return self._available_memory + len(self._free) * self._poolable_size - - def unallocated_memory(self): - """Get the unallocated memory (not in the free list or in use).""" - with self._lock: - return self._available_memory - - def queued(self): - """The number of threads blocked waiting on memory.""" - with self._lock: - return len(self._waiters) - - def poolable_size(self): - """The buffer size that will be retained in the free list after use.""" - return self._poolable_size - - def total_memory(self): - """The total memory managed by this pool.""" - return self._total_memory -''' diff --git a/kafka/protocol/struct.py b/kafka/protocol/struct.py index 676de1b..693e2a2 100644 --- a/kafka/protocol/struct.py +++ b/kafka/protocol/struct.py @@ -64,11 +64,3 @@ class Struct(AbstractType): if self.__dict__[attr] != other.__dict__[attr]: return False return True - -""" -class MetaStruct(type): - def __new__(cls, clsname, bases, dct): - nt = namedtuple(clsname, [name for (name, _) in dct['SCHEMA']]) - bases = tuple([Struct, nt] + list(bases)) - return super(MetaStruct, cls).__new__(cls, clsname, bases, dct) -""" diff --git a/kafka/util.py b/kafka/util.py index 9f65b81..ca1e531 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -65,12 +65,3 @@ class Dict(dict): See: https://docs.python.org/2/library/weakref.html """ pass - - -def try_method_on_system_exit(obj, method, *args, **kwargs): - def wrapper(_obj, _meth, *args, **kwargs): - try: - getattr(_obj, _meth)(*args, **kwargs) - except (ReferenceError, AttributeError): - pass - atexit.register(wrapper, weakref.proxy(obj), method, *args, **kwargs) |