1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 worker-side objects to handle worker clients
24 """
25
26 import os
27 import signal
28 import sys
29
30 from twisted.internet import defer, reactor
31
32 from flumotion.common import errors, log
33 from flumotion.common import messages
34 from flumotion.common.i18n import N_, gettexter
35 from flumotion.configure import configure
36 from flumotion.worker import base
37
38 __version__ = "$Rev: 8530 $"
39 T_ = gettexter()
40
41
43
45
46 def bootstrap(*args):
47 return self.mindCallRemote('bootstrap', *args)
48
49 def create(_, job):
50 self.debug("asking job to create component with avatarId %s,"
51 " type %s", job.avatarId, job.type)
52 return self.mindCallRemote('create', job.avatarId, job.type,
53 job.moduleName, job.methodName,
54 job.nice, job.conf)
55
56 def success(_, avatarId):
57 self.debug('job started component with avatarId %s',
58 avatarId)
59
60 self._heaven._startSet.createSuccess(avatarId)
61
62 def error(failure, job):
63 msg = log.getFailureMessage(failure)
64 if failure.check(errors.ComponentCreateError):
65 self.warning('could not create component %s of type %s:'
66 ' %s', job.avatarId, job.type, msg)
67 else:
68 self.warning('unhandled error creating component %s: %s',
69 job.avatarId, msg)
70
71 self._heaven._startSet.createFailed(job.avatarId, failure)
72
73 def gotPid(pid):
74 self.pid = pid
75 info = self._heaven.getManagerConnectionInfo()
76 if info.use_ssl:
77 transport = 'ssl'
78 else:
79 transport = 'tcp'
80 job = self._heaven.getJobInfo(pid)
81 workerName = self._heaven.getWorkerName()
82
83 d = bootstrap(workerName, info.host, info.port, transport,
84 info.authenticator, job.bundles)
85 d.addCallback(create, job)
86 d.addCallback(success, job.avatarId)
87 d.addErrback(error, job)
88 return d
89 d = self.mindCallRemote("getPid")
90 d.addCallback(gotPid)
91 return d
92
94 """
95 returns: a deferred marking completed stop.
96 """
97 if not self.mind:
98 self.debug('already logged out')
99 return defer.succeed(None)
100 else:
101 self.debug('stopping')
102 return self.mindCallRemote('stop')
103
104 - def sendFeed(self, feedName, fd, eaterId):
105 """
106 Tell the feeder to send the given feed to the given fd.
107
108 @returns: whether the fd was successfully handed off to the component.
109 """
110 self.debug('Sending FD %d to component job to feed %s to fd',
111 fd, feedName)
112
113
114
115
116
117 if self.mind:
118 message = "sendFeed %s %s" % (feedName, eaterId)
119 return self._sendFileDescriptor(fd, message)
120 else:
121 self.debug('my mind is gone, trigger disconnect')
122 return False
123
125 """
126 Tell the feeder to receive the given feed from the given fd.
127
128 @returns: whether the fd was successfully handed off to the component.
129 """
130 self.debug('Sending FD %d to component job to eat %s from fd',
131 fd, eaterAlias)
132
133
134 if self.mind:
135 message = "receiveFeed %s %s" % (eaterAlias, feedId)
136 return self._sendFileDescriptor(fd, message)
137 else:
138 self.debug('my mind is gone, trigger disconnect')
139 return False
140
142 """
143 This notification from the job process will be fired when it is
144 shutting down, so that although the process might still be
145 around, we know it's OK to accept new start requests for this
146 avatar ID.
147 """
148 self.info("component %s shutting down cleanly", self.avatarId)
149
150 self._heaven._startSet.shutdownStart(self.avatarId)
151
152
154 __slots__ = ('conf', )
155
156 - def __init__(self, pid, avatarId, type, moduleName, methodName,
157 nice, bundles, conf):
161
162
164 avatarClass = ComponentJobAvatar
165 logCategory = 'component-job-heaven'
166
168 """
169 Gets the L{flumotion.common.connection.PBConnectionInfo}
170 describing how to connect to the manager.
171
172 @rtype: L{flumotion.common.connection.PBConnectionInfo}
173 """
174 return self.brain.managerConnectionInfo
175
176 - def spawn(self, avatarId, type, moduleName, methodName, nice,
177 bundles, conf):
178 """
179 Spawn a new job.
180
181 This will spawn a new flumotion-job process, running under the
182 requested nice level. When the job logs in, it will be told to
183 load bundles and run a function, which is expected to return a
184 component.
185
186 @param avatarId: avatarId the component should use to log in
187 @type avatarId: str
188 @param type: type of component to start
189 @type type: str
190 @param moduleName: name of the module to create the component from
191 @type moduleName: str
192 @param methodName: the factory method to use to create the component
193 @type methodName: str
194 @param nice: nice level
195 @type nice: int
196 @param bundles: ordered list of (bundleName, bundlePath) for this
197 component
198 @type bundles: list of (str, str)
199 @param conf: component configuration
200 @type conf: dict
201 """
202 d = self._startSet.createStart(avatarId)
203
204 p = base.JobProcessProtocol(self, avatarId, self._startSet)
205 executable = os.path.join(configure.bindir, 'flumotion-job')
206 if not os.path.exists(executable):
207 self.error("Trying to spawn job process, but '%s' does not "
208 "exist", executable)
209 argv = [executable, avatarId, self._socketPath]
210
211 realexecutable = executable
212
213
214
215
216
217 if 'FLU_VALGRIND_JOB' in os.environ:
218 jobnames = os.environ['FLU_VALGRIND_JOB'].split(',')
219 if avatarId in jobnames:
220 realexecutable = 'valgrind'
221
222
223
224 argv = ['valgrind', '--leak-check=full', '--num-callers=24',
225 '--leak-resolution=high', '--show-reachable=yes',
226 'python'] + argv
227
228 childFDs = {0: 0, 1: 1, 2: 2}
229 env = {}
230 env.update(os.environ)
231 env['FLU_DEBUG'] = log.getDebug()
232 process = reactor.spawnProcess(p, realexecutable, env=env, args=argv,
233 childFDs=childFDs)
234
235 p.setPid(process.pid)
236
237 self.addJobInfo(process.pid,
238 ComponentJobInfo(process.pid, avatarId, type,
239 moduleName, methodName, nice,
240 bundles, conf))
241 return d
242
243
245
253
254 d = self.mindCallRemote("getPid")
255 d.addCallback(gotPid)
256 return d
257
264
266 self.debug("job is stopping")
267
268
270 avatarClass = CheckJobAvatar
271 logCategory = 'check-job-heaven'
272
273 _checkCount = 0
274 _timeout = 45
275
282
284 if self.jobPool:
285 job, expireDC = self.jobPool.pop(0)
286 expireDC.cancel()
287 self.debug('running check in already-running job %s',
288 job.avatarId)
289 return defer.succeed(job)
290
291 avatarId = 'check-%d' % (self._checkCount, )
292 self._checkCount += 1
293
294 self.debug('spawning new job %s to run a check', avatarId)
295 d = self._startSet.createStart(avatarId)
296
297 p = base.JobProcessProtocol(self, avatarId, self._startSet)
298 executable = os.path.join(configure.bindir, 'flumotion-job')
299 argv = [executable, avatarId, self._socketPath]
300
301 childFDs = {0: 0, 1: 1, 2: 2}
302 env = {}
303 env.update(os.environ)
304 env['FLU_DEBUG'] = log.getDebug()
305 process = reactor.spawnProcess(p, executable, env=env, args=argv,
306 childFDs=childFDs)
307
308 p.setPid(process.pid)
309 jobInfo = base.JobInfo(process.pid, avatarId, type, None, None,
310 None, [])
311 self._jobInfos[process.pid] = jobInfo
312
313 def haveMind(_):
314
315 return self.avatars[avatarId]
316
317 d.addCallback(haveMind)
318 return d
319
320 - def runCheck(self, bundles, moduleName, methodName, *args, **kwargs):
327
328 def timeout(sig):
329 self.killJobByPid(job.pid, sig)
330
331 def haveResult(res):
332 if not termtimeout.active():
333 self.info("Discarding error %s", res)
334 res = messages.Result()
335 res.add(messages.Error(
336 T_(N_("Check timed out.")),
337 debug=("Timed out running %s."%methodName)))
338 else:
339
340 def expire():
341 if (job, expireDC) in self.jobPool:
342 self.debug('stopping idle check job process %s',
343 job.avatarId)
344 self.jobPool.remove((job, expireDC))
345 job.mindCallRemote('stop')
346 expireDC = reactor.callLater(self._timeout, expire)
347 self.jobPool.append((job, expireDC))
348
349 if termtimeout.active():
350 termtimeout.cancel()
351 if killtimeout.active():
352 killtimeout.cancel()
353 return res
354
355
356
357 termtimeout = reactor.callLater(self._timeout, timeout,
358 signal.SIGTERM)
359 killtimeout = reactor.callLater(self._timeout, timeout,
360 signal.SIGKILL)
361
362 d = job.mindCallRemote('bootstrap', self.getWorkerName(),
363 None, None, None, None, bundles)
364 d.addCallback(callProc)
365 d.addCallbacks(haveResult, haveResult)
366 return d
367
368 d = self.getCheckJobFromPool()
369 d.addCallback(haveJob)
370
371 return d
372