1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Feed components, participating in the stream
24 """
25
26 import os
27
28 import gst
29 import gst.interfaces
30 import gobject
31
32 from twisted.internet import reactor, defer
33 from twisted.spread import pb
34 from zope.interface import implements
35
36 from flumotion.configure import configure
37 from flumotion.component import component as basecomponent
38 from flumotion.component import feed
39 from flumotion.common import common, interfaces, errors, log, pygobject, \
40 messages
41 from flumotion.common import gstreamer
42 from flumotion.common.i18n import N_, gettexter
43 from flumotion.common.planet import moods
44 from flumotion.common.pygobject import gsignal
45
46 __version__ = "$Rev: 8808 $"
47 T_ = gettexter()
48
49
51 """
52 I am a component-side medium for a FeedComponent to interface with
53 the manager-side ComponentAvatar.
54 """
55 implements(interfaces.IComponentMedium)
56 logCategory = 'feedcompmed'
57 remoteLogName = 'feedserver'
58
60 """
61 @param component: L{flumotion.component.feedcomponent.FeedComponent}
62 """
63 basecomponent.BaseComponentMedium.__init__(self, component)
64
65 self._feederFeedServer = {}
66
67 self._feederPendingConnections = {}
68 self._eaterFeedServer = {}
69
70 self._eaterPendingConnections = {}
71 self.logName = component.name
72
73
74
77
79 """
80 Sets the GStreamer debugging levels based on the passed debug string.
81
82 @since: 0.4.2
83 """
84 self.debug('Setting GStreamer debug level to %s' % debug)
85 if not debug:
86 return
87
88 for part in debug.split(','):
89 glob = None
90 value = None
91 pair = part.split(':')
92 if len(pair) == 1:
93
94 value = int(pair[0])
95 elif len(pair) == 2:
96 glob, value = pair
97 value = int(value)
98 else:
99 self.warning("Cannot parse GStreamer debug setting '%s'." %
100 part)
101 continue
102
103 if glob:
104 try:
105
106 gst.debug_set_threshold_for_name(glob, value)
107 except TypeError:
108 self.warning("Cannot set glob %s to value %s" % (
109 glob, value))
110 else:
111 gst.debug_set_default_threshold(value)
112
114 """
115 Tell the component the host and port for the FeedServer through which
116 it can connect a local eater to a remote feeder to eat the given
117 fullFeedId.
118
119 Called on by the manager-side ComponentAvatar.
120 """
121 if self._feederFeedServer.get(eaterAlias):
122 if self._feederFeedServer[eaterAlias] == (fullFeedId, host, port):
123 self.debug("Feed:%r is the same as the current one. "\
124 "Request ignored.", (fullFeedId, host, port))
125 return
126 self._feederFeedServer[eaterAlias] = (fullFeedId, host, port)
127 return self.connectEater(eaterAlias)
128
141
143 """
144 Connect one of the medium's component's eaters to a remote feed.
145 Called by the component, both on initial connection and for
146 reconnecting.
147
148 @returns: deferred that will fire with a value of None
149 """
150
151
152 def gotFeed((feedId, fd)):
153 self._feederPendingConnections.pop(eaterAlias, None)
154 self.comp.eatFromFD(eaterAlias, feedId, fd)
155
156 if eaterAlias not in self._feederFeedServer:
157 self.debug("eatFrom() hasn't been called yet for eater %s",
158 eaterAlias)
159
160
161 return defer.succeed(None)
162
163 (fullFeedId, host, port) = self._feederFeedServer[eaterAlias]
164
165 cancel = self._feederPendingConnections.pop(eaterAlias, None)
166 if cancel:
167 self.debug('cancelling previous connection attempt on %s',
168 eaterAlias)
169 cancel()
170
171 client = feed.FeedMedium(logName=self.comp.name)
172
173 d = client.requestFeed(host, port,
174 self._getAuthenticatorForFeed(eaterAlias),
175 fullFeedId)
176 self._feederPendingConnections[eaterAlias] = client.stopConnecting
177 d.addCallback(gotFeed)
178 return d
179
181 """
182 Tell the component to feed the given feed to the receiving component
183 accessible through the FeedServer on the given host and port.
184
185 Called on by the manager-side ComponentAvatar.
186 """
187 self._eaterFeedServer[fullFeedId] = (host, port)
188 self.connectFeeder(feederName, fullFeedId)
189
191 """
192 Tell the component to feed the given feed to the receiving component
193 accessible through the FeedServer on the given host and port.
194
195 Called on by the manager-side ComponentAvatar.
196 """
197
198 def gotFeed((fullFeedId, fd)):
199 self._eaterPendingConnections.pop(feederName, None)
200 self.comp.feedToFD(feederName, fd, os.close, fullFeedId)
201
202 if fullFeedId not in self._eaterFeedServer:
203 self.debug("feedTo() hasn't been called yet for feeder %s",
204 feederName)
205
206
207 return defer.succeed(None)
208
209 host, port = self._eaterFeedServer[fullFeedId]
210
211
212 cancel = self._eaterPendingConnections.pop(fullFeedId, None)
213 if cancel:
214 self.debug('cancelling previous connection attempt on %s',
215 feederName)
216 cancel()
217
218 client = feed.FeedMedium(logName=self.comp.name)
219
220 d = client.sendFeed(host, port,
221 self._getAuthenticatorForFeed(feederName),
222 fullFeedId)
223 self._eaterPendingConnections[feederName] = client.stopConnecting
224 d.addCallback(gotFeed)
225 return d
226
228 """
229 Tells the component to start providing a master clock on the given
230 UDP port.
231 Can only be called if setup() has been called on the component.
232
233 The IP address returned is the local IP the clock is listening on.
234
235 @returns: (ip, port, base_time)
236 @rtype: tuple of (str, int, long)
237 """
238 self.debug('remote_provideMasterClock(port=%r)' % port)
239 return self.comp.provide_master_clock(port)
240
242 """
243 Return the clock master info created by a previous call
244 to provideMasterClock.
245
246 @returns: (ip, port, base_time)
247 @rtype: tuple of (str, int, long)
248 """
249 return self.comp.get_master_clock()
250
253
254 - def remote_effect(self, effectName, methodName, *args, **kwargs):
255 """
256 Invoke the given methodName on the given effectName in this component.
257 The effect should implement effect_(methodName) to receive the call.
258 """
259 self.debug("calling %s on effect %s" % (methodName, effectName))
260 if not effectName in self.comp.effects:
261 raise errors.UnknownEffectError(effectName)
262 effect = self.comp.effects[effectName]
263 if not hasattr(effect, "effect_%s" % methodName):
264 raise errors.NoMethodError("%s on effect %s" % (methodName,
265 effectName))
266 method = getattr(effect, "effect_%s" % methodName)
267 try:
268 result = method(*args, **kwargs)
269 except TypeError:
270 msg = "effect method %s did not accept %s and %s" % (
271 methodName, args, kwargs)
272 self.debug(msg)
273 raise errors.RemoteRunError(msg)
274 self.debug("effect: result: %r" % result)
275 return result
276
279
280 from feedcomponent010 import FeedComponent
281
282 FeedComponent.componentMediumClass = FeedComponentMedium
283
284
286 """A component using gst-launch syntax
287
288 @cvar checkTimestamp: whether to check continuity of timestamps for eaters
289 @cvar checkOffset: whether to check continuity of offsets for
290 eaters
291 """
292
293 DELIMITER = '@'
294
295
296 checkTimestamp = False
297 checkOffset = False
298
299
300 FDSRC_TMPL = 'fdsrc name=%(name)s'
301 DEPAY_TMPL = 'gdpdepay name=%(name)s-depay'
302 FEEDER_TMPL = 'gdppay name=%(name)s-pay ! multifdsink sync=false '\
303 'name=%(name)s buffers-max=500 buffers-soft-max=450 '\
304 'recover-policy=1'
305 EATER_TMPL = None
306
326
327
328
354
363
364
365
367 """
368 Method that must be implemented by subclasses to produce the
369 gstparse string for the component's pipeline. Subclasses should
370 not chain up; this method raises a NotImplemented error.
371
372 Returns: a new pipeline string representation.
373 """
374 raise NotImplementedError('subclasses should implement '
375 'get_pipeline_string')
376
386
387
388
399
401 """
402 Expand the given pipeline string representation by substituting
403 blocks between '@' with a filled-in template.
404
405 @param pipeline: a pipeline string representation with variables
406 @param templatizers: A dict of prefix => procedure. Template
407 blocks in the pipeline will be replaced
408 with the result of calling the procedure
409 with what is left of the template after
410 taking off the prefix.
411 @returns: a new pipeline string representation.
412 """
413 assert pipeline != ''
414
415
416 if pipeline.count(self.DELIMITER) % 2 != 0:
417 raise TypeError("'%s' contains an odd number of '%s'"
418 % (pipeline, self.DELIMITER))
419
420 out = []
421 for i, block in enumerate(pipeline.split(self.DELIMITER)):
422
423
424 if i % 2 == 0:
425 out.append(block)
426 else:
427 block = block.strip()
428 try:
429 pos = block.index(':')
430 except ValueError:
431 raise TypeError("Template %r has no colon" % (block, ))
432 prefix = block[:pos+1]
433 if prefix not in templatizers:
434 raise TypeError("Template %r has invalid prefix %r"
435 % (block, prefix))
436 out.append(templatizers[prefix](block[pos+1:]))
437 return ''.join(out)
438
460
462 queue = self.get_queue_string(eaterAlias)
463 elementName = self.eaters[eaterAlias].elementName
464
465 return self.EATER_TMPL % {'name': elementName, 'queue': queue}
466
468 elementName = self.feeders[feederName].elementName
469 return self.FEEDER_TMPL % {'name': elementName}
470
472 """
473 Return a parse-launch string to join the fdsrc eater element and
474 the depayer, for example '!' or '! queue !'. The string may have
475 no format strings.
476 """
477 return '!'
478
480 """
481 Method that returns the source pad of the final element in an eater.
482
483 @returns: the GStreamer source pad of the final element in an eater
484 @rtype: L{gst.Pad}
485 """
486 e = self.eaters[eaterAlias]
487 identity = self.get_element(e.elementName + '-identity')
488 depay = self.get_element(e.depayName)
489 srcpad = depay.get_pad("src")
490 if identity:
491 srcpad = identity.get_pad("src")
492 return srcpad
493
494
496 """
497 I am a part of a feed component for a specific group
498 of functionality.
499
500 @ivar name: name of the effect
501 @type name: string
502 @ivar component: component owning the effect
503 @type component: L{FeedComponent}
504 """
505 logCategory = "effect"
506
508 """
509 @param name: the name of the effect
510 """
511 self.name = name
512 self.setComponent(None)
513
515 """
516 Set the given component as the effect's owner.
517
518 @param component: the component to set as an owner of this effect
519 @type component: L{FeedComponent}
520 """
521 self.component = component
522 self.setUIState(component and component.uiState or None)
523
525 """
526 Set the given UI state on the effect. This method is ideal for
527 adding keys to the UI state.
528
529 @param state: the UI state for the component to use.
530 @type state: L{flumotion.common.componentui.WorkerComponentUIState}
531 """
532 self.uiState = state
533
535 """
536 Get the component owning this effect.
537
538 @rtype: L{FeedComponent}
539 """
540 return self.component
541
542
543 -class PostProcEffect (Effect):
544 """
545 I am an effect that is plugged in the pipeline to do a post processing
546 job and can be chained to other effect of the same class.
547
548 @ivar name: name of the effect
549 @type name: string
550 @ivar component: component owning the effect
551 @type component: L{FeedComponent}
552 @ivar sourcePad: pad of the source after which I'm plugged
553 @type sourcePad: L{GstPad}
554 @ivar effectBin: gstreamer bin doing the post processing effect
555 @type source: L{GstBin}
556 @ivar pipeline: pipeline holding the gstreamer elements
557 @type pipeline: L{GstPipeline}
558
559 """
560 logCategory = "effect"
561
562 - def __init__(self, name, sourcePad, effectBin, pipeline):
563 """
564 @param name: the name of the effect
565 @param sourcePad: pad of the source after which I'm plugged
566 @param effectBin: gstreamer bin doing the post processing effect
567 @param pipeline: pipeline holding the gstreamer elements
568 """
569 Effect.__init__(self, name)
570 self.sourcePad = sourcePad
571 self.effectBin = effectBin
572 self.pipeline = pipeline
573 self.plugged = False
574
576 """
577 Plug the effect in the pipeline unlinking the source element with it's
578 downstream peer
579 """
580 if self.plugged == True:
581 return
582
583
584 peerSinkPad = self.sourcePad
585 peerSrcPad = peerSinkPad.get_peer()
586 peerSinkPad.unlink(peerSrcPad)
587
588
589 self.effectBin.set_state(gst.STATE_PLAYING)
590 self.pipeline.add(self.effectBin)
591
592
593 peerSinkPad.link(self.effectBin.get_pad('sink'))
594 self.effectBin.get_pad('src').link(peerSrcPad)
595 self.plugged = True
596
597
671
672 signalid = queue.connect("underrun", _underrun_cb)
673
674
676
677 disconnectedPads = False
678
680 """Should be overrided by subclasses to provide the pipeline the
681 component uses.
682 """
683 return ""
684
686 self.EATER_TMPL += ' ! queue name=input-%(name)s'
687 self._reset_count = 0
688
689 self.uiState.addKey('reset-count', 0)
690
694
695
696
698 return [self.get_element(f.elementName + '-pay')
699 for f in self.feeders.values()]
700
704
706 raise NotImplementedError('Subclasses should implement '
707 'get_base_pipeline_string')
708
710 e = self.eaters[eaterAlias]
711 inputq = self.get_element('input-' + e.elementName)
712 return inputq.get_pad('src')
713
714
715
717 """
718 Add the event probes that will check for the flumotion-reset event.
719
720 Those will trigger the pipeline's blocking and posterior reload
721 """
722
723
724 def input_reset_event(pad, event):
725 if event.type != gst.EVENT_CUSTOM_DOWNSTREAM:
726 return True
727 if event.get_structure().get_name() != 'flumotion-reset':
728 return True
729 if self.disconnectedPads:
730 return False
731
732 self.log('RESET: in reset event received on input pad %r', pad)
733 self._reset_count = len(self.feeders)
734
735
736
737 self._block_eaters()
738
739
740 return False
741
742 def output_reset_event(pad, event):
743 if event.type != gst.EVENT_EOS:
744 return True
745
746 self.log('RESET: out reset event received on output pad %r', pad)
747
748
749
750
751
752
753 self._reset_count -= 1
754 if self._reset_count > 0:
755 return False
756
757 self._send_reset_event()
758 reactor.callFromThread(self._on_pipeline_drained)
759
760 return False
761
762 self.log('RESET: installing event probes for detecting changes')
763
764 for elem in self.get_input_elements():
765 self.debug('RESET: adding event probe for %s', elem.get_name())
766 elem.get_pad('sink').add_event_probe(input_reset_event)
767
768 for elem in self.get_output_elements():
769 self.debug('RESET: adding event probe for %s', elem.get_name())
770 elem.get_pad('sink').add_event_probe(output_reset_event)
771
773 """
774 Function that blocks all the identities of the eaters
775 """
776 for elem in self.get_input_elements():
777 pad = elem.get_pad('src')
778 self.debug("RESET: Blocking pad %s", pad)
779 pad.set_blocked_async(True, self._on_eater_blocked)
780
786
788 event = gst.event_new_custom(gst.EVENT_CUSTOM_DOWNSTREAM,
789 gst.Structure('flumotion-reset'))
790
791 for elem in self.get_output_elements():
792 pad = elem.get_pad('sink')
793 pad.send_event(event)
794
796 for pad in element.pads():
797 ppad = pad.get_peer()
798 if not ppad:
799 continue
800 if (pad.get_direction() in directions and
801 pad.get_direction() == gst.PAD_SINK):
802 self.debug('RESET: unlink %s with %s', pad, ppad)
803 ppad.unlink(pad)
804 elif (pad.get_direction() in directions and
805 pad.get_direction() == gst.PAD_SRC):
806 self.debug('RESET: unlink %s with %s', pad, ppad)
807 pad.unlink(ppad)
808
810 if done is None:
811 done = []
812 if not element:
813 return
814 if element in done:
815 return
816 if element in end:
817 return
818
819 for src in element.src_pads():
820 self.log('going to start by pad %r', src)
821 if not src.get_peer():
822 continue
823 peer = src.get_peer().get_parent()
824 self._remove_pipeline(pipeline, peer, end, done)
825 done.append(peer)
826 element.unlink(peer)
827
828 self.log("RESET: removing old element %s from pipeline", element)
829 element.set_state(gst.STATE_NULL)
830 pipeline.remove(element)
831
833
834
835
836
837 self.log('RESET: Going to rebuild the pipeline')
838
839 base_pipe = self._get_base_pipeline_string()
840
841
842
843 fake_pipeline = 'fakesrc name=start ! %s' % base_pipe
844 pipeline = gst.parse_launch(fake_pipeline)
845
846 def move_element(element, orig, dest):
847 if not element:
848 return
849 if element in done:
850 return
851
852 to_link = []
853 done.append(element)
854 self.log("RESET: going to remove %s", element)
855 for src in element.src_pads():
856 self.log("RESET: got src pad element %s", src)
857 if not src.get_peer():
858 continue
859 peer = src.get_peer().get_parent()
860 to_link.append(peer)
861
862 move_element(to_link[-1], orig, dest)
863
864 self._unlink_pads(element, [gst.PAD_SRC, gst.PAD_SINK])
865 orig.remove(element)
866 dest.add(element)
867
868 self.log("RESET: new element %s added to the pipeline", element)
869 for peer in to_link:
870 self.log("RESET: linking peers %s -> %s", element, peer)
871 element.link(peer)
872
873 done = []
874 start = pipeline.get_by_name('start').get_pad('src').get_peer()
875 move_element(start.get_parent(), pipeline, self.pipeline)
876
877
878
879
880
881
882 if len(self.get_input_elements()) == 1:
883 elem = self.get_input_elements()[0]
884 self.log("RESET: linking eater %r to %r", elem, done[0])
885 elem.link(done[0])
886
887
888 if len(self.get_output_elements()) == 1:
889 elem = self.get_output_elements()[0]
890 self.log("RESET: linking %r to feeder %r", done[-1], elem)
891 done[-1].link(elem)
892
893 self.configure_pipeline(self.pipeline, self.config['properties'])
894 self.pipeline.set_state(gst.STATE_PLAYING)
895 self._unblock_eaters()
896
897 resets = self.uiState.get('reset-count')
898 self.uiState.set('reset-count', resets+1)
899
900
901
903 self.log("RESET: Pad %s %s", pad,
904 (blocked and "blocked") or "unblocked")
905
907 self._on_pad_blocked(pad, blocked)
908 if blocked:
909 peer = pad.get_peer()
910 peer.send_event(gst.event_new_eos())
911
912
922
923
925 """
926 Component that is reconfigured when new changes arrive through the
927 flumotion-reset event (sent by the fms producer).
928 """
929 pass
930
931
933 """
934 This class provides for multi-input ParseLaunchComponents, such as muxers,
935 that handle flumotion-reset events for reconfiguration.
936 """
937
938 LINK_MUXER = False
939
941 return muxer.get_compatible_pad(srcpad, caps)
942
982
983 for e in self.eaters:
984 depay = self.get_element(self.eaters[e].depayName)
985 self._probes[depay] = \
986 depay.get_pad("src").add_buffer_probe(
987 buffer_probe_cb, depay, e)
988
990 if is_blocked:
991 self.fired_eaters = self.fired_eaters + 1
992 if self.fired_eaters == len(self.eaters):
993 self.debug("All pads are now blocked")
994 self.disconnectedPads = False
995 for e in self.eaters:
996 srcpad = self.get_eater_srcpad(e)
997 srcpad.set_blocked_async(False, self.is_blocked_cb)
998