1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 worker-side objects for components
24 """
25
26 import os
27 import time
28 import socket
29
30 from twisted.internet import reactor, error, defer
31 from twisted.spread import pb
32 from twisted.python import reflect
33 from zope.interface import implements
34
35 from flumotion.configure import configure
36 from flumotion.common import interfaces, errors, log, planet, medium
37 from flumotion.common import componentui, common, messages
38 from flumotion.common import interfaces, reflectcall, debug
39 from flumotion.common.i18n import N_, gettexter
40 from flumotion.common.planet import moods
41 from flumotion.common.poller import Poller
42 from flumotion.twisted import credentials
43 from flumotion.twisted import pb as fpb
44
45
46 __version__ = "$Rev: 7162 $"
47 T_ = gettexter()
48
49
51 """
52 I am a client factory for a component logging in to the manager.
53 """
54 logCategory = 'component'
55 perspectiveInterface = interfaces.IComponentMedium
56
75
79
80
81
83
84 def remoteDisconnected(remoteReference):
85 if reactor.killed:
86 self.log('Connection to manager lost due to shutdown')
87 else:
88 self.warning('Lost connection to manager, '
89 'will attempt to reconnect')
90
91 def loginCallback(reference):
92 self.info("Logged in to manager")
93 self.debug("remote reference %r" % reference)
94
95 self.medium.setRemoteReference(reference)
96 reference.notifyOnDisconnect(remoteDisconnected)
97
98 def loginFailedDisconnect(failure):
99
100
101 self.debug('Login failed, reason: %s, disconnecting', failure)
102 self.disconnect()
103 return failure
104
105 def accessDeniedErrback(failure):
106 failure.trap(errors.NotAuthenticatedError)
107 self.warning('Access denied.')
108
109 def connectionRefusedErrback(failure):
110 failure.trap(error.ConnectionRefusedError)
111 self.warning('Connection to manager refused.')
112
113 def alreadyLoggedInErrback(failure):
114 failure.trap(errors.AlreadyConnectedError)
115 self.warning('Component with id %s is already logged in.',
116 self.medium.authenticator.avatarId)
117
118 def loginFailedErrback(failure):
119 self.warning('Login failed, reason: %s' % failure)
120
121 d.addCallback(loginCallback)
122 d.addErrback(loginFailedDisconnect)
123 d.addErrback(accessDeniedErrback)
124 d.addErrback(connectionRefusedErrback)
125 d.addErrback(alreadyLoggedInErrback)
126 d.addErrback(loginFailedErrback)
127
128
129
133
134
136 """
137 Creates a deferred chain created by chaining calls to the given
138 procedures, each of them made with the given args and kwargs.
139 Only the result of the last procedure is returned; results for the
140 other procedures are discarded.
141
142 Failures triggered during any of the procedure short-circuit execution
143 of the other procedures and should be handled by the errbacks attached
144 to the deferred returned here.
145
146 @rtype: L{twisted.internet.defer.Deferred}
147 """
148
149 def call_proc(_, p):
150 log.debug('', 'calling %r', p)
151 return p(*args, **kwargs)
152 p, procs = procs[0], procs[1:]
153 d = defer.maybeDeferred(call_proc, None, p)
154 for p in procs:
155 d.addCallback(call_proc, p)
156 return d
157
158
159
160
162 """
163 I am a medium interfacing with a manager-side avatar.
164 I implement a Referenceable for the manager's avatar to call on me.
165 I have a remote reference to the manager's avatar to call upon.
166 I am created by the L{ComponentClientFactory}.
167
168 @cvar authenticator: the authenticator used to log in to manager
169 @type authenticator: L{flumotion.twisted.pb.Authenticator}
170 """
171
172 implements(interfaces.IComponentMedium)
173 logCategory = 'basecompmed'
174
176 """
177 @param component: L{flumotion.component.component.BaseComponent}
178 """
179 self.comp = component
180 self.authenticator = None
181 self.broker = None
182
186
187
188
189 - def setup(self, config):
191
193 """
194 Return the manager IP as seen by us.
195 """
196 assert self.remote or self.broker
197 broker = self.broker or self.remote.broker
198 peer = broker.transport.getPeer()
199 try:
200 host = peer.host
201 except AttributeError:
202 host = peer[1]
203
204 res = socket.gethostbyname(host)
205 self.debug("getManagerIP(): we think the manager's IP is %r" % res)
206 return res
207
209 """
210 Return the IP of this component based on connection to the manager.
211
212 Note: this is insufficient in general, and should be replaced by
213 network mapping stuff later.
214 """
215 assert self.remote
216 host = self.remote.broker.transport.getHost()
217 self.debug("getIP(): using %r as our IP", host.host)
218 return host.host
219
221 """
222 Set the authenticator the client factory has used to log in to the
223 manager. Can be reused by the component's medium to make
224 feed connections which also get authenticated by the manager's
225 bouncer.
226
227 @type authenticator: L{flumotion.twisted.pb.Authenticator}
228 """
229 self.authenticator = authenticator
230
231
232
233
235 """
236 Return the state of the component, which will be serialized to a
237 L{flumotion.common.planet.ManagerJobState} object.
238
239 @rtype: L{flumotion.common.planet.WorkerJobState}
240 @returns: state of component
241 """
242
243
244 self.comp.state.set('manager-ip', self.getManagerIP())
245 return self.comp.state
246
248 """
249 Return the configuration of the component.
250
251 @rtype: dict
252 @returns: component's current configuration
253 """
254 return self.comp.config
255
257 self.info('Stopping component')
258 return self.comp.stop()
259
264
266 """Get a WorkerComponentUIState containing details needed to
267 present an admin-side UI state
268 """
269 return self.comp.uiState
270
272 """
273 Base implementation of getMasterClockInfo, can be overridden by
274 subclasses. By default, just returns None.
275 """
276 return None
277
280
282 """
283 Sets the Flumotion debugging levels based on the passed debug string.
284
285 @since: 0.6.0
286 """
287 self.debug('Setting Flumotion debug level to %s' % debug)
288 log.setDebug(debug)
289
290
292 """
293 I am the base class for all Flumotion components.
294
295 @ivar name: the name of the component
296 @type name: string
297 @ivar medium: the component's medium
298 @type medium: L{BaseComponentMedium}
299 @ivar uiState: state of the component to be shown in a UI.
300 Contains at least the following keys.
301 - cpu-percent: percentage of CPU use in last interval
302 - start-time: time when component was started, in epoch
303 seconds
304 - current-time: current time in epoch seconds, as seen on
305 component's machine, which might be out of
306 sync
307 - virtual-size: virtual memory size in bytes
308 Subclasses can add additional keys for their respective UI.
309 @type uiState: L{componentui.WorkerComponentUIState}
310
311 @cvar componentMediumClass: the medium class to use for this component
312 @type componentMediumClass: child class of L{BaseComponentMedium}
313 """
314
315 logCategory = 'basecomp'
316 componentMediumClass = BaseComponentMedium
317
318 - def __init__(self, config, haveError=None):
319 """
320 Subclasses should not override __init__ at all.
321
322 Instead, they should implement init(), which will be called
323 by this implementation automatically.
324
325 L{flumotion.common.common.InitMixin} for more details.
326 """
327 self.debug("initializing %r with config %r", type(self), config)
328 self.config = config
329 self._haveError = haveError
330
331
332 common.InitMixin.__init__(self)
333
334 self.setup()
335
336
337
339 """
340 A subclass should do as little as possible in its init method.
341 In particular, it should not try to access resources.
342
343 Failures during init are marshalled back to the manager through
344 the worker's remote_create method, since there is no component state
345 proxied to the manager yet at the time of init.
346 """
347 self.state = planet.WorkerJobState()
348
349 self.name = self.config['name']
350
351 self.state.set('pid', os.getpid())
352 self.setMood(moods.waking)
353
354 self.medium = None
355
356 self.uiState = componentui.WorkerComponentUIState()
357 self.uiState.addKey('cpu-percent')
358 self.uiState.addKey('start-time')
359 self.uiState.addKey('current-time')
360 self.uiState.addKey('virtual-size')
361
362 self.plugs = {}
363
364 self._happyWaits = []
365
366
367 self._lastTime = time.time()
368 self._lastClock = time.clock()
369 self._cpuPoller = Poller(self._pollCPU, 5)
370 self._memoryPoller = Poller(self._pollMemory, 60)
371
372 self._shutdownHook = None
373
375 """
376 Subclasses can implement me to run any checks before the component
377 performs setup.
378
379 Messages can be added to the component state's 'messages' list key.
380 Any error messages added will trigger the component going to sad,
381 with L{flumotion.common.errors.ComponentSetupError} being raised
382 before getting to setup stage; do_setup() will not be called.
383
384 In the event of a fatal problem that can't be expressed through an
385 error message, this method should raise an exception or return a
386 failure.
387
388 It is not necessary to chain up in this function. The return
389 value may be a deferred.
390 """
391 return defer.maybeDeferred(self.check_properties,
392 self.config['properties'],
393 self.addMessage)
394
396 """
397 BaseComponent convenience vmethod for running checks.
398
399 A component implementation can override this method to run any
400 checks that it needs to. Typically, a check_properties
401 implementation will call the provided addMessage() callback to
402 note warnings or errors. For errors, addMessage() will set
403 component's mood to sad, which will abort the init process
404 before getting to do_setup().
405
406 @param properties: The component's properties
407 @type properties: dict of string => object
408 @param addMessage: Thunk to add a message to the component
409 state. Will raise an exception if the
410 message is of level ERROR.
411 @type addMessage: L{flumotion.common.messages.Message} -> None
412 """
413 pass
414
416 """
417 Subclasses can implement me to set up the component before it is
418 started. It should set up the component, possibly opening files
419 and resources.
420 Non-programming errors should not be raised, but returned as a
421 failing deferred.
422
423 The return value may be a deferred.
424 """
425 for socket, plugs in self.config['plugs'].items():
426 self.plugs[socket] = []
427 for plug in plugs:
428 entry = plug['entries']['default']
429 instance = reflectcall.reflectCall(entry['module-name'],
430 entry['function-name'],
431 plug)
432 self.plugs[socket].append(instance)
433 self.debug('Starting plug %r on socket %s',
434 instance, socket)
435 instance.start(self)
436
437
438
439 checks = common.get_all_methods(self, 'do_check', False)
440
441 def checkErrorCallback(result):
442
443
444
445
446 current = self.state.get('mood')
447 if current == moods.sad.value:
448 self.warning('Running checks made the component sad.')
449 raise errors.ComponentSetupHandledError()
450
451 checks.append(checkErrorCallback)
452 return _maybeDeferredChain(checks, self)
453
455 """
456 BaseComponent vmethod for stopping.
457 The component should do any cleanup it needs, but must not set the
458 component's mood to sleeping.
459
460 @Returns: L{twisted.internet.defer.Deferred}
461 """
462 for socket, plugs in self.plugs.items():
463 for plug in plugs:
464 self.debug('Stopping plug %r on socket %s', plug, socket)
465 plug.stop(self)
466
467 for message in self.state.get('messages'):
468
469 self.state.remove('messages', message)
470
471 if self._cpuPoller:
472 self._cpuPoller.stop()
473 self._cpuPoller = None
474 if self._memoryPoller:
475 self._memoryPoller.stop()
476 self._memoryPoller = None
477
478 if self._shutdownHook:
479 self.debug('_stoppedCallback: firing shutdown hook')
480 self._shutdownHook()
481
482
483
485 """
486 Sets up the component. Called during __init__, so be sure not
487 to raise exceptions, instead adding messages to the component
488 state.
489 """
490
491 def run_setups():
492 setups = common.get_all_methods(self, 'do_setup', False)
493 return _maybeDeferredChain(setups, self)
494
495 def setup_complete(_):
496 self.debug('setup completed')
497 self.setup_completed()
498
499 def got_error(failure):
500 txt = log.getFailureMessage(failure)
501 self.debug('got_error: %s', txt)
502 if not failure.check(errors.ComponentSetupHandledError):
503 self.warning('Setup failed: %s', txt)
504 m = messages.Error(T_(N_("Could not setup component.")),
505 debug=txt,
506 mid="component-setup-%s" % self.name)
507
508 self.addMessage(m)
509
510
511 return None
512
513 self.setMood(moods.waking)
514 self.uiState.set('start-time', time.time())
515
516 d = run_setups()
517 d.addCallbacks(setup_complete, got_error)
518
519
521 self.debug('turning happy')
522 self.setMood(moods.happy)
523
525 """
526 Set the shutdown hook for this component (replacing any previous hook).
527 When a component is stopped, then this hook will be fired.
528 """
529 self._shutdownHook = shutdownHook
530
532 """
533 Tell the component to stop.
534 The connection to the manager will be closed.
535 The job process will also finish.
536 """
537 self.debug('BaseComponent.stop')
538
539
540 self.setMood(moods.waking)
541
542
543 stops = common.get_all_methods(self, 'do_stop', True)
544 return _maybeDeferredChain(stops, self)
545
546
547
550
552 self.state.set('workerName', workerName)
553
556
561
563 """
564 Set the given mood on the component if it's different from the current
565 one.
566 """
567 current = self.state.get('mood')
568
569 if current == mood.value:
570 self.log('already in mood %r' % mood)
571 return
572 elif current == moods.sad.value:
573 self.info('tried to set mood to %r, but already sad :-(' % mood)
574 return
575
576 self.doLog(log.DEBUG, -2, 'MOOD changed to %r by caller', mood)
577 self.state.set('mood', mood.value)
578
579 if mood == moods.happy:
580 while self._happyWaits:
581 self._happyWaits.pop(0).callback(None)
582 elif mood == moods.sad:
583 while self._happyWaits:
584 self._happyWaits.pop(0).errback(errors.ComponentStartError())
585
587 """
588 Gets the mood on the component.
589
590 @rtype: int
591 """
592 return self.state.get('mood')
593
604
606 """
607 Add a message to the component.
608 If any of the messages is an error, the component will turn sad.
609
610 @type message: L{flumotion.common.messages.Message}
611 """
612 self.state.append('messages', message)
613 if message.level == messages.ERROR:
614 self.debug('error message, turning sad')
615 self.setMood(moods.sad)
616 if self._haveError:
617 self._haveError(message)
618
620 """
621 Fix properties that have been renamed from a previous version,
622 and add a warning for them.
623
624 @param properties: properties; will be modified as a result.
625 @type properties: dict
626 @param list: list of (old, new) tuples of property names.
627 @type list: list of tuple of (str, str)
628 """
629 found = []
630 for old, new in list:
631 if old in properties:
632 found.append((old, new))
633
634 if found:
635 m = messages.Warning(T_(N_(
636 "Your configuration uses deprecated properties. "
637 "Please update your configuration and correct them.\n")),
638 mid="deprecated")
639 for old, new in found:
640 m.add(T_(N_(
641 "Please rename '%s' to '%s'.\n"),
642 old, new))
643 self.debug("Setting new property '%s' to %r", new,
644 properties[old])
645 properties[new] = properties[old]
646 del properties[old]
647 self.addMessage(m)
648
650 """
651 Call a remote method on all admin client views on this component.
652
653 This gets serialized through the manager and multiplexed to all
654 admin clients, and from there on to all views connected to each
655 admin client model.
656
657 Because there can be any number of admin clients that this call
658 will go out do, it does not make sense to have one return value.
659 This function will return None always.
660 """
661 if self.medium:
662 self.medium.callRemote("adminCallRemote", methodName,
663 *args, **kwargs)
664 else:
665 self.debug('asked to adminCallRemote(%s, *%r, **%r), but '
666 'no manager.'
667 % (methodName, args, kwargs))
668
670
671 nowTime = time.time()
672 nowClock = time.clock()
673 deltaTime = nowTime - self._lastTime
674 deltaClock = nowClock - self._lastClock
675 self._lastTime = nowTime
676 self._lastClock = nowClock
677
678 if deltaClock >= 0:
679 CPU = deltaClock/deltaTime
680 self.log('latest CPU use: %r', CPU)
681 self.uiState.set('cpu-percent', CPU)
682
683 self.uiState.set('current-time', nowTime)
684
686
687
688 handle = open('/proc/%d/stat' % os.getpid())
689 line = handle.read()
690 handle.close()
691 fields = line.split()
692
693
694 vsize = int(fields[22])
695 self.log('vsize is %d', vsize)
696 self.uiState.set('virtual-size', vsize)
697