1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import errno
23 import os
24 import stat
25 import tempfile
26 import threading
27 import time
28
29 from twisted.internet import defer, reactor, abstract
30
31 from flumotion.common import log, format, common, python
32 from flumotion.component.misc.httpserver import cachestats
33 from flumotion.component.misc.httpserver import fileprovider
34 from flumotion.component.misc.httpserver import localpath
35 from flumotion.component.misc.httpserver.fileprovider import FileClosedError
36 from flumotion.component.misc.httpserver.fileprovider import FileError
37 from flumotion.component.misc.httpserver.fileprovider import NotFoundError
38
39
40 SEEK_SET = 0
41 DEFAULT_CACHE_SIZE = 1000
42 DEFAULT_CLEANUP_HIGH_WATERMARK = 1.0
43 DEFAULT_CLEANUP_LOW_WATERMARK = 0.6
44 FILE_COPY_BUFFER_SIZE = abstract.FileDescriptor.bufferSize
45 TEMP_FILE_POSTFIX = ".tmp"
46 MAX_LOGNAME_SIZE = 30
47 ID_CACHE_MAX_SIZE = 1024
48
49
50 LOG_CATEGORY = "fileprovider-localcached"
51
52 _errorLookup = {errno.ENOENT: NotFoundError,
53 errno.EISDIR: fileprovider.CannotOpenError,
54 errno.EACCES: fileprovider.AccessError}
55
56
58 """
59
60 WARNING: Currently does not work properly in combination with rate-control.
61
62 I'm caching the files taken from a mounted
63 network file system to a shared local directory.
64 Multiple instances can share the same cache directory,
65 but it's recommended to use slightly different values
66 for the property cleanup-high-watermark.
67 I'm using the directory access time to know when
68 the cache usage changed and keep an estimation
69 of the cache usage for statistics.
70
71 I'm creating a unique thread to do the file copying block by block,
72 for all files to be copied to the cache.
73 Using a thread instead of a reactor.callLater 'loop' allow for
74 higher copy throughput and do not slow down the mail loop when
75 lots of files are copied at the same time.
76 Simulations with real request logs show that using a thread
77 gives better results than the equivalent asynchronous implementation.
78 """
79
80 logCategory = LOG_CATEGORY
81
83 props = args['properties']
84 self._sourceDir = props.get('path')
85 self._cacheDir = props.get('cache-dir', "/tmp")
86 cacheSizeInMB = int(props.get('cache-size', DEFAULT_CACHE_SIZE))
87 self._cacheSize = cacheSizeInMB * 10 ** 6
88 self._cleanupEnabled = props.get('cleanup-enabled', True)
89 highWatermark = props.get('cleanup-high-watermark',
90 DEFAULT_CLEANUP_HIGH_WATERMARK)
91 highWatermark = max(0.0, min(1.0, float(highWatermark)))
92 lowWatermark = props.get('cleanup-low-watermark',
93 DEFAULT_CLEANUP_LOW_WATERMARK)
94 lowWatermark = max(0.0, min(1.0, float(lowWatermark)))
95 self._identifiers = {}
96 self._sessions = {}
97 self._index = {}
98
99 self.info("Cached file provider initialized")
100 self.debug("Source directory: '%s'", self._sourceDir)
101 self.debug("Cache directory: '%s'", self._cacheDir)
102 self.debug("Cache size: %d bytes", self._cacheSize)
103 self.debug("Cache cleanup enabled: %s", self._cleanupEnabled)
104
105 common.ensureDir(self._sourceDir, "source")
106 common.ensureDir(self._cacheDir, "cache")
107
108 self._cacheUsage = None
109 self._cacheUsageLastUpdate = None
110 self._lastCacheTime = None
111 self._cacheMaxUsage = self._cacheSize * highWatermark
112 self._cacheMinUsage = self._cacheSize * lowWatermark
113 self.stats = cachestats.CacheStatistics()
114
115
116 self.updateCacheUsage()
117
118
119 self._thread = CopyThread(self)
120
121 - def start(self, component):
123
124 - def stop(self, component):
126
128
129
130 updater.update("provider-name", "fileprovider-localcached")
131 self.stats.startUpdates(updater)
132
135
137 if self._sourceDir is None:
138 return None
139 return LocalPath(self, self._sourceDir)
140
141
142
143
145 """
146 Returns a log name for a path, shortened to a maximum size
147 specified by the global variable MAX_LOGNAME_SIZE.
148 The log name will be the filename part of the path postfixed
149 by the id in brackets if id is not None.
150 """
151 filename = os.path.basename(path)
152 basename, postfix = os.path.splitext(filename)
153 if id is not None:
154 postfix += "[%s]" % id
155 prefixMaxLen = MAX_LOGNAME_SIZE - len(postfix)
156 if len(basename) > prefixMaxLen:
157 basename = basename[:prefixMaxLen-1] + "*"
158 return basename + postfix
159
161 """
162 Returns an identifier for a path.
163 The identifier is a digest of the path encoded in hex string.
164 The hash function used is SHA1.
165 It caches the identifiers in a dictionary indexed by path and with
166 a maximum number of entry specified by the constant ID_CACHE_MAX_SIZE.
167 """
168 ident = self._identifiers.get(path, None)
169 if ident is None:
170 hash = python.sha1()
171 hash.update(path)
172 ident = hash.digest().encode("hex").strip('\n')
173
174 if len(self._identifiers) >= ID_CACHE_MAX_SIZE:
175 self._identifiers.clear()
176 self._identifiers[path] = ident
177 return ident
178
180 ident = self.getIdentifier(path)
181 return os.path.join(self._cacheDir, ident)
182
186
189
191 """
192 @returns: the cache usage, in bytes
193 """
194
195
196 cacheTime = os.path.getmtime(self._cacheDir)
197 if ((self._cacheUsage is None) or (self._lastCacheTime < cacheTime)):
198 self._lastCacheTime = cacheTime
199 os.chdir(self._cacheDir)
200
201
202
203
204
205
206 sizes = []
207 for f in os.listdir('.'):
208 try:
209 sizes.append(os.path.getsize(f))
210 except OSError, e:
211 if e.errno == errno.ENOENT:
212 pass
213 else:
214 raise
215
216 self._cacheUsage = sum(sizes)
217 self.updateCacheUsageStatistics()
218 self._cacheUsageLastUpdate = time.time()
219 return self._cacheUsage
220
222 """
223 Try to reserve cache space.
224
225 If there is not enough space and the cache cleanup is enabled,
226 it will delete files from the cache starting with the ones
227 with oldest access time until the cache usage drops below
228 the fraction specified by the property cleanup-low-threshold.
229
230 Returns a 'tag' that should be used to 'free' the cache space
231 using releaseCacheSpace.
232 This tag is needed to better estimate the cache usage,
233 if the cache usage has been updated since cache space
234 has been allocated, freeing up the space should not change
235 the cache usage estimation.
236
237 @param size: size to reserve, in bytes
238 @type size: int
239
240 @returns: an allocation tag or None if the allocation failed.
241 @rtype: tuple
242 """
243 usage = self.updateCacheUsage()
244 if (usage + size) < self._cacheMaxUsage:
245 self._cacheUsage += size
246 self.updateCacheUsageStatistics()
247 return (self._cacheUsageLastUpdate, size)
248
249 self.debug('cache usage will be %sbytes, need more cache',
250 format.formatStorage(usage + size))
251
252 if not self._cleanupEnabled:
253 self.debug('not allowed to clean up cache, so cannot cache')
254
255 return None
256
257
258 self.stats.onCleanup()
259
260 os.chdir(self._cacheDir)
261
262 files = []
263 for f in os.listdir('.'):
264
265
266 try:
267 files.append((f, os.stat(f)))
268 except OSError, e:
269 if e.errno == errno.ENOENT:
270 pass
271 else:
272 raise
273
274
275 usage = sum([d[1].st_size for d in files])
276
277 files.sort(key=lambda d: d[1].st_atime)
278 for path, info in files:
279 try:
280 os.remove(path)
281 usage -= info.st_size
282 except OSError, e:
283 if e.errno == errno.ENOENT:
284
285
286 usage -= info.st_size
287 else:
288 self.warning("Error cleaning cached file: %s", str(e))
289 if usage <= self._cacheMinUsage:
290
291 self.debug('cleaned up, cache use is now %sbytes',
292 format.formatStorage(usage))
293 break
294
295
296 self._cacheUsage = usage
297 self._cacheUsageLastUpdate = time.time()
298 if (usage + size) < self._cacheSize:
299
300 self._cacheUsage += size
301 self.updateCacheUsageStatistics()
302 return (self._cacheUsageLastUpdate, size)
303
304 self.updateCacheUsageStatistics()
305 return None
306
308 lastUpdate, size = tag
309 if lastUpdate == self._cacheUsageLastUpdate:
310 self._cacheUsage -= size
311 self.updateCacheUsageStatistics()
312
314 return self._index.get(path, None)
315
323
325 session = self._index.get(path, None)
326 if session is not None:
327 session.outdate()
328
330 path = session.sourcePath
331 if path in self._index:
332 del self._index[path]
333 self.disableSession(session)
334
336 self.debug("Starting Copy Session '%s' (%d)",
337 session.logName, len(self._sessions))
338 if session in self._sessions:
339 return
340 self._sessions[session] = None
341 self._activateCopyLoop()
342
344 self.debug("Stopping Copy Session '%s' (%d)",
345 session.logName, len(self._sessions))
346 if session in self._sessions:
347 del self._sessions[session]
348 if not self._sessions:
349 self._disableCopyLoop()
350
353
356
357
358 -class LocalPath(localpath.LocalPath, log.Loggable):
390
391
393
394 logCategory = LOG_CATEGORY
395
397 threading.Thread.__init__(self)
398 self.plug = plug
399 self._running = True
400 self._event = threading.Event()
401
403 self._running = False
404 self._event.set()
405 self.join()
406
409
412
428
429
432
433
435 """
436 I'm serving a file at the same time I'm copying it
437 from the network file system to the cache.
438 If the client ask for data not yet copied, the source file
439 read operation is delegated the the copy thread as an asynchronous
440 operation because file seeking/reading is not thread safe.
441
442 The copy session have to open two times the temporary file,
443 one for read-only and one for write only,
444 because closing a read/write file change the modification time.
445 We want the modification time to be set to a known value
446 when the copy is finished even keeping read access to the file.
447
448 The session manage a reference counter to know how many TempFileDelegate
449 instances are using the session to delegate read operations.
450 This is done for two reasons:
451 - To avoid circular references by have the session manage
452 a list of delegate instances.
453 - If not cancelled, sessions should not be deleted
454 when no delegates reference them anymore. So weakref cannot be used.
455 """
456
457 logCategory = LOG_CATEGORY
458
459 - def __init__(self, plug, sourcePath, sourceFile, sourceInfo):
460 self.plug = plug
461 self.logName = plug.getLogName(sourcePath, sourceFile.fileno())
462 self.copying = None
463 self.sourcePath = sourcePath
464 self.tempPath = plug.getTempPath(sourcePath)
465 self.cachePath = plug.getCachePath(sourcePath)
466
467 self.mtime = sourceInfo[stat.ST_MTIME]
468 self.size = sourceInfo[stat.ST_SIZE]
469 self._sourceFile = sourceFile
470 self._cancelled = False
471 self._wTempFile = None
472 self._rTempFile = None
473 self._allocTag = None
474 self._waitCancel = None
475
476 self._pending = []
477 self._refCount = 0
478 self._copied = 0
479 self._correction = 0
480 self._startCopying()
481
485
486 - def read(self, position, size, stats):
487
488 if self._rTempFile:
489
490
491 if (self._copied is None) or ((position + size) <= self._copied):
492 try:
493 self._rTempFile.seek(position)
494 data = self._rTempFile.read(size)
495
496 size = len(data)
497
498
499
500
501
502 diff = min(self._correction, size)
503 self._correction -= diff
504 stats.onBytesRead(0, size, diff)
505 return data
506 except Exception, e:
507 self.warning("Failed to read from temporary file: %s",
508 log.getExceptionMessage(e))
509 self._cancelSession()
510
511 if self._sourceFile is None:
512 raise FileError("File caching error, cannot proceed")
513
514 try:
515
516
517
518
519 if self.copying:
520
521
522
523 d = defer.Deferred()
524
525 def updateStats(data):
526 stats.onBytesRead(len(data), 0, 0)
527 return data
528
529 d.addCallback(updateStats)
530 self._pending.append((position, size, d))
531 return d
532
533 self._sourceFile.seek(position)
534 data = self._sourceFile.read(size)
535 stats.onBytesRead(len(data), 0, 0)
536 return data
537 except IOError, e:
538 cls = _errorLookup.get(e.errno, FileError)
539 raise cls("Failed to read source file: %s" % str(e))
540
543
545 self._refCount -= 1
546
547
548 if (self._refCount == 1) and self._cancelled:
549
550 self._cancelCopy(False, True)
551
552 if (self._refCount == 0) and (self._wTempFile is None):
553 self.close()
554
563
565 if not (self.copying and self._pending):
566
567 return False
568
569 position, size, d = self._pending.pop(0)
570 self._sourceFile.seek(position)
571 data = self._sourceFile.read(size)
572
573 reactor.callFromThread(d.callback, data)
574 return len(self._pending) > 0
575
577
578 if not self.copying:
579
580 return False
581
582 cont = True
583 try:
584
585
586 self._sourceFile.seek(self._copied)
587 self._wTempFile.seek(self._copied)
588 data = self._sourceFile.read(FILE_COPY_BUFFER_SIZE)
589 self._wTempFile.write(data)
590 self._wTempFile.flush()
591 except IOError, e:
592 self.warning("Failed to copy source file: %s",
593 log.getExceptionMessage(e))
594
595 self.copying = False
596 reactor.callFromThread(self.plug.disableSession, self)
597 reactor.callFromThread(self._cancelSession)
598
599 cont = False
600 else:
601 size = len(data)
602 self._copied += size
603 self._correction += size
604 if size < FILE_COPY_BUFFER_SIZE:
605
606 self.copying = False
607 reactor.callFromThread(self.plug.disableSession, self)
608 reactor.callFromThread(self._onCopyFinished)
609 cont = False
610
611 if self._waitCancel:
612
613 self.copying = False
614 reactor.callFromThread(self.plug.disableSession, self)
615 reactor.callFromThread(self._onCopyCancelled, *self._waitCancel)
616 return False
617 return cont
618
619
620
621
623
624 tag = self.plug.allocateCacheSpace(self.size)
625 if tag is None:
626 return False
627 self._allocTag = tag
628 return True
629
631 if not (self._cancelled or self._allocTag is None):
632 self.plug.releaseCacheSpace(self._allocTag)
633 self._allocTag = None
634
636 if not self._cancelled:
637 self.log("Canceling copy session")
638
639 self._cancelled = True
640
641
642 if self._refCount <= 1:
643
644 self._cancelCopy(False, True)
645
647 self.log("Start copy session")
648
649 self._removeTempFile()
650
651 if not self._allocCacheSpace():
652
653 self._cancelSession()
654 return
655 self.plug.stats.onCopyStarted()
656
657 try:
658 fd, transientPath = tempfile.mkstemp(".tmp", LOG_CATEGORY)
659 self.log("Created transient file '%s'", transientPath)
660 self._wTempFile = os.fdopen(fd, "wb")
661 self.log("Opened temporary file for writing [fd %d]",
662 self._wTempFile.fileno())
663 self._rTempFile = file(transientPath, "rb")
664 self.log("Opened temporary file for reading [fd %d]",
665 self._rTempFile.fileno())
666 except IOError, e:
667 self.warning("Failed to open temporary file: %s",
668 log.getExceptionMessage(e))
669 self._cancelSession()
670 return
671
672 try:
673 self.log("Truncating temporary file to size %d", self.size)
674 self._wTempFile.truncate(self.size)
675 except IOError, e:
676 self.warning("Failed to truncate temporary file: %s",
677 log.getExceptionMessage(e))
678 self._cancelSession()
679 return
680
681 try:
682 self.log("Renaming transient file to '%s'", self.tempPath)
683 os.rename(transientPath, self.tempPath)
684 except IOError, e:
685 self.warning("Failed to rename transient temporary file: %s",
686 log.getExceptionMessage(e))
687
688 self.debug("Start caching '%s' [fd %d]",
689 self.sourcePath, self._sourceFile.fileno())
690
691 self.copying = True
692 self.plug.activateSession(self)
693
695 if self.copying:
696 self.log("Canceling file copy")
697 if self._waitCancel:
698
699 return
700 self.debug("Cancel caching '%s' [fd %d]",
701 self.sourcePath, self._sourceFile.fileno())
702
703
704
705 self._waitCancel = (closeSource, closeTempWrite)
706 return
707
708 if closeSource:
709 self._closeSourceFile()
710 if closeTempWrite:
711 self._closeWriteTempFile()
712
737
739
740 self.debug("Finished caching '%s' [fd %d]",
741 self.sourcePath, self._sourceFile.fileno())
742 self.plug.stats.onCopyFinished(self.size)
743
744
745 self._copied = None
746
747 self._closeSourceFile()
748 self._closeWriteTempFile()
749
750 try:
751 mtime = self.mtime
752 atime = int(time.time())
753 self.log("Setting temporary file modification time to %d", mtime)
754
755 os.utime(self.tempPath, (atime, mtime))
756 except OSError, e:
757 if e.errno == errno.ENOENT:
758
759 self._releaseCacheSpace()
760 else:
761 self.warning("Failed to update modification time of temporary "
762 "file: %s", log.getExceptionMessage(e))
763 self._cancelSession()
764 try:
765 self.log("Renaming temporary file to '%s'", self.cachePath)
766 os.rename(self.tempPath, self.cachePath)
767 except OSError, e:
768 if e.errno == errno.ENOENT:
769 self._releaseCacheSpace()
770 else:
771 self.warning("Failed to rename temporary file: %s",
772 log.getExceptionMessage(e))
773 self._cancelSession()
774
775 for position, size, d in self._pending:
776 try:
777 self._rTempFile.seek(position)
778 data = self._rTempFile.read(size)
779 d.callback(data)
780 except Exception, e:
781 self.warning("Failed to read from temporary file: %s",
782 log.getExceptionMessage(e))
783 d.errback(e)
784 self._pending = []
785 if self._refCount == 0:
786
787 self.close()
788
803
805 if self._sourceFile is not None:
806 self.log("Closing source file [fd %d]", self._sourceFile.fileno())
807 try:
808 try:
809 self._sourceFile.close()
810 finally:
811 self._sourceFile = None
812 except IOError, e:
813 self.warning("Failed to close source file: %s",
814 log.getExceptionMessage(e))
815
817 if self._rTempFile is not None:
818 self.log("Closing temporary file for reading [fd %d]",
819 self._rTempFile.fileno())
820 try:
821 try:
822 self._rTempFile.close()
823 finally:
824 self._rTempFile = None
825 except IOError, e:
826 self.warning("Failed to close temporary file for reading: %s",
827 log.getExceptionMessage(e))
828
830 if self._wTempFile is not None:
831
832 if not self._cancelled and self._copied is not None:
833 self._removeTempFile()
834 self.log("Closing temporary file for writing [fd %d]",
835 self._wTempFile.fileno())
836 try:
837 try:
838 self._wTempFile.close()
839 finally:
840 self._wTempFile = None
841 except Exception, e:
842 self.warning("Failed to close temporary file for writing: %s",
843 log.getExceptionMessage(e))
844
845
847
848 logCategory = LOG_CATEGORY
849
851 self.logName = plug.getLogName(session.sourcePath)
852 self.mtime = session.mtime
853 self.size = session.size
854 self._session = session
855 self._reading = False
856 self._position = 0
857 session.incRef()
858
860 return self._position
861
862 - def seek(self, offset):
863 self._position = offset
864
865 - def read(self, size, stats):
866 assert not self._reading, "Simultaneous read not supported"
867 d = self._session.read(self._position, size, stats)
868 if isinstance(d, defer.Deferred):
869 self._reading = True
870 return d.addCallback(self._cbGotData)
871 self._position += len(d)
872 return d
873
875 if self._session is not None:
876 self._session.decRef()
877 self._session = None
878
879
880
881
883 self._reading = False
884 self._position += len(data)
885 return data
886
887
933
934
936
937 - def read(self, size, stats):
941
946
947
948 -class CachedFile(fileprovider.File, log.Loggable):
949
950 logCategory = LOG_CATEGORY
951
952
953 mimeType = None
954
955
956 _delegate = None
957
958 - def __init__(self, plug, path, mimeType):
965
967 return "<CachedFile '%s'>" % self._path
968
973
978
983
984 - def seek(self, offset):
988
989 - def read(self, size):
1002
1008
1011
1014
1015
1016
1017
1034
1042
1044 sourcePath = self._path
1045 cachedPath = self.plug.getCachePath(sourcePath)
1046
1047 try:
1048 sourceFile, sourceInfo = self._open(sourcePath)
1049 self.log("Opened source file [fd %d]", sourceFile.fileno())
1050 except NotFoundError:
1051 self.debug("Source file not found")
1052 self.plug.outdateCopySession(sourcePath)
1053 self._removeCachedFile(cachedPath)
1054 raise
1055
1056 self.logName = self.plug.getLogName(self._path, sourceFile.fileno())
1057
1058 try:
1059 cachedFile, cachedInfo = self._open(cachedPath)
1060 self.log("Opened cached file [fd %d]", cachedFile.fileno())
1061 except NotFoundError:
1062 self.debug("Did not find cached file '%s'", cachedPath)
1063 return self._tryTempFile(sourcePath, sourceFile, sourceInfo)
1064 except FileError, e:
1065 self.debug("Failed to open cached file: %s", str(e))
1066 self._removeCachedFile(cachedPath)
1067 return self._tryTempFile(sourcePath, sourceFile, sourceInfo)
1068
1069 self.debug("Found cached file '%s'", cachedPath)
1070 sourceTime = sourceInfo[stat.ST_MTIME]
1071 cacheTime = cachedInfo[stat.ST_MTIME]
1072 if sourceTime != cacheTime:
1073
1074 self.debug("Cached file out-of-date (%d != %d)",
1075 sourceTime, cacheTime)
1076 self.stats.onCacheOutdated()
1077 self.plug.outdateCopySession(sourcePath)
1078 self._removeCachedFile(cachedPath)
1079 return self._cacheFile(sourcePath, sourceFile, sourceInfo)
1080 self._closeSourceFile(sourceFile)
1081
1082 self.debug("Serving cached file '%s'", cachedPath)
1083 delegate = CachedFileDelegate(self.plug, cachedPath,
1084 cachedFile, cachedInfo)
1085 self.stats.onStarted(delegate.size, cachestats.CACHE_HIT)
1086 return delegate
1087
1089 try:
1090 os.remove(cachePath)
1091 self.debug("Deleted cached file '%s'", cachePath)
1092 except OSError, e:
1093 if e.errno != errno.ENOENT:
1094 self.warning("Error deleting cached file: %s", str(e))
1095
1096 - def _tryTempFile(self, sourcePath, sourceFile, sourceInfo):
1097 session = self.plug.getCopySession(sourcePath)
1098 if session is None:
1099 self.debug("No copy sessions found")
1100 return self._cacheFile(sourcePath, sourceFile, sourceInfo)
1101 self.debug("Copy session found")
1102 if sourceInfo[stat.ST_MTIME] != session.mtime:
1103 self.debug("Copy session out-of-date (%d != %d)",
1104 sourceInfo[stat.ST_MTIME], session.mtime)
1105 self.stats.onCacheOutdated()
1106 session.outdate()
1107 return self._cacheFile(sourcePath, sourceFile, sourceInfo)
1108 self._closeSourceFile(sourceFile)
1109
1110 self.debug("Serving temporary file '%s'", session.tempPath)
1111 delegate = TempFileDelegate(self.plug, session)
1112 self.stats.onStarted(delegate.size, cachestats.TEMP_HIT)
1113 return delegate
1114
1115 - def _cacheFile(self, sourcePath, sourceFile, sourceInfo):
1122