1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import os
23 import random
24 import socket
25 import string
26 import time
27 from urllib2 import urlparse
28
29 from twisted.cred import portal
30 from twisted.internet import protocol, reactor, address, error, defer
31 from twisted.spread import pb
32 from zope.interface import implements
33
34 from flumotion.common import medium, log, messages, errors
35 from flumotion.common.i18n import N_, gettexter
36 from flumotion.component import component
37 from flumotion.component.component import moods
38 from flumotion.twisted import credentials, fdserver, checkers
39 from flumotion.twisted import reflect
40
41 __version__ = "$Rev: 7983 $"
42 T_ = gettexter()
43
44
46 """
47 An Avatar in the porter representing a streamer
48 """
49
50 - def __init__(self, avatarId, porter, mind):
57
59 return self.mind != None
60
62 self.debug("porter client %s logging out", self.avatarId)
63 self.mind = None
64
66 self.log("Perspective called: registering path \"%s\"" % path)
67 self.porter.registerPath(path, self)
68
72
76
80
81
83 """
84 A Realm within the Porter that creates Avatars for streamers logging into
85 the porter.
86 """
87 implements(portal.IRealm)
88
90 """
91 @param porter: The porter that avatars created from here should use.
92 @type porter: L{Porter}
93 """
94 self.porter = porter
95
104
105
107
109 """
110 Return the location, login username/password, and listening port
111 and interface for the porter as a tuple (path, username,
112 password, port, interface).
113 """
114 return (self.comp._socketPath, self.comp._username,
115 self.comp._password, self.comp._iptablesPort,
116 self.comp._interface)
117
118
119 -class Porter(component.BaseComponent, log.Loggable):
120 """
121 The porter optionally sits in front of a set of streamer components.
122 The porter is what actually deals with incoming connections on a socket.
123 It decides which streamer to direct the connection to, then passes the FD
124 (along with some amount of already-read data) to the appropriate streamer.
125 """
126
127 componentMediumClass = PorterMedium
128
130
131
132 self._mappings = {}
133 self._prefixes = {}
134
135 self._socketlistener = None
136
137 self._socketPath = None
138 self._username = None
139 self._password = None
140 self._port = None
141 self._iptablesPort = None
142 self._porterProtocol = None
143
144 self._interface = ''
145
147 """
148 Register a path as being served by a streamer represented by this
149 avatar. Will remove any previous registration at this path.
150
151 @param path: The path to register
152 @type path: str
153 @param avatar: The avatar representing the streamer to direct this path
154 to
155 @type avatar: L{PorterAvatar}
156 """
157 self.debug("Registering porter path \"%s\" to %r" % (path, avatar))
158 if path in self._mappings:
159 self.warning("Replacing existing mapping for path \"%s\"" % path)
160
161 self._mappings[path] = avatar
162
164 """
165 Attempt to deregister the given path. A deregistration will only be
166 accepted if the mapping is to the avatar passed.
167
168 @param path: The path to deregister
169 @type path: str
170 @param avatar: The avatar representing the streamer being deregistered
171 @type avatar: L{PorterAvatar}
172 """
173 if path in self._mappings:
174 if self._mappings[path] == avatar:
175 self.debug("Removing porter mapping for \"%s\"" % path)
176 del self._mappings[path]
177 else:
178 self.warning(
179 "Mapping not removed: refers to a different avatar")
180 else:
181 self.warning("Mapping not removed: no mapping found")
182
184 """
185 Register a destination for all requests directed to anything beginning
186 with a specified prefix. Where there are multiple matching prefixes,
187 the longest is selected.
188
189 @param avatar: The avatar being registered
190 @type avatar: L{PorterAvatar}
191 """
192
193 self.debug("Setting prefix \"%s\" for porter", prefix)
194 if prefix in self._prefixes:
195 self.warning("Overwriting prefix")
196
197 self._prefixes[prefix] = avatar
198
200 """
201 Attempt to deregister a default destination for all requests not
202 directed to a specifically-mapped path. This will only succeed if the
203 default is currently equal to this avatar.
204
205 @param avatar: The avatar being deregistered
206 @type avatar: L{PorterAvatar}
207 """
208 if prefix not in self._prefixes:
209 self.warning("Mapping not removed: no mapping found")
210 return
211
212 if self._prefixes[prefix] == avatar:
213 self.debug("Removing prefix destination from porter")
214 del self._prefixes[prefix]
215 else:
216 self.warning(
217 "Not removing prefix destination: expected avatar not found")
218
220 found = None
221
222 for prefix in self._prefixes.keys():
223 self.log("Checking: %r, %r" % (prefix, path))
224 if (path.startswith(prefix) and
225 (not found or len(found) < len(prefix))):
226 found = prefix
227 if found:
228 return self._prefixes[found]
229 else:
230 return None
231
233 """
234 Find a destination Avatar for this path.
235 @returns: The Avatar for this mapping, or None.
236 """
237
238 if path in self._mappings:
239 return self._mappings[path]
240 else:
241 return self.findPrefixMatch(path)
242
244 """
245 Generate a socket pathname in an appropriate location
246 """
247
248
249 import tempfile
250 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.porter.')
251 os.close(fd)
252
253 return name
254
256 """
257 Generate a random US-ASCII string of length numchars
258 """
259 string = ""
260 chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
261 for _ in range(numchars):
262 string += chars[random.randint(0, len(chars) - 1)]
263
264 return string
265
267 props = self.config['properties']
268
269 self.fixRenamedProperties(props,
270 [('socket_path', 'socket-path')])
271
272
273
274
275
276 if 'socket-path' in props:
277
278 self._socketPath = props['socket-path']
279 self._username = props['username']
280 self._password = props['password']
281 else:
282
283
284 self._username = self.generateRandomString(12)
285 self._password = self.generateRandomString(12)
286 self._socketPath = self.generateSocketPath()
287
288 self._port = int(props['port'])
289 self._iptablesPort = int(props.get('iptables-port', self._port))
290 self._porterProtocol = props.get('protocol',
291 'flumotion.component.misc.porter.porter.HTTPPorterProtocol')
292 self._interface = props.get('interface', '')
293
295 d = None
296 if self._socketlistener:
297
298
299 d = self._socketlistener.stopListening()
300 self._socketlistener = None
301 return d
302
304
305 self.have_properties()
306 realm = PorterRealm(self)
307 checker = checkers.FlexibleCredentialsChecker()
308 checker.addUser(self._username, self._password)
309
310 p = portal.Portal(realm, [checker])
311 serverfactory = pb.PBServerFactory(p)
312
313 try:
314
315
316
317 try:
318 os.unlink(self._socketPath)
319 except OSError:
320 pass
321
322 self._socketlistener = reactor.listenWith(
323 fdserver.FDPort, self._socketPath, serverfactory)
324 self.debug("Now listening on socketPath %s" % self._socketPath)
325 except error.CannotListenError, e:
326 self.warning("Failed to create socket %s" % self._socketPath)
327 m = messages.Error(T_(N_(
328 "Network error: socket path %s is not available."),
329 self._socketPath))
330 self.addMessage(m)
331 self.setMood(moods.sad)
332 return defer.fail(errors.ComponentSetupHandledError())
333
334
335
336 try:
337 proto = reflect.namedAny(self._porterProtocol)
338 self.debug("Created proto %r" % proto)
339 except (ImportError, AttributeError):
340 self.warning("Failed to import protocol '%s', defaulting to HTTP" %
341 self._porterProtocol)
342 proto = HTTPPorterProtocol
343
344
345
346 factory = PorterProtocolFactory(self, proto)
347 try:
348 reactor.listenWith(
349 fdserver.PassableServerPort, self._port, factory,
350 interface=self._interface)
351 self.debug("Now listening on port %d" % self._port)
352 except error.CannotListenError, e:
353 self.warning("Failed to listen on port %d" % self._port)
354 m = messages.Error(T_(N_(
355 "Network error: TCP port %d is not available."), self._port))
356 self.addMessage(m)
357 self.setMood(moods.sad)
358 return defer.fail(errors.ComponentSetupHandledError())
359
360
362
364 self._porter = porter
365 self.protocol = protocol
366
368 p = self.protocol(self._porter)
369 p.factory = self
370 return p
371
372
374 """
375 The base porter is capable of accepting HTTP-like protocols (including
376 RTSP) - it reads the first line of a request, and makes the decision
377 solely on that.
378
379 We can't guarantee that we read precisely a line, so the buffer we
380 accumulate will actually be larger than what we actually parse.
381
382 @cvar MAX_SIZE: the maximum number of bytes allowed for the first line
383 @cvar delimiters: a list of valid line delimiters I check for
384 """
385
386 logCategory = 'porterprotocol'
387
388
389 MAX_SIZE = 4096
390
391
392
393 PORTER_CLIENT_TIMEOUT = 30
394
395
396
397
398
399 delimiters = ['\r\n', '\n', '\r']
400
408
410
411 self.requestId = self.generateRequestId()
412
413 self.debug("[fd %5d] (ts %f) (request-id %r) accepted connection",
414 self.transport.fileno(), time.time(), self.requestId)
415
416 protocol.Protocol.connectionMade(self)
417
419 self._timeoutDC = None
420 self.debug("Timing out porter client after %d seconds",
421 self.PORTER_CLIENT_TIMEOUT)
422 self.transport.loseConnection()
423
425 if self._timeoutDC:
426 self._timeoutDC.cancel()
427 self._timeoutDC = None
428
430 self._buffer = self._buffer + data
431 self.log("Got data, buffer now \"%s\"" % self._buffer)
432
433
434 for delim in self.delimiters:
435 try:
436 line, remaining = self._buffer.split(delim, 1)
437 break
438 except ValueError:
439
440 pass
441 else:
442
443 self.log("No valid delimiter found")
444 if len(self._buffer) > self.MAX_SIZE:
445
446
447 self.debug("[fd %5d] (ts %f) (request-id %r) dropping, "
448 "buffer exceeded",
449 self.transport.fileno(), time.time(),
450 self.requestId)
451
452 return self.transport.loseConnection()
453 else:
454
455
456 return
457
458
459
460 parsed = self.parseLine(line)
461 if not parsed:
462 self.log("Couldn't parse the first line")
463 return self.transport.loseConnection()
464
465 identifier = self.extractIdentifier(parsed)
466 if not identifier:
467 self.log("Couldn't find identifier in first line")
468 return self.transport.loseConnection()
469
470 if self.requestId:
471 self.log("Injecting request-id %r", self.requestId)
472 parsed = self.injectRequestId(parsed, self.requestId)
473
474
475
476
477 self._buffer = delim.join((self.unparseLine(parsed), remaining))
478
479
480 self.debug("[fd %5d] (ts %f) (request-id %r) identifier %s",
481 self.transport.fileno(), time.time(), self.requestId,
482 identifier)
483
484
485
486 destinationAvatar = self._porter.findDestination(identifier)
487
488 if not destinationAvatar or not destinationAvatar.isAttached():
489 if destinationAvatar:
490 self.debug("There was an avatar, but it logged out?")
491
492
493 self.debug(
494 "[fd %5d] (ts %f) (request-id %r) no destination avatar found",
495 self.transport.fileno(), time.time(), self.requestId)
496
497 self.writeNotFoundResponse()
498 return self.transport.loseConnection()
499
500
501
502
503
504
505
506
507 self.debug("[fd %5d] (ts %f) (request-id %r) send fd to avatarId %s",
508 self.transport.fileno(), time.time(), self.requestId,
509 destinationAvatar.avatarId)
510
511
512
513 try:
514 destinationAvatar.mind.broker.transport.sendFileDescriptor(
515 self.transport.fileno(), self._buffer)
516 except OSError, e:
517 self.warning("[fd %5d] failed to send FD: %s",
518 self.transport.fileno(), log.getExceptionMessage(e))
519 self.writeServiceUnavailableResponse()
520 return self.transport.loseConnection()
521
522
523 self.debug("[fd %5d] (ts %f) (request-id %r) sent fd to avatarId %s",
524 self.transport.fileno(), time.time(), self.requestId,
525 destinationAvatar.avatarId)
526
527
528
529
530
531 self.transport.keepSocketAlive = True
532 self.transport.loseConnection()
533
535 """
536 Parse the initial line of the request. Return an object that can be
537 used to uniquely identify the stream being requested by passing it to
538 extractIdentifier, or None if the request is unreadable.
539
540 Subclasses should override this.
541 """
542 raise NotImplementedError
543
545 """
546 Recreate the initial request line from the parsed representation. The
547 recreated line does not need to be exactly identical, but both
548 parsedLine(unparseLine(line)) and line should contain the same
549 information (i.e. unparseLine should not lose information).
550
551 UnparseLine has to return a valid line from the porter protocol's
552 scheme point of view (for instance, HTTP).
553
554 Subclasses should override this.
555 """
556 raise NotImplementedError
557
559 """
560 Extract a string that uniquely identifies the requested stream from the
561 parsed representation of the first request line.
562
563 Subclasses should override this, depending on how they implemented
564 parseLine.
565 """
566 raise NotImplementedError
567
569 """
570 Return a string that will uniquely identify the request.
571
572 Subclasses should override this if they want to use request-ids and
573 also implement injectRequestId.
574 """
575 raise NotImplementedError
576
578 """
579 Take the parsed representation of the first request line and a string
580 token, return a parsed representation of the request line with the
581 request-id possibly mixed into it.
582
583 Subclasses should override this if they generate request-ids.
584 """
585
586 return parsed
587
589 """
590 Write a response indicating that the requested resource was not found
591 in this protocol.
592
593 Subclasses should override this to use the correct protocol.
594 """
595 raise NotImplementedError
596
598 """
599 Write a response indicating that the requested resource was
600 temporarily uavailable in this protocol.
601
602 Subclasses should override this to use the correct protocol.
603 """
604 raise NotImplementedError
605
606
608 scheme = 'http'
609 protos = ["HTTP/1.0", "HTTP/1.1"]
610 requestIdParameter = 'FLUREQID'
611 requestIdBitsNo = 256
612
614 try:
615 (method, location, proto) = map(string.strip, line.split(' ', 2))
616
617 if proto not in self.protos:
618 return None
619
620
621 parsed_url = urlparse.urlparse(location)
622
623 return method, parsed_url, proto
624
625 except ValueError:
626 return None
627
629 method, parsed_url, proto = parsed
630 return ' '.join((method, urlparse.urlunparse(parsed_url), proto))
631
637
639 method, parsed_url, proto = parsed
640
641 sep = ''
642 if parsed_url[4] != '':
643 sep = '&'
644 query_string = ''.join((parsed_url[4],
645 sep, self.requestIdParameter, '=',
646 requestId))
647 parsed_url = (parsed_url[:4] +
648 (query_string, )
649 + parsed_url[5:])
650 return method, parsed_url, proto
651
653 method, parsed_url, proto = parsed
654
655 return parsed_url[2]
656
658 self.transport.write("HTTP/1.0 404 Not Found\r\n\r\nResource unknown")
659
661 self.transport.write("HTTP/1.0 503 Service Unavailable\r\n\r\n"
662 "Service temporarily unavailable")
663
664
666 scheme = 'rtsp'
667 protos = ["RTSP/1.0"]
668
670 self.transport.write("RTSP/1.0 404 Not Found\r\n\r\nResource unknown")
671
673 self.transport.write("RTSP/1.0 503 Service Unavailable\r\n\r\n"
674 "Service temporarily unavailable")
675