Package flumotion :: Package component :: Package misc :: Package httpserver :: Module cachedprovider
[hide private]

Source Code for Module flumotion.component.misc.httpserver.cachedprovider

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import errno 
 23  import os 
 24  import stat 
 25  import tempfile 
 26  import threading 
 27  import time 
 28   
 29  from twisted.internet import defer, reactor, threads, abstract 
 30   
 31  from flumotion.common import log, format, common, python 
 32  from flumotion.component.misc.httpserver import cachestats 
 33  from flumotion.component.misc.httpserver import cachemanager 
 34  from flumotion.component.misc.httpserver import fileprovider 
 35  from flumotion.component.misc.httpserver import localpath 
 36  from flumotion.component.misc.httpserver.fileprovider import FileClosedError 
 37  from flumotion.component.misc.httpserver.fileprovider import FileError 
 38  from flumotion.component.misc.httpserver.fileprovider import NotFoundError 
 39   
 40   
 41  SEEK_SET = 0 # os.SEEK_SET is not defined in python 2.4 
 42  FILE_COPY_BUFFER_SIZE = abstract.FileDescriptor.bufferSize 
 43  MAX_LOGNAME_SIZE = 30 # maximum number of characters to use for logging a path 
 44   
 45   
 46  LOG_CATEGORY = "fileprovider-localcached" 
 47   
 48   
 49  errnoLookup = {errno.ENOENT: fileprovider.NotFoundError, 
 50                 errno.EISDIR: fileprovider.CannotOpenError, 
 51                 errno.EACCES: fileprovider.AccessError} 
 52   
 53   
