Source code for s3tail.background_writer

import logging

from queue import Queue
from threading import Thread

_logger = logging.getLogger(__name__)

[docs]class BackgroundWriter(Thread):
[docs] class WriteAfterDone(Exception): '''Indicates when an action is taken after requested to stop.'''
def __init__(self, writer, done_callback=None): '''Wraps a writer I/O object with background write calls. Optionally, will call the done_callback just before the thread stops (to allow caller to close/operate on the writer) ''' super(BackgroundWriter, self).__init__() _logger = logging.getLogger('s3tail.writer') self._done = False self._done_callback = done_callback self._queue = Queue() self._writer = writer self.name = writer.name
[docs] def write(self, data): if self._done: raise self.WriteAfterDone('Refusing to write when stopping ' + self.name) self._queue.put(data)
[docs] def mark_done(self): if not self._done: self._done = True _logger.debug('Asked to stop writing to %s', self.name) self._queue.put(True)
[docs] def join(self, timeout=None): _logger.debug('Joining %s', self.name) self.mark_done() self._queue.join() super(BackgroundWriter, self).join(timeout)
[docs] def run(self): while True: data = self._queue.get() if data is True: _logger.debug('Stopping %s', self.name) self._queue.task_done() if self._done_callback: self._done_callback(self._writer) return self._writer.write(data) self._queue.task_done()