Package Bio :: Package SeqIO :: Module InsdcIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.SeqIO.InsdcIO

   1  # Copyright 2007-2010 by Peter Cock.  All rights reserved. 
   2  # 
   3  # This code is part of the Biopython distribution and governed by its 
   4  # license.  Please see the LICENSE file that should have been included 
   5  # as part of this package.. 
   6   
   7  """Bio.SeqIO support for the "genbank" and "embl" file formats. 
   8   
   9  You are expected to use this module via the Bio.SeqIO functions. 
  10  Note that internally this module calls Bio.GenBank to do the actual 
  11  parsing of both GenBank and EMBL files. 
  12   
  13  See also: 
  14   
  15  International Nucleotide Sequence Database Collaboration 
  16  http://www.insdc.org/ 
  17    
  18  GenBank 
  19  http://www.ncbi.nlm.nih.gov/Genbank/ 
  20   
  21  EMBL Nucleotide Sequence Database 
  22  http://www.ebi.ac.uk/embl/ 
  23   
  24  DDBJ (DNA Data Bank of Japan) 
  25  http://www.ddbj.nig.ac.jp/ 
  26  """ 
  27   
  28  from Bio.Seq import UnknownSeq 
  29  from Bio.GenBank.Scanner import GenBankScanner, EmblScanner 
  30  from Bio import Alphabet 
  31  from Interfaces import SequentialSequenceWriter 
  32  from Bio import SeqFeature 
  33   
  34  # NOTE 
  35  # ==== 
  36  # The "brains" for parsing GenBank and EMBL files (and any 
  37  # other flat file variants from the INSDC in future) is in 
  38  # Bio.GenBank.Scanner (plus the _FeatureConsumer in Bio.GenBank) 
  39  # However, all the writing code is in this file. 
  40   
  41   
