Source code for s3tail.cache

from builtins import chr
from builtins import range
from builtins import object

import os
import logging
import zlib

from hashlib import sha256
from tempfile import NamedTemporaryFile

from .background_writer import BackgroundWriter
from .old_file_cleaner import OldFileCleaner

_logger = logging.getLogger(__name__)

[docs]class Cache(object): readers = [] def __init__(self, path, hours): self.path = path self.enabled = True if not self.path or hours < 1: self.enabled = False return if not os.path.isdir(path): os.mkdir(path) # create shard buckets for sha hexstring names chars = list(range(ord('0'), ord('9')+1)) + list(range(ord('a'), ord('f')+1)) for i in chars: for j in chars: os.mkdir(os.path.join(path, chr(i)+chr(j))) else: cleaner = OldFileCleaner(path, hours) cleaner.start()
[docs] def lookup(self, name): if self.enabled: cache_pn = self._cache_path_for(name) cached = os.path.exists(cache_pn) return (cache_pn, cached) return (None, False)
[docs] def open(self, name, reader): if not self.enabled: return self._open_reader(name, reader) cache_pn, cached = self.lookup(name) if cached: if _logger.isEnabledFor(logging.DEBUG): _logger.debug('Found %s in cache: %s', name, cache_pn) else: _logger.info('Found %s in cache', name) return open(cache_pn) return self._Reader(name, self._open_reader(name, reader), cache_pn)
[docs] def cleanup(self): for reader in self.readers: reader.cleanup()
###################################################################### # private def _cache_path_for(self, name): safe_name = sha256(name).hexdigest() return os.path.join(self.path, safe_name[0:2], safe_name) def _open_reader(self, name, reader): reader.open() if name.endswith('.gz'): # TODO: lame! use header magic numbers for decompression algorithm # return GzipFile(fileobj=reader) return self._Decompressor(reader) return reader class _Decompressor(object): def __init__(self, reader): self._reader = reader self._decompressor = zlib.decompressobj(32 + zlib.MAX_WBITS) def read(self, size=-1): data = self._reader.read(size) return self._decompressor.decompress(data) def close(self): self._reader.close() class _Reader(object): def __init__(self, name, reader, cache_pn): self.name = name self.closed = False self._logger = logging.getLogger(__name__ + 'reader') self._reader = reader self._at_eof = False self._cache_pn = cache_pn # write to a tempfile in case of failure; move into place when writing is complete head, tail = os.path.split(cache_pn) self._tempfile = NamedTemporaryFile(dir=head, prefix=tail+'_') self._writer = BackgroundWriter(self._tempfile, self._move_into_place) self._writer.start() Cache.readers.append(self) def read(self, size=-1): data = self._reader.read(size) if size < 1 or len(data) < size: self._at_eof = True self._writer.write(data) return data def close(self): self._reader.close() self._writer.mark_done() # allow writer to finish async, not requiring caller to wait self.closed = True def cleanup(self): self._writer.join() def _move_into_place(self, _): self._tempfile.delete = False # prevent removal on close self._tempfile.close() if self._at_eof: os.rename(self._tempfile.name, self._cache_pn) Cache.readers.remove(self) self._logger.debug('Placed: %s', self._cache_pn) else: os.remove(self._tempfile.name) self._logger.debug('Not keeping in cache (did not read all data): %s', self._tempfile.name)