Package flumotion :: Package worker :: Module medium
[hide private]

Source Code for Module flumotion.worker.medium

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  worker-side objects to handle worker clients 
 24  """ 
 25   
 26  import signal 
 27   
 28  from twisted.internet import reactor, error 
 29  from twisted.spread import flavors 
 30  from zope.interface import implements 
 31   
 32  from flumotion.common import errors, interfaces, debug 
 33  from flumotion.common import medium 
 34  from flumotion.common.vfs import listDirectory, registerVFSJelly 
 35  from flumotion.twisted.pb import ReconnectingFPBClientFactory 
 36   
 37  __version__ = "$Rev: 8208 $" 
 38  JOB_SHUTDOWN_TIMEOUT = 5 
 39   
 40   
41 -class WorkerClientFactory(ReconnectingFPBClientFactory):
42 """ 43 I am a client factory for the worker to log in to the manager. 44 """ 45 logCategory = 'worker' 46 perspectiveInterface = interfaces.IWorkerMedium 47
48 - def __init__(self, medium, host, port):
49 """ 50 @type medium: L{flumotion.worker.medium.WorkerMedium} 51 @type host: str 52 @type port: int 53 """ 54 self._managerHost = host 55 self._managerPort = port 56 self.medium = medium 57 # doing this as a class method triggers a doc error 58 ReconnectingFPBClientFactory.__init__(self) 59 # maximum 10 second delay for workers to attempt to log in again 60 self.maxDelay = 10
61
62 - def clientConnectionFailed(self, connector, reason):
63 """ 64 @param reason: L{twisted.spread.pb.failure.Failure} 65 """ 66 # this method exists so that we log the failure 67 ReconnectingFPBClientFactory.clientConnectionFailed(self, 68 connector, reason) 69 # delay is now updated 70 self.debug("failed to connect, will try to reconnect in %f seconds" % 71 self.delay)
72 73 ### ReconnectingPBClientFactory methods 74
75 - def gotDeferredLogin(self, d):
76 # the deferred from the login is now available 77 # add some of our own to it 78 79 def remoteDisconnected(remoteReference): 80 if reactor.killed: 81 self.log('Connection to manager lost due to shutdown') 82 else: 83 self.warning('Lost connection to manager, ' 84 'will attempt to reconnect')
85 86 def loginCallback(reference): 87 self.info("Logged in to manager") 88 self.debug("remote reference %r" % reference) 89 90 self.medium.setRemoteReference(reference) 91 reference.notifyOnDisconnect(remoteDisconnected)
92 93 def alreadyConnectedErrback(failure): 94 failure.trap(errors.AlreadyConnectedError) 95 self.warning('A worker with the name "%s" is already connected.' % 96 failure.value) 97 98 def accessDeniedErrback(failure): 99 failure.trap(errors.NotAuthenticatedError) 100 self.warning('Access denied.') 101 102 def connectionRefusedErrback(failure): 103 failure.trap(error.ConnectionRefusedError) 104 self.warning('Connection to %s:%d refused.' % (self._managerHost, 105 self._managerPort)) 106 107 def NoSuchMethodErrback(failure): 108 failure.trap(flavors.NoSuchMethod) 109 # failure.value is a str 110 if failure.value.find('remote_getKeycardClasses') > -1: 111 self.warning( 112 "Manager %s:%d is older than version 0.3.0. " 113 "Please upgrade." % (self._managerHost, self._managerPort)) 114 return 115 116 return failure 117 118 def loginFailedErrback(failure): 119 self.warning('Login failed, reason: %s' % str(failure)) 120 121 d.addCallback(loginCallback) 122 d.addErrback(accessDeniedErrback) 123 d.addErrback(connectionRefusedErrback) 124 d.addErrback(alreadyConnectedErrback) 125 d.addErrback(NoSuchMethodErrback) 126 d.addErrback(loginFailedErrback) 127 128
129 -class WorkerMedium(medium.PingingMedium):
130 """ 131 I am a medium interfacing with the manager-side WorkerAvatar. 132 133 @ivar brain: the worker brain 134 @type brain: L{worker.WorkerBrain} 135 @ivar factory: the worker client factory 136 @type factory: L{WorkerClientFactory} 137 """ 138 139 logCategory = 'workermedium' 140 141 implements(interfaces.IWorkerMedium) 142
143 - def __init__(self, brain):
144 """ 145 @type brain: L{worker.WorkerBrain} 146 """ 147 self.brain = brain 148 self.factory = None 149 registerVFSJelly()
150
151 - def startConnecting(self, connectionInfo):
152 info = connectionInfo 153 154 self.factory = WorkerClientFactory(self, info.host, info.port) 155 self.factory.startLogin(info.authenticator) 156 157 if info.use_ssl: 158 from flumotion.common import common 159 common.assertSSLAvailable() 160 from twisted.internet import ssl 161 reactor.connectSSL(info.host, info.port, self.factory, 162 ssl.ClientContextFactory()) 163 else: 164 reactor.connectTCP(info.host, info.port, self.factory)
165
166 - def stopConnecting(self):
167 # only called by test suites 168 self.factory.disconnect() 169 self.factory.stopTrying()
170 171 ### pb.Referenceable method for the manager's WorkerAvatar 172
173 - def remote_getPorts(self):
174 """ 175 Gets the set of TCP ports that this worker is configured to use. 176 177 @rtype: 2-tuple: (list of int, bool) 178 @return: list of ports, and a boolean if we allocate ports 179 randomly 180 """ 181 return self.brain.getPorts()
182
183 - def remote_getFeedServerPort(self):
184 """ 185 Return the TCP port the Feed Server is listening on. 186 187 @rtype: int, or NoneType 188 @return: TCP port number, or None if there is no feed server 189 """ 190 return self.brain.getFeedServerPort()
191
192 - def remote_create(self, avatarId, type, moduleName, methodName, 193 nice, conf):
194 """ 195 Start a component of the given type with the given nice level. 196 Will spawn a new job process to run the component in. 197 198 @param avatarId: avatar identification string 199 @type avatarId: str 200 @param type: type of the component to create 201 @type type: str 202 @param moduleName: name of the module to create the component from 203 @type moduleName: str 204 @param methodName: the factory method to use to create the component 205 @type methodName: str 206 @param nice: nice level 207 @type nice: int 208 @param conf: component config 209 @type conf: dict 210 211 @returns: a deferred fired when the process has started and created 212 the component 213 """ 214 return self.brain.create(avatarId, type, moduleName, methodName, 215 nice, conf)
216
217 - def remote_checkElements(self, elementNames):
218 """ 219 Checks if one or more GStreamer elements are present and can be 220 instantiated. 221 222 @param elementNames: names of the Gstreamer elements 223 @type elementNames: list of str 224 225 @rtype: list of str 226 @returns: a list of instantiatable element names 227 """ 228 return self.brain.runCheck('flumotion.worker.checks.check', 229 'checkElements', elementNames)
230
231 - def remote_checkImport(self, moduleName):
232 """ 233 Checks if the given module can be imported. 234 235 @param moduleName: name of the module to check 236 @type moduleName: str 237 238 @returns: None or Failure 239 """ 240 return self.brain.runCheck( 241 'flumotion.worker.checks.check', 'checkImport', 242 moduleName)
243
244 - def remote_runCheck(self, module, function, *args, **kwargs):
245 """ 246 Runs the given function in the given module with the given arguments. 247 248 @param module: module the function lives in 249 @type module: str 250 @param function: function to run 251 @type function: str 252 253 @returns: the return value of the given function in the module. 254 """ 255 return self.brain.runCheck(module, function, *args, **kwargs)
256 remote_runFunction = remote_runCheck 257
258 - def remote_getComponents(self):
259 """ 260 I return a list of componentAvatarIds, I have. I am called by the 261 manager soon after I attach to it. This is needed on reconnects 262 so that the manager knows what components it needs to start on me. 263 264 @returns: a list of componentAvatarIds 265 """ 266 return self.brain.getComponents()
267
268 - def remote_killJob(self, avatarId, signum=signal.SIGKILL):
269 """Kill one of the worker's jobs. 270 271 This method is intended for exceptional purposes only; a normal 272 component shutdown is performed by the manager via calling 273 remote_stop() on the component avatar. 274 275 Raises L{flumotion.common.errors.UnknownComponentError} if the 276 job is unknown. 277 278 @param avatarId: the avatar Id of the component, e.g. 279 '/default/audio-encoder' 280 @type avatarId: string 281 @param signum: Signal to send, optional. Defaults to SIGKILL. 282 @type signum: int 283 """ 284 self.brain.killJob(avatarId, signum)
285
286 - def remote_getVersions(self):
287 return debug.getVersions()
288
289 - def remote_listDirectory(self, directoryName):
290 """List the directory called path. 291 292 Raises L{flumotion.common.errors.NotDirectoryError} if directoryName is 293 not a directory. 294 295 @param directoryName: the name of the directory to list 296 @type directoryName: string 297 @returns: the directory 298 @rtype: deferred that will fire an object implementing L{IDirectory} 299 """ 300 return listDirectory(directoryName)
301