Package cloudfiles :: Module connection
[frames] | no frames]

Source Code for Module cloudfiles.connection

  1  """ 
  2  connection operations 
  3   
  4  Connection instances are used to communicate with the remote service at 
  5  the account level creating, listing and deleting Containers, and returning 
  6  Container instances. 
  7   
  8  See COPYING for license information. 
  9  """ 
 10   
 11  import  socket 
 12  import  os 
 13  from    urllib    import urlencode 
 14  from    httplib   import HTTPSConnection, HTTPConnection, HTTPException 
 15  from    container import Container, ContainerResults 
 16  from    utils     import unicode_quote, parse_url, THTTPConnection, THTTPSConnection 
 17  from    errors    import ResponseError, NoSuchContainer, ContainerNotEmpty, \ 
 18                           InvalidContainerName, CDNNotEnabled, ContainerExists 
 19  from    Queue     import Queue, Empty, Full 
 20  from    time      import time 
 21  import  consts 
 22  from    authentication import Authentication 
 23  from    fjson     import json_loads 
 24  from    sys       import version_info 
 25  # Because HTTPResponse objects *have* to have read() called on them 
 26  # before they can be used again ... 
 27  # pylint: disable-msg=W0612 
 28   
 29   
30 -class Connection(object):
31 """ 32 Manages the connection to the storage system and serves as a factory 33 for Container instances. 34 35 @undocumented: cdn_connect 36 @undocumented: http_connect 37 @undocumented: cdn_request 38 @undocumented: make_request 39 @undocumented: _check_container_name 40 """ 41
42 - def __init__(self, username=None, api_key=None, timeout=5, **kwargs):
43 """ 44 Accepts keyword arguments for Mosso username and api key. 45 Optionally, you can omit these keywords and supply an 46 Authentication object using the auth keyword. Setting the argument 47 servicenet to True will make use of Rackspace servicenet network. 48 49 @type username: str 50 @param username: a Mosso username 51 @type api_key: str 52 @param api_key: a Mosso API key 53 @type servicenet: bool 54 @param servicenet: Use Rackspace servicenet to access Cloud Files. 55 @type cdn_log_retention: bool 56 @param cdn_log_retention: set logs retention for this cdn enabled 57 container. 58 """ 59 self.cdn_enabled = False 60 self.cdn_args = None 61 self.connection_args = None 62 self.cdn_connection = None 63 self.connection = None 64 self.token = None 65 self.debuglevel = int(kwargs.get('debuglevel', 0)) 66 self.servicenet = kwargs.get('servicenet', False) 67 self.user_agent = kwargs.get('useragent', consts.user_agent) 68 self.timeout = timeout 69 70 # if the environement variable RACKSPACE_SERVICENET is set (to 71 # anything) it will automatically set servicenet=True 72 if not 'servicenet' in kwargs \ 73 and 'RACKSPACE_SERVICENET' in os.environ: 74 self.servicenet = True 75 76 self.auth = 'auth' in kwargs and kwargs['auth'] or None 77 78 if not self.auth: 79 authurl = kwargs.get('authurl', consts.us_authurl) 80 if username and api_key and authurl: 81 self.auth = Authentication(username, api_key, authurl=authurl, 82 useragent=self.user_agent, timeout=self.timeout) 83 else: 84 raise TypeError("Incorrect or invalid arguments supplied") 85 86 self._authenticate()
87
88 - def _authenticate(self):
89 """ 90 Authenticate and setup this instance with the values returned. 91 """ 92 (url, self.cdn_url, self.token) = self.auth.authenticate() 93 url = self._set_storage_url(url) 94 self.connection_args = parse_url(url) 95 96 if version_info[0] <= 2 and version_info[1] < 6: 97 self.conn_class = self.connection_args[3] and THTTPSConnection or \ 98 THTTPConnection 99 else: 100 self.conn_class = self.connection_args[3] and HTTPSConnection or \ 101 HTTPConnection 102 self.http_connect() 103 if self.cdn_url: 104 self.cdn_connect()
105
106 - def _set_storage_url(self, url):
107 if self.servicenet: 108 return "https://snet-%s" % url.replace("https://", "") 109 return url
110
111 - def cdn_connect(self):
112 """ 113 Setup the http connection instance for the CDN service. 114 """ 115 (host, port, cdn_uri, is_ssl) = parse_url(self.cdn_url) 116 self.cdn_connection = self.conn_class(host, port, timeout=self.timeout) 117 self.cdn_enabled = True
118
119 - def http_connect(self):
120 """ 121 Setup the http connection instance. 122 """ 123 (host, port, self.uri, is_ssl) = self.connection_args 124 self.connection = self.conn_class(host, port=port, \ 125 timeout=self.timeout) 126 self.connection.set_debuglevel(self.debuglevel)
127
128 - def cdn_request(self, method, path=[], data='', hdrs=None):
129 """ 130 Given a method (i.e. GET, PUT, POST, etc), a path, data, header and 131 metadata dicts, performs an http request against the CDN service. 132 """ 133 if not self.cdn_enabled: 134 raise CDNNotEnabled() 135 136 path = '/%s/%s' % \ 137 (self.uri.rstrip('/'), '/'.join([unicode_quote(i) for i in path])) 138 headers = {'Content-Length': str(len(data)), 139 'User-Agent': self.user_agent, 140 'X-Auth-Token': self.token} 141 if isinstance(hdrs, dict): 142 headers.update(hdrs) 143 144 def retry_request(): 145 '''Re-connect and re-try a failed request once''' 146 self.cdn_connect() 147 self.cdn_connection.request(method, path, data, headers) 148 return self.cdn_connection.getresponse()
149 150 try: 151 self.cdn_connection.request(method, path, data, headers) 152 response = self.cdn_connection.getresponse() 153 except (socket.error, IOError, HTTPException): 154 response = retry_request() 155 if response.status == 401: 156 self._authenticate() 157 headers['X-Auth-Token'] = self.token 158 response = retry_request() 159 160 return response
161
162 - def make_request(self, method, path=[], data='', hdrs=None, parms=None):
163 """ 164 Given a method (i.e. GET, PUT, POST, etc), a path, data, header and 165 metadata dicts, and an optional dictionary of query parameters, 166 performs an http request. 167 """ 168 path = '/%s/%s' % \ 169 (self.uri.rstrip('/'), '/'.join([unicode_quote(i) for i in path])) 170 171 if isinstance(parms, dict) and parms: 172 path = '%s?%s' % (path, urlencode(parms)) 173 174 headers = {'Content-Length': str(len(data)), 175 'User-Agent': self.user_agent, 176 'X-Auth-Token': self.token} 177 isinstance(hdrs, dict) and headers.update(hdrs) 178 179 def retry_request(): 180 '''Re-connect and re-try a failed request once''' 181 self.http_connect() 182 self.connection.request(method, path, data, headers) 183 return self.connection.getresponse()
184 185 try: 186 self.connection.request(method, path, data, headers) 187 response = self.connection.getresponse() 188 except (socket.error, IOError, HTTPException): 189 response = retry_request() 190 if response.status == 401: 191 self._authenticate() 192 headers['X-Auth-Token'] = self.token 193 response = retry_request() 194 195 return response 196
197 - def get_info(self):
198 """ 199 Return tuple for number of containers and total bytes in the account 200 201 >>> connection.get_info() 202 (5, 2309749) 203 204 @rtype: tuple 205 @return: a tuple containing the number of containers and total bytes 206 used by the account 207 """ 208 response = self.make_request('HEAD') 209 count = size = None 210 for hdr in response.getheaders(): 211 if hdr[0].lower() == 'x-account-container-count': 212 try: 213 count = int(hdr[1]) 214 except ValueError: 215 count = 0 216 if hdr[0].lower() == 'x-account-bytes-used': 217 try: 218 size = int(hdr[1]) 219 except ValueError: 220 size = 0 221 buff = response.read() 222 if (response.status < 200) or (response.status > 299): 223 raise ResponseError(response.status, response.reason) 224 return (count, size)
225
226 - def _check_container_name(self, container_name):
227 if not container_name or \ 228 '/' in container_name or \ 229 len(container_name) > consts.container_name_limit: 230 raise InvalidContainerName(container_name)
231
232 - def create_container(self, container_name, error_on_existing=False):
233 """ 234 Given a container name, returns a L{Container} item, creating a new 235 Container if one does not already exist. 236 237 >>> connection.create_container('new_container') 238 <cloudfiles.container.Container object at 0xb77d628c> 239 240 @param container_name: name of the container to create 241 @type container_name: str 242 @param error_on_existing: raise ContainerExists if container already 243 exists 244 @type error_on_existing: bool 245 @rtype: L{Container} 246 @return: an object representing the newly created container 247 """ 248 self._check_container_name(container_name) 249 250 response = self.make_request('PUT', [container_name]) 251 buff = response.read() 252 if (response.status < 200) or (response.status > 299): 253 raise ResponseError(response.status, response.reason) 254 if error_on_existing and (response.status == 202): 255 raise ContainerExists(container_name) 256 return Container(self, container_name)
257
258 - def delete_container(self, container_name):
259 """ 260 Given a container name, delete it. 261 262 >>> connection.delete_container('old_container') 263 264 @param container_name: name of the container to delete 265 @type container_name: str 266 """ 267 if isinstance(container_name, Container): 268 container_name = container_name.name 269 self._check_container_name(container_name) 270 271 response = self.make_request('DELETE', [container_name]) 272 response.read() 273 274 if (response.status == 409): 275 raise ContainerNotEmpty(container_name) 276 elif (response.status == 404): 277 raise NoSuchContainer 278 elif (response.status < 200) or (response.status > 299): 279 raise ResponseError(response.status, response.reason) 280 281 if self.cdn_enabled: 282 response = self.cdn_request('POST', [container_name], 283 hdrs={'X-CDN-Enabled': 'False'})
284
285 - def get_all_containers(self, limit=None, marker=None, **parms):
286 """ 287 Returns a Container item result set. 288 289 >>> connection.get_all_containers() 290 ContainerResults: 4 containers 291 >>> print ', '.join([container.name for container in 292 connection.get_all_containers()]) 293 new_container, old_container, pictures, music 294 295 @rtype: L{ContainerResults} 296 @return: an iterable set of objects representing all containers on the 297 account 298 @param limit: number of results to return, up to 10,000 299 @type limit: int 300 @param marker: return only results whose name is greater than "marker" 301 @type marker: str 302 """ 303 if limit: 304 parms['limit'] = limit 305 if marker: 306 parms['marker'] = marker 307 return ContainerResults(self, self.list_containers_info(**parms))
308
309 - def get_container(self, container_name):
310 """ 311 Return a single Container item for the given Container. 312 313 >>> connection.get_container('old_container') 314 <cloudfiles.container.Container object at 0xb77d628c> 315 >>> container = connection.get_container('old_container') 316 >>> container.size_used 317 23074 318 319 @param container_name: name of the container to create 320 @type container_name: str 321 @rtype: L{Container} 322 @return: an object representing the container 323 """ 324 self._check_container_name(container_name) 325 326 response = self.make_request('HEAD', [container_name]) 327 count = size = None 328 for hdr in response.getheaders(): 329 if hdr[0].lower() == 'x-container-object-count': 330 try: 331 count = int(hdr[1]) 332 except ValueError: 333 count = 0 334 if hdr[0].lower() == 'x-container-bytes-used': 335 try: 336 size = int(hdr[1]) 337 except ValueError: 338 size = 0 339 buff = response.read() 340 if response.status == 404: 341 raise NoSuchContainer(container_name) 342 if (response.status < 200) or (response.status > 299): 343 raise ResponseError(response.status, response.reason) 344 return Container(self, container_name, count, size)
345
346 - def list_public_containers(self):
347 """ 348 Returns a list of containers that have been published to the CDN. 349 350 >>> connection.list_public_containers() 351 ['container1', 'container2', 'container3'] 352 353 @rtype: list(str) 354 @return: a list of all CDN-enabled container names as strings 355 """ 356 response = self.cdn_request('GET', ['']) 357 if (response.status < 200) or (response.status > 299): 358 buff = response.read() 359 raise ResponseError(response.status, response.reason) 360 return response.read().splitlines()
361
362 - def list_containers_info(self, limit=None, marker=None, **parms):
363 """ 364 Returns a list of Containers, including object count and size. 365 366 >>> connection.list_containers_info() 367 [{u'count': 510, u'bytes': 2081717, u'name': u'new_container'}, 368 {u'count': 12, u'bytes': 23074, u'name': u'old_container'}, 369 {u'count': 0, u'bytes': 0, u'name': u'container1'}, 370 {u'count': 0, u'bytes': 0, u'name': u'container2'}, 371 {u'count': 0, u'bytes': 0, u'name': u'container3'}, 372 {u'count': 3, u'bytes': 2306, u'name': u'test'}] 373 374 @rtype: list({"name":"...", "count":..., "bytes":...}) 375 @return: a list of all container info as dictionaries with the 376 keys "name", "count", and "bytes" 377 @param limit: number of results to return, up to 10,000 378 @type limit: int 379 @param marker: return only results whose name is greater than "marker" 380 @type marker: str 381 """ 382 if limit: 383 parms['limit'] = limit 384 if marker: 385 parms['marker'] = marker 386 parms['format'] = 'json' 387 response = self.make_request('GET', [''], parms=parms) 388 if (response.status < 200) or (response.status > 299): 389 buff = response.read() 390 raise ResponseError(response.status, response.reason) 391 return json_loads(response.read())
392
393 - def list_containers(self, limit=None, marker=None, **parms):
394 """ 395 Returns a list of Containers. 396 397 >>> connection.list_containers() 398 ['new_container', 399 'old_container', 400 'container1', 401 'container2', 402 'container3', 403 'test'] 404 405 @rtype: list(str) 406 @return: a list of all containers names as strings 407 @param limit: number of results to return, up to 10,000 408 @type limit: int 409 @param marker: return only results whose name is greater than "marker" 410 @type marker: str 411 """ 412 if limit: 413 parms['limit'] = limit 414 if marker: 415 parms['marker'] = marker 416 response = self.make_request('GET', [''], parms=parms) 417 if (response.status < 200) or (response.status > 299): 418 buff = response.read() 419 raise ResponseError(response.status, response.reason) 420 return response.read().splitlines()
421
422 - def __getitem__(self, key):
423 """ 424 Container objects can be grabbed from a connection using index 425 syntax. 426 427 >>> container = conn['old_container'] 428 >>> container.size_used 429 23074 430 431 @rtype: L{Container} 432 @return: an object representing the container 433 """ 434 return self.get_container(key)
435 436
437 -class ConnectionPool(Queue):
438 """ 439 A thread-safe connection pool object. 440 441 This component isn't required when using the cloudfiles library, but it may 442 be useful when building threaded applications. 443 """ 444
445 - def __init__(self, username=None, api_key=None, **kwargs):
446 poolsize = kwargs.pop('poolsize', 10) 447 self.connargs = {'username': username, 'api_key': api_key} 448 self.connargs.update(kwargs) 449 Queue.__init__(self, poolsize)
450
451 - def get(self):
452 """ 453 Return a cloudfiles connection object. 454 455 @rtype: L{Connection} 456 @return: a cloudfiles connection object 457 """ 458 try: 459 (create, connobj) = Queue.get(self, block=0) 460 except Empty: 461 connobj = Connection(**self.connargs) 462 return connobj
463
464 - def put(self, connobj):
465 """ 466 Place a cloudfiles connection object back into the pool. 467 468 @param connobj: a cloudfiles connection object 469 @type connobj: L{Connection} 470 """ 471 try: 472 Queue.put(self, (time(), connobj), block=0) 473 except Full: 474 del connobj
475 # vim:set ai sw=4 ts=4 tw=0 expandtab: 476