42 -def GenBankIterator(handle):
43 """Breaks up a Genbank file into SeqRecord objects. 44 45 Every section from the LOCUS line to the terminating // becomes 46 a single SeqRecord with associated annotation and features. 47 48 Note that for genomes or chromosomes, there is typically only 49 one record.""" 50 #This calls a generator function: 51 return GenBankScanner(debug=0).parse_records(handle)
52
53 -def EmblIterator(handle):
54 """Breaks up an EMBL file into SeqRecord objects. 55 56 Every section from the LOCUS line to the terminating // becomes 57 a single SeqRecord with associated annotation and features. 58 59 Note that for genomes or chromosomes, there is typically only 60 one record.""" 61 #This calls a generator function: 62 return EmblScanner(debug=0).parse_records(handle)
63
64 -def GenBankCdsFeatureIterator(handle, alphabet=Alphabet.generic_protein):
65 """Breaks up a Genbank file into SeqRecord objects for each CDS feature. 66 67 Every section from the LOCUS line to the terminating // can contain 68 many CDS features. These are returned as with the stated amino acid 69 translation sequence (if given). 70 """ 71 #This calls a generator function: 72 return GenBankScanner(debug=0).parse_cds_features(handle, alphabet)
73
74 -def EmblCdsFeatureIterator(handle, alphabet=Alphabet.generic_protein):
75 """Breaks up a EMBL file into SeqRecord objects for each CDS feature. 76 77 Every section from the LOCUS line to the terminating // can contain 78 many CDS features. These are returned as with the stated amino acid 79 translation sequence (if given). 80 """ 81 #This calls a generator function: 82 return EmblScanner(debug=0).parse_cds_features(handle, alphabet)
83
84 -def _insdc_feature_position_string(pos, offset=0):
85 """Build a GenBank/EMBL position string (PRIVATE). 86 87 Use offset=1 to add one to convert a start position from python counting. 88 """ 89 if isinstance(pos, SeqFeature.ExactPosition): 90 return "%i" % (pos.position+offset) 91 elif isinstance(pos, SeqFeature.WithinPosition): 92 return "(%i.%i)" % (pos.position + offset, 93 pos.position + pos.extension + offset) 94 elif isinstance(pos, SeqFeature.BetweenPosition): 95 return "(%i^%i)" % (pos.position + offset, 96 pos.position + pos.extension + offset) 97 elif isinstance(pos, SeqFeature.BeforePosition): 98 return "<%i" % (pos.position + offset) 99 elif isinstance(pos, SeqFeature.AfterPosition): 100 return ">%i" % (pos.position + offset) 101 elif isinstance(pos, SeqFeature.OneOfPosition): 102 return "one-of(%s)" \ 103 % ",".join([_insdc_feature_position_string(p,offset) \ 104 for p in pos.position_choices]) 105 elif isinstance(pos, SeqFeature.AbstractPosition): 106 raise NotImplementedError("Please report this as a bug in Biopython.") 107 else: 108 raise ValueError("Expected a SeqFeature position object.")
109 110
111 -def _insdc_location_string_ignoring_strand_and_subfeatures(feature):
112 if feature.ref: 113 ref = "%s:" % feature.ref 114 else: 115 ref = "" 116 assert not feature.ref_db 117 if isinstance(feature.location.start, SeqFeature.ExactPosition) \ 118 and isinstance(feature.location.end, SeqFeature.ExactPosition) \ 119 and feature.location.start.position == feature.location.end.position: 120 #Special case, for 12:12 return 12^13 121 #(a zero length slice, meaning the point between two letters) 122 return "%s%i^%i" % (ref, feature.location.end.position, 123 feature.location.end.position+1) 124 if isinstance(feature.location.start, SeqFeature.ExactPosition) \ 125 and isinstance(feature.location.end, SeqFeature.ExactPosition) \ 126 and feature.location.start.position+1 == feature.location.end.position: 127 #Special case, for 11:12 return 12 rather than 12..12 128 #(a length one slice, meaning a single letter) 129 return "%s%i" % (ref, feature.location.end.position) 130 else: 131 #Typical case, e.g. 12..15 gets mapped to 11:15 132 return ref \ 133 + _insdc_feature_position_string(feature.location.start, +1) \ 134 + ".." + \ 135 _insdc_feature_position_string(feature.location.end)
136
137 -def _insdc_feature_location_string(feature):
138 """Build a GenBank/EMBL location string from a SeqFeature (PRIVATE). 139 140 There is a choice of how to show joins on the reverse complement strand, 141 GenBank used "complement(join(1,10),(20,100))" while EMBL used to use 142 "join(complement(20,100),complement(1,10))" instead (but appears to have 143 now adopted the GenBank convention). Notice that the order of the entries 144 is reversed! This function therefore uses the first form. In this situation 145 we expect the parent feature and the two children to all be marked as 146 strand == -1, and in the order 0:10 then 19:100. 147 148 Also need to consider dual-strand examples like these from the Arabidopsis 149 thaliana chloroplast NC_000932: join(complement(69611..69724),139856..140650) 150 gene ArthCp047, GeneID:844801 or its CDS (protein NP_051038.1 GI:7525057) 151 which is further complicated by a splice: 152 join(complement(69611..69724),139856..140087,140625..140650) 153 154 For mixed this mixed strand feature, the parent SeqFeature should have 155 no strand (either 0 or None) while the child features should have either 156 strand +1 or -1 as appropriate, and be listed in the order given here. 157 """ 158 159 if not feature.sub_features: 160 #Non-recursive. 161 #assert feature.location_operator == "", \ 162 # "%s has no subfeatures but location_operator %s" \ 163 # % (repr(feature), feature.location_operator) 164 location = _insdc_location_string_ignoring_strand_and_subfeatures(feature) 165 if feature.strand == -1: 166 location = "complement(%s)" % location 167 return location 168 # As noted above, treat reverse complement strand features carefully: 169 if feature.strand == -1: 170 for f in feature.sub_features: 171 assert f.strand == -1 172 return "complement(%s(%s))" \ 173 % (feature.location_operator, 174 ",".join(_insdc_location_string_ignoring_strand_and_subfeatures(f) \ 175 for f in feature.sub_features)) 176 #if feature.strand == +1: 177 # for f in feature.sub_features: 178 # assert f.strand == +1 179 #This covers typical forward strand features, and also an evil mixed strand: 180 assert feature.location_operator != "" 181 return "%s(%s)" % (feature.location_operator, 182 ",".join([_insdc_feature_location_string(f) \ 183 for f in feature.sub_features]))
184 185
186 -class _InsdcWriter(SequentialSequenceWriter):
187 """Base class for GenBank and EMBL writers (PRIVATE).""" 188 MAX_WIDTH = 80 189 QUALIFIER_INDENT = 21 190 QUALIFIER_INDENT_STR = " "*QUALIFIER_INDENT 191 QUALIFIER_INDENT_TMP = " %s " # 21 if %s is empty 192
193 - def _write_feature_qualifier(self, key, value=None, quote=None):
194 if not value: 195 self.handle.write("%s/%s\n" % (self.QUALIFIER_INDENT_STR, key)) 196 return 197 #Quick hack with no line wrapping, may be useful for testing: 198 #self.handle.write('%s/%s="%s"\n' % (self.QUALIFIER_INDENT_STR, key, value)) 199 if quote is None: 200 #Try to mimic unwritten rules about when quotes can be left out: 201 if isinstance(value, int) or isinstance(value, long): 202 quote = False 203 else: 204 quote = True 205 if quote: 206 line = '%s/%s="%s"' % (self.QUALIFIER_INDENT_STR, key, value) 207 else: 208 line = '%s/%s=%s' % (self.QUALIFIER_INDENT_STR, key, value) 209 if len(line) <= self.MAX_WIDTH: 210 self.handle.write(line+"\n") 211 return 212 while line.lstrip(): 213 if len(line) <= self.MAX_WIDTH: 214 self.handle.write(line+"\n") 215 return 216 #Insert line break... 217 for index in range(min(len(line)-1, self.MAX_WIDTH), 218 self.QUALIFIER_INDENT+1,-1): 219 if line[index] == " " : break 220 if line[index] != " ": 221 #No nice place to break... 222 index = self.MAX_WIDTH 223 assert index <= self.MAX_WIDTH 224 self.handle.write(line[:index] + "\n") 225 line = self.QUALIFIER_INDENT_STR + line[index:].lstrip()
226
227 - def _wrap_location(self, location):
228 """Split a feature location into lines (break at commas).""" 229 #TODO - Rewrite this not to recurse! 230 length = self.MAX_WIDTH - self.QUALIFIER_INDENT 231 if len(location) <= length: 232 return location 233 index = location[:length].rfind(",") 234 if index == -1: 235 #No good place to split (!) 236 import warnings 237 warnings.warn("Couldn't split location:\n%s" % location) 238 return location 239 return location[:index+1] + "\n" + \ 240 self.QUALIFIER_INDENT_STR + self._wrap_location(location[index+1:])
241
242 - def _write_feature(self, feature):
243 """Write a single SeqFeature object to features table.""" 244 assert feature.type, feature 245 location = _insdc_feature_location_string(feature) 246 line = (self.QUALIFIER_INDENT_TMP % feature.type)[:self.QUALIFIER_INDENT] \ 247 + self._wrap_location(location) + "\n" 248 self.handle.write(line) 249 #Now the qualifiers... 250 for key, values in feature.qualifiers.iteritems(): 251 if isinstance(values, list) or isinstance(values, tuple): 252 for value in values: 253 self._write_feature_qualifier(key, value) 254 elif values: 255 #String, int, etc 256 self._write_feature_qualifier(key, values) 257 else: 258 #e.g. a /psuedo entry 259 self._write_feature_qualifier(key)
260
261 - def _get_annotation_str(self, record, key, default=".", just_first=False):
262 """Get an annotation dictionary entry (as a string). 263 264 Some entries are lists, in which case if just_first=True the first entry 265 is returned. If just_first=False (default) this verifies there is only 266 one entry before returning it.""" 267 try: 268 answer = record.annotations[key] 269 except KeyError: 270 return default 271 if isinstance(answer, list): 272 if not just_first : assert len(answer) == 1 273 return str(answer[0]) 274 else: 275 return str(answer)
276
277 - def _split_multi_line(self, text, max_len):
278 "Returns a list of strings.""" 279 #TODO - Do the line spliting while preserving white space? 280 text = text.strip() 281 if len(text) < max_len: 282 return [text] 283 284 words = text.split() 285 assert max([len(w) for w in words]) < max_len, \ 286 "Your description cannot be broken into nice lines!:\n%s" \ 287 % repr(text) 288 text = "" 289 while words and len(text) + 1 + len(words[0]) < max_len: 290 text += " " + words.pop(0) 291 text = text.strip() 292 assert len(text) < max_len 293 answer = [text] 294 while words: 295 text = "" 296 while words and len(text) + 1 + len(words[0]) < max_len: 297 text += " " + words.pop(0) 298 text = text.strip() 299 assert len(text) < max_len 300 answer.append(text) 301 assert not words 302 return answer
303
304 - def _split_contig(self, record, max_len):
305 "Returns a list of strings, splits on commas.""" 306 #TODO - Merge this with _write_multi_line method? 307 #It would need the addition of the comma splitting logic... 308 #are there any other cases where that would be sensible? 309 contig = record.annotations.get("contig", "") 310 if isinstance(contig, list) or isinstance(contig, tuple): 311 contig = "".join(contig) 312 contig = self.clean(contig) 313 i = 0 314 answer = [] 315 while contig: 316 if len(contig) > max_len: 317 #Split lines at the commas 318 pos = contig[:max_len-1].rfind(",") 319 if pos == -1: 320 raise ValueError("Could not break up CONTIG") 321 text, contig = contig[:pos+1], contig[pos+1:] 322 else: 323 text, contig = contig, "" 324 answer.append(text) 325 return answer
326
327 -class GenBankWriter(_InsdcWriter):
328 HEADER_WIDTH = 12 329 QUALIFIER_INDENT = 21 330
331 - def _write_single_line(self, tag, text):
332 "Used in the the 'header' of each GenBank record.""" 333 assert len(tag) < self.HEADER_WIDTH 334 assert len(text) < self.MAX_WIDTH - self.HEADER_WIDTH, \ 335 "Annotation %s too long for %s line" % (repr(text), tag) 336 self.handle.write("%s%s\n" % (tag.ljust(self.HEADER_WIDTH), 337 text.replace("\n", " ")))
338
339 - def _write_multi_line(self, tag, text):
340 "Used in the the 'header' of each GenBank record.""" 341 #TODO - Do the line spliting while preserving white space? 342 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 343 lines = self._split_multi_line(text, max_len) 344 assert len(tag) < self.HEADER_WIDTH 345 self._write_single_line(tag, lines[0]) 346 for line in lines[1:] : 347 self._write_single_line("", line)
348
349 - def _write_multi_entries(self, tag, text_list):
350 #used for DBLINK and any similar later line types. 351 #If the list of strings is empty, nothing is written. 352 for i, text in enumerate(text_list): 353 if i == 0: 354 self._write_single_line(tag, text) 355 else: 356 self._write_single_line("", text)
357
358 - def _get_date(self, record) :
359 default = "01-JAN-1980" 360 try : 361 date = record.annotations["date"] 362 except KeyError : 363 return default 364 #Cope with a list of one string: 365 if isinstance(date, list) and len(date)==1 : 366 date = date[0] 367 #TODO - allow a Python date object 368 if not isinstance(date, str) or len(date) != 11 \ 369 or date[2] != "-" or date[6] != "-" \ 370 or not date[:2].isdigit() or not date[7:].isdigit() \ 371 or int(date[:2]) > 31 \ 372 or date[3:6] not in ["JAN", "FEB", "MAR", "APR", "MAY", "JUN", 373 "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"] : 374 #TODO - Check is a valid date (e.g. not 31 Feb) 375 return default 376 return date
377
378 - def _get_data_division(self, record):
379 try: 380 division = record.annotations["data_file_division"] 381 except KeyError: 382 division = "UNK" 383 if division in ["PRI", "ROD", "MAM", "VRT", "INV", "PLN", "BCT", 384 "VRL", "PHG", "SYN", "UNA", "EST", "PAT", "STS", 385 "GSS", "HTG", "HTC", "ENV", "CON"]: 386 #Good, already GenBank style 387 # PRI - primate sequences 388 # ROD - rodent sequences 389 # MAM - other mammalian sequences 390 # VRT - other vertebrate sequences 391 # INV - invertebrate sequences 392 # PLN - plant, fungal, and algal sequences 393 # BCT - bacterial sequences [plus archea] 394 # VRL - viral sequences 395 # PHG - bacteriophage sequences 396 # SYN - synthetic sequences 397 # UNA - unannotated sequences 398 # EST - EST sequences (expressed sequence tags) 399 # PAT - patent sequences 400 # STS - STS sequences (sequence tagged sites) 401 # GSS - GSS sequences (genome survey sequences) 402 # HTG - HTGS sequences (high throughput genomic sequences) 403 # HTC - HTC sequences (high throughput cDNA sequences) 404 # ENV - Environmental sampling sequences 405 # CON - Constructed sequences 406 # 407 #(plus UNK for unknown) 408 pass 409 else: 410 #See if this is in EMBL style: 411 # Division Code 412 # ----------------- ---- 413 # Bacteriophage PHG - common 414 # Environmental Sample ENV - common 415 # Fungal FUN - map to PLN (plants + fungal) 416 # Human HUM - map to PRI (primates) 417 # Invertebrate INV - common 418 # Other Mammal MAM - common 419 # Other Vertebrate VRT - common 420 # Mus musculus MUS - map to ROD (rodent) 421 # Plant PLN - common 422 # Prokaryote PRO - map to BCT (poor name) 423 # Other Rodent ROD - common 424 # Synthetic SYN - common 425 # Transgenic TGN - ??? map to SYN ??? 426 # Unclassified UNC - map to UNK 427 # Viral VRL - common 428 # 429 #(plus XXX for submiting which we can map to UNK) 430 embl_to_gbk = {"FUN":"PLN", 431 "HUM":"PRI", 432 "MUS":"ROD", 433 "PRO":"BCT", 434 "UNC":"UNK", 435 "XXX":"UNK", 436 } 437 try: 438 division = embl_to_gbk[division] 439 except KeyError: 440 division = "UNK" 441 assert len(division)==3 442 return division
443
444 - def _write_the_first_line(self, record):
445 """Write the LOCUS line.""" 446 447 locus = record.name 448 if not locus or locus == "<unknown name>": 449 locus = record.id 450 if not locus or locus == "<unknown id>": 451 locus = self._get_annotation_str(record, "accession", just_first=True) 452 if len(locus) > 16: 453 raise ValueError("Locus identifier %s is too long" % repr(locus)) 454 455 if len(record) > 99999999999: 456 #Currently GenBank only officially support up to 350000, but 457 #the length field can take eleven digits 458 raise ValueError("Sequence too long!") 459 460 #Get the base alphabet (underneath any Gapped or StopCodon encoding) 461 a = Alphabet._get_base_alphabet(record.seq.alphabet) 462 if not isinstance(a, Alphabet.Alphabet): 463 raise TypeError("Invalid alphabet") 464 elif isinstance(a, Alphabet.ProteinAlphabet): 465 units = "aa" 466 elif isinstance(a, Alphabet.NucleotideAlphabet): 467 units = "bp" 468 else: 469 #Must be something like NucleotideAlphabet or 470 #just the generic Alphabet (default for fasta files) 471 raise ValueError("Need a Nucleotide or Protein alphabet") 472 473 #Get the molecule type 474 #TODO - record this explicitly in the parser? 475 if isinstance(a, Alphabet.ProteinAlphabet): 476 mol_type = "" 477 elif isinstance(a, Alphabet.DNAAlphabet): 478 mol_type = "DNA" 479 elif isinstance(a, Alphabet.RNAAlphabet): 480 mol_type = "RNA" 481 else: 482 #Must be something like NucleotideAlphabet or 483 #just the generic Alphabet (default for fasta files) 484 raise ValueError("Need a DNA, RNA or Protein alphabet") 485 486 division = self._get_data_division(record) 487 488 assert len(units) == 2 489 assert len(division) == 3 490 #TODO - date 491 #TODO - mol_type 492 line = "LOCUS %s %s %s %s %s %s\n" \ 493 % (locus.ljust(16), 494 str(len(record)).rjust(11), 495 units, 496 mol_type.ljust(6), 497 division, 498 self._get_date(record)) 499 assert len(line) == 79+1, repr(line) #plus one for new line 500 501 assert line[12:28].rstrip() == locus, \ 502 'LOCUS line does not contain the locus at the expected position:\n' + line 503 assert line[28:29] == " " 504 assert line[29:40].lstrip() == str(len(record)), \ 505 'LOCUS line does not contain the length at the expected position:\n' + line 506 507 #Tests copied from Bio.GenBank.Scanner 508 assert line[40:44] in [' bp ', ' aa '] , \ 509 'LOCUS line does not contain size units at expected position:\n' + line 510 assert line[44:47] in [' ', 'ss-', 'ds-', 'ms-'], \ 511 'LOCUS line does not have valid strand type (Single stranded, ...):\n' + line 512 assert line[47:54].strip() == "" \ 513 or line[47:54].strip().find('DNA') != -1 \ 514 or line[47:54].strip().find('RNA') != -1, \ 515 'LOCUS line does not contain valid sequence type (DNA, RNA, ...):\n' + line 516 assert line[54:55] == ' ', \ 517 'LOCUS line does not contain space at position 55:\n' + line 518 assert line[55:63].strip() in ['', 'linear', 'circular'], \ 519 'LOCUS line does not contain valid entry (linear, circular, ...):\n' + line 520 assert line[63:64] == ' ', \ 521 'LOCUS line does not contain space at position 64:\n' + line 522 assert line[67:68] == ' ', \ 523 'LOCUS line does not contain space at position 68:\n' + line 524 assert line[70:71] == '-', \ 525 'LOCUS line does not contain - at position 71 in date:\n' + line 526 assert line[74:75] == '-', \ 527 'LOCUS line does not contain - at position 75 in date:\n' + line 528 529 self.handle.write(line)
530
531 - def _write_references(self, record):
532 number = 0 533 for ref in record.annotations["references"]: 534 if not isinstance(ref, SeqFeature.Reference): 535 continue 536 number += 1 537 data = str(number) 538 #TODO - support more complex record reference locations? 539 if ref.location and len(ref.location)==1: 540 a = Alphabet._get_base_alphabet(record.seq.alphabet) 541 if isinstance(a, Alphabet.ProteinAlphabet): 542 units = "residues" 543 else: 544 units = "bases" 545 data += " (%s %i to %i)" % (units, 546 ref.location[0].nofuzzy_start+1, 547 ref.location[0].nofuzzy_end) 548 self._write_single_line("REFERENCE", data) 549 if ref.authors: 550 #We store the AUTHORS data as a single string 551 self._write_multi_line(" AUTHORS", ref.authors) 552 if ref.consrtm: 553 #We store the consortium as a single string 554 self._write_multi_line(" CONSRTM", ref.consrtm) 555 if ref.title: 556 #We store the title as a single string 557 self._write_multi_line(" TITLE", ref.title) 558 if ref.journal: 559 #We store this as a single string - holds the journal name, 560 #volume, year, and page numbers of the citation 561 self._write_multi_line(" JOURNAL", ref.journal) 562 if ref.medline_id: 563 #This line type is obsolete and was removed from the GenBank 564 #flatfile format in April 2005. Should we write it? 565 #Note this has a two space indent: 566 self._write_multi_line(" MEDLINE", ref.medline_id) 567 if ref.pubmed_id: 568 #Note this has a THREE space indent: 569 self._write_multi_line(" PUBMED", ref.pubmed_id) 570 if ref.comment: 571 self._write_multi_line(" REMARK", ref.comment)
572 573
574 - def _write_comment(self, record):
575 #This is a bit complicated due to the range of possible 576 #ways people might have done their annotation... 577 #Currently the parser uses a single string with newlines. 578 #A list of lines is also reasonable. 579 #A single (long) string is perhaps the most natural of all. 580 #This means we may need to deal with line wrapping. 581 comment = record.annotations["comment"] 582 if isinstance(comment, basestring): 583 lines = comment.split("\n") 584 elif isinstance(comment, list) or isinstance(comment, tuple): 585 lines = comment 586 else: 587 raise ValueError("Could not understand comment annotation") 588 self._write_multi_line("COMMENT", lines[0]) 589 for line in lines[1:]: 590 self._write_multi_line("", line)
591
592 - def _write_contig(self, record):
593 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 594 lines = self._split_contig(record, max_len) 595 self._write_single_line("CONTIG", lines[0]) 596 for text in lines[1:] : 597 self._write_single_line("", text)
598
599 - def _write_sequence(self, record):
600 #Loosely based on code from Howard Salis 601 #TODO - Force lower case? 602 LETTERS_PER_LINE = 60 603 SEQUENCE_INDENT = 9 604 605 if isinstance(record.seq, UnknownSeq): 606 #We have already recorded the length, and there is no need 607 #to record a long sequence of NNNNNNN...NNN or whatever. 608 if "contig" in record.annotations: 609 self._write_contig(record) 610 else: 611 self.handle.write("ORIGIN\n") 612 return 613 614 data = self._get_seq_string(record) #Catches sequence being None 615 seq_len = len(data) 616 #TODO - Should we change the case? 617 self.handle.write("ORIGIN\n") 618 for line_number in range(0, seq_len, LETTERS_PER_LINE): 619 self.handle.write(str(line_number+1).rjust(SEQUENCE_INDENT)) 620 for words in range(line_number, 621 min(line_number+LETTERS_PER_LINE, seq_len), 10): 622 self.handle.write(" %s" % data[words:words+10]) 623 self.handle.write("\n")
624
625 - def write_record(self, record):
626 """Write a single record to the output file.""" 627 handle = self.handle 628 self._write_the_first_line(record) 629 630 accession = self._get_annotation_str(record, "accession", 631 record.id.split(".", 1)[0], 632 just_first=True) 633 acc_with_version = accession 634 if record.id.startswith(accession+"."): 635 try: 636 acc_with_version = "%s.%i" \ 637 % (accession, 638 int(record.id.split(".", 1)[1])) 639 except ValueError: 640 pass 641 gi = self._get_annotation_str(record, "gi", just_first=True) 642 643 descr = record.description 644 if descr == "<unknown description>" : descr = "." 645 self._write_multi_line("DEFINITION", descr) 646 647 self._write_single_line("ACCESSION", accession) 648 if gi != ".": 649 self._write_single_line("VERSION", "%s GI:%s" \ 650 % (acc_with_version, gi)) 651 else: 652 self._write_single_line("VERSION", "%s" % (acc_with_version)) 653 654 #The NCBI only expect two types of link so far, 655 #e.g. "Project:28471" and "Trace Assembly Archive:123456" 656 #TODO - Filter the dbxrefs list to just these? 657 self._write_multi_entries("DBLINK", record.dbxrefs) 658 659 try: 660 #List of strings 661 #Keywords should be given separated with semi colons, 662 keywords = "; ".join(record.annotations["keywords"]) 663 #with a trailing period: 664 if not keywords.endswith(".") : 665 keywords += "." 666 except KeyError: 667 #If no keywords, there should be just a period: 668 keywords = "." 669 self._write_multi_line("KEYWORDS", keywords) 670 671 if "segment" in record.annotations: 672 #Deal with SEGMENT line found only in segmented records, 673 #e.g. AH000819 674 segment = record.annotations["segment"] 675 if isinstance(segment, list): 676 assert len(segment)==1, segment 677 segment = segment[0] 678 self._write_single_line("SEGMENT", segment) 679 680 self._write_multi_line("SOURCE", \ 681 self._get_annotation_str(record, "source")) 682 #The ORGANISM line MUST be a single line, as any continuation is the taxonomy 683 org = self._get_annotation_str(record, "organism") 684 if len(org) > self.MAX_WIDTH - self.HEADER_WIDTH: 685 org = org[:self.MAX_WIDTH - self.HEADER_WIDTH-4]+"..." 686 self._write_single_line(" ORGANISM", org) 687 try: 688 #List of strings 689 #Taxonomy should be given separated with semi colons, 690 taxonomy = "; ".join(record.annotations["taxonomy"]) 691 #with a trailing period: 692 if not taxonomy.endswith(".") : 693 taxonomy += "." 694 except KeyError: 695 taxonomy = "." 696 self._write_multi_line("", taxonomy) 697 698 if "references" in record.annotations: 699 self._write_references(record) 700 701 if "comment" in record.annotations: 702 self._write_comment(record) 703 704 handle.write("FEATURES Location/Qualifiers\n") 705 for feature in record.features: 706 self._write_feature(feature) 707 self._write_sequence(record) 708 handle.write("//\n")
709
710 -class EmblWriter(_InsdcWriter):
711 HEADER_WIDTH = 5 712 QUALIFIER_INDENT = 21 713 QUALIFIER_INDENT_STR = "FT" + " "*(QUALIFIER_INDENT-2) 714 QUALIFIER_INDENT_TMP = "FT %s " # 21 if %s is empty 715
716 - def _write_contig(self, record):
717 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 718 lines = self._split_contig(record, max_len) 719 for text in lines: 720 self._write_single_line("CO", text)
721
722 - def _write_sequence(self, record):
723 LETTERS_PER_BLOCK = 10 724 BLOCKS_PER_LINE = 6 725 LETTERS_PER_LINE = LETTERS_PER_BLOCK * BLOCKS_PER_LINE 726 POSITION_PADDING = 10 727 handle = self.handle #save looking up this multiple times 728 729 if isinstance(record.seq, UnknownSeq): 730 #We have already recorded the length, and there is no need 731 #to record a long sequence of NNNNNNN...NNN or whatever. 732 if "contig" in record.annotations: 733 self._write_contig(record) 734 else: 735 #TODO - Can the sequence just be left out as in GenBank files? 736 handle.write("SQ \n") 737 return 738 739 data = self._get_seq_string(record) #Catches sequence being None 740 seq_len = len(data) 741 #TODO - Should we change the case? 742 #TODO - What if we have RNA? 743 a_count = data.count('A') + data.count('a') 744 c_count = data.count('C') + data.count('c') 745 g_count = data.count('G') + data.count('g') 746 t_count = data.count('T') + data.count('t') 747 other = seq_len - (a_count + c_count + g_count + t_count) 748 handle.write("SQ Sequence %i BP; %i A; %i C; %i G; %i T; %i other;\n" \ 749 % (seq_len, a_count, c_count, g_count, t_count, other)) 750 751 for line_number in range(0, seq_len // LETTERS_PER_LINE): 752 handle.write(" ") #Just four, not five 753 for block in range(BLOCKS_PER_LINE) : 754 index = LETTERS_PER_LINE*line_number + LETTERS_PER_BLOCK*block 755 handle.write((" %s" % data[index:index+LETTERS_PER_BLOCK])) 756 handle.write(str((line_number+1) 757 *LETTERS_PER_LINE).rjust(POSITION_PADDING)) 758 handle.write("\n") 759 if seq_len % LETTERS_PER_LINE: 760 #Final (partial) line 761 line_number = (seq_len // LETTERS_PER_LINE) 762 handle.write(" ") #Just four, not five 763 for block in range(BLOCKS_PER_LINE) : 764 index = LETTERS_PER_LINE*line_number + LETTERS_PER_BLOCK*block 765 handle.write((" %s" % data[index:index+LETTERS_PER_BLOCK]).ljust(11)) 766 handle.write(str(seq_len).rjust(POSITION_PADDING)) 767 handle.write("\n")
768
769 - def _write_single_line(self, tag, text):
770 assert len(tag)==2 771 line = tag+" "+text 772 assert len(line) <= self.MAX_WIDTH, line 773 self.handle.write(line+"\n")
774
775 - def _write_multi_line(self, tag, text):
776 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 777 lines = self._split_multi_line(text, max_len) 778 for line in lines : 779 self._write_single_line(tag, line)
780
781 - def _write_the_first_lines(self, record):
782 """Write the ID and AC lines.""" 783 if "." in record.id and record.id.rsplit(".", 1)[1].isdigit(): 784 version = "SV " + record.id.rsplit(".", 1)[1] 785 accession = self._get_annotation_str(record, "accession", 786 record.id.rsplit(".", 1)[0], 787 just_first=True) 788 else : 789 version = "" 790 accession = self._get_annotation_str(record, "accession", 791 record.id, 792 just_first=True) 793 794 if ";" in accession : 795 raise ValueError("Cannot have semi-colon in EMBL accession, %s" \ 796 % repr(accession)) 797 if " " in accession : 798 #This is out of practicallity... might it be allowed? 799 raise ValueError("Cannot have spaces in EMBL accession, %s" \ 800 % repr(accession)) 801 802 #Get the molecule type 803 #TODO - record this explicitly in the parser? 804 #Get the base alphabet (underneath any Gapped or StopCodon encoding) 805 a = Alphabet._get_base_alphabet(record.seq.alphabet) 806 if not isinstance(a, Alphabet.Alphabet): 807 raise TypeError("Invalid alphabet") 808 elif not isinstance(a, Alphabet.NucleotideAlphabet): 809 raise ValueError("Need a Nucleotide alphabet") 810 elif isinstance(a, Alphabet.DNAAlphabet): 811 mol_type = "DNA" 812 elif isinstance(a, Alphabet.RNAAlphabet): 813 mol_type = "RNA" 814 else: 815 #Must be something like NucleotideAlphabet 816 raise ValueError("Need a DNA or RNA alphabet") 817 818 #Get the taxonomy division 819 division = self._get_data_division(record) 820 821 #TODO - Full ID line 822 handle = self.handle 823 #ID <1>; SV <2>; <3>; <4>; <5>; <6>; <7> BP. 824 #1. Primary accession number 825 #2. Sequence version number 826 #3. Topology: 'circular' or 'linear' 827 #4. Molecule type 828 #5. Data class 829 #6. Taxonomic division 830 #7. Sequence length 831 self._write_single_line("ID", "%s; %s; ; %s; ; %s; %i BP." \ 832 % (accession, version, mol_type, 833 division, len(record))) 834 handle.write("XX\n") 835 self._write_single_line("AC", accession+";") 836 handle.write("XX\n")
837
838 - def _get_data_division(self, record):
839 try: 840 division = record.annotations["data_file_division"] 841 except KeyError: 842 division = "UNC" 843 if division in ["PHG", "ENV", "FUN", "HUM", "INV", "MAM", "VRT", 844 "MUS", "PLN", "PRO", "ROD", "SYN", "TGN", "UNC", 845 "VRL", "XXX"]: 846 #Good, already EMBL style 847 # Division Code 848 # ----------------- ---- 849 # Bacteriophage PHG 850 # Environmental Sample ENV 851 # Fungal FUN 852 # Human HUM 853 # Invertebrate INV 854 # Other Mammal MAM 855 # Other Vertebrate VRT 856 # Mus musculus MUS 857 # Plant PLN 858 # Prokaryote PRO 859 # Other Rodent ROD 860 # Synthetic SYN 861 # Transgenic TGN 862 # Unclassified UNC (i.e. unknown) 863 # Viral VRL 864 # 865 #(plus XXX used for submiting data to EMBL) 866 pass 867 else: 868 #See if this is in GenBank style & can be converted. 869 #Generally a problem as the GenBank groups are wider 870 #than those of EMBL. Note that GenBank use "BCT" for 871 #both bacteria and acherea thus this maps to EMBL's 872 #"PRO" nicely. 873 gbk_to_embl = {"BCT":"PRO", 874 "UNK":"UNC", 875 } 876 try: 877 division = gbk_to_embl[division] 878 except KeyError: 879 division = "UNC" 880 assert len(division)==3 881 return division
882
883 - def _write_references(self, record):
884 #The order should be RN, RC, RP, RX, RG, RA, RT, RL 885 number = 0 886 for ref in record.annotations["references"]: 887 if not isinstance(ref, SeqFeature.Reference): 888 continue 889 number += 1 890 self._write_single_line("RN", "[%i]" % number) 891 #TODO - support for RC line (needed in parser too) 892 #TODO - support more complex record reference locations? 893 if ref.location and len(ref.location)==1: 894 self._write_single_line("RP", "%i-%i" % (ref.location[0].nofuzzy_start+1, 895 ref.location[0].nofuzzy_end)) 896 #TODO - record any DOI or AGRICOLA identifier in the reference object? 897 if ref.pubmed_id: 898 self._write_single_line("RX", "PUBMED; %s." % ref.pubmed_id) 899 if ref.consrtm: 900 self._write_single_line("RG", "%s" % ref.consrtm) 901 if ref.authors: 902 #We store the AUTHORS data as a single string 903 self._write_multi_line("RA", ref.authors+";") 904 if ref.title: 905 #We store the title as a single string 906 self._write_multi_line("RT", '"%s";' % ref.title) 907 if ref.journal: 908 #We store this as a single string - holds the journal name, 909 #volume, year, and page numbers of the citation 910 self._write_multi_line("RL", ref.journal) 911 self.handle.write("XX\n")
912
913 - def _write_comment(self, record):
914 #This is a bit complicated due to the range of possible 915 #ways people might have done their annotation... 916 #Currently the parser uses a single string with newlines. 917 #A list of lines is also reasonable. 918 #A single (long) string is perhaps the most natural of all. 919 #This means we may need to deal with line wrapping. 920 comment = record.annotations["comment"] 921 if isinstance(comment, basestring): 922 lines = comment.split("\n") 923 elif isinstance(comment, list) or isinstance(comment, tuple): 924 lines = comment 925 else: 926 raise ValueError("Could not understand comment annotation") 927 #TODO - Merge this with the GenBank comment code? 928 if not lines : return 929 for line in lines: 930 self._write_multi_line("CC", line) 931 self.handle.write("XX\n")
932
933 - def write_record(self, record):
934 """Write a single record to the output file.""" 935 936 handle = self.handle 937 self._write_the_first_lines(record) 938 939 #PR line (0 or 1 lines only), project identifier 940 for xref in record.dbxrefs: 941 if xref.startswith("Project:"): 942 self._write_single_line("PR", xref+";") 943 handle.write("XX\n") 944 break 945 946 #TODO - DT lines (date) 947 948 descr = record.description 949 if descr == "<unknown description>" : descr = "." 950 self._write_multi_line("DE", descr) 951 handle.write("XX\n") 952 953 #Should this be "source" or "organism"? 954 self._write_multi_line("OS", self._get_annotation_str(record, "organism")) 955 try: 956 #List of strings 957 taxonomy = "; ".join(record.annotations["taxonomy"]) + "." 958 except KeyError: 959 taxonomy = "." 960 self._write_multi_line("OC", taxonomy) 961 handle.write("XX\n") 962 963 if "references" in record.annotations: 964 self._write_references(record) 965 966 if "comment" in record.annotations: 967 self._write_comment(record) 968 969 handle.write("FH Key Location/Qualifiers\n") 970 for feature in record.features: 971 self._write_feature(feature) 972 973 self._write_sequence(record) 974 handle.write("//\n")
975 976 977 if __name__ == "__main__": 978 print "Quick self test" 979 import os 980 from StringIO import StringIO 981
982 - def compare_record(old, new):
983 if old.id != new.id and old.name != new.name: 984 raise ValueError("'%s' or '%s' vs '%s' or '%s' records" \ 985 % (old.id, old.name, new.id, new.name)) 986 if len(old.seq) != len(new.seq): 987 raise ValueError("%i vs %i" % (len(old.seq), len(new.seq))) 988 if str(old.seq).upper() != str(new.seq).upper(): 989 if len(old.seq) < 200: 990 raise ValueError("'%s' vs '%s'" % (old.seq, new.seq)) 991 else: 992 raise ValueError("'%s...' vs '%s...'" % (old.seq[:100], new.seq[:100])) 993 if old.features and new.features: 994 return compare_features(old.features, new.features) 995 #Just insist on at least one word in common: 996 if (old.description or new.description) \ 997 and not set(old.description.split()).intersection(new.description.split()): 998 raise ValueError("%s versus %s" \ 999 % (repr(old.description), repr(new.description))) 1000 #TODO - check annotation 1001 if "contig" in old.annotations: 1002 assert old.annotations["contig"] == \ 1003 new.annotations["contig"] 1004 return True
1005
1006 - def compare_records(old_list, new_list):
1007 """Check two lists of SeqRecords agree, raises a ValueError if mismatch.""" 1008 if len(old_list) != len(new_list): 1009 raise ValueError("%i vs %i records" % (len(old_list), len(new_list))) 1010 for old, new in zip(old_list, new_list): 1011 if not compare_record(old, new): 1012 return False 1013 return True
1014
1015 - def compare_feature(old, new, ignore_sub_features=False):
1016 """Check two SeqFeatures agree.""" 1017 if old.type != new.type: 1018 raise ValueError("Type %s versus %s" % (old.type, new.type)) 1019 if old.location.nofuzzy_start != new.location.nofuzzy_start \ 1020 or old.location.nofuzzy_end != new.location.nofuzzy_end: 1021 raise ValueError("%s versus %s:\n%s\nvs:\n%s" \ 1022 % (old.location, new.location, str(old), str(new))) 1023 if old.strand != new.strand: 1024 raise ValueError("Different strand:\n%s\nvs:\n%s" % (str(old), str(new))) 1025 if old.location.start != new.location.start: 1026 raise ValueError("Start %s versus %s:\n%s\nvs:\n%s" \ 1027 % (old.location.start, new.location.start, str(old), str(new))) 1028 if old.location.end != new.location.end: 1029 raise ValueError("End %s versus %s:\n%s\nvs:\n%s" \ 1030 % (old.location.end, new.location.end, str(old), str(new))) 1031 if not ignore_sub_features: 1032 if len(old.sub_features) != len(new.sub_features): 1033 raise ValueError("Different sub features") 1034 for a, b in zip(old.sub_features, new.sub_features): 1035 if not compare_feature(a, b): 1036 return False 1037 #This only checks key shared qualifiers 1038 #Would a white list be easier? 1039 #for key in ["name", "gene", "translation", "codon_table", "codon_start", "locus_tag"]: 1040 for key in set(old.qualifiers.keys()).intersection(new.qualifiers.keys()): 1041 if key in ["db_xref", "protein_id", "product", "note"]: 1042 #EMBL and GenBank files are use different references/notes/etc 1043 continue 1044 if old.qualifiers[key] != new.qualifiers[key]: 1045 raise ValueError("Qualifier mis-match for %s:\n%s\n%s" \ 1046 % (key, old.qualifiers[key], new.qualifiers[key])) 1047 return True
1048
1049 - def compare_features(old_list, new_list, ignore_sub_features=False):
1050 """Check two lists of SeqFeatures agree, raises a ValueError if mismatch.""" 1051 if len(old_list) != len(new_list): 1052 raise ValueError("%i vs %i features" % (len(old_list), len(new_list))) 1053 for old, new in zip(old_list, new_list): 1054 #This assumes they are in the same order 1055 if not compare_feature(old, new, ignore_sub_features): 1056 return False 1057 return True
1058
1059 - def check_genbank_writer(records):
1060 handle = StringIO() 1061 GenBankWriter(handle).write_file(records) 1062 handle.seek(0) 1063 1064 records2 = list(GenBankIterator(handle)) 1065 assert compare_records(records, records2)
1066
1067 - def check_embl_writer(records):
1068 handle = StringIO() 1069 try: 1070 EmblWriter(handle).write_file(records) 1071 except ValueError, err: 1072 print err 1073 return 1074 handle.seek(0) 1075 1076 records2 = list(EmblIterator(handle)) 1077 assert compare_records(records, records2)
1078 1079 for filename in os.listdir("../../Tests/GenBank"): 1080 if not filename.endswith(".gbk") and not filename.endswith(".gb"): 1081 continue 1082 print filename 1083 1084 handle = open("../../Tests/GenBank/%s" % filename) 1085 records = list(GenBankIterator(handle)) 1086 handle.close() 1087 1088 check_genbank_writer(records) 1089 check_embl_writer(records) 1090 1091 for filename in os.listdir("../../Tests/EMBL"): 1092 if not filename.endswith(".embl"): 1093 continue 1094 print filename 1095 1096 handle = open("../../Tests/EMBL/%s" % filename) 1097 records = list(EmblIterator(handle)) 1098 handle.close() 1099 1100 check_genbank_writer(records) 1101 check_embl_writer(records) 1102 1103 from Bio import SeqIO 1104 for filename in os.listdir("../../Tests/SwissProt"): 1105 if not filename.startswith("sp"): 1106 continue 1107 print filename 1108 1109 handle = open("../../Tests/SwissProt/%s" % filename) 1110 records = list(SeqIO.parse(handle, "swiss")) 1111 handle.close() 1112 1113 check_genbank_writer(records) 1114