1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 worker-side objects for components
24 """
25
26 import os
27 import time
28 import socket
29
30 from twisted.internet import reactor, error, defer
31 from twisted.spread import pb
32 from twisted.python import reflect
33 from zope.interface import implements
34
35 from flumotion.configure import configure
36 from flumotion.common import interfaces, errors, log, planet, medium
37 from flumotion.common import componentui, common, messages
38 from flumotion.common import interfaces, reflectcall, debug
39 from flumotion.common.i18n import N_, gettexter
40 from flumotion.common.planet import moods
41 from flumotion.common.poller import Poller
42 from flumotion.twisted import credentials
43 from flumotion.twisted import pb as fpb
44 from flumotion.twisted.flavors import IStateCacheableListener
45
46
47 __version__ = "$Rev: 8826 $"
48 T_ = gettexter()
49
50
52 """
53 I am a client factory for a component logging in to the manager.
54 """
55 logCategory = 'component'
56 perspectiveInterface = interfaces.IComponentMedium
57
76
80
81
82
84
85 def remoteDisconnected(remoteReference):
86 if reactor.killed:
87 self.log('Connection to manager lost due to shutdown')
88 else:
89 self.warning('Lost connection to manager, '
90 'will attempt to reconnect')
91
92 def loginCallback(reference):
93 self.info("Logged in to manager")
94 self.debug("remote reference %r" % reference)
95
96 self.medium.setRemoteReference(reference)
97 reference.notifyOnDisconnect(remoteDisconnected)
98
99 def loginFailedDisconnect(failure):
100
101
102 self.debug('Login failed, reason: %s, disconnecting', failure)
103 self.disconnect()
104 return failure
105
106 def accessDeniedErrback(failure):
107 failure.trap(errors.NotAuthenticatedError)
108 self.warning('Access denied.')
109
110 def connectionRefusedErrback(failure):
111 failure.trap(error.ConnectionRefusedError)
112 self.warning('Connection to manager refused.')
113
114 def alreadyLoggedInErrback(failure):
115 failure.trap(errors.AlreadyConnectedError)
116 self.warning('Component with id %s is already logged in.',
117 self.medium.authenticator.avatarId)
118
119 def loginFailedErrback(failure):
120 self.warning('Login failed, reason: %s' % failure)
121
122 d.addCallback(loginCallback)
123 d.addErrback(loginFailedDisconnect)
124 d.addErrback(accessDeniedErrback)
125 d.addErrback(connectionRefusedErrback)
126 d.addErrback(alreadyLoggedInErrback)
127 d.addErrback(loginFailedErrback)
128
129
130
134
135
137 """
138 Creates a deferred chain created by chaining calls to the given
139 procedures, each of them made with the given args and kwargs.
140 Only the result of the last procedure is returned; results for the
141 other procedures are discarded.
142
143 Failures triggered during any of the procedure short-circuit execution
144 of the other procedures and should be handled by the errbacks attached
145 to the deferred returned here.
146
147 @rtype: L{twisted.internet.defer.Deferred}
148 """
149
150 def call_proc(_, p):
151 log.debug('', 'calling %r', p)
152 return p(*args, **kwargs)
153 if not procs:
154 return defer.succeed(None)
155 p, procs = procs[0], procs[1:]
156 d = defer.maybeDeferred(call_proc, None, p)
157 for p in procs:
158 d.addCallback(call_proc, p)
159 return d
160
161
162
163
165 """
166 I am a medium interfacing with a manager-side avatar.
167 I implement a Referenceable for the manager's avatar to call on me.
168 I have a remote reference to the manager's avatar to call upon.
169 I am created by the L{ComponentClientFactory}.
170
171 @cvar authenticator: the authenticator used to log in to manager
172 @type authenticator: L{flumotion.twisted.pb.Authenticator}
173 """
174
175 implements(interfaces.IComponentMedium)
176 logCategory = 'basecompmed'
177
179 """
180 @param component: L{flumotion.component.component.BaseComponent}
181 """
182 self.comp = component
183 self.authenticator = None
184 self.broker = None
185
189
190
191
192 - def setup(self, config):
194
196 """
197 Return the manager IP as seen by us.
198 """
199 assert self.remote or self.broker
200 broker = self.broker or self.remote.broker
201 peer = broker.transport.getPeer()
202 try:
203 host = peer.host
204 except AttributeError:
205 host = peer[1]
206
207 res = socket.gethostbyname(host)
208 self.debug("getManagerIP(): we think the manager's IP is %r" % res)
209 return res
210
212 """
213 Return the IP of this component based on connection to the manager.
214
215 Note: this is insufficient in general, and should be replaced by
216 network mapping stuff later.
217 """
218 assert self.remote
219 host = self.remote.broker.transport.getHost()
220 self.debug("getIP(): using %r as our IP", host.host)
221 return host.host
222
224 """
225 Set the authenticator the client factory has used to log in to the
226 manager. Can be reused by the component's medium to make
227 feed connections which also get authenticated by the manager's
228 bouncer.
229
230 @type authenticator: L{flumotion.twisted.pb.Authenticator}
231 """
232 self.authenticator = authenticator
233
234
235
236
238 """
239 Return the state of the component, which will be serialized to a
240 L{flumotion.common.planet.ManagerJobState} object.
241
242 @rtype: L{flumotion.common.planet.WorkerJobState}
243 @returns: state of component
244 """
245
246
247 self.comp.state.set('manager-ip', self.getManagerIP())
248 return self.comp.state
249
251 """
252 Return the configuration of the component.
253
254 @rtype: dict
255 @returns: component's current configuration
256 """
257 return self.comp.config
258
260 self.info('Stopping component')
261 return self.comp.stop()
262
267
269 """Get a WorkerComponentUIState containing details needed to
270 present an admin-side UI state
271 """
272 return self.comp.uiState
273
275 """Get mood of the component
276 """
277 return self.comp.getMood()
278
280 """
281 Base implementation of getMasterClockInfo, can be overridden by
282 subclasses. By default, just returns None.
283 """
284 return None
285
288
290 """
291 Sets the Flumotion debugging levels based on the passed debug string.
292
293 @since: 0.6.0
294 """
295 self.debug('Setting Flumotion debug level to %s' % debug)
296 log.setDebug(debug)
297
298
300 """
301 I am the base class for all Flumotion components.
302
303 @ivar name: the name of the component
304 @type name: string
305 @ivar medium: the component's medium
306 @type medium: L{BaseComponentMedium}
307 @ivar uiState: state of the component to be shown in a UI.
308 Contains at least the following keys.
309 - cpu-percent: percentage of CPU use in last interval
310 - start-time: time when component was started, in epoch
311 seconds
312 - current-time: current time in epoch seconds, as seen on
313 component's machine, which might be out of
314 sync
315 - virtual-size: virtual memory size in bytes
316 Subclasses can add additional keys for their respective UI.
317 @type uiState: L{componentui.WorkerComponentUIState}
318
319 @cvar componentMediumClass: the medium class to use for this component
320 @type componentMediumClass: child class of L{BaseComponentMedium}
321 """
322
323 logCategory = 'basecomp'
324 componentMediumClass = BaseComponentMedium
325
326 implements(IStateCacheableListener)
327
328 - def __init__(self, config, haveError=None):
329 """
330 Subclasses should not override __init__ at all.
331
332 Instead, they should implement init(), which will be called
333 by this implementation automatically.
334
335 L{flumotion.common.common.InitMixin} for more details.
336 """
337 self.debug("initializing %r with config %r", type(self), config)
338 self.config = config
339 self._haveError = haveError
340
341
342 common.InitMixin.__init__(self)
343
344 self.setup()
345
346
347
349 """
350 A subclass should do as little as possible in its init method.
351 In particular, it should not try to access resources.
352
353 Failures during init are marshalled back to the manager through
354 the worker's remote_create method, since there is no component state
355 proxied to the manager yet at the time of init.
356 """
357 self.state = planet.WorkerJobState()
358
359 self.name = self.config['name']
360
361 self.state.set('pid', os.getpid())
362 self.setMood(moods.waking)
363
364 self.medium = None
365
366 self.uiState = componentui.WorkerComponentUIState()
367 self.uiState.addKey('cpu-percent')
368 self.uiState.addKey('start-time')
369 self.uiState.addKey('current-time')
370 self.uiState.addKey('virtual-size')
371 self.uiState.addKey('total-memory')
372
373 self.uiState.addHook(self)
374
375 self.plugs = {}
376
377 self._happyWaits = []
378
379
380 self._lastTime = time.time()
381 self._lastClock = time.clock()
382 self._cpuPoller = Poller(self._pollCPU, 5, start=False)
383 self._memoryPoller = Poller(self._pollMemory, 60, start=False)
384 self._cpuPollerDC = None
385 self._memoryPollerDC = None
386 self._shutdownHook = None
387
388
389
391 """
392 Triggered when a uiState observer was added.
393
394 Default implementation is to start the memory and cpu pollers.
395
396 Note:
397 Subclasses can override me but should chain me up to start these
398 pollers
399 """
400 self.debug("observer has started watching us, starting pollers")
401 if not self._cpuPoller.running and not self._cpuPollerDC:
402 self._cpuPollerDC = reactor.callLater(0,
403 self._cpuPoller.start,
404 immediately=True)
405 if not self._memoryPoller.running and not self._memoryPollerDC:
406 self._memoryPollerDC = reactor.callLater(0,
407 self._memoryPoller.start,
408 immediately=True)
409
411 """
412 Triggered when a uiState observer has left.
413
414 Default implementation is to stop the memory and cpu pollers
415 when the total number of observers denoted by the 'num'
416 argument becomes zero.
417
418 Note:
419 Subclasses can override me but should chain me up to stop these
420 pollers
421 """
422 if num == 0:
423 self.debug("no more observers left, shutting down pollers")
424
425 if self._cpuPollerDC:
426 self._cpuPollerDC.cancel()
427 self._cpuPollerDC = None
428 if self._memoryPollerDC:
429 self._memoryPollerDC.cancel()
430 self._memoryPollerDC = None
431
432 if self._cpuPoller:
433 self._cpuPoller.stop()
434 if self._memoryPoller:
435 self._memoryPoller.stop()
436
438 """
439 Subclasses can implement me to run any checks before the component
440 performs setup.
441
442 Messages can be added to the component state's 'messages' list key.
443 Any error messages added will trigger the component going to sad,
444 with L{flumotion.common.errors.ComponentSetupError} being raised
445 before getting to setup stage; do_setup() will not be called.
446
447 In the event of a fatal problem that can't be expressed through an
448 error message, this method should raise an exception or return a
449 failure.
450
451 It is not necessary to chain up in this function. The return
452 value may be a deferred.
453 """
454 return defer.maybeDeferred(self.check_properties,
455 self.config['properties'],
456 self.addMessage)
457
459 """
460 BaseComponent convenience vmethod for running checks.
461
462 A component implementation can override this method to run any
463 checks that it needs to. Typically, a check_properties
464 implementation will call the provided addMessage() callback to
465 note warnings or errors. For errors, addMessage() will set
466 component's mood to sad, which will abort the init process
467 before getting to do_setup().
468
469 @param properties: The component's properties
470 @type properties: dict of string => object
471 @param addMessage: Thunk to add a message to the component
472 state. Will raise an exception if the
473 message is of level ERROR.
474 @type addMessage: L{flumotion.common.messages.Message} -> None
475 """
476 pass
477
479 """
480 Subclasses can implement me to set up the component before it is
481 started. It should set up the component, possibly opening files
482 and resources.
483 Non-programming errors should not be raised, but returned as a
484 failing deferred.
485
486 The return value may be a deferred.
487 """
488 plug_starts = []
489 for socket, plugs in self.config['plugs'].items():
490 self.plugs[socket] = []
491 for plug in plugs:
492 entry = plug['entries']['default']
493 instance = reflectcall.reflectCall(entry['module-name'],
494 entry['function-name'],
495 plug)
496 self.plugs[socket].append(instance)
497 self.debug('Starting plug %r on socket %s',
498 instance, socket)
499 plug_starts.append(instance.start)
500
501
502
503 checks = common.get_all_methods(self, 'do_check', False)
504
505 def checkErrorCallback(result):
506
507
508
509
510 current = self.state.get('mood')
511 if current == moods.sad.value:
512 self.warning('Running checks made the component sad.')
513 raise errors.ComponentSetupHandledError()
514
515 checks.append(checkErrorCallback)
516
517 return _maybeDeferredChain(plug_starts + checks, self)
518
520 """
521 BaseComponent vmethod for stopping.
522 The component should do any cleanup it needs, but must not set the
523 component's mood to sleeping.
524
525 @Returns: L{twisted.internet.defer.Deferred}
526 """
527 plug_stops = []
528 for socket, plugs in self.plugs.items():
529 for plug in plugs:
530 self.debug('Stopping plug %r on socket %s', plug, socket)
531 plug_stops.append(plug.stop)
532
533 for message in self.state.get('messages'):
534
535 self.state.remove('messages', message)
536
537
538 if self._cpuPollerDC:
539 self._cpuPollerDC.cancel()
540 self._cpuPollerDC = None
541 if self._memoryPollerDC:
542 self._memoryPollerDC.cancel()
543 self._memoryPollerDC = None
544
545 if self._cpuPoller:
546 self._cpuPoller.stop()
547 self._cpuPoller = None
548 if self._memoryPoller:
549 self._memoryPoller.stop()
550 self._memoryPoller = None
551
552 if self._shutdownHook:
553 self.debug('_stoppedCallback: firing shutdown hook')
554 self._shutdownHook()
555
556 return _maybeDeferredChain(plug_stops, self)
557
558
559
561 """
562 Sets up the component. Called during __init__, so be sure not
563 to raise exceptions, instead adding messages to the component
564 state.
565 """
566
567 def run_setups():
568 setups = common.get_all_methods(self, 'do_setup', False)
569 return _maybeDeferredChain(setups, self)
570
571 def setup_complete(_):
572 self.debug('setup completed')
573 self.setup_completed()
574
575 def got_error(failure):
576 txt = log.getFailureMessage(failure)
577 self.debug('got_error: %s', txt)
578 if not failure.check(errors.ComponentSetupHandledError):
579 self.warning('Setup failed: %s', txt)
580 m = messages.Error(T_(N_("Could not setup component.")),
581 debug=txt,
582 mid="component-setup-%s" % self.name)
583
584 self.addMessage(m)
585
586
587 return None
588
589 self.setMood(moods.waking)
590 self.uiState.set('start-time', time.time())
591
592 self.uiState.set('total-memory', self._getTotalMemory())
593 d = run_setups()
594 d.addCallbacks(setup_complete, got_error)
595
596
598 self.debug('turning happy')
599 self.setMood(moods.happy)
600
602 """
603 Set the shutdown hook for this component (replacing any previous hook).
604 When a component is stopped, then this hook will be fired.
605 """
606 self._shutdownHook = shutdownHook
607
609 """
610 Tell the component to stop.
611 The connection to the manager will be closed.
612 The job process will also finish.
613 """
614 self.debug('BaseComponent.stop')
615
616
617 self.setMood(moods.waking)
618
619
620 stops = common.get_all_methods(self, 'do_stop', True)
621 return _maybeDeferredChain(stops, self)
622
623
624
627
629 self.state.set('workerName', workerName)
630
633
641
643 """
644 Set the given mood on the component if it's different from the current
645 one.
646 """
647 current = self.state.get('mood')
648
649 if current == mood.value:
650 self.log('already in mood %r' % mood)
651 return
652 elif current == moods.sad.value:
653 self.info('tried to set mood to %r, but already sad :-(' % mood)
654 return
655
656 self.doLog(log.DEBUG, -2, 'MOOD changed to %r by caller', mood)
657 self.state.set('mood', mood.value)
658
659 if mood == moods.happy:
660 while self._happyWaits:
661 self._happyWaits.pop(0).callback(None)
662 elif mood == moods.sad:
663 while self._happyWaits:
664 self._happyWaits.pop(0).errback(errors.ComponentStartError())
665
667 """
668 Gets the mood on the component.
669
670 @rtype: int
671 """
672 return self.state.get('mood')
673
684
686 """
687 Add a message to the component.
688 If any of the messages is an error, the component will turn sad.
689
690 @type message: L{flumotion.common.messages.Message}
691 """
692 self.state.append('messages', message)
693 if message.level == messages.ERROR:
694 self.debug('error message, turning sad')
695 self.setMood(moods.sad)
696 if self._haveError:
697 self._haveError(message)
698
700 """
701 Add a warning messages for deprecated properties.
702
703 @param list: list of property names.
704 @type list: list of str
705 """
706 m = messages.Warning(T_(N_(
707 "Your configuration uses deprecated properties. "
708 "Please update your configuration and correct them.\n")),
709 mid="deprecated")
710 for prop in list:
711 m.add(T_(N_(
712 "Please remove '%s' property.\n"), prop))
713 self.addMessage(m)
714
716 """
717 Fix properties that have been renamed from a previous version,
718 and add a warning for them.
719
720 @param properties: properties; will be modified as a result.
721 @type properties: dict
722 @param list: list of (old, new) tuples of property names.
723 @type list: list of tuple of (str, str)
724 """
725 found = []
726 for old, new in list:
727 if old in properties:
728 found.append((old, new))
729
730 if found:
731 m = messages.Warning(T_(N_(
732 "Your configuration uses deprecated properties. "
733 "Please update your configuration and correct them.\n")),
734 mid="deprecated")
735 for old, new in found:
736 m.add(T_(N_(
737 "Please rename '%s' to '%s'.\n"),
738 old, new))
739 self.debug("Setting new property '%s' to %r", new,
740 properties[old])
741 properties[new] = properties[old]
742 del properties[old]
743 self.addMessage(m)
744
746 """
747 Call a remote method on all admin client views on this component.
748
749 This gets serialized through the manager and multiplexed to all
750 admin clients, and from there on to all views connected to each
751 admin client model.
752
753 Because there can be any number of admin clients that this call
754 will go out do, it does not make sense to have one return value.
755 This function will return None always.
756 """
757 if self.medium:
758 self.medium.callRemote("adminCallRemote", methodName,
759 *args, **kwargs)
760 else:
761 self.debug('asked to adminCallRemote(%s, *%r, **%r), but '
762 'no manager.'
763 % (methodName, args, kwargs))
764
766 try:
767 namespace = plug.get_namespace()
768 except AttributeError:
769 self.debug("Plug %r does not provide namespace, "
770 "its interface will not be exposed", plug)
771 return
772
773 self.debug("Exposing plug's %r interface in namespace %r",
774 plug, namespace)
775 for method in filter(callable,
776 [getattr(plug, m) for m in dir(plug)
777 if m.startswith('remote_')]):
778 if namespace:
779 name = "".join(("remote_", namespace, "_",
780 method.__name__[len("remote_"):]))
781 else:
782 name = method.__name__
783 self.debug("Exposing method %r as %r in %r", method, name, medium)
784 setattr(medium, name, method)
785
787 self._cpuPollerDC = None
788
789 nowTime = time.time()
790 nowClock = time.clock()
791 deltaTime = nowTime - self._lastTime
792 deltaClock = nowClock - self._lastClock
793 self._lastTime = nowTime
794 self._lastClock = nowClock
795
796 if deltaClock >= 0:
797 CPU = deltaClock/deltaTime
798 self.log('latest CPU use: %r', CPU)
799 self.uiState.set('cpu-percent', CPU)
800
801 self.uiState.set('current-time', nowTime)
802
804 self._memoryPollerDC = None
805
806
807 handle = open('/proc/%d/stat' % os.getpid())
808 line = handle.read()
809 handle.close()
810 fields = line.split()
811
812
813 vsize = int(fields[22])
814 self.log('vsize is %d', vsize)
815 self.uiState.set('virtual-size', vsize)
816
818 f = open("/proc/meminfo")
819 memtotal = f.readline()
820 f.close()
821 return int(memtotal[memtotal.index(":") + 1: -3]) * 1024
822