summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2017-08-30 15:09:25 -0700
committerJeff Widman <jeff@jeffwidman.com>2020-02-05 15:57:01 -0800
commit6babefab60b05e405fdebde89e27fd8b02848633 (patch)
treed09fb906fabd104e456418a1c3045438af926e97
parent66f9750e0fc5617ebc38a76144aa49d94b442f35 (diff)
downloadkafka-python-6babefab60b05e405fdebde89e27fd8b02848633.tar.gz
Remove some dead code
-rw-r--r--kafka/producer/buffer.py179
-rw-r--r--kafka/protocol/struct.py8
-rw-r--r--kafka/util.py9
3 files changed, 0 insertions, 196 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index 8a8d717..1008017 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -113,182 +113,3 @@ class SimpleBufferPool(object):
"""The number of threads blocked waiting on memory."""
with self._lock:
return len(self._waiters)
-
-'''
-class BufferPool(object):
- """
- A pool of ByteBuffers kept under a given memory limit. This class is fairly
- specific to the needs of the producer. In particular it has the following
- properties:
-
- * There is a special "poolable size" and buffers of this size are kept in a
- free list and recycled
- * It is fair. That is all memory is given to the longest waiting thread
- until it has sufficient memory. This prevents starvation or deadlock when
- a thread asks for a large chunk of memory and needs to block until
- multiple buffers are deallocated.
- """
- def __init__(self, memory, poolable_size):
- """Create a new buffer pool.
-
- Arguments:
- memory (int): maximum memory that this buffer pool can allocate
- poolable_size (int): memory size per buffer to cache in the free
- list rather than deallocating
- """
- self._poolable_size = poolable_size
- self._lock = threading.RLock()
- self._free = collections.deque()
- self._waiters = collections.deque()
- self._total_memory = memory
- self._available_memory = memory
- #self.metrics = metrics;
- #self.waitTime = this.metrics.sensor("bufferpool-wait-time");
- #MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation.");
- #this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
-
- def allocate(self, size, max_time_to_block_ms):
- """
- Allocate a buffer of the given size. This method blocks if there is not
- enough memory and the buffer pool is configured with blocking mode.
-
- Arguments:
- size (int): The buffer size to allocate in bytes
- max_time_to_block_ms (int): The maximum time in milliseconds to
- block for buffer memory to be available
-
- Returns:
- buffer
-
- Raises:
- InterruptedException If the thread is interrupted while blocked
- IllegalArgumentException if size is larger than the total memory
- controlled by the pool (and hence we would block forever)
- """
- assert size <= self._total_memory, (
- "Attempt to allocate %d bytes, but there is a hard limit of %d on"
- " memory allocations." % (size, self._total_memory))
-
- with self._lock:
- # check if we have a free buffer of the right size pooled
- if (size == self._poolable_size and len(self._free) > 0):
- return self._free.popleft()
-
- # now check if the request is immediately satisfiable with the
- # memory on hand or if we need to block
- free_list_size = len(self._free) * self._poolable_size
- if self._available_memory + free_list_size >= size:
- # we have enough unallocated or pooled memory to immediately
- # satisfy the request
- self._free_up(size)
- self._available_memory -= size
- raise NotImplementedError()
- #return ByteBuffer.allocate(size)
- else:
- # we are out of memory and will have to block
- accumulated = 0
- buf = None
- more_memory = threading.Condition(self._lock)
- self._waiters.append(more_memory)
- # loop over and over until we have a buffer or have reserved
- # enough memory to allocate one
- while (accumulated < size):
- start_wait = time.time()
- if not more_memory.wait(max_time_to_block_ms / 1000.0):
- raise Errors.KafkaTimeoutError(
- "Failed to allocate memory within the configured"
- " max blocking time")
- end_wait = time.time()
- #this.waitTime.record(endWait - startWait, time.milliseconds());
-
- # check if we can satisfy this request from the free list,
- # otherwise allocate memory
- if (accumulated == 0
- and size == self._poolable_size
- and self._free):
-
- # just grab a buffer from the free list
- buf = self._free.popleft()
- accumulated = size
- else:
- # we'll need to allocate memory, but we may only get
- # part of what we need on this iteration
- self._free_up(size - accumulated)
- got = min(size - accumulated, self._available_memory)
- self._available_memory -= got
- accumulated += got
-
- # remove the condition for this thread to let the next thread
- # in line start getting memory
- removed = self._waiters.popleft()
- assert removed is more_memory, 'Wrong condition'
-
- # signal any additional waiters if there is more memory left
- # over for them
- if (self._available_memory > 0 or len(self._free) > 0):
- if len(self._waiters) > 0:
- self._waiters[0].notify()
-
- # unlock and return the buffer
- if buf is None:
- raise NotImplementedError()
- #return ByteBuffer.allocate(size)
- else:
- return buf
-
- def _free_up(self, size):
- """
- Attempt to ensure we have at least the requested number of bytes of
- memory for allocation by deallocating pooled buffers (if needed)
- """
- while self._free and self._available_memory < size:
- self._available_memory += self._free.pop().capacity
-
- def deallocate(self, buffer_, size=None):
- """
- Return buffers to the pool. If they are of the poolable size add them
- to the free list, otherwise just mark the memory as free.
-
- Arguments:
- buffer (io.BytesIO): The buffer to return
- size (int): The size of the buffer to mark as deallocated, note
- that this maybe smaller than buffer.capacity since the buffer
- may re-allocate itself during in-place compression
- """
- with self._lock:
- if size is None:
- size = buffer_.capacity
- if (size == self._poolable_size and size == buffer_.capacity):
- buffer_.seek(0)
- buffer_.truncate()
- self._free.append(buffer_)
- else:
- self._available_memory += size
-
- if self._waiters:
- more_mem = self._waiters[0]
- more_mem.notify()
-
- def available_memory(self):
- """The total free memory both unallocated and in the free list."""
- with self._lock:
- return self._available_memory + len(self._free) * self._poolable_size
-
- def unallocated_memory(self):
- """Get the unallocated memory (not in the free list or in use)."""
- with self._lock:
- return self._available_memory
-
- def queued(self):
- """The number of threads blocked waiting on memory."""
- with self._lock:
- return len(self._waiters)
-
- def poolable_size(self):
- """The buffer size that will be retained in the free list after use."""
- return self._poolable_size
-
- def total_memory(self):
- """The total memory managed by this pool."""
- return self._total_memory
-'''
diff --git a/kafka/protocol/struct.py b/kafka/protocol/struct.py
index 676de1b..693e2a2 100644
--- a/kafka/protocol/struct.py
+++ b/kafka/protocol/struct.py
@@ -64,11 +64,3 @@ class Struct(AbstractType):
if self.__dict__[attr] != other.__dict__[attr]:
return False
return True
-
-"""
-class MetaStruct(type):
- def __new__(cls, clsname, bases, dct):
- nt = namedtuple(clsname, [name for (name, _) in dct['SCHEMA']])
- bases = tuple([Struct, nt] + list(bases))
- return super(MetaStruct, cls).__new__(cls, clsname, bases, dct)
-"""
diff --git a/kafka/util.py b/kafka/util.py
index 9f65b81..ca1e531 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -65,12 +65,3 @@ class Dict(dict):
See: https://docs.python.org/2/library/weakref.html
"""
pass
-
-
-def try_method_on_system_exit(obj, method, *args, **kwargs):
- def wrapper(_obj, _meth, *args, **kwargs):
- try:
- getattr(_obj, _meth)(*args, **kwargs)
- except (ReferenceError, AttributeError):
- pass
- atexit.register(wrapper, weakref.proxy(obj), method, *args, **kwargs)