Package flumotion :: Package component :: Module feeder
[hide private]

Source Code for Module flumotion.component.feeder

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import time 
 23   
 24  import gst 
 25   
 26  from twisted.internet import reactor 
 27   
 28  from flumotion.common import componentui 
 29   
 30  __version__ = "$Rev: 7162 $" 
 31   
 32   
33 -class Feeder:
34 """ 35 This class groups feeder-related information as used by a Feed Component. 36 37 @ivar feederName: name of the feeder 38 @ivar uiState: the serializable UI State for this feeder 39 """ 40
41 - def __init__(self, feederName):
42 self.feederName = feederName 43 self.elementName = 'feeder:' + feederName 44 self.payName = self.elementName + '-pay' 45 self.uiState = componentui.WorkerComponentUIState() 46 self.uiState.addKey('feederName') 47 self.uiState.set('feederName', feederName) 48 self.uiState.addListKey('clients') 49 self._fdToClient = {} # fd -> (FeederClient, cleanupfunc) 50 self._clients = {} # id -> FeederClient
51
52 - def __repr__(self):
53 return ('<Feeder %s (%d client(s))>' 54 % (self.feederName, len(self._clients)))
55
56 - def clientConnected(self, clientId, fd, cleanup):
57 """ 58 The given client has connected on the given file descriptor, and is 59 being added to multifdsink. This is called solely from the reactor 60 thread. 61 62 @param clientId: id of the client of the feeder 63 @param fd: file descriptor representing the client 64 @param cleanup: callable to be called when the given fd is removed 65 """ 66 if clientId not in self._clients: 67 # first time we see this client, create an object 68 client = FeederClient(clientId) 69 self._clients[clientId] = client 70 self.uiState.append('clients', client.uiState) 71 72 client = self._clients[clientId] 73 self._fdToClient[fd] = (client, cleanup) 74 75 client.connected(fd) 76 77 return client
78
79 - def clientDisconnected(self, fd):
80 """ 81 The client has been entirely removed from multifdsink, and we may 82 now close its file descriptor. 83 The client object stays around so we can track over multiple 84 connections. 85 86 Called from GStreamer threads. 87 88 @type fd: file descriptor 89 """ 90 (client, cleanup) = self._fdToClient.pop(fd) 91 client.disconnected(fd=fd) 92 93 # To avoid races between this thread (a GStreamer thread) closing the 94 # FD, and the reactor thread reusing this FD, we only actually perform 95 # the close in the reactor thread. 96 reactor.callFromThread(cleanup, fd)
97
98 - def getClients(self):
99 """ 100 @rtype: list of all L{FeederClient}s ever seen, including currently 101 disconnected clients 102 """ 103 return self._clients.values()
104 105
106 -class FeederClient:
107 """ 108 This class groups information related to the client of a feeder. 109 The client is identified by an id. 110 The information remains valid for the lifetime of the feeder, so it 111 can track reconnects of the client. 112 113 @ivar clientId: id of the client of the feeder 114 @ivar fd: file descriptor the client is currently using, or None. 115 """ 116
117 - def __init__(self, clientId):
118 self.uiState = componentui.WorkerComponentUIState() 119 self.uiState.addKey('client-id', clientId) 120 self.fd = None 121 self.uiState.addKey('fd', None) 122 123 # these values can be set to None, which would mean 124 # Unknown, not supported 125 # these are supported 126 for key in ( 127 'bytes-read-current', # bytes read over current connection 128 'bytes-read-total', # bytes read over all connections 129 'reconnects', # number of connections made by this client 130 'last-connect', # last client connection, in epoch seconds 131 'last-disconnect', # last client disconnect, in epoch seconds 132 'last-activity', # last time client read or connected 133 ): 134 self.uiState.addKey(key, 0) 135 # these are possibly unsupported 136 for key in ( 137 'buffers-dropped-current', # buffers dropped over current conn 138 'buffers-dropped-total', # buffers dropped over all connections 139 ): 140 self.uiState.addKey(key, None) 141 142 # internal state allowing us to track global numbers 143 self._buffersDroppedBefore = 0 144 self._bytesReadBefore = 0
145
146 - def setStats(self, stats):
147 """ 148 @type stats: list 149 """ 150 bytesSent = stats[0] 151 #timeAdded = stats[1] 152 #timeRemoved = stats[2] 153 #timeActive = stats[3] 154 timeLastActivity = float(stats[4]) / gst.SECOND 155 if len(stats) > 5: 156 # added in gst-plugins-base 0.10.11 157 buffersDropped = stats[5] 158 else: 159 # We don't know, but we cannot use None 160 # since that would break integer addition below 161 buffersDropped = 0 162 163 self.uiState.set('bytes-read-current', bytesSent) 164 self.uiState.set('buffers-dropped-current', buffersDropped) 165 self.uiState.set('bytes-read-total', self._bytesReadBefore + bytesSent) 166 self.uiState.set('last-activity', timeLastActivity) 167 if buffersDropped is not None: 168 self.uiState.set('buffers-dropped-total', 169 self._buffersDroppedBefore + buffersDropped)
170
171 - def connected(self, fd, when=None):
172 """ 173 The client has connected on this fd. 174 Update related stats. 175 176 Called only from the reactor thread. 177 """ 178 if not when: 179 when = time.time() 180 181 if self.fd: 182 # It's normal to receive a reconnection before we notice 183 # that an old connection has been closed. Perform the 184 # disconnection logic for the old FD if necessary. See #591. 185 self._updateUIStateForDisconnect(self.fd, when) 186 187 self.fd = fd 188 self.uiState.set('fd', fd) 189 self.uiState.set('last-connect', when) 190 self.uiState.set('reconnects', self.uiState.get('reconnects', 0) + 1)
191
192 - def _updateUIStateForDisconnect(self, fd, when):
193 if self.fd == fd: 194 self.fd = None 195 self.uiState.set('fd', None) 196 self.uiState.set('last-disconnect', when) 197 198 # update our internal counters and reset current counters to 0 199 self._bytesReadBefore += self.uiState.get('bytes-read-current') 200 self.uiState.set('bytes-read-current', 0) 201 if self.uiState.get('buffers-dropped-current') is not None: 202 self._buffersDroppedBefore += self.uiState.get( 203 'buffers-dropped-current') 204 self.uiState.set('buffers-dropped-current', 0)
205
206 - def disconnected(self, when=None, fd=None):
207 """ 208 The client has disconnected. 209 Update related stats. 210 211 Called from GStreamer threads. 212 """ 213 if self.fd != fd: 214 # assume that connected() already called 215 # _updateUIStateForDisconnect for us 216 return 217 218 if not when: 219 when = time.time() 220 221 reactor.callFromThread(self._updateUIStateForDisconnect, fd, 222 when)
223