Package flumotion :: Package common :: Module worker
[hide private]

Source Code for Module flumotion.common.worker

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_common_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """objects related to the state of workers. 
 23  """ 
 24   
 25  import os 
 26  import signal 
 27   
 28  from twisted.spread import pb 
 29  from twisted.internet import protocol 
 30   
 31  from flumotion.common import log, errors, messages 
 32  from flumotion.common.i18n import N_, gettexter 
 33  from flumotion.twisted import flavors 
 34   
 35  __version__ = "$Rev: 7162 $" 
 36  T_ = gettexter() 
 37   
 38   
39 -class ProcessProtocol(protocol.ProcessProtocol):
40
41 - def __init__(self, loggable, avatarId, processType, where):
42 self.loggable = loggable 43 self.avatarId = avatarId 44 self.processType = processType # e.g., 'component' 45 self.where = where # e.g., 'worker 1' 46 47 self.setPid(None)
48
49 - def setPid(self, pid):
50 self.pid = pid
51
52 - def sendMessage(self, message):
53 raise NotImplementedError
54
55 - def processEnded(self, status):
56 # vmethod implementation 57 # status is an instance of failure.Failure 58 # status.value is a twisted.internet.error.ProcessTerminated 59 # status.value.status is the os.WAIT-like status value 60 message = None 61 obj = self.loggable 62 pid = None 63 # if we have a pid, then set pid to string value of pid 64 # otherwise set to "unknown" 65 if self.pid: 66 pid = str(self.pid) 67 else: 68 pid = "unknown" 69 if status.value.exitCode is not None: 70 obj.info("Reaped child with pid %s, exit value %d.", 71 pid, status.value.exitCode) 72 signum = status.value.signal 73 74 # SIGKILL is an explicit kill, and never generates a core dump. 75 # For any other signal we want to see if there is a core dump, 76 # and warn if not. 77 if signum is not None: 78 if signum == signal.SIGKILL: 79 obj.warning("Child with pid %s killed.", pid) 80 message = messages.Error(T_(N_("The %s was killed.\n"), 81 self.processType)) 82 else: 83 message = messages.Error(T_(N_("The %s crashed.\n"), 84 self.processType), 85 debug='Terminated with signal number %d' % signum) 86 87 # use some custom logging depending on signal 88 if signum == signal.SIGSEGV: 89 obj.warning("Child with pid %s segfaulted.", pid) 90 elif signum == signal.SIGTRAP: 91 # SIGTRAP occurs when registry is corrupt 92 obj.warning("Child with pid %s received a SIGTRAP.", 93 pid) 94 else: 95 # if we find any of these, possibly special-case them too 96 obj.info("Reaped child with pid %s signaled by " 97 "signal %d.", pid, signum) 98 99 if not os.WCOREDUMP(status.value.status): 100 obj.warning("No core dump generated. " 101 "Were core dumps enabled at the start ?") 102 message.add(T_(N_( 103 "However, no core dump was generated. " 104 "You may need to configure the environment " 105 "if you want to further debug this problem."))) 106 #message.description = T_(N_( 107 # "Learn how to enable core dumps.")) 108 else: 109 obj.info("Core dumped.") 110 corepath = os.path.join(os.getcwd(), 'core.%s' % pid) 111 if os.path.exists(corepath): 112 obj.info("Core file is probably '%s'." % corepath) 113 message.add(T_(N_( 114 "The core dump is '%s' on the host running '%s'."), 115 corepath, self.where)) 116 # FIXME: add an action that runs gdb and produces a 117 # backtrace; or produce it here and attach to the 118 # message as debug info. 119 message.description = T_(N_( 120 "Learn how to analyze core dumps.")) 121 message.section = 'chapter-debug' 122 message.anchor = 'section-os-analyze-core-dumps' 123 124 if message: 125 obj.debug('sending message to manager/admin') 126 self.sendMessage(message) 127 128 self.setPid(None)
129 130
131 -class PortSet(log.Loggable):
132 """ 133 A list of ports that keeps track of which are available for use on a 134 given machine. 135 """ 136 # not very efficient mkay 137
138 - def __init__(self, logName, ports, randomPorts=False):
139 self.logName = logName 140 self.ports = ports 141 self.used = [0] * len(ports) 142 self.random = randomPorts
143
144 - def reservePorts(self, numPorts):
145 ret = [] 146 while numPorts > 0: 147 if self.random: 148 ret.append(0) 149 numPorts -= 1 150 continue 151 if not 0 in self.used: 152 raise errors.ComponentStartError( 153 'could not allocate port on worker %s' % self.logName) 154 i = self.used.index(0) 155 ret.append(self.ports[i]) 156 self.used[i] = 1 157 numPorts -= 1 158 return ret
159
160 - def setPortsUsed(self, ports):
161 for port in ports: 162 try: 163 i = self.ports.index(port) 164 except ValueError: 165 self.warning('portset does not include port %d', port) 166 else: 167 if self.used[i]: 168 self.warning('port %d already in use!', port) 169 else: 170 self.used[i] = 1
171
172 - def releasePorts(self, ports):
173 """ 174 @param ports: list of ports to release 175 @type ports: list of int 176 """ 177 for p in ports: 178 try: 179 i = self.ports.index(p) 180 if self.used[i]: 181 self.used[i] = 0 182 else: 183 self.warning('releasing unallocated port: %d' % p) 184 except ValueError: 185 self.warning('releasing unknown port: %d' % p)
186
187 - def numFree(self):
188 return len(self.ports) - self.numUsed()
189
190 - def numUsed(self):
191 return len(filter(None, self.used))
192 193 # worker heaven state proxy objects 194 195
196 -class ManagerWorkerHeavenState(flavors.StateCacheable):
197 """ 198 I represent the state of the worker heaven on the manager. 199 200 I have the following keys: 201 202 - names (list): list of worker names that we have state for 203 - workers (list): list of L{ManagerWorkerState} 204 """ 205
206 - def __init__(self):
207 flavors.StateCacheable.__init__(self) 208 self.addListKey('names', []) 209 self.addListKey('workers', []) # should be a dict
210
211 - def __repr__(self):
212 return "%r" % self._dict
213 214
215 -class AdminWorkerHeavenState(flavors.StateRemoteCache):
216 """ 217 I represent the state of the worker heaven in the admin. 218 See L{ManagerWorkerHeavenState} 219 """ 220 pass
221 222 pb.setUnjellyableForClass(ManagerWorkerHeavenState, AdminWorkerHeavenState) 223 224
225 -class ManagerWorkerState(flavors.StateCacheable):
226 """ 227 I represent the state of a worker in the manager. 228 229 - name: name of the worker 230 - host: the IP address of the worker as seen by the manager 231 """ 232
233 - def __init__(self, **kwargs):
234 flavors.StateCacheable.__init__(self) 235 self.addKey('name') 236 self.addKey('host') 237 for k, v in kwargs.items(): 238 self.set(k, v)
239
240 - def __repr__(self):
241 return ("<ManagerWorkerState for %s on %s>" 242 % (self.get('name'), self.get('host')))
243 244
245 -class AdminWorkerState(flavors.StateRemoteCache):
246 """ 247 I represent the state of a worker in the admin. 248 249 See L{ManagerWorkerState} 250 """ 251 pass
252 253 pb.setUnjellyableForClass(ManagerWorkerState, AdminWorkerState) 254