Source code for koapy.backend.kiwoom_open_api_plus.utils.queue.QueueIterator
import atexit
from queue import Empty
[docs]class QueueIterator:
_check_timeout = 1
def __init__(self, queue):
self._queue = queue
self._should_stop = False
atexit.register(self.stop)
def __del__(self):
atexit.unregister(self.stop)
@property
[docs] def queue(self):
return self._queue
[docs] def next(self, block=True, timeout=None):
if block and timeout is None:
timeout = self._check_timeout
while not self._should_stop:
try:
item = self._queue.get(block=block, timeout=timeout)
except Empty:
pass
else:
self._queue.task_done()
return item
raise StopIteration
else:
item = self._queue.get(block=block, timeout=timeout)
self._queue.task_done()
return item
[docs] def next_nowait(self):
return self.next(block=False)
[docs] def has_next(self):
return not self._queue.empty()
def __iter__(self):
return self
def __next__(self):
return self.next()
[docs] def stop(self):
self._should_stop = True
[docs] def enable(self):
self._should_stop = False
[docs]class BufferedQueueIterator(QueueIterator):
def __init__(self, queue):
super().__init__(queue)
self._none = object()
self._head = self._none
[docs] def next(self, block=True, timeout=None):
if self._head is not self._none:
item = self._head
self._head = self._none
return item
else:
return super().next(block, timeout)
[docs] def has_next(self):
return self._head is not self._none or super().has_next()
[docs] def head(self):
if self._head is not self._none:
return self._head
else:
item = self.next_nowait() # raises queue.Empty if queue is empty
self._head = item
return self._head