54 -def open_stat(path, mode='rb'):
55 """ 56 @rtype: (file, statinfo) 57 """ 58 try: 59 file = open(path, mode) 60 fd = file.fileno() 61 except IOError, e: 62 cls = errnoLookup.get(e.errno, fileprovider.FileError) 63 raise cls("Failed to open file '%s': %s" % (path, str(e))) 64 try: 65 info = os.fstat(fd) 66 except OSError, e: 67 file.close() 68 cls = errnoLookup.get(e.errno, fileprovider.FileError) 69 raise cls("Failed to stat file '%s': %s" % (path, str(e))) 70 return file, info
71 72
73 -class FileProviderLocalCachedPlug(fileprovider.FileProviderPlug, 74 log.Loggable):
75 """ 76 77 WARNING: Currently does not work properly in combination with rate-control. 78 79 I'm caching the files taken from a mounted 80 network file system to a shared local directory. 81 Multiple instances can share the same cache directory, 82 but it's recommended to use slightly different values 83 for the property cleanup-high-watermark. 84 I'm using the directory access time to know when 85 the cache usage changed and keep an estimation 86 of the cache usage for statistics. 87 88 I'm creating a unique thread to do the file copying block by block, 89 for all files to be copied to the cache. 90 Using a thread instead of a reactor.callLater 'loop' allow for 91 higher copy throughput and do not slow down the mail loop when 92 lots of files are copied at the same time. 93 Simulations with real request logs show that using a thread 94 gives better results than the equivalent asynchronous implementation. 95 """ 96 97 logCategory = LOG_CATEGORY 98
99 - def __init__(self, args):
100 props = args['properties'] 101 self._sourceDir = props.get('path') 102 cacheDir = props.get('cache-dir') 103 cacheSizeInMB = props.get('cache-size') 104 if cacheSizeInMB is not None: 105 cacheSize = cacheSizeInMB * 10 ** 6 # in bytes 106 else: 107 cacheSize = None 108 cleanupEnabled = props.get('cleanup-enabled') 109 cleanupHighWatermark = props.get('cleanup-high-watermark') 110 cleanupLowWatermark = props.get('cleanup-low-watermark') 111 112 self._sessions = {} # {CopySession: None} 113 self._index = {} # {path: CopySession} 114 115 self.stats = cachestats.CacheStatistics() 116 117 self.cache = cachemanager.CacheManager(self.stats, 118 cacheDir, cacheSize, 119 cleanupEnabled, 120 cleanupHighWatermark, 121 cleanupLowWatermark) 122 123 common.ensureDir(self._sourceDir, "source") 124 125 # Startup copy thread 126 self._thread = CopyThread(self)
127
128 - def start(self, component):
129 self.debug('Starting cachedprovider plug for component %r', component) 130 d = self.cache.setUp() 131 d.addCallback(lambda x: self._thread.start()) 132 return d
133
134 - def stop(self, component):
135 self.debug('Stopping cachedprovider plug for component %r', component) 136 self._thread.stop() 137 dl = [] 138 for s in self._index.values(): 139 d = s.close() 140 if d: 141 dl.append(d) 142 if len(dl) != 0: 143 return defer.DeferredList(dl)
144
145 - def startStatsUpdates(self, updater):
146 #FIXME: This is temporary. Should be done with plug UI. 147 # Used for the UI to know which plug is used 148 updater.update("provider-name", "fileprovider-localcached") 149 self.stats.startUpdates(updater)
150
151 - def stopStatsUpdates(self):
152 self.stats.stopUpdates()
153
154 - def getRootPath(self):
155 if self._sourceDir is None: 156 return None 157 return LocalPath(self, self._sourceDir)
158 159 160 ## Protected Methods ## 161
162 - def getLogName(self, path, id=None):
163 """ 164 Returns a log name for a path, shortened to a maximum size 165 specified by the global variable MAX_LOGNAME_SIZE. 166 The log name will be the filename part of the path postfixed 167 by the id in brackets if id is not None. 168 """ 169 filename = os.path.basename(path) 170 basename, postfix = os.path.splitext(filename) 171 if id is not None: 172 postfix += "[%s]" % id 173 prefixMaxLen = MAX_LOGNAME_SIZE - len(postfix) 174 if len(basename) > prefixMaxLen: 175 basename = basename[:prefixMaxLen-1] + "*" 176 return basename + postfix
177
178 - def getCopySession(self, path):
179 return self._index.get(path, None)
180
181 - def createCopySession(self, path, file, info):
182 # First outdate existing session for the path 183 self.outdateCopySession(path) 184 # Then create a new one 185 session = CopySession(self, path, file, info) 186 self._index[path] = session 187 return session
188
189 - def outdateCopySession(self, path):
190 session = self._index.get(path, None) 191 if session is not None: 192 session.outdate()
193
194 - def removeCopySession(self, session):
195 path = session.sourcePath 196 if path in self._index: 197 del self._index[path] 198 self.disableSession(session)
199
200 - def activateSession(self, session):
201 self.debug("Starting Copy Session '%s' (%d)", 202 session.logName, len(self._sessions)) 203 if session in self._sessions: 204 return 205 self._sessions[session] = None 206 self._activateCopyLoop()
207
208 - def disableSession(self, session):
209 self.debug("Stopping Copy Session '%s' (%d)", 210 session.logName, len(self._sessions)) 211 if session in self._sessions: 212 del self._sessions[session] 213 if not self._sessions: 214 self._disableCopyLoop()
215
216 - def _activateCopyLoop(self):
217 self._thread.wakeup()
218
219 - def _disableCopyLoop(self):
220 self._thread.sleep()
221 222
223 -class LocalPath(localpath.LocalPath, log.Loggable):
224 225 logCategory = LOG_CATEGORY 226
227 - def __init__(self, plug, path):
228 localpath.LocalPath.__init__(self, path) 229 self.logName = plug.getLogName(path) 230 self.plug = plug
231
232 - def child(self, name):
233 childpath = self._getChildPath(name) 234 return LocalPath(self.plug, childpath)
235
236 - def open(self):
237 f = CachedFile(self.plug, self._path, self.mimeType) 238 return f.open()
239 240 241 ## Private Methods ## 242
243 - def _removeCachedFile(self, sourcePath):
244 cachePath = self.plug.cache.getCachePath(sourcePath) 245 try: 246 os.remove(cachePath) 247 self.debug("Deleted cached file '%s'", cachePath) 248 except OSError, e: 249 if e.errno != errno.ENOENT: 250 self.warning("Error deleting file: %s", str(e))
251 252
253 -class CopyThread(threading.Thread, log.Loggable):
254 255 logCategory = LOG_CATEGORY 256
257 - def __init__(self, plug):
258 threading.Thread.__init__(self) 259 self.plug = plug 260 self._running = True 261 self._event = threading.Event()
262
263 - def stop(self):
264 self._running = False 265 self._event.set() 266 self.join()
267
268 - def wakeup(self):
269 self._event.set()
270
271 - def sleep(self):
272 self._event.clear()
273
274 - def run(self):
275 while self._running: 276 sessions = self.plug._sessions.keys() 277 for session in sessions: 278 try: 279 session.doServe() 280 except Exception, e: 281 log.warning("Error during async file serving: %s", 282 log.getExceptionMessage(e)) 283 try: 284 session.doCopy() 285 except Exception, e: 286 log.warning("Error during file copy: %s", 287 log.getExceptionMessage(e)) 288 self._event.wait()
289 290
291 -class CopySessionCancelled(Exception):
292 pass
293 294
295 -class CopySession(log.Loggable):
296 """ 297 I'm serving a file at the same time I'm copying it 298 from the network file system to the cache. 299 If the client ask for data not yet copied, the source file 300 read operation is delegated the the copy thread as an asynchronous 301 operation because file seeking/reading is not thread safe. 302 303 The copy session have to open two times the temporary file, 304 one for read-only and one for write only, 305 because closing a read/write file change the modification time. 306 We want the modification time to be set to a known value 307 when the copy is finished even keeping read access to the file. 308 309 The session manage a reference counter to know how many TempFileDelegate 310 instances are using the session to delegate read operations. 311 This is done for two reasons: 312 - To avoid circular references by have the session manage 313 a list of delegate instances. 314 - If not cancelled, sessions should not be deleted 315 when no delegates reference them anymore. So weakref cannot be used. 316 """ 317 318 logCategory = LOG_CATEGORY 319
320 - def __init__(self, plug, sourcePath, sourceFile, sourceInfo):
321 self.plug = plug 322 self.logName = plug.getLogName(sourcePath, sourceFile.fileno()) 323 self.copying = None # Not yet started 324 self.sourcePath = sourcePath 325 self.tempPath = plug.cache.getTempPath(sourcePath) 326 self.cachePath = plug.cache.getCachePath(sourcePath) 327 # The size and modification time is not supposed to change over time 328 self.mtime = sourceInfo[stat.ST_MTIME] 329 self.size = sourceInfo[stat.ST_SIZE] 330 self._sourceFile = sourceFile 331 self._cancelled = False # True when a session has been outdated 332 self._wTempFile = None 333 self._rTempFile = None 334 self._allocTag = None # Tag used to identify cache allocations 335 self._waitCancel = None 336 # List of the pending read from source file 337 self._pending = [] # [(position, size, defer),] 338 self._refCount = 0 339 self._copied = 0 # None when the file is fully copied 340 self._correction = 0 # Used to take into account copies data for stats 341 self._startCopyingDefer = self._startCopying()
342
343 - def outdate(self):
344 self.log("Copy session outdated") 345 self._cancelSession()
346
347 - def read(self, position, size, stats):
348 # If the temporary file is open for reading 349 if self._rTempFile: 350 # And the needed data is already downloaded 351 # Safe to read because it's not used by the copy thread 352 if (self._copied is None) or ((position + size) <= self._copied): 353 try: 354 self._rTempFile.seek(position) 355 data = self._rTempFile.read(size) 356 # Adjust the cache/source values to take copy into account 357 size = len(data) 358 # It's safe to use and modify self._correction even if 359 # it's used by the copy thread because the copy thread 360 # only add and the main thread only subtract. 361 # The only thing that could append it's a less accurate 362 # correction... 363 diff = min(self._correction, size) 364 self._correction -= diff 365 stats.onBytesRead(0, size, diff) 366 return data 367 except Exception, e: 368 self.warning("Failed to read from temporary file: %s", 369 log.getExceptionMessage(e)) 370 self._cancelSession() 371 # If the source file is not open anymore, we can't continue 372 if self._sourceFile is None: 373 raise FileError("File caching error, cannot proceed") 374 # Otherwise read the data directly from the source 375 try: 376 # It's safe to not use Lock, because simple type operations 377 # are thread safe, and even if the copying state change 378 # from True to False, _onCopyFinished will be called 379 # later in the same thread and will process pending reads. 380 if self.copying: 381 # If we are currently copying the source file, 382 # we defer the file read to the copying thread 383 # because we can't read a file from two threads. 384 d = defer.Deferred() 385 386 def updateStats(data): 387 stats.onBytesRead(len(data), 0, 0) 388 return data
389 390 d.addCallback(updateStats) 391 self._pending.append((position, size, d)) 392 return d 393 # Not copying, it's safe to read directly 394 self._sourceFile.seek(position) 395 data = self._sourceFile.read(size) 396 stats.onBytesRead(len(data), 0, 0) 397 return data 398 except IOError, e: 399 cls = errnoLookup.get(e.errno, FileError) 400 raise cls("Failed to read source file: %s" % str(e))
401
402 - def incRef(self):
403 self._refCount += 1
404
405 - def decRef(self):
406 self._refCount -= 1 407 # If there is only one client and the session has been cancelled, 408 # stop copying and and serve the source file directly 409 if (self._refCount == 1) and self._cancelled: 410 # Cancel the copy and close the writing temporary file. 411 self._cancelCopy(False, True) 412 # We close if the copy is finished (if _copied is None) 413 if (self._refCount == 0) and (self._copied is None): 414 self.close()
415
416 - def _close(self):
417 self.log("Closing copy session") 418 # Cancel the copy, close the source file and the writing temp file. 419 self._cancelCopy(True, True) 420 self._closeReadTempFile() 421 self.plug.removeCopySession(self) 422 self.plug = None
423
424 - def close(self):
425 if self._startCopyingDefer: 426 d = self._startCopyingDefer 427 self._startCopyingDefer = None 428 d.addCallback(lambda _: self._close()) 429 return d
430
431 - def doServe(self):
432 if not (self.copying and self._pending): 433 # Nothing to do anymore. 434 return False 435 # We have pending source file read operations 436 position, size, d = self._pending.pop(0) 437 self._sourceFile.seek(position) 438 data = self._sourceFile.read(size) 439 # Call the deferred in the main thread 440 reactor.callFromThread(d.callback, data) 441 return len(self._pending) > 0
442
443 - def doCopy(self):
444 # Called in the copy thread context. 445 if not self.copying: 446 # Nothing to do anymore. 447 return False 448 # Copy a buffer from the source file to the temporary writing file 449 cont = True 450 try: 451 # It's safe to use self._copied, because it's only set 452 # by the copy thread during copy. 453 self._sourceFile.seek(self._copied) 454 self._wTempFile.seek(self._copied) 455 data = self._sourceFile.read(FILE_COPY_BUFFER_SIZE) 456 self._wTempFile.write(data) 457 self._wTempFile.flush() 458 except IOError, e: 459 self.warning("Failed to copy source file: %s", 460 log.getExceptionMessage(e)) 461 # Abort copy and cancel the session 462 self.copying = False 463 reactor.callFromThread(self.plug.disableSession, self) 464 reactor.callFromThread(self._cancelSession) 465 # Do not continue 466 cont = False 467 else: 468 size = len(data) 469 self._copied += size 470 self._correction += size 471 if size < FILE_COPY_BUFFER_SIZE: 472 # Stop copying 473 self.copying = False 474 reactor.callFromThread(self.plug.disableSession, self) 475 reactor.callFromThread(self._onCopyFinished) 476 cont = False 477 # Check for cancellation 478 if self._waitCancel and self.copying: 479 # Copy has been cancelled 480 self.copying = False 481 reactor.callFromThread(self.plug.disableSession, self) 482 reactor.callFromThread(self._onCopyCancelled, *self._waitCancel) 483 return False 484 return cont
485 486 487 ## Private Methods ## 488
489 - def _allocCacheSpace(self):
490 # Retrieve a cache allocation tag, used to track the cache free space 491 return self.plug.cache.allocateCacheSpace(self.size)
492
493 - def _releaseCacheSpace(self):
494 if not (self._cancelled or self._allocTag is None): 495 self.plug.cache.releaseCacheSpace(self._allocTag) 496 self._allocTag = None
497
498 - def _cancelSession(self):#
499 if not self._cancelled: 500 self.log("Canceling copy session") 501 # Not a valid copy session anymore 502 self._cancelled = True 503 # If there is no more than 1 client using the session, 504 # stop copying and and serve the source file directly 505 if self._refCount <= 1: 506 # Cancel and close the temp write file. 507 self._cancelCopy(False, True) 508
509 - def _gotCacheSpace(self, tag):
510 self._allocTag = tag 511 512 if not tag: 513 # No free space, proxying source file directly 514 self._cancelSession() 515 return 516 self.plug.stats.onCopyStarted() 517 # Then open a transient temporary files 518 try: 519 fd, transientPath = tempfile.mkstemp(".tmp", LOG_CATEGORY) 520 self.log("Created transient file '%s'", transientPath) 521 self._wTempFile = os.fdopen(fd, "wb") 522 self.log("Opened temporary file for writing [fd %d]", 523 self._wTempFile.fileno()) 524 self._rTempFile = file(transientPath, "rb") 525 self.log("Opened temporary file for reading [fd %d]", 526 self._rTempFile.fileno()) 527 except IOError, e: 528 self.warning("Failed to open temporary file: %s", 529 log.getExceptionMessage(e)) 530 self._cancelSession() 531 return 532 # Truncate it to the source size 533 try: 534 self.log("Truncating temporary file to size %d", self.size) 535 self._wTempFile.truncate(self.size) 536 except IOError, e: 537 self.warning("Failed to truncate temporary file: %s", 538 log.getExceptionMessage(e)) 539 self._cancelSession() 540 return 541 # And move it to the real temporary file path 542 try: 543 self.log("Renaming transient file to '%s'", self.tempPath) 544 os.rename(transientPath, self.tempPath) 545 except IOError, e: 546 self.warning("Failed to rename transient temporary file: %s", 547 log.getExceptionMessage(e)) 548 # And start copying 549 self.debug("Start caching '%s' [fd %d]", 550 self.sourcePath, self._sourceFile.fileno()) 551 # Activate the copy 552 self.copying = True 553 self.plug.activateSession(self)
554
555 - def _startCopying(self):
556 self.log("Start copy session") 557 # First ensure there is not already a temporary file 558 self._removeTempFile() 559 # Reserve cache space, may trigger a cache cleanup 560 d = self._allocCacheSpace() 561 d.addCallback(self._gotCacheSpace) 562 return d
563
564 - def _cancelCopy(self, closeSource, closeTempWrite):
565 if self.copying: 566 self.log("Canceling file copy") 567 if self._waitCancel: 568 # Already waiting for cancellation. 569 return 570 self.debug("Cancel caching '%s' [fd %d]", 571 self.sourcePath, self._sourceFile.fileno()) 572 # Disable the copy, we do not modify copying directly 573 # to let the copying thread terminate current operations. 574 # The file close operation are deferred. 575 self._waitCancel = (closeSource, closeTempWrite) 576 return 577 # No pending copy, we can close the files 578 if closeSource: 579 self._closeSourceFile() 580 if closeTempWrite: 581 self._closeWriteTempFile()
582
583 - def _onCopyCancelled(self, closeSource, closeTempWrite):
584 self.log("Copy session cancelled") 585 # Called when the copy thread really stopped to read/write 586 self._waitCancel = None 587 self.plug.stats.onCopyCancelled(self.size, self._copied) 588 # Resolve all pending source read operations 589 for position, size, d in self._pending: 590 if self._sourceFile is None: 591 d.errback(CopySessionCancelled()) 592 else: 593 try: 594 self._sourceFile.seek(position) 595 data = self._sourceFile.read(size) 596 d.callback(data) 597 except Exception, e: 598 self.warning("Failed to read from source file: %s", 599 log.getExceptionMessage(e)) 600 d.errback(e) 601 self._pending = [] 602 # then we can safely close files 603 if closeSource: 604 self._closeSourceFile() 605 if closeTempWrite: 606 self._closeWriteTempFile()
607
608 - def _onCopyFinished(self):
609 if self._sourceFile is None: 610 return 611 # Called when the copy thread really stopped to read/write 612 self.debug("Finished caching '%s' [fd %d]", 613 self.sourcePath, self._sourceFile.fileno()) 614 self.plug.stats.onCopyFinished(self.size) 615 # Set the copy as finished to prevent the temporary file 616 # to be deleted when closed 617 self._copied = None 618 # Closing source and write files 619 self._closeSourceFile() 620 self._closeWriteTempFile() 621 # Setting the modification time on the temporary file 622 try: 623 mtime = self.mtime 624 atime = int(time.time()) 625 self.log("Setting temporary file modification time to %d", mtime) 626 # FIXME: Should use futimes, but it's not wrapped by python 627 os.utime(self.tempPath, (atime, mtime)) 628 except OSError, e: 629 if e.errno == errno.ENOENT: 630 # The file may have been deleted by another process 631 self._releaseCacheSpace() 632 else: 633 self.warning("Failed to update modification time of temporary " 634 "file: %s", log.getExceptionMessage(e)) 635 self._cancelSession() 636 try: 637 self.log("Renaming temporary file to '%s'", self.cachePath) 638 os.rename(self.tempPath, self.cachePath) 639 except OSError, e: 640 if e.errno == errno.ENOENT: 641 self._releaseCacheSpace() 642 else: 643 self.warning("Failed to rename temporary file: %s", 644 log.getExceptionMessage(e)) 645 self._cancelSession() 646 # Complete all pending source read operations with the temporary file. 647 for position, size, d in self._pending: 648 try: 649 self._rTempFile.seek(position) 650 data = self._rTempFile.read(size) 651 d.callback(data) 652 except Exception, e: 653 self.warning("Failed to read from temporary file: %s", 654 log.getExceptionMessage(e)) 655 d.errback(e) 656 self._pending = [] 657 if self._refCount == 0: 658 # We were waiting for the file to be copied to close it. 659 self.close()
660
661 - def _removeTempFile(self):
662 try: 663 os.remove(self.tempPath) 664 self.log("Deleted temporary file '%s'", self.tempPath) 665 # Inform the plug that cache space has been released 666 self._releaseCacheSpace() 667 except OSError, e: 668 if e.errno == errno.ENOENT: 669 if self._wTempFile is not None: 670 # Already deleted but inform the plug anyway 671 self._releaseCacheSpace() 672 else: 673 self.warning("Error deleting temporary file: %s", 674 log.getExceptionMessage(e))
675
676 - def _closeSourceFile(self):
677 if self._sourceFile is not None: 678 self.log("Closing source file [fd %d]", self._sourceFile.fileno()) 679 try: 680 try: 681 self._sourceFile.close() 682 finally: 683 self._sourceFile = None 684 except IOError, e: 685 self.warning("Failed to close source file: %s", 686 log.getExceptionMessage(e))
687
688 - def _closeReadTempFile(self):
689 if self._rTempFile is not None: 690 self.log("Closing temporary file for reading [fd %d]", 691 self._rTempFile.fileno()) 692 try: 693 try: 694 self._rTempFile.close() 695 finally: 696 self._rTempFile = None 697 except IOError, e: 698 self.warning("Failed to close temporary file for reading: %s", 699 log.getExceptionMessage(e))
700
701 - def _closeWriteTempFile(self):
702 if self._wTempFile is not None: 703 # If the copy is not finished, remove the temporary file 704 if not self._cancelled and self._copied is not None: 705 self._removeTempFile() 706 self.log("Closing temporary file for writing [fd %d]", 707 self._wTempFile.fileno()) 708 try: 709 try: 710 self._wTempFile.close() 711 finally: 712 self._wTempFile = None 713 except Exception, e: 714 self.warning("Failed to close temporary file for writing: %s", 715 log.getExceptionMessage(e))
716 717
718 -class TempFileDelegate(log.Loggable):
719 720 logCategory = LOG_CATEGORY 721
722 - def __init__(self, plug, session):
723 self.logName = plug.getLogName(session.sourcePath) 724 self.mtime = session.mtime 725 self.size = session.size 726 self._session = session 727 self._reading = False 728 self._position = 0 729 session.incRef()
730
731 - def tell(self):
732 return self._position
733
734 - def seek(self, offset):
735 self._position = offset
736
737 - def read(self, size, stats):
738 assert not self._reading, "Simultaneous read not supported" 739 d = self._session.read(self._position, size, stats) 740 if isinstance(d, defer.Deferred): 741 self._reading = True 742 return d.addCallback(self._cbGotData) 743 self._position += len(d) 744 return d
745
746 - def close(self):
747 if self._session is not None: 748 self._session.decRef() 749 self._session = None
750 751 752 ## Private Methods ## 753
754 - def _cbGotData(self, data):
755 self._reading = False 756 self._position += len(data) 757 return data
758 759
760 -class DirectFileDelegate(log.Loggable):
761 762 logCategory = LOG_CATEGORY 763 764 # Default values 765 _file = None 766
767 - def __init__(self, plug, path, file, info):
768 self.logName = plug.getLogName(path, file.fileno()) 769 self._file = file 770 # The size and modification time is not supposed to change over time 771 self.mtime = info[stat.ST_MTIME] 772 self.size = info[stat.ST_SIZE]
773
774 - def tell(self):
775 try: 776 return self._file.tell() 777 except IOError, e: 778 cls = errnoLookup.get(e.errno, FileError) 779 raise cls("Failed to tell position in file: %s" % str(e))
780
781 - def seek(self, offset):
782 try: 783 self._file.seek(offset, SEEK_SET) 784 except IOError, e: 785 cls = errnoLookup.get(e.errno, FileError) 786 raise cls("Failed to seek in cached file: %s" % str(e))
787
788 - def read(self, size):
789 try: 790 return self._file.read(size) 791 except IOError, e: 792 cls = errnoLookup.get(e.errno, FileError) 793 raise cls("Failed to read data from file: %s" % str(e))
794
795 - def close(self):
796 if self._file is not None: 797 try: 798 try: 799 self._file.close() 800 finally: 801 self._file = None 802 except IOError, e: 803 cls = errnoLookup.get(e.errno, FileError) 804 raise cls("Failed to close file: %s" % str(e))
805 806
807 -class CachedFileDelegate(DirectFileDelegate):
808
809 - def read(self, size, stats):
810 data = DirectFileDelegate.read(self, size) 811 stats.onBytesRead(0, len(data), 0) 812 return data
813
814 - def close(self):
815 if self._file is not None: 816 self.log("Closing cached file [fd %d]", self._file.fileno()) 817 DirectFileDelegate.close(self)
818 819
820 -class CachedFile(fileprovider.File, log.Loggable):
821 822 logCategory = LOG_CATEGORY 823 824 # Overriding parent class properties to become attribute 825 mimeType = None 826 827 # Default values 828 _delegate = None 829
830 - def __init__(self, plug, path, mimeType):
831 self.logName = plug.getLogName(path) 832 self.plug = plug 833 self._path = path 834 self.mimeType = mimeType 835 self.stats = cachestats.RequestStatistics(plug.stats) 836 self._delegate = None
837
838 - def open(self):
839 # Opening source file in a separate thread, as it usually involves 840 # accessing a network filesystem (which would block the reactor) 841 d = threads.deferToThread(open_stat, self._path) 842 d.addCallbacks(self._selectDelegate, self._sourceOpenFailed) 843 844 def _setDelegate(delegate): 845 self._delegate = delegate
846 d.addCallback(_setDelegate) 847 d.addCallback(lambda _: self) 848 return d
849
850 - def _sourceOpenFailed(self, failure):
851 failure.trap(NotFoundError) 852 self.debug("Source file %r not found", self._path) 853 self.plug.outdateCopySession(self._path) 854 cachedPath = self.plug.cache.getCachePath(self._path) 855 self._removeCachedFile(cachedPath) 856 raise failure
857
858 - def __str__(self):
859 return "<CachedFile '%s'>" % self._path
860
861 - def getmtime(self):
862 if self._delegate is None: 863 raise FileClosedError("File closed") 864 return self._delegate.mtime
865
866 - def getsize(self):
867 if self._delegate is None: 868 raise FileClosedError("File closed") 869 return self._delegate.size
870
871 - def tell(self):
872 if self._delegate is None: 873 raise FileClosedError("File closed") 874 return self._delegate.tell()
875
876 - def seek(self, offset):
877 if self._delegate is None: 878 raise FileClosedError("File closed") 879 return self._delegate.seek(offset)
880
881 - def read(self, size):
882 if self._delegate is None: 883 raise FileClosedError("File closed") 884 try: 885 d = self._delegate.read(size, self.stats) 886 if isinstance(d, defer.Deferred): 887 return d 888 return defer.succeed(d) 889 except IOError, e: 890 cls = errnoLookup.get(e.errno, FileError) 891 return defer.fail(cls("Failed to read cached data: %s", str(e))) 892 except: 893 return defer.fail()
894
895 - def close(self):
896 if self._delegate: 897 self.stats.onClosed() 898 self._delegate.close() 899 self._delegate = None
900
901 - def __del__(self):
902 self.close()
903
904 - def getLogFields(self):
905 return self.stats.getLogFields()
906 907 908 ## Private Methods ## 909
910 - def _closeSourceFile(self, sourceFile):
911 self.log("Closing source file [fd %d]", sourceFile.fileno()) 912 try: 913 sourceFile.close() 914 except Exception, e: 915 self.warning("Failed to close source file: %s", 916 log.getExceptionMessage(e))
917
918 - def _selectDelegate(self, (sourceFile, sourceInfo)):
919 sourcePath = self._path 920 self.log("Selecting delegate for source file %r [fd %d]", 921 sourcePath, sourceFile.fileno()) 922 # Update the log name 923 self.logName = self.plug.getLogName(self._path, sourceFile.fileno()) 924 # Opening cached file 925 cachedPath = self.plug.cache.getCachePath(sourcePath) 926 try: 927 cachedFile, cachedInfo = open_stat(cachedPath) 928 self.log("Opened cached file [fd %d]", cachedFile.fileno()) 929 except NotFoundError: 930 self.debug("Did not find cached file '%s'", cachedPath) 931 return self._tryTempFile(sourcePath, sourceFile, sourceInfo) 932 except FileError, e: 933 self.debug("Failed to open cached file: %s", str(e)) 934 self._removeCachedFile(cachedPath) 935 return self._tryTempFile(sourcePath, sourceFile, sourceInfo) 936 # Found a cached file, now check the modification time 937 self.debug("Found cached file '%s'", cachedPath) 938 sourceTime = sourceInfo[stat.ST_MTIME] 939 cacheTime = cachedInfo[stat.ST_MTIME] 940 if sourceTime != cacheTime: 941 # Source file changed, remove file and start caching again 942 self.debug("Cached file out-of-date (%d != %d)", 943 sourceTime, cacheTime) 944 self.stats.onCacheOutdated() 945 self.plug.outdateCopySession(sourcePath) 946 self._removeCachedFile(cachedPath) 947 return self._cacheFile(sourcePath, sourceFile, sourceInfo) 948 self._closeSourceFile(sourceFile) 949 # We have a valid cached file, just delegate to it. 950 self.debug("Serving cached file '%s'", cachedPath) 951 delegate = CachedFileDelegate(self.plug, cachedPath, 952 cachedFile, cachedInfo) 953 self.stats.onStarted(delegate.size, cachestats.CACHE_HIT) 954 return delegate
955
956 - def _removeCachedFile(self, cachePath):
957 try: 958 os.remove(cachePath) 959 self.debug("Deleted cached file '%s'", cachePath) 960 except OSError, e: 961 if e.errno != errno.ENOENT: 962 self.warning("Error deleting cached file: %s", str(e))
963
964 - def _tryTempFile(self, sourcePath, sourceFile, sourceInfo):
965 session = self.plug.getCopySession(sourcePath) 966 if session is None: 967 self.debug("No copy sessions found") 968 return self._cacheFile(sourcePath, sourceFile, sourceInfo) 969 self.debug("Copy session found") 970 if sourceInfo[stat.ST_MTIME] != session.mtime: 971 self.debug("Copy session out-of-date (%d != %d)", 972 sourceInfo[stat.ST_MTIME], session.mtime) 973 self.stats.onCacheOutdated() 974 session.outdate() 975 return self._cacheFile(sourcePath, sourceFile, sourceInfo) 976 self._closeSourceFile(sourceFile) 977 # We have a valid session, just delegate to it. 978 self.debug("Serving temporary file '%s'", session.tempPath) 979 delegate = TempFileDelegate(self.plug, session) 980 self.stats.onStarted(delegate.size, cachestats.TEMP_HIT) 981 return delegate
982
983 - def _cacheFile(self, sourcePath, sourceFile, sourceInfo):
984 session = self.plug.createCopySession(sourcePath, sourceFile, 985 sourceInfo) 986 self.debug("Serving temporary file '%s'", session.tempPath) 987 delegate = TempFileDelegate(self.plug, session) 988 self.stats.onStarted(delegate.size, cachestats.CACHE_MISS) 989 return delegate
990