StarFile module
1.This Software copyright \u00A9 Australian Synchrotron Research Program Inc, ("ASRP").
2.Subject to ensuring that this copyright notice and licence terms appear on all copies and all modified versions, of PyCIFRW computer code ("this Software"), a royalty-free non-exclusive licence is hereby given (i) to use, copy and modify this Software including the use of reasonable portions of it in other software and (ii) to publish, bundle and otherwise re-distribute this Software or modified versions of this Software to third parties, provided that this copyright notice and terms are clearly shown as applying to all parts of software derived from this Software on each occasion it is published, bundled or re-distributed. You are encouraged to communicate useful modifications to ASRP for inclusion for future versions.
3.No part of this Software may be sold as a standalone package.
4.If any part of this Software is bundled with Software that is sold, a free copy of the relevant version of this Software must be made available through the same distribution channel (be that web server, tape, CD or otherwise).
5.It is a term of exercise of any of the above royalty free licence rights that ASRP gives no warranty, undertaking or representation whatsoever whether express or implied by statute, common law, custom or otherwise, in respect of this Software or any part of it. Without limiting the generality of the preceding sentence, ASRP will not be liable for any injury, loss or damage (including consequential loss or damage) or other loss, loss of profits, costs, charges or expenses however caused which may be suffered, incurred or arise directly or indirectly in respect of this Software.
- This Software is not licenced for use in medical applications.
"""
1.This Software copyright \u00A9 Australian Synchrotron Research Program Inc, ("ASRP").
2.Subject to ensuring that this copyright notice and licence terms
appear on all copies and all modified versions, of PyCIFRW computer
code ("this Software"), a royalty-free non-exclusive licence is hereby
given (i) to use, copy and modify this Software including the use of
reasonable portions of it in other software and (ii) to publish,
bundle and otherwise re-distribute this Software or modified versions
of this Software to third parties, provided that this copyright notice
and terms are clearly shown as applying to all parts of software
derived from this Software on each occasion it is published, bundled
or re-distributed. You are encouraged to communicate useful
modifications to ASRP for inclusion for future versions.
3.No part of this Software may be sold as a standalone package.
4.If any part of this Software is bundled with Software that is sold,
a free copy of the relevant version of this Software must be made
available through the same distribution channel (be that web server,
tape, CD or otherwise).
5.It is a term of exercise of any of the above royalty free licence
rights that ASRP gives no warranty, undertaking or representation
whatsoever whether express or implied by statute, common law, custom
or otherwise, in respect of this Software or any part of it. Without
limiting the generality of the preceding sentence, ASRP will not be
liable for any injury, loss or damage (including consequential loss or
damage) or other loss, loss of profits, costs, charges or expenses
however caused which may be suffered, incurred or arise directly or
indirectly in respect of this Software.
6. This Software is not licenced for use in medical applications.
"""
from types import *
from urllib import * # for arbitrary opening
import re
import copy
# For Python 2.6 or higher compatibility
try:
set
except NameError:
import sets
set = sets.Set
class StarList(list):
pass
class StarDict(dict):
pass
class LoopBlock(object):
def __init__(self,data = (), dimension = 0, maxoutlength=2048, wraplength=80, overwrite=True,
characterset='ascii'):
# print 'Creating new loop block, dimension %d' % dimension
self.block = {}
self.loops = []
self.no_packets = 0
self.item_order = []
self.formatting_hints = {}
self.lower_keys = [] #for efficiency
self.value_switch = False #prefer string version always
self.comment_list = {}
self.dimension = dimension
self.dictionary = None #DDLm dictionary
self.popout = False #used during load iteration
self.curitem = -1 #used during iteration
self.maxoutlength = maxoutlength
self.wraplength = wraplength
self.overwrite = overwrite
self.characterset = characterset
if not hasattr(self,'loopclass'): #in case are derived class
self.loopclass = LoopBlock #when making new loops
if self.characterset == 'ascii':
self.char_check = re.compile("[][ \n\r\t!%&\(\)*+,./:<=>?@0-9A-Za-z\\\\^`{}\|~\"#$';_-]+",re.M)
elif self.characterset == 'unicode':
self.char_check = re.compile(u"[][ \n\r\t!%&\(\)*+,./:<=>?@0-9A-Za-z\\\\^`{}\|~\"#$';_\u00A0-\uD7FF\uE000-\uFDCF\uFDF0-\uFFFD\U00010000-\U0010FFFD-]+",re.M)
else:
raise StarError("No character set specified")
if isinstance(data,(TupleType,ListType)):
for item in data:
self.AddLoopItem(item)
elif isinstance(data,LoopBlock):
self.block = data.block.copy()
self.item_order = data.item_order[:]
self.lower_keys = data.lower_keys[:]
self.comment_list = data.comment_list.copy()
self.dimension = data.dimension
# loops as well
for loopno in range(len(data.loops)):
try:
placeholder = self.item_order.index(data.loops[loopno])
except ValueError:
print "Warning: loop %s (%s) in loops, but not in item_order (%s)" % (`data.loops[loopno]`,str(data.loops[loopno]),`self.item_order`)
placeholder = -1
self.item_order.remove(data.loops[loopno]) #gone
# newobject = self.loopclass(data.loops[loopno])
# print "Recasting and adding loop %s -> %s" % (`data.loops[loopno]`,`newobject`)
self.insert_loop(data.loops[loopno],position=placeholder)
def __str__(self):
return self.printsection()
def __setitem__(self,key,value):
if key == "saves":
raise StarError("""Setting the saves key is deprecated. Add the save block to
an enclosing block collection (e.g. CIF or STAR file) with this block as child""")
self.AddLoopItem((key,value))
def __getitem__(self,key):
if isinstance(key,IntType): #return a packet!!
return self.GetPacket(key)
if key == "saves":
raise StarError("""The saves key is deprecated. Access the save block from
the enclosing block collection (e.g. CIF or STAR file object)""")
try:
rawitem,is_value = self.GetLoopItemValue(key)
except KeyError:
if self.dictionary:
# send the dictionary the required key and a pointer to us
new_value = self.dictionary.derive_item(key,self,store_value=True)
print 'Set %s to derived value %s' % (key, `new_value`)
return new_value
else:
raise KeyError, 'No such item: %s' % key
# we now have an item, we can try to convert it to a number if that is appropriate
# note numpy values are never stored but are converted to lists
if not self.dictionary or not self.dictionary.has_key(key) or is_value: return rawitem
return self.dictionary.change_type(key,rawitem)
def __delitem__(self,key):
self.RemoveLoopItem(key)
def __len__(self):
blen = len(self.block)
for aloop in self.loops:
# print 'Aloop is %s' % `aloop`
blen = blen + len(aloop) # also a LoopBlock
return blen
def __nonzero__(self):
if self.__len__() > 0: return 1
return 0
# keys returns all internal keys
def keys(self):
thesekeys = self.block.keys()
for aloop in self.loops:
thesekeys.extend(aloop.keys())
return thesekeys
def values(self):
ourkeys = self.keys()
return map(lambda a:self[a],ourkeys)
def items(self):
ourkeys = self.keys()
return map(lambda a,b:(a,b),self.keys(),self.values())
def has_key(self,key):
if isinstance(key,StringTypes) and key.lower() in self.lower_keys:
return 1
for aloop in self.loops:
if aloop.has_key(key): return 1
return 0
def get(self,key,default=None):
if self.has_key(key):
retval = self.GetLoopItem(key)
else:
retval = default
return retval
def clear(self):
self.block = {}
self.loops = []
self.item_order = []
self.lower_keys = []
self.no_packets = 0
# doesn't appear to work
def copy(self):
newcopy = LoopBlock(dimension = self.dimension)
newcopy.block = self.block.copy()
newcopy.loops = []
newcopy.no_packets = self.no_packets
newcopy.item_order = self.item_order[:]
newcopy.lower_keys = self.lower_keys[:]
for loop in self.loops:
try:
placeholder = self.item_order.index(loop)
except ValueError:
print "Warning: loop %s (%s) in loops, but not in item_order (%s)" % (`loop`,str(loop),`self.item_order`)
placeholder = -1
newcopy.item_order.remove(loop) #gone
newobject = loop.copy()
# print "Adding loop %s -> %s" % (`loop`,`newobject`)
newcopy.insert_loop(newobject,position=placeholder)
return newcopy
# this is not appropriate for subloops. Instead, the loop block
# should be accessed directly for update
def update(self,adict):
for key in adict.keys():
self.AddLoopItem((key,adict[key]))
def load_iter(self,coords=[]):
count = 0 #to create packet index
while not self.popout:
# ok, we have a new packet: append a list to our subloops
for aloop in self.loops:
aloop.new_enclosing_packet()
for iname in self.item_order:
if isinstance(iname,LoopBlock): #into a nested loop
for subitems in iname.load_iter(coords=coords+[count]):
# print 'Yielding %s' % `subitems`
yield subitems
# print 'End of internal loop'
else:
if self.dimension == 0:
# print 'Yielding %s' % `self[iname]`
yield self,self[iname]
else:
backval = self.block[iname]
for i in range(len(coords)):
# print 'backval, coords: %s, %s' % (`backval`,`coords`)
backval = backval[coords[i]]
yield self,backval
count = count + 1 # count packets
self.popout = False # reinitialise
# print 'Finished iterating'
yield self,'###Blank###' #this value should never be used
# an experimental fast iterator for level-1 loops (ie CIF)
def fast_load_iter(self):
targets = map(lambda a:self.block[a],self.item_order)
while targets:
for target in targets:
yield self,target
# Add another list of the required shape to take into account a new outer packet
def new_enclosing_packet(self):
if self.dimension > 1: #otherwise have a top-level list
for iname in self.keys(): #includes lower levels
target_list = self[iname]
for i in range(3,self.dimension): #dim 2 upwards are lists of lists of...
target_list = target_list[-1]
target_list.append([])
# print '%s now %s' % (iname,`self[iname]`)
def recursive_iter(self,dict_so_far={},coord=[]):
# print "Recursive iter: coord %s, keys %s, dim %d" % (`coord`,`self.block.keys()`,self.dimension)
my_length = 0
top_items = self.block.items()
top_values = self.block.values() #same order as items
drill_values = self.block.values()
for dimup in range(0,self.dimension): #look higher in the tree
if len(drill_values)>0: #this block has values
drill_values=drill_values[0] #drill in
else:
raise StarError("Malformed loop packet %s" % `top_items[0]`)
my_length = len(drill_values[0]) #length of 'string' entry
if self.dimension == 0: #top level
for aloop in self.loops:
for apacket in aloop.recursive_iter():
# print "Recursive yielding %s" % `dict(top_items + apacket.items())`
prep_yield = StarPacket(top_values+apacket.values()) #straight list
for name,value in top_items + apacket.items():
setattr(prep_yield,name,value)
yield prep_yield
else: #in some loop
for i in range(my_length):
kvpairs = map(lambda a:(a,self.coord_to_group(a,coord)[i]),self.block.keys())
kvvals = map(lambda a:a[1],kvpairs) #just values
# print "Recursive kvpairs at %d: %s" % (i,`kvpairs`)
if self.loops:
for aloop in self.loops:
for apacket in aloop.recursive_iter(coord=coord+[i]):
# print "Recursive yielding %s" % `dict(kvpairs + apacket.items())`
prep_yield = StarPacket(kvvals+apacket.values())
for name,value in kvpairs + apacket.items():
setattr(prep_yield,name,value)
yield prep_yield
else: # we're at the bottom of the tree
# print "Recursive yielding %s" % `dict(kvpairs)`
prep_yield = StarPacket(kvvals)
for name,value in kvpairs:
setattr(prep_yield,name,value)
yield prep_yield
# small function to use the coordinates.
def coord_to_group(self,dataname,coords):
if not isinstance(dataname,StringTypes):
return dataname # flag inner loop processing
newm = self[dataname] # newm must be a list or tuple
for c in coords:
# print "Coord_to_group: %s ->" % (`newm`),
newm = newm[c]
# print `newm`
return newm
def flat_iterator(self):
if self.dimension == 0:
yield copy.copy(self)
else:
my_length = 0
top_keys = self.block.keys()
if len(top_keys)>0:
my_length = len(self.block[top_keys[0]])
for pack_no in range(my_length):
yield(self.collapse(pack_no))
def insert_loop(self,newloop,position=-1,audit=True):
# check that new loop is kosher
if newloop.dimension != self.dimension + 1:
raise StarError( 'Insertion of loop of wrong nesting level %d, should be %d' % (newloop.dimension, self.dimension+1))
self.loops.append(newloop)
if audit:
dupes = self.audit()
if dupes:
dupenames = map(lambda a:a[0],dupes)
raise StarError( 'Duplicate names: %s' % `dupenames`)
if position >= 0:
self.item_order.insert(position,newloop)
else:
self.item_order.append(newloop)
# print "Insert loop: item_order now" + `self.item_order`
def remove_loop(self,oldloop):
# print "Removing %s: item_order %s" % (`oldloop`,self.item_order)
# print "Length %d" % len(oldloop)
self.item_order.remove(oldloop)
self.loops.remove(oldloop)
def AddComment(self,itemname,comment):
self.comment_list[itemname.lower()] = comment
def RemoveComment(self,itemname):
del self.comment_list[itemname.lower()]
def GetLoopItem(self,itemname):
"""Return value of itemname in this loop block"""
return self.GetLoopItemValue(itemname)[0]
def GetLoopItemValue(self,itemname):
"""Return value of itemname and whether or not it is a native value"""
# assume case is correct first
try:
s,v = self.block[itemname]
except KeyError:
for loop in self.loops:
try:
return loop.GetLoopItemValue(itemname)
except KeyError:
pass
if itemname.lower() in self.lower_keys:
# it is there somewhere, now we need to find it
real_keys = self.block.keys()
lower_keys = map(lambda a:a.lower(),self.block.keys())
try:
k_index = lower_keys.index(itemname.lower())
except ValueError: #should never happen!!
raise KeyError, 'Bug: Item %s unexpectedly not in block' % itemname
s,v = self.block[real_keys[k_index]]
else:
raise KeyError, 'Item %s not in block' % itemname
# prefer string value unless all are None
if self.dimension == 0:
if s is not None:
return s,False
else:
return v,not isinstance(v,StarList)
elif None not in s: return s,False
else:
if len(v)>0:
return v,not isinstance(v[0],StarList)
return v,True
def RemoveLoopItem(self,itemname):
if self.has_key(itemname):
testkey = itemname.lower()
real_keys = self.block.keys()
lower_keys = map(lambda a:a.lower(),real_keys)
try:
k_index = lower_keys.index(testkey)
except ValueError: #must be in a lower loop
for aloop in self.loops:
if aloop.has_key(itemname):
# print "Deleting %s (%s)" % (itemname,aloop[itemname])
del aloop[itemname]
if len(aloop)==0: # all gone
self.remove_loop(aloop)
break
else:
del self.block[real_keys[k_index]]
self.lower_keys.remove(testkey)
# now remove the key in the order list
for i in range(len(self.item_order)):
if isinstance(self.item_order[i],StringTypes): #may be loop
if self.item_order[i].lower()==testkey:
del self.item_order[i]
break
if len(self.block)==0: #no items in loop, length -> 0
self.no_packets = 0
return #no duplicates, no more checking needed
def AddLoopItem(self,incomingdata,precheck=False,maxlength=-1):
# print "Received data %s" % `incomingdata`
# we accept tuples, strings, lists and dicts!!
# Direct insertion: we have a string-valued key, with an array
# of values -> single-item into our loop
if isinstance(incomingdata[0],(TupleType,ListType)):
# internal loop
# first we remove any occurences of these datanames in
# other loops
for one_item in incomingdata[0]:
if self.has_key(one_item):
if not self.overwrite:
raise StarError( 'Attempt to insert duplicate item name %s' % incomingdata[0])
else:
del self[one_item]
newloop = self.loopclass(dimension = self.dimension+1,characterset=self.characterset)
keyvals = zip(incomingdata[0],incomingdata[1])
for key,val in keyvals:
newloop.AddLoopItem((key,val))
self.insert_loop(newloop)
elif not isinstance(incomingdata[0],StringTypes):
raise TypeError, 'Star datanames are strings only (got %s)' % `incomingdata[0]`
else:
data = list(incomingdata) #copy
if data[1] == [] or get_dim(data[1])[0] == self.dimension:
if not precheck:
self.check_data_name(data[0],maxlength) # make sure no nasty characters
# check that we can replace data
if not self.overwrite:
if self.has_key(data[0]):
raise StarError( 'Attempt to insert duplicate item name %s' % data[0])
# put the data in the right container
regval,empty_val = self.regularise_data(data[1])
# check for pure string data
pure_string = check_stringiness(regval)
if not precheck:
self.check_item_value(regval)
if self.dimension > 0:
if self.no_packets <= 0:
self.no_packets = len(data[1]) #first item in this loop
if len(data[1]) != self.no_packets:
raise StarLengthError, 'Not enough values supplied for %s' % (data[0])
try:
oldpos = self.GetItemPosition(data[0])
except ValueError:
oldpos = len(self.item_order)#end of list
self.RemoveLoopItem(data[0]) # may be different case (upper/lower), so have to do this
if pure_string:
self.block.update({data[0]:[regval,empty_val]}) # trust the data is OK
else:
self.block.update({data[0]:[empty_val,regval]})
self.lower_keys.insert(oldpos,data[0].lower())
self.item_order.insert(oldpos,data[0])
# self.lower_keys.append(data[0].lower())
# self.item_order.append(data[0])
else: #dimension mismatch
# single-member lists could be seen as bare lists...
if isinstance(data[1],(TupleType,ListType)) and len(data[1])==1:
self.AddLoopItem(data[0],data[1][0])
# if that doesn't work, make the dataname list a compound item for inserting a loop
else:
self.AddLoopItem(((data[0],),(data[1],)))
# raise StarLengthError, "input data dim %d != required dim %d: %s %s" % (get_dim(data[1])[0],self.dimension,data[0],`data[1]`)
def check_data_name(self,dataname,maxlength=-1):
if maxlength > 0:
if len(dataname)>maxlength:
raise StarError( 'Dataname %s exceeds maximum length %d' % (dataname,maxlength))
if dataname[0]!='_':
raise StarError( 'Dataname ' + dataname + ' does not begin with _')
if self.characterset=='ascii':
if len (filter (lambda a: ord(a) < 33 or ord(a) > 126, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains forbidden characters')
else:
# print 'Checking %s for unicode characterset conformance' % dataname
if len (filter (lambda a: ord(a) < 33, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains forbidden characters (below code point 33)')
if len (filter (lambda a: ord(a) > 126 and ord(a) < 160, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains forbidden characters (between code point 127-159)')
if len (filter (lambda a: ord(a) > 0xD7FF and ord(a) < 0xE000, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains unsupported characters (between U+D800 and U+E000)')
if len (filter (lambda a: ord(a) > 0xFDCF and ord(a) < 0xFDF0, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains unsupported characters (between U+FDD0 and U+FDEF)')
if len (filter (lambda a: ord(a) == 0xFFFE or ord(a) == 0xFFFF, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains unsupported characters (U+FFFE and/or U+FFFF)')
if len (filter (lambda a: ord(a) > 0x10000 and (ord(a) & 0xE == 0xE) , dataname)) > 0:
print '%s fails' % dataname
for a in dataname: print '%x' % ord(a),
print
raise StarError( u'Dataname ' + dataname + u' contains unsupported characters (U+xFFFE and/or U+xFFFF)')
def check_item_value(self,item):
test_item = item
if not isinstance(item,(list,dict,tuple)):
test_item = [item] #single item list
def check_one (it):
if isinstance(it,basestring):
if it=='': return
me = self.char_check.match(it)
if not me:
print "Fail value check: %s" % it
raise StarError, u'Bad character in %s' % it
else:
if me.span() != (0,len(it)):
print "Fail value check, match only %d-%d in string %s" % (me.span()[0],me.span()[1],`it`)
raise StarError,u'Data item "' + `it` + u'"... contains forbidden characters'
map(check_one,test_item)
def regularise_data(self,dataitem):
"""Place dataitem into a list if necessary"""
from numbers import Number
if isinstance(dataitem,(Number,basestring,StarList,StarDict)):
return dataitem,None
if isinstance(dataitem,(tuple,list)):
return dataitem,[None]*len(dataitem)
# so try to make into a list
try:
regval = list(dataitem)
except TypeError, value:
raise StarError( str(dataitem) + ' is wrong type for data value\n' )
return regval,[None]*len(regval)
def GetLoop(self,keyname):
if not self.has_key(keyname):
raise KeyError, 'Item %s does not exist' % keyname
if keyname.lower() in self.lower_keys: #python 2.2 or above
return self
for aloop in self.loops:
try:
return aloop.GetLoop(keyname)
except KeyError:
pass
raise KeyError, 'Item %s does not exist' % keyname
def GetPacket(self,index):
thispack = StarPacket([])
for myitem in self.item_order:
if isinstance(myitem,LoopBlock):
pack_list = [myitem[b][index] for b in myitem.item_order]
# print 'Pack_list -> %s' % `pack_list`
thispack.append(pack_list)
elif self.dimension==0:
thispack.append(self[myitem])
else:
thispack.append(self[myitem][index])
setattr(thispack,myitem,thispack[-1])
return thispack
def AddPacket(self,packet):
if self.dimension==0:
raise StarError,"Attempt to add packet to top level block"
for myitem in self.item_order:
self[myitem] = list(self[myitem]) #in case we have stored a tuple
self[myitem].append(packet.__getattribute__(myitem))
self.no_packets +=1
# print "%s now %s" % (myitem,`self[myitem]`)
def RemoveKeyedPacket(self,keyname,keyvalue):
packet_coord = list(self[keyname]).index(keyvalue)
loophandle = self.GetLoop(keyname)
for dataname in loophandle.item_order:
loophandle.block[dataname][0] = list(loophandle.block[dataname][0])
del loophandle.block[dataname][0][packet_coord]
loophandle.block[dataname][1] = list(loophandle.block[dataname][1])
del loophandle.block[dataname][1][packet_coord]
self.no_packets -= 1
def GetKeyedPacket(self,keyname,keyvalue,no_case=False):
"""Return the loop packet where [[keyname]] has value [[keyvalue]]. Ignore case if no_case is true"""
#print "Looking for %s in %s" % (keyvalue, self[keyname])
my_loop = self.GetLoop(keyname)
if no_case:
one_pack= [a for a in my_loop if getattr(a,keyname).lower()==keyvalue.lower()]
else:
one_pack= [a for a in my_loop if getattr(a,keyname)==keyvalue]
if len(one_pack)!=1:
raise ValueError, "Bad packet key %s = %s: returned %d packets" % (keyname,keyvalue,len(one_pack))
#print "Keyed packet: %s" % one_pack[0]
return one_pack[0]
def GetKeyedSemanticPacket(self,keyvalue,cat_id):
"""Return a complete packet for category cat_id"""
target_keys = self.dictionary.cat_key_table[cat_id]
p = StarPacket()
# set case-sensitivity flag
lcase = False
if self.dictionary[target_keys[0]]['_type.contents'] in ['Code','Tag','Name']:
lcase = True
for cat_key in target_keys:
try:
extra_packet = self.GetKeyedPacket(cat_key,keyvalue,no_case=lcase)
except KeyError: #try to create the key
key_vals = self[cat_key] #will create a key column
p.merge_packet(self.GetKeyedPacket(cat_key,keyvalue,no_case=lcase))
# the following attributes used to calculate missing values
p.key = target_keys[0]
p.cif_dictionary = self.dictionary
p.fulldata = self
return p
def GetItemOrder(self):
return self.item_order[:]
def ChangeItemOrder(self,itemname,newpos):
testpos = self.GetItemPosition(itemname)
del self.item_order[testpos]
# so we have an object ready for action
self.item_order.insert(newpos,itemname)
def GetItemPosition(self,itemname):
import string
def low_case(item):
try:
return string.lower(item)
except AttributeError:
return item
try:
testname = string.lower(itemname)
except AttributeError:
testname = itemname
lowcase_order = map(low_case,self.item_order)
return lowcase_order.index(testname)
def collapse(self,packet_no):
if self.dimension == 0:
raise StarError( "Attempt to select non-existent packet")
newlb = LoopBlock(dimension=self.dimension-1)
for one_item in self.item_order:
if isinstance(one_item,LoopBlock):
newlb.insert_loop(one_item.collapse(packet_no))
else:
# print "Collapse: %s -> %s" % (one_item,`self[one_item][packet_no]`)
newlb[one_item] = self[one_item][packet_no]
return newlb
def audit(self):
allkeys = self.keys()
uniquenames = set(allkeys)
if len(uniquenames) == len(allkeys): return []
else:
keycount = map(lambda a:(a,allkeys.count(a)),uniquenames)
return filter(lambda a:a[1]>1,keycount)
def GetLoopNames(self,keyname):
if keyname in self:
return self.keys()
for aloop in self.loops:
try:
return aloop.GetLoopNames(keyname)
except KeyError:
pass
raise KeyError, 'Item does not exist'
def AddToLoop(self,dataname,loopdata):
thisloop = self.GetLoop(dataname)
for itemname,itemvalue in loopdata.items():
thisloop[itemname] = itemvalue
def Loopify(self,datanamelist):
thisloop = self.GetLoop(datanamelist[0])
badmatch = filter(lambda a:a in datanamelist,thisloop.keys())
if len(badmatch)==len(datanamelist): #all at same level so is OK
newloop = LoopBlock(dimension=self.dimension+1)
for name in datanamelist:
newloop[name]=[self[name]]
del self[name]
self.insert_loop(newloop)
def SetOutputLength(self,wraplength=80,maxoutlength=2048):
if wraplength > maxoutlength:
raise StarError("Wrap length (requested %d) must be <= Maximum line length (requested %d)" % (wraplength,maxoutlength))
self.wraplength = wraplength
self.maxoutlength = maxoutlength
for loop in self.loops:
loop.SetOutputLength(wraplength,maxoutlength)
def printsection(self,instring='',ordering=[],blockstart="",blockend="",indent=0,coord=[]):
import string
# first make an ordering
self.create_ordering(ordering)
# now do it...
if not instring:
outstring = CIFStringIO(target_width=80) # the returned string
else:
outstring = instring
if not coord:
coords = [0]*(self.dimension-1)
else:
coords = coord
if(len(coords)0:
#print "Remaining to output " + `self.output_order`
itemname = self.output_order.pop(0)
item_spec = [i for i in ordering if i['dataname'].lower()==itemname.lower()]
if len(item_spec)>0:
col_pos = item_spec[0].get('column',-1)
else:
col_pos = -1
item_spec = {}
if self.dimension == 0: # ie value next to tag
if not isinstance(itemname,LoopBlock): #no loop
if col_pos < 0: col_pos = 40
outstring.set_tab(col_pos)
itemvalue = self[itemname]
outstring.write(itemname,mustbreak=True,do_tab=False)
outstring.write(' ',canbreak=True,do_tab=False) #space after itemname
self.format_value(itemvalue,outstring,hints=item_spec)
else: # we are asked to print an internal loop block
#first make sure we have sensible coords. Length should be one
#less than the current dimension
outstring.set_tab(10) #guess this is OK?
outstring.write(' '*indent,mustbreak=True,do_tab=False); outstring.write('loop_\n',do_tab=False)
itemname.format_names(outstring,indent+2)
itemname.format_packets(outstring,coords,indent+2)
else: # we are a nested loop
outstring.write(' '*indent,mustbreak=True,do_tab=False); outstring.write('loop_\n',do_tab=False)
self.format_names(outstring,indent+2)
self.format_packets(outstring,coords,indent+2)
if instring: return #inside a recursion
else:
returnstring = outstring.getvalue()
outstring.close()
return returnstring
def format_names(self,outstring,indent=0):
temp_order = self.item_order[:]
while len(temp_order)>0:
itemname = temp_order.pop(0)
if isinstance(itemname,StringTypes): #(not loop)
outstring.write(' ' * indent,do_tab=False)
outstring.write(itemname,do_tab=False)
outstring.write("\n",do_tab=False)
else: # a loop
outstring.write(' ' * indent,do_tab=False)
outstring.write("loop_\n",do_tab=False)
itemname.format_names(outstring,indent+2)
outstring.write(" stop_\n",do_tab=False)
def format_packets(self,outstring,coordinates,indent=0):
import cStringIO
import string
# get our current group of data
# print 'Coords: %s' % `coordinates`
alldata = map(lambda a:self.coord_to_group(a,coordinates),self.item_order)
# print 'Alldata: %s' % `alldata`
packet_data = apply(zip,alldata)
# print 'Packet data: %s' % `packet_data`
for position in range(len(packet_data)):
for point in range(len(packet_data[position])):
datapoint = packet_data[position][point]
packstring = self.format_packet_item(datapoint,indent,outstring)
outstring.write("\n",do_tab=False)
def format_packet_item(self,pack_item,indent,outstring):
# print 'Formatting %s' % `pack_item`
if isinstance(pack_item,(StringType,UnicodeType,IntType,FloatType,LongType,StarList,StarDict)):
if isinstance(pack_item,StringTypes):
outstring.write(self._formatstring(pack_item))
else:
self.format_value(pack_item,outstring)
outstring.write(' ',canbreak=True,do_tab=False)
# Now, for each nested loop we call ourselves again
else: # a nested packet
if not isinstance(pack_item[0],(ListType,TupleType)): #base packet
item_list = pack_item
else:
item_list = apply(zip,pack_item)
for sub_item in item_list:
outstring.write(' ' + self.format_packet_item(sub_item,indent,outstring),canbreak=True)
# stop_ is not issued at the end of each innermost packet
if isinstance(pack_item[0],(ListType,TupleType)):
outstring.write(' stop_ ',canbreak=True)
def _formatstring(self,instring,delimiter=None,standard='CIF1',indent=0,
lbprotocol=True,pref_protocol=True):
import string
if standard == 'CIF2':
allowed_delimiters = set(['"',"'",";",None,'"""',"'''"])
else:
allowed_delimiters = set(['"',"'",";",None])
if len(instring)==0: allowed_delimiters.difference_update([None])
if len(instring) > (self.maxoutlength-2) or '\n' in instring:
allowed_delimiters.intersection_update([";","'''",'"""'])
if ' ' in instring or '\t' in instring or '\v' in instring or '_' in instring or ',' in instring:
allowed_delimiters.difference_update([None])
if '"' in instring: allowed_delimiters.difference_update(['"',None])
if "'" in instring: allowed_delimiters.difference_update(["'",None])
out_delimiter = ";" #default (most conservative)
if delimiter in allowed_delimiters:
out_delimiter = delimiter
elif "'" in allowed_delimiters: out_delimiter = "'"
elif '"' in allowed_delimiters: out_delimiter = '"'
if out_delimiter in ['"',"'",'"""',"'''"]: return out_delimiter + instring + out_delimiter
elif out_delimiter is None: return instring
# we are left with semicolon strings
outstring = "\n;"
# if there are returns in the string, try to work with them
while 1:
retin = string.find(instring,'\n')+1
if retin < self.maxoutlength and retin > 0: # honour this break
outstring = outstring + instring[:retin]
instring = instring[retin:]
elif len(instring)0:
self.format_value(itemvalue[0],stringsink)
for listval in itemvalue[1:]:
print 'Formatting %s' % `listval`
stringsink.write(', ',do_tab=False)
self.format_value(listval,stringsink,compound=True)
stringsink.write(']',unindent=True)
elif isinstance(itemvalue,StarDict):
stringsink.set_tab(0)
stringsink.write('{',newindent=True,mustbreak=compound) #start a new line inside
items = itemvalue.items()
if len(items)>0:
stringsink.write("'"+items[0][0]+"'"+':',canbreak=True)
self.format_value(items[0][1],stringsink)
for key,value in items[1:]:
stringsink.write(', ')
stringsink.write("'"+key+"'"+":",canbreak=True)
self.format_value(value,stringsink) #never break between key and value
stringsink.write('}',unindent=True)
else:
stringsink.write(str(itemvalue),canbreak=True) #numbers
def process_template(self,template_string):
"""Process a template datafile to formatting instructions"""
template_as_cif = StarFile(StringIO(template_string),grammar="DDLm").first_block()
#template_as_lines = template_string.split("\n")
#template_as_lines = [l for l in template_as_lines if len(l)>0 and l[0]!='#']
#template_as_lines = [l for l in template_as_lines if l.split()[0] != 'loop_']
#template_full_lines = dict([(l.split()[0],l) for l in template_as_lines if len(l.split())>0])
self.form_hints = [] #ordered array of hint dictionaries
for item in template_as_cif.item_order: #order of input
if not isinstance(item,LoopBlock): #not nested
hint_dict = {"dataname":item}
# find the line in the file
start_pos = re.search("(^[ \t]*" + item + "[ \t\n]+)(?P([\S]+)|(^;))",template_string,re.I|re.M)
if start_pos.group("spec") != None:
spec_pos = start_pos.start("spec")-start_pos.start(0)
spec_char = template_string[start_pos.start("spec")]
if spec_char in '\'";':
hint_dict.update({"delimiter":spec_char})
if spec_char != ";": #so we need to work out the column number
hint_dict.update({"column":spec_pos})
print '%s: %s' % (item,`hint_dict`)
self.form_hints.append(hint_dict)
else: #loop block
testname = item.item_order[0]
#find the loop spec line in the file
loop_regex = "(^[ \t]*loop_[ \t\n\r]+" + testname + "([ \t\n\r]+_[\S]+){%d}[ \t]*$(?P(.(?!_loop|_[\S]+))*))" % (len(item.item_order) - 1)
loop_line = re.search(loop_regex,template_string,re.I|re.M|re.S)
loop_so_far = loop_line.end()
packet_text = loop_line.group('packet')
packet_regex = "[ \t]*(?P(?P'([^\n\r\f']*)'+)|(?P\"([^\n\r\"]*)\"+)|(?P[^\s]+))"
packet_pos = re.finditer(packet_regex,packet_text)
line_end_pos = re.finditer("^",packet_text,re.M)
next_end = line_end_pos.next().end()
last_end = next_end
for loopname in item.item_order:
hint_dict = {"dataname":loopname}
thismatch = packet_pos.next()
while thismatch.start('all') > next_end:
try:
last_end = next_end
next_end = line_end_pos.next().start()
print 'next end %d' % next_end
except StopIteration:
pass
print 'Start %d, last_end %d' % (thismatch.start('all'),last_end)
col_pos = thismatch.start('all') - last_end
if thismatch.group('none') is None:
hint_dict.update({'delimiter':thismatch.groups()[0][0]})
hint_dict.update({'column':col_pos})
print '%s: %s' % (loopname,`hint_dict`)
self.form_hints.append(hint_dict)
return
def create_ordering(self,order_dict):
"""Create a canonical ordering that includes loops using our formatting hints dictionary"""
requested_order = [i['dataname'] for i in order_dict]
new_order = []
for item in requested_order:
if isinstance(item,basestring) and item.lower() in self.item_order:
new_order.append(item)
elif self.has_key(item): #in a loop somewhere
target_loop = self.GetLoop(item)
target_loop.create_ordering(order_dict)
new_order.append(self.GetLoop(item))
extras = [i for i in self.item_order if i not in new_order]
self.output_order = new_order + extras
print 'Final order: ' + `self.output_order`
class StarBlock(LoopBlock):
def copy(self):
newblock = super(StarBlock,self).copy()
return self.copy.im_class(newblock) #catch inheritance
def merge(self,new_block,mode="strict",match_att=[],match_function=None,
rel_keys = []):
if mode == 'strict':
for key in new_block.item_order:
if self.has_key(key) and key not in match_att:
raise CifError( "Identical keys %s in strict merge mode" % key)
elif key not in match_att: #no change otherwise
if isinstance(key,StringTypes):
self[key] = new_block[key]
else:
self.insert_loop(key)
elif mode == 'replace':
newkeys = new_block.keys()
for ma in match_att:
try:
newkeys.remove(ma) #don't touch the special ones
except ValueError:
pass
for key in new_block.item_order:
if isinstance(key,StringTypes):
self[key] = new_block[key]
else:
self.insert_loop(key) #assume is a loop
elif mode == 'overlay':
print 'Overlay mode, current overwrite is %s' % self.overwrite
save_overwrite = self.overwrite
self.overwrite = True
for attribute in new_block.keys():
if attribute in match_att: continue #ignore this one
new_value = new_block[attribute]
#non-looped items
if isinstance(new_value,StringTypes):
self[attribute] = new_value
these_atts = self.keys()
for newloop in new_block.loops:
newkeys = newloop.keys()
# note that the following line determines packet item order
overlaps = filter(lambda a: a in these_atts,newkeys)
if len(overlaps)< len(newloop):#completely new loop
self.insert_loop(newloop)
elif len(overlaps)==len(newloop):
# appending packets
# print "In overlay merge mode, found extra packet items:"
# print `overlaps`
# get key position
loop_keys = filter(lambda a:a in rel_keys,overlaps)
try:
newkeypos = map(lambda a:newkeys.index(a),loop_keys)
newkeypos = newkeypos[0] #one key per loop for now
loop_keys = loop_keys[0]
except (ValueError,IndexError):
newkeypos = []
overlap_data = map(lambda a:listify(self[a]),overlaps) #old packet data
new_data = map(lambda a:new_block[a],overlaps) #new packet data
packet_data = transpose(overlap_data)
new_p_data = transpose(new_data)
# remove any packets for which the keys match between old and new; we
# make the arbitrary choice that the old data stays
if newkeypos:
# get matching values in new list
print "Old, new data:\n%s\n%s" % (`overlap_data[newkeypos]`,`new_data[newkeypos]`)
key_matches = filter(lambda a:a in overlap_data[newkeypos],new_data[newkeypos])
# filter out any new data with these key values
new_p_data = filter(lambda a:a[newkeypos] not in key_matches,new_p_data)
if new_p_data:
new_data = transpose(new_p_data)
else: new_data = []
# wipe out the old data and enter the new stuff
byebyeloop = self.GetLoop(overlaps[0])
# print "Removing '%s' with overlaps '%s'" % (`byebyeloop`,`overlaps`)
# Note that if, in the original dictionary, overlaps are not
# looped, GetLoop will return the block itself. So we check
# for this case...
if byebyeloop != self:
self.remove_loop(byebyeloop)
self.AddLoopItem((overlaps,overlap_data)) #adding old packets
for pd in new_p_data: #adding new packets
if pd not in packet_data:
for i in range(len(overlaps)):
#don't do this at home; we are appending
#to something in place
self[overlaps[i]].append(pd[i])
self.overwrite = save_overwrite
def assign_dictionary(self,dic):
if not dic.diclang=="DDLm":
print "Warning: ignoring dictionary %s" % dic.dic_as_cif.my_uri
return
self.dictionary = dic
def unassign_dictionary(self):
"""Remove dictionary-dependent behaviour"""
self.dictionary = None
class StarPacket(list):
def merge_packet(self,incoming):
"""Merge contents of incoming packet with this packet"""
new_attrs = [a for a in dir(incoming) if a[0] == '_' and a[1] != "_"]
self.append(incoming)
for na in new_attrs:
setattr(self,na,getattr(incoming,na))
def __getattr__(self,att_name):
"""Derive a missing attribute"""
if att_name in ('cif_dictionary','fulldata','key'):
raise AttributeError, 'Programming error: cannot compute value of %s' % att_name
d = self.cif_dictionary
c = self.fulldata
k = self.key
d.derive_item(att_name,c,store_value=True)
#
# now pick out the new value
keyval = getattr(self,k)
full_pack = c.GetKeyedPacket(k,keyval)
return getattr(full_pack,att_name)
# and convert if necessary to actual type
class BlockCollection(object):
def __init__(self,datasource=None,standard='CIF',
characterset='ascii',scoping='instance',parent_id=None,**kwargs):
import collections
self.dictionary = {}
self.standard = standard
self.lower_keys = set() # short_cuts
self.renamed = {}
self.characterset = characterset
self.PC = collections.namedtuple('PC',['block_id','parent'])
self.child_table = {}
self.visible_keys = [] # for efficiency
self.parent_id = parent_id
self.scoping = scoping #will trigger setting of child table
if isinstance(datasource,BlockCollection):
self.merge_fast(datasource)
self.scoping = scoping #reset visibility
elif isinstance(datasource,DictType):
for key,value in datasource.items():
self[key]= value
self.header_comment = ''
def unlock(self):
"""Allow overwriting of all blocks in this collection"""
for a in self.lower_keys:
self[a].overwrite=True
def lock(self):
"""Disallow overwriting for all blocks in this collection"""
for a in self.lower_keys:
self[a].overwrite = False
def __str__(self):
return self.WriteOut()
def __setitem__(self,key,value):
self.NewBlock(key,value,parent=None)
def __getitem__(self,key):
if isinstance(key,StringTypes):
lowerkey = key.lower()
if lowerkey in self.lower_keys:
return self.dictionary[lowerkey]
#print 'Visible keys:' + `self.visible_keys`
#print 'All keys' + `self.lower_keys`
#print 'Child table' + `self.child_table`
raise KeyError,'No such item %s' % key
# we have to get an ordered list of the current keys,
# as we'll have to delete one of them anyway.
# Deletion will delete any key regardless of visibility
def __delitem__(self,key):
dummy = self[key] #raise error if not present
lowerkey = key.lower()
# get rid of all children recursively as well
children = [a[0] for a in self.child_table.items() if a[1].parent == lowerkey]
for child in children:
del self[child] #recursive call
del self.dictionary[lowerkey]
del self.child_table[lowerkey]
try:
self.visible_keys.remove(lowerkey)
except KeyError:
pass
self.lower_keys.remove(lowerkey)
def __len__(self):
return len(self.visible_keys)
def __contains__(self,item):
"""Support the 'in' operator"""
return self.has_key(item)
# We iterate over all visible
def __iter__(self):
for one_block in self.keys():
yield self[one_block]
# TODO: handle different case
def keys(self):
return self.visible_keys
# changes to take case independence into account
def has_key(self,key):
if not isinstance(key,StringTypes): return 0
if key.lower() in self.visible_keys:
return 1
return 0
def get(self,key,default=None):
if self.has_key(key): # take account of case
return self.__getitem__(key)
else:
return default
def clear(self):
self.dictionary.clear()
self.lower_keys = set()
self.child_table = {}
self.visible_keys = []
def copy(self):
newcopy = self.dictionary.copy() #all blocks
newcopy = BlockCollection('',newcopy,parent_id=self.parent_id)
newcopy.child_table = self.child_table.copy()
newcopy.lower_keys = self.lower_keys
newcopy.characterset = self.characterset
newcopy.scoping = self.scoping #this sets visible keys
return newcopy
def update(self,adict):
for key in adict.keys():
self[key] = adict[key]
def items(self):
return [(a,self[a]) for a in self.keys()]
def first_block(self):
"""Return the 'first' block. This is not necessarily the first block in the file."""
if self.keys():
return self[self.keys()[0]]
def NewBlock(self,blockname,blockcontents=None,fix=True,parent=None):
if blockcontents is None:
blockcontents = StarBlock()
if self.standard is not None:
if self.standard == 'CIF':
self.checknamelengths(blockcontents,maxlength=75) #
self.checkloopnesting(blockcontents)
if len(blockname)>75:
raise StarError , 'Blockname %s is longer than 75 characters' % blockname
if fix:
newblockname = re.sub('[ \t]','_',blockname)
else: newblockname = blockname
new_lowerbn = newblockname.lower()
if new_lowerbn in self.lower_keys:
if self.standard is not None: #already there
toplevelnames = [a[0] for a in self.child_table.items() if a[1].parent==None]
if parent is None and new_lowerbn not in toplevelnames: #can give a new key to this one
while new_lowerbn in self.lower_keys: new_lowerbn = new_lowerbn + '+'
elif parent is not None and new_lowerbn in toplevelnames: #can fix a different one
replace_name = new_lowerbn
while replace_name in self.lower_keys: replace_name = replace_name + '+'
self._rekey(new_lowerbn,replace_name)
# now continue on to add in the new block
if parent.lower() == new_lowerbn: #the new block's requested parent just got renamed!!
parent = replace_name
else:
raise StarError( "Attempt to replace existing block " + blockname)
else:
del self[new_lowerbn]
self.dictionary.update({new_lowerbn:blockcontents})
self.lower_keys.add(new_lowerbn)
if parent is None:
self.child_table[new_lowerbn]=self.PC(newblockname,None)
self.visible_keys.append(new_lowerbn)
else:
if parent.lower() in self.lower_keys:
if self.scoping == 'instance':
self.child_table[new_lowerbn]=self.PC(newblockname,parent.lower())
else:
self.child_table[new_lowerbn]=self.PC(newblockname,parent.lower())
self.visible_keys.append(new_lowerbn)
else:
print 'Warning:Parent block %s does not exist for child %s' % (parent,newblockname)
return new_lowerbn #in case calling routine wants to know
def _rekey(self,oldname,newname,block_id=''):
"""The block with key [[oldname]] gets [[newname]] as a new key, but the printed name
does not change unless [[block_id]] is given. Prefer [[rename]] for a safe version."""
move_block = self[oldname] #old block
is_visible = oldname in self.visible_keys
move_block_info = self.child_table[oldname] #old info
move_block_children = [a for a in self.child_table.items() if a[1].parent==oldname]
# now rewrite the necessary bits
self.child_table.update(dict([(a[0],self.PC(a[1].block_id,newname)) for a in move_block_children]))
del self[oldname] #do this after updating child table so we don't delete children
self.dictionary.update({newname:move_block})
self.lower_keys.add(newname)
if block_id == '':
self.child_table.update({newname:move_block_info})
else:
self.child_table.update({newname:self.PC(block_id,move_block_info.parent)})
if is_visible: self.visible_keys += [newname]
def rename(self,oldname,newname):
"""Rename datablock from [[oldname]] to [[newname]]. Both key and printed name are changed. No
conformance checks are conducted."""
realoldname = oldname.lower()
realnewname = newname.lower()
if realnewname in self.lower_keys:
raise StarError,'Cannot change blockname %s to %s as %s already present' % (oldname,newname,newname)
if realoldname not in self.lower_keys:
raise KeyError,'Cannot find old block %s' % realoldname
self._rekey(realoldname,realnewname,block_id=newname)
def merge_fast(self,new_bc,parent=None):
"""Do a fast merge"""
if self.standard is None:
mode = 'replace'
else:
mode = 'strict'
overlap_flag = not self.lower_keys.isdisjoint(new_bc.lower_keys)
if overlap_flag and mode != 'replace':
double_keys = self.lower_keys.intersection(new_bc.lower_keys)
for dup_key in double_keys:
our_parent = self.child_table[dup_key].parent
their_parent = new_bc.child_table[dup_key].parent
if (our_parent is None and their_parent is not None and parent is None) or\
parent is not None: #rename our block
start_key = dup_key
while start_key in self.lower_keys: start_key = start_key+'+'
self._rekey(dup_key,start_key)
if parent.lower() == dup_key: #we just renamed the prospective parent!
parent = start_key
elif our_parent is not None and their_parent is None and parent is None:
start_key = dup_key
while start_key in new_bc.lower_keys: start_key = start_key+'+'
new_bc._rekey(dup_key,start_key)
else:
raise StarError("In strict merge mode:duplicate keys %s" % dup_key)
self.dictionary.update(new_bc.dictionary)
self.lower_keys.update(new_bc.lower_keys)
self.visible_keys += (list(new_bc.lower_keys))
self.child_table.update(new_bc.child_table)
if parent is not None: #redo the child_table entries
reparent_list = [(a[0],a[1].block_id) for a in new_bc.child_table.items() if a[1].parent==None]
reparent_dict = [(a[0],self.PC(a[1],parent.lower())) for a in reparent_list]
self.child_table.update(dict(reparent_dict))
def merge(self,new_bc,mode=None,parent=None,single_block=[],
idblock="",match_att=[],match_function=None):
if mode is None:
if self.standard is None:
mode = 'replace'
else:
mode = 'strict'
if single_block:
self[single_block[0]].merge(new_bc[single_block[1]],mode,
match_att=match_att,
match_function=match_function)
return None
base_keys = [a[1].block_id for a in self.child_table.items()]
block_to_item = base_keys #default
new_keys = [a[1].block_id for a in new_bc.child_table.items()] #get list of incoming blocks
if match_att:
#make a blockname -> item name map
if match_function:
block_to_item = map(lambda a:match_function(self[a]),self.keys())
else:
block_to_item = map(lambda a:self[a].get(match_att[0],None),self.keys())
#print `block_to_item`
for key in new_keys: #run over incoming blocknames
if key == idblock: continue #skip dictionary id
basekey = key #default value
if len(match_att)>0:
attval = new_bc[key].get(match_att[0],0) #0 if ignoring matching
else:
attval = 0
for ii in range(len(block_to_item)): #do this way to get looped names
thisatt = block_to_item[ii] #keyname in old block
#print "Looking for %s in %s" % (attval,thisatt)
if attval == thisatt or \
(isinstance(thisatt,ListType) and attval in thisatt):
basekey = base_keys.pop(ii)
block_to_item.remove(thisatt)
break
if not self.has_key(basekey) or mode=="replace":
new_parent = new_bc.get_parent(key)
if parent is not None and new_parent is None:
new_parent = parent
self.NewBlock(basekey,new_bc[key],parent=new_parent) #add the block
else:
if mode=="strict":
raise StarError( "In strict merge mode: block %s in old and block %s in new files" % (basekey,key))
elif mode=="overlay":
# print "Merging block %s with %s" % (basekey,key)
self[basekey].merge(new_bc[key],mode,match_att=match_att)
else:
raise StarError( "Merge called with unknown mode %s" % mode)
def checknamelengths(self,target_block,maxlength=-1):
if maxlength < 0:
return
else:
toolong = filter(lambda a:len(a)>maxlength, target_block.keys())
outstring = ""
for it in toolong: outstring += "\n" + it
if toolong:
raise StarError( 'Following data names too long:' + outstring)
def checkloopnesting(self,target_block):
"""Check that block doesn't contain nested loops"""
for one_loop in target_block.loops:
if len(one_loop.loops) > 0:
raise StarError('Block contains nested loops')
def get_all(self,item_name):
raw_values = map(lambda a:self[a].get(item_name),self.keys())
raw_values = filter(lambda a:a != None, raw_values)
ret_vals = []
for rv in raw_values:
if isinstance(rv,ListType):
for rvv in rv:
if rvv not in ret_vals: ret_vals.append(rvv)
else:
if rv not in ret_vals: ret_vals.append(rv)
return ret_vals
def __setattr__(self,attr_name,newval):
if attr_name == 'scoping':
if newval not in ('dictionary','instance'):
raise StarError("Star file may only have 'dictionary' or 'instance' scoping, not %s" % newval)
if newval == 'dictionary':
self.visible_keys = [a for a in self.lower_keys]
else:
#only top-level datablocks visible
self.visible_keys = [a[0] for a in self.child_table.items() if a[1].parent==None]
object.__setattr__(self,attr_name,newval)
def get_parent(self,blockname):
"""Return the name of the block enclosing [[blockname]] in canonical form (lower case)"""
possibles = (a for a in self.child_table.items() if a[0] == blockname.lower())
try:
first = possibles.next() #get first one
except:
raise StarError('no parent for %s' % blockname)
try:
second = possibles.next()
except StopIteration:
return first[1].parent
raise StarError('More than one parent for %s' % blockname)
def get_roots(self):
"""Get the top-level blocks"""
return [a for a in self.child_table.items() if a[1].parent==None]
def get_children(self,blockname,include_parent=False,scoping='dictionary'):
"""Get all children of [[blockname]] as a block collection. If [[include_parent]] is
True, the parent block will also be included in the block collection as the root."""
newbc = BlockCollection()
block_lower = blockname.lower()
proto_child_table = [a for a in self.child_table.items() if self.is_child_of_parent(block_lower,a[1].block_id)]
newbc.child_table = dict(proto_child_table)
if not include_parent:
newbc.child_table.update(dict([(a[0],self.PC(a[1].block_id,None)) for a in proto_child_table if a[1].parent == block_lower]))
newbc.lower_keys = set([a[0] for a in proto_child_table])
newbc.dictionary = dict((a[0],self.dictionary[a[0]]) for a in proto_child_table)
if include_parent:
newbc.child_table.update({block_lower:self.PC(self.child_table[block_lower].block_id,None)})
newbc.lower_keys.add(block_lower)
newbc.dictionary.update({block_lower:self.dictionary[block_lower]})
newbc.scoping = scoping
return newbc
def get_immediate_children(self,parentname):
"""Get the next level of children of the given block as a list, without nested levels"""
child_handles = [a for a in self.child_table.items() if a[1].parent == parentname.lower()]
return child_handles
def get_child_list(self,parentname):
"""Get a list of all child categories"""
child_handles = [a[0] for a in self.child_table.items() if self.is_child_of_parent(parentname.lower(),a[0])]
return child_handles
def is_child_of_parent(self,parentname,blockname):
"""Recursively search for children of blockname, case is important for now"""
checkname = parentname.lower()
more_children = [a[0] for a in self.child_table.items() if a[1].parent == checkname]
if blockname.lower() in more_children:
return True
else:
for one_child in more_children:
if self.is_child_of_parent(one_child,blockname): return True
return False
def set_parent(self,parentname,childname):
"""Set the parent block"""
# first check that both blocks exist
if parentname.lower() not in self.lower_keys:
raise KeyError('Parent block %s does not exist' % parentname)
if childname.lower() not in self.lower_keys:
raise KeyError('Child block %s does not exist' % childname)
old_entry = self.child_table[childname.lower()]
self.child_table[childname.lower()]=self.PC(old_entry.block_id,
parentname.lower())
self.scoping = self.scoping #reset visibility
def WriteOut(self,comment='',wraplength=80,maxoutlength=2048):
import cStringIO
if not comment:
comment = self.header_comment
outstring = cStringIO.StringIO()
outstring.write(comment)
# loop over top-level
top_block_names = [(a[0],a[1].block_id) for a in self.child_table.items() if a[1].parent is None]
for blockref,blockname in top_block_names:
outstring.write('\n' + 'data_' +blockname+'\n')
child_names = [(a[0],a[1].block_id) for a in self.child_table.items() if a[1].parent==blockref]
if self.standard == 'Dic': #put contents before save frames
self[blockref].SetOutputLength(wraplength,maxoutlength)
outstring.write(str(self[blockref]))
for child_ref,child_name in child_names:
outstring.write('\n' + 'save_' + child_name + '\n')
self.block_to_string(child_ref,child_name,outstring,4)
outstring.write('\n' + 'save_'+ '\n')
if self.standard != 'Dic': #put contents after save frames
self[blockref].SetOutputLength(wraplength,maxoutlength)
outstring.write(str(self[blockref]))
returnstring = outstring.getvalue()
outstring.close()
return returnstring
def block_to_string(self,block_ref,block_id,outstring,indentlevel=0):
"""Output a complete datablock indexed by [[block_ref]] and named [[block_id]], including children"""
child_names = [(a[0],a[1].block_id) for a in self.child_table.items() if a[1].parent==block_ref]
if self.standard == 'Dic':
outstring.write(str(self[block_ref]))
for child_ref,child_name in child_names:
outstring.write('\n' + 'save_' + child_name + '\n')
self.block_to_string(child_ref,child_name,outstring,indentlevel)
outstring.write('\n' + ' '*indentlevel + 'save_' + '\n')
if self.standard != 'Dic':
outstring.write(str(self[block_ref]))
class StarFile(BlockCollection):
def __init__(self,datasource=None,maxinlength=-1,maxoutlength=0,
scoping='instance',grammar='1.1',scantype='standard',**kwargs):
super(StarFile,self).__init__(datasource=datasource,**kwargs)
self.my_uri = getattr(datasource,'my_uri','')
self.maxinlength = maxinlength #no restriction
if maxoutlength == 0:
self.maxoutlength = 2048
else:
self.maxoutlength = maxoutlength
self.scoping = scoping
if type(datasource) in StringTypes or hasattr(datasource,"read"):
ReadStar(datasource,prepared=self,maxlength=self.maxinlength,
grammar=grammar,scantype=scantype)
self.header_comment = \
"""#\\#STAR
##########################################################################
# STAR Format file
# Produced by PySTARRW module
#
# This is a STAR file. STAR is a superset of the CIF file type. For
# more information, please refer to International Tables for Crystallography,
# Volume G, Chapter 2.1
#
##########################################################################
"""
def set_uri(self,my_uri): self.my_uri = my_uri
from StringIO import StringIO
import math
class CIFStringIO(StringIO):
def __init__(self,target_width=80,**kwargs):
StringIO.__init__(self,**kwargs)
self.currentpos = 0
self.target_width = target_width
self.tabwidth = -1
self.indentlist = [0]
def write(self,outstring,canbreak=False,mustbreak=False,do_tab=True,newindent=False,unindent=False,startcol=-1):
"""Write a string with correct linebreak, tabs and indents"""
# do we need to break?
if mustbreak: #insert a new line and indent
StringIO.write(self,'\n' + ' '*self.indentlist[-1])
self.currentpos = self.indentlist[-1]
if self.currentpos+len(outstring)>self.target_width: #try to break
if canbreak:
StringIO.write(self,'\n'+' '*self.indentlist[-1])
self.currentpos = self.indentlist[-1]
if newindent: #indent by current amount
if self.indentlist[-1] == 0: #first time
self.indentlist.append(self.currentpos)
print 'Indentlist: ' + `self.indentlist`
else:
self.indentlist.append(self.indentlist[-1]+2)
elif unindent:
if len(self.indentlist)>1:
self.indentlist.pop()
else:
print 'Warning: cannot unindent any further'
#handle tabs
if self.tabwidth >0 and do_tab:
next_stop = ((self.currentpos//self.tabwidth)+1)*self.tabwidth
#print 'Currentpos %d: Next tab stop at %d' % (self.currentpos,next_stop)
if self.currentpos < next_stop:
StringIO.write(self,(next_stop-self.currentpos)*' ')
self.currentpos = next_stop
#now output the string
StringIO.write(self,outstring)
last_line_break = outstring.rfind('\n')
if last_line_break >=0:
self.currentpos = len(outstring)-last_line_break
else:
self.currentpos = self.currentpos + len(outstring)
def set_tab(self,tabwidth):
"""Set the tab stop position"""
self.tabwidth = tabwidth
class StarError(Exception):
def __init__(self,value):
self.value = value
def __str__(self):
return '\nStar Format error: '+ self.value
class StarLengthError(Exception):
def __init__(self,value):
self.value = value
def __str__(self):
return '\nStar length error: ' + self.value
def ReadStar(filename,prepared = StarFile(),maxlength=2048,scantype='standard',grammar='1.1',CBF=False):
import string
import codecs
# save desired scoping
save_scoping = prepared.scoping
if grammar=="1.1":
import YappsStarParser_1_1 as Y
elif grammar=="1.0":
import YappsStarParser_1_0 as Y
elif grammar=="DDLm":
import YappsStarParser_DDLm as Y
if isinstance(filename,basestring):
filestream = urlopen(filename)
else:
filestream = filename #already opened for us
my_uri = ""
if hasattr(filestream,"geturl"):
my_uri = filestream.geturl()
text = unicode(filestream.read(),"utf8")
if isinstance(filename,basestring): #we opened it, we close it
filestream.close()
if not text: # empty file, return empty block
return StarFile().set_uri(my_uri)
# filter out non-ASCII characters in CBF files if required. We assume
# that the binary is enclosed in a fixed string that occurs
# nowhere else.
if CBF:
text_bits = text.split("-BINARY-FORMAT-SECTION-")
text = text_bits[0]
for section in range(2,len(text_bits),2):
text = text+" (binary omitted)"+text_bits[section]
# we recognise ctrl-Z as end of file
endoffile = text.find('\x1a')
if endoffile >= 0:
text = text[:endoffile]
split = string.split(text,'\n')
if maxlength > 0:
toolong = filter(lambda a:len(a)>maxlength,split)
if toolong:
pos = split.index(toolong[0])
raise StarError( 'Line %d contains more than %d characters' % (pos+1,maxlength))
if scantype == 'standard':
parser = Y.StarParser(Y.StarParserScanner(text))
else:
parser = Y.StarParser(Y.yappsrt.Scanner(None,[],text,scantype='flex'))
proto_star = None
try:
proto_star = getattr(parser,"input")(prepared)
except Y.yappsrt.SyntaxError,e:
input = parser._scanner.input
Y.yappsrt.print_error(input, e, parser._scanner)
except Y.yappsrt.NoMoreTokens:
print >>sys.stderr, 'Could not complete parsing; stopped around here:'
print >>sys.stderr, parser._scanner
if proto_star == None:
errorstring = 'Syntax error in input file: last value parsed was %s' % Y.lastval
errorstring = errorstring + '\nParser status: %s' % `parser._scanner`
raise StarError( errorstring)
# set visibility correctly
proto_star.scoping = 'dictionary'
# duplication check on all blocks
audit_result = map(lambda a:(a,proto_star[a].audit()),proto_star.keys())
audit_result = filter(lambda a:len(a[1])>0,audit_result)
if audit_result:
raise StarError( 'Duplicate keys as follows: %s' % `audit_result`)
proto_star.set_uri(my_uri)
proto_star.scoping = save_scoping
return proto_star
def get_dim(dataitem,current=0,packlen=0):
zerotypes = [IntType, LongType,
FloatType, StringType, UnicodeType]
if type(dataitem) in zerotypes:
return current, packlen
if not dataitem.__class__ == ().__class__ and \
not dataitem.__class__ == [].__class__:
return current, packlen
elif len(dataitem)>0:
# print "Get_dim: %d: %s" % (current,`dataitem`)
return get_dim(dataitem[0],current+1,len(dataitem))
else: return current+1,0
def apply_line_folding(instring,minwraplength=60,maxwraplength=80):
"""Insert line folding characters into instring between min/max wraplength"""
# first check that we need to do this
lines = instring.split('\n')
line_len = [len(l) for l in lines]
if max(line_len) < maxwraplength and re.match("\\[ \v\t\f]*\n",instring) is None:
return instring
outstring = "\\\n" #header
for l in lines:
if len(l) < maxwraplength:
outstring = outstring + l
if len(l) > 0 and l[-1]=='\\': #who'da thunk it? A line ending with a backslash
outstring = outstring + "\\\n" #
outstring = outstring + "\n" # put back the split character
else:
current_bit = l
while len(current_bit) > maxwraplength:
space_pos = re.search('[ \v\f\t]+',current_bit[minwraplength:])
if space_pos is not None and space_pos.start()[^;\\\n][^\n\\\\]+)(?P\\\\{1,2}[ \t\v\f]*\n)",instring)
if prefix_match is not None:
prefix_text = prefix_match.group('prefix')
print 'Found prefix %s' % prefix_text
prefix_end = prefix_match.end('folding')
# keep any line folding instructions
if prefix_match.group('folding')[:2]=='\\\\': #two backslashes
outstring = instring[prefix_match.end('folding')-1:].replace("\n"+prefix_text,"\n")
return "\\" + outstring #keep line folding first line
else:
outstring = instring[prefix_match.end('folding')-1:].replace("\n"+prefix_text,"\n")
return outstring[1:] #drop first line ending, no longer necessary
else:
return instring
def listify(item):
if isinstance(item,StringTypes): return [item]
else: return item
#Transpose the list of lists passed to us
def transpose(base_list):
new_lofl = []
full_length = len(base_list)
opt_range = range(full_length)
for i in range(len(base_list[0])):
new_packet = []
for j in opt_range:
new_packet.append(base_list[j][i])
new_lofl.append(new_packet)
return new_lofl
def check_stringiness(data):
"""Check that the contents of data are all strings"""
if not hasattr(data,'dtype'): #so not Numpy
from numbers import Number
if isinstance(data,Number): return False
elif isinstance(data,basestring): return True
elif data is None:return False #should be data are None
else:
for one_item in data:
if not check_stringiness(one_item): return False
return True #all must be strings
else: #numerical python
import numpy
if data.ndim == 0: #a bare value
if data.dtype.kind in ['S','U']: return True
else: return False
else:
for one_item in numpy.nditer(data):
print 'numpy data: ' + `one_item`
if not check_stringiness(one_item): return False
return True
Module variables
var StringTypes
Functions
def ReadStar(
filename, prepared=<__pdoc_file_module__.StarFile object at 0xb6e0590c>, maxlength=2048, scantype='standard', grammar='1.1', CBF=False)
def ReadStar(filename,prepared = StarFile(),maxlength=2048,scantype='standard',grammar='1.1',CBF=False):
import string
import codecs
# save desired scoping
save_scoping = prepared.scoping
if grammar=="1.1":
import YappsStarParser_1_1 as Y
elif grammar=="1.0":
import YappsStarParser_1_0 as Y
elif grammar=="DDLm":
import YappsStarParser_DDLm as Y
if isinstance(filename,basestring):
filestream = urlopen(filename)
else:
filestream = filename #already opened for us
my_uri = ""
if hasattr(filestream,"geturl"):
my_uri = filestream.geturl()
text = unicode(filestream.read(),"utf8")
if isinstance(filename,basestring): #we opened it, we close it
filestream.close()
if not text: # empty file, return empty block
return StarFile().set_uri(my_uri)
# filter out non-ASCII characters in CBF files if required. We assume
# that the binary is enclosed in a fixed string that occurs
# nowhere else.
if CBF:
text_bits = text.split("-BINARY-FORMAT-SECTION-")
text = text_bits[0]
for section in range(2,len(text_bits),2):
text = text+" (binary omitted)"+text_bits[section]
# we recognise ctrl-Z as end of file
endoffile = text.find('\x1a')
if endoffile >= 0:
text = text[:endoffile]
split = string.split(text,'\n')
if maxlength > 0:
toolong = filter(lambda a:len(a)>maxlength,split)
if toolong:
pos = split.index(toolong[0])
raise StarError( 'Line %d contains more than %d characters' % (pos+1,maxlength))
if scantype == 'standard':
parser = Y.StarParser(Y.StarParserScanner(text))
else:
parser = Y.StarParser(Y.yappsrt.Scanner(None,[],text,scantype='flex'))
proto_star = None
try:
proto_star = getattr(parser,"input")(prepared)
except Y.yappsrt.SyntaxError,e:
input = parser._scanner.input
Y.yappsrt.print_error(input, e, parser._scanner)
except Y.yappsrt.NoMoreTokens:
print >>sys.stderr, 'Could not complete parsing; stopped around here:'
print >>sys.stderr, parser._scanner
if proto_star == None:
errorstring = 'Syntax error in input file: last value parsed was %s' % Y.lastval
errorstring = errorstring + '\nParser status: %s' % `parser._scanner`
raise StarError( errorstring)
# set visibility correctly
proto_star.scoping = 'dictionary'
# duplication check on all blocks
audit_result = map(lambda a:(a,proto_star[a].audit()),proto_star.keys())
audit_result = filter(lambda a:len(a[1])>0,audit_result)
if audit_result:
raise StarError( 'Duplicate keys as follows: %s' % `audit_result`)
proto_star.set_uri(my_uri)
proto_star.scoping = save_scoping
return proto_star
def apply_line_folding(
instring, minwraplength=60, maxwraplength=80)
Insert line folding characters into instring between min/max wraplength
def apply_line_folding(instring,minwraplength=60,maxwraplength=80):
"""Insert line folding characters into instring between min/max wraplength"""
# first check that we need to do this
lines = instring.split('\n')
line_len = [len(l) for l in lines]
if max(line_len) < maxwraplength and re.match("\\[ \v\t\f]*\n",instring) is None:
return instring
outstring = "\\\n" #header
for l in lines:
if len(l) < maxwraplength:
outstring = outstring + l
if len(l) > 0 and l[-1]=='\\': #who'da thunk it? A line ending with a backslash
outstring = outstring + "\\\n" #
outstring = outstring + "\n" # put back the split character
else:
current_bit = l
while len(current_bit) > maxwraplength:
space_pos = re.search('[ \v\f\t]+',current_bit[minwraplength:])
if space_pos is not None and space_pos.start()
def apply_line_prefix(
instring, prefix)
Prefix every line in instring with prefix
def apply_line_prefix(instring,prefix):
"""Prefix every line in instring with prefix"""
if prefix[0] != ";" and "\\" not in prefix:
header = re.match(r"(\\[ \v\t\f]*" +"\n)",instring)
if header is not None:
print 'Found line folded string for prefixing...'
not_header = instring[header.end():]
outstring = prefix + "\\\\\n" + prefix
else:
print 'No folding in input string...'
not_header = instring
outstring = prefix + "\\\n" + prefix
outstring = outstring + not_header.replace("\n","\n"+prefix)
return outstring
raise StarError, "Requested prefix starts with semicolon or contains a backslash: " + prefix
def check_stringiness(
data)
Check that the contents of data are all strings
def check_stringiness(data):
"""Check that the contents of data are all strings"""
if not hasattr(data,'dtype'): #so not Numpy
from numbers import Number
if isinstance(data,Number): return False
elif isinstance(data,basestring): return True
elif data is None:return False #should be data are None
else:
for one_item in data:
if not check_stringiness(one_item): return False
return True #all must be strings
else: #numerical python
import numpy
if data.ndim == 0: #a bare value
if data.dtype.kind in ['S','U']: return True
else: return False
else:
for one_item in numpy.nditer(data):
print 'numpy data: ' + `one_item`
if not check_stringiness(one_item): return False
return True
def get_dim(
dataitem, current=0, packlen=0)
def get_dim(dataitem,current=0,packlen=0):
zerotypes = [IntType, LongType,
FloatType, StringType, UnicodeType]
if type(dataitem) in zerotypes:
return current, packlen
if not dataitem.__class__ == ().__class__ and \
not dataitem.__class__ == [].__class__:
return current, packlen
elif len(dataitem)>0:
# print "Get_dim: %d: %s" % (current,`dataitem`)
return get_dim(dataitem[0],current+1,len(dataitem))
else: return current+1,0
def listify(
item)
def listify(item):
if isinstance(item,StringTypes): return [item]
else: return item
def remove_line_folding(
instring)
Remove line folding from instring
def remove_line_folding(instring):
"""Remove line folding from instring"""
if re.match(r"\\[ \v\t\f]*" +"\n",instring) is not None:
return re.sub(r"\\[ \v\t\f]*$" + "\n?","",instring,flags=re.M)
else:
return instring
def remove_line_prefix(
instring)
Remove prefix from every line if present
def remove_line_prefix(instring):
"""Remove prefix from every line if present"""
prefix_match = re.match("(?P[^;\\\n][^\n\\\\]+)(?P\\\\{1,2}[ \t\v\f]*\n)",instring)
if prefix_match is not None:
prefix_text = prefix_match.group('prefix')
print 'Found prefix %s' % prefix_text
prefix_end = prefix_match.end('folding')
# keep any line folding instructions
if prefix_match.group('folding')[:2]=='\\\\': #two backslashes
outstring = instring[prefix_match.end('folding')-1:].replace("\n"+prefix_text,"\n")
return "\\" + outstring #keep line folding first line
else:
outstring = instring[prefix_match.end('folding')-1:].replace("\n"+prefix_text,"\n")
return outstring[1:] #drop first line ending, no longer necessary
else:
return instring
def transpose(
base_list)
def transpose(base_list):
new_lofl = []
full_length = len(base_list)
opt_range = range(full_length)
for i in range(len(base_list[0])):
new_packet = []
for j in opt_range:
new_packet.append(base_list[j][i])
new_lofl.append(new_packet)
return new_lofl
Classes
class BlockCollection
class BlockCollection(object):
def __init__(self,datasource=None,standard='CIF',
characterset='ascii',scoping='instance',parent_id=None,**kwargs):
import collections
self.dictionary = {}
self.standard = standard
self.lower_keys = set() # short_cuts
self.renamed = {}
self.characterset = characterset
self.PC = collections.namedtuple('PC',['block_id','parent'])
self.child_table = {}
self.visible_keys = [] # for efficiency
self.parent_id = parent_id
self.scoping = scoping #will trigger setting of child table
if isinstance(datasource,BlockCollection):
self.merge_fast(datasource)
self.scoping = scoping #reset visibility
elif isinstance(datasource,DictType):
for key,value in datasource.items():
self[key]= value
self.header_comment = ''
def unlock(self):
"""Allow overwriting of all blocks in this collection"""
for a in self.lower_keys:
self[a].overwrite=True
def lock(self):
"""Disallow overwriting for all blocks in this collection"""
for a in self.lower_keys:
self[a].overwrite = False
def __str__(self):
return self.WriteOut()
def __setitem__(self,key,value):
self.NewBlock(key,value,parent=None)
def __getitem__(self,key):
if isinstance(key,StringTypes):
lowerkey = key.lower()
if lowerkey in self.lower_keys:
return self.dictionary[lowerkey]
#print 'Visible keys:' + `self.visible_keys`
#print 'All keys' + `self.lower_keys`
#print 'Child table' + `self.child_table`
raise KeyError,'No such item %s' % key
# we have to get an ordered list of the current keys,
# as we'll have to delete one of them anyway.
# Deletion will delete any key regardless of visibility
def __delitem__(self,key):
dummy = self[key] #raise error if not present
lowerkey = key.lower()
# get rid of all children recursively as well
children = [a[0] for a in self.child_table.items() if a[1].parent == lowerkey]
for child in children:
del self[child] #recursive call
del self.dictionary[lowerkey]
del self.child_table[lowerkey]
try:
self.visible_keys.remove(lowerkey)
except KeyError:
pass
self.lower_keys.remove(lowerkey)
def __len__(self):
return len(self.visible_keys)
def __contains__(self,item):
"""Support the 'in' operator"""
return self.has_key(item)
# We iterate over all visible
def __iter__(self):
for one_block in self.keys():
yield self[one_block]
# TODO: handle different case
def keys(self):
return self.visible_keys
# changes to take case independence into account
def has_key(self,key):
if not isinstance(key,StringTypes): return 0
if key.lower() in self.visible_keys:
return 1
return 0
def get(self,key,default=None):
if self.has_key(key): # take account of case
return self.__getitem__(key)
else:
return default
def clear(self):
self.dictionary.clear()
self.lower_keys = set()
self.child_table = {}
self.visible_keys = []
def copy(self):
newcopy = self.dictionary.copy() #all blocks
newcopy = BlockCollection('',newcopy,parent_id=self.parent_id)
newcopy.child_table = self.child_table.copy()
newcopy.lower_keys = self.lower_keys
newcopy.characterset = self.characterset
newcopy.scoping = self.scoping #this sets visible keys
return newcopy
def update(self,adict):
for key in adict.keys():
self[key] = adict[key]
def items(self):
return [(a,self[a]) for a in self.keys()]
def first_block(self):
"""Return the 'first' block. This is not necessarily the first block in the file."""
if self.keys():
return self[self.keys()[0]]
def NewBlock(self,blockname,blockcontents=None,fix=True,parent=None):
if blockcontents is None:
blockcontents = StarBlock()
if self.standard is not None:
if self.standard == 'CIF':
self.checknamelengths(blockcontents,maxlength=75) #
self.checkloopnesting(blockcontents)
if len(blockname)>75:
raise StarError , 'Blockname %s is longer than 75 characters' % blockname
if fix:
newblockname = re.sub('[ \t]','_',blockname)
else: newblockname = blockname
new_lowerbn = newblockname.lower()
if new_lowerbn in self.lower_keys:
if self.standard is not None: #already there
toplevelnames = [a[0] for a in self.child_table.items() if a[1].parent==None]
if parent is None and new_lowerbn not in toplevelnames: #can give a new key to this one
while new_lowerbn in self.lower_keys: new_lowerbn = new_lowerbn + '+'
elif parent is not None and new_lowerbn in toplevelnames: #can fix a different one
replace_name = new_lowerbn
while replace_name in self.lower_keys: replace_name = replace_name + '+'
self._rekey(new_lowerbn,replace_name)
# now continue on to add in the new block
if parent.lower() == new_lowerbn: #the new block's requested parent just got renamed!!
parent = replace_name
else:
raise StarError( "Attempt to replace existing block " + blockname)
else:
del self[new_lowerbn]
self.dictionary.update({new_lowerbn:blockcontents})
self.lower_keys.add(new_lowerbn)
if parent is None:
self.child_table[new_lowerbn]=self.PC(newblockname,None)
self.visible_keys.append(new_lowerbn)
else:
if parent.lower() in self.lower_keys:
if self.scoping == 'instance':
self.child_table[new_lowerbn]=self.PC(newblockname,parent.lower())
else:
self.child_table[new_lowerbn]=self.PC(newblockname,parent.lower())
self.visible_keys.append(new_lowerbn)
else:
print 'Warning:Parent block %s does not exist for child %s' % (parent,newblockname)
return new_lowerbn #in case calling routine wants to know
def _rekey(self,oldname,newname,block_id=''):
"""The block with key [[oldname]] gets [[newname]] as a new key, but the printed name
does not change unless [[block_id]] is given. Prefer [[rename]] for a safe version."""
move_block = self[oldname] #old block
is_visible = oldname in self.visible_keys
move_block_info = self.child_table[oldname] #old info
move_block_children = [a for a in self.child_table.items() if a[1].parent==oldname]
# now rewrite the necessary bits
self.child_table.update(dict([(a[0],self.PC(a[1].block_id,newname)) for a in move_block_children]))
del self[oldname] #do this after updating child table so we don't delete children
self.dictionary.update({newname:move_block})
self.lower_keys.add(newname)
if block_id == '':
self.child_table.update({newname:move_block_info})
else:
self.child_table.update({newname:self.PC(block_id,move_block_info.parent)})
if is_visible: self.visible_keys += [newname]
def rename(self,oldname,newname):
"""Rename datablock from [[oldname]] to [[newname]]. Both key and printed name are changed. No
conformance checks are conducted."""
realoldname = oldname.lower()
realnewname = newname.lower()
if realnewname in self.lower_keys:
raise StarError,'Cannot change blockname %s to %s as %s already present' % (oldname,newname,newname)
if realoldname not in self.lower_keys:
raise KeyError,'Cannot find old block %s' % realoldname
self._rekey(realoldname,realnewname,block_id=newname)
def merge_fast(self,new_bc,parent=None):
"""Do a fast merge"""
if self.standard is None:
mode = 'replace'
else:
mode = 'strict'
overlap_flag = not self.lower_keys.isdisjoint(new_bc.lower_keys)
if overlap_flag and mode != 'replace':
double_keys = self.lower_keys.intersection(new_bc.lower_keys)
for dup_key in double_keys:
our_parent = self.child_table[dup_key].parent
their_parent = new_bc.child_table[dup_key].parent
if (our_parent is None and their_parent is not None and parent is None) or\
parent is not None: #rename our block
start_key = dup_key
while start_key in self.lower_keys: start_key = start_key+'+'
self._rekey(dup_key,start_key)
if parent.lower() == dup_key: #we just renamed the prospective parent!
parent = start_key
elif our_parent is not None and their_parent is None and parent is None:
start_key = dup_key
while start_key in new_bc.lower_keys: start_key = start_key+'+'
new_bc._rekey(dup_key,start_key)
else:
raise StarError("In strict merge mode:duplicate keys %s" % dup_key)
self.dictionary.update(new_bc.dictionary)
self.lower_keys.update(new_bc.lower_keys)
self.visible_keys += (list(new_bc.lower_keys))
self.child_table.update(new_bc.child_table)
if parent is not None: #redo the child_table entries
reparent_list = [(a[0],a[1].block_id) for a in new_bc.child_table.items() if a[1].parent==None]
reparent_dict = [(a[0],self.PC(a[1],parent.lower())) for a in reparent_list]
self.child_table.update(dict(reparent_dict))
def merge(self,new_bc,mode=None,parent=None,single_block=[],
idblock="",match_att=[],match_function=None):
if mode is None:
if self.standard is None:
mode = 'replace'
else:
mode = 'strict'
if single_block:
self[single_block[0]].merge(new_bc[single_block[1]],mode,
match_att=match_att,
match_function=match_function)
return None
base_keys = [a[1].block_id for a in self.child_table.items()]
block_to_item = base_keys #default
new_keys = [a[1].block_id for a in new_bc.child_table.items()] #get list of incoming blocks
if match_att:
#make a blockname -> item name map
if match_function:
block_to_item = map(lambda a:match_function(self[a]),self.keys())
else:
block_to_item = map(lambda a:self[a].get(match_att[0],None),self.keys())
#print `block_to_item`
for key in new_keys: #run over incoming blocknames
if key == idblock: continue #skip dictionary id
basekey = key #default value
if len(match_att)>0:
attval = new_bc[key].get(match_att[0],0) #0 if ignoring matching
else:
attval = 0
for ii in range(len(block_to_item)): #do this way to get looped names
thisatt = block_to_item[ii] #keyname in old block
#print "Looking for %s in %s" % (attval,thisatt)
if attval == thisatt or \
(isinstance(thisatt,ListType) and attval in thisatt):
basekey = base_keys.pop(ii)
block_to_item.remove(thisatt)
break
if not self.has_key(basekey) or mode=="replace":
new_parent = new_bc.get_parent(key)
if parent is not None and new_parent is None:
new_parent = parent
self.NewBlock(basekey,new_bc[key],parent=new_parent) #add the block
else:
if mode=="strict":
raise StarError( "In strict merge mode: block %s in old and block %s in new files" % (basekey,key))
elif mode=="overlay":
# print "Merging block %s with %s" % (basekey,key)
self[basekey].merge(new_bc[key],mode,match_att=match_att)
else:
raise StarError( "Merge called with unknown mode %s" % mode)
def checknamelengths(self,target_block,maxlength=-1):
if maxlength < 0:
return
else:
toolong = filter(lambda a:len(a)>maxlength, target_block.keys())
outstring = ""
for it in toolong: outstring += "\n" + it
if toolong:
raise StarError( 'Following data names too long:' + outstring)
def checkloopnesting(self,target_block):
"""Check that block doesn't contain nested loops"""
for one_loop in target_block.loops:
if len(one_loop.loops) > 0:
raise StarError('Block contains nested loops')
def get_all(self,item_name):
raw_values = map(lambda a:self[a].get(item_name),self.keys())
raw_values = filter(lambda a:a != None, raw_values)
ret_vals = []
for rv in raw_values:
if isinstance(rv,ListType):
for rvv in rv:
if rvv not in ret_vals: ret_vals.append(rvv)
else:
if rv not in ret_vals: ret_vals.append(rv)
return ret_vals
def __setattr__(self,attr_name,newval):
if attr_name == 'scoping':
if newval not in ('dictionary','instance'):
raise StarError("Star file may only have 'dictionary' or 'instance' scoping, not %s" % newval)
if newval == 'dictionary':
self.visible_keys = [a for a in self.lower_keys]
else:
#only top-level datablocks visible
self.visible_keys = [a[0] for a in self.child_table.items() if a[1].parent==None]
object.__setattr__(self,attr_name,newval)
def get_parent(self,blockname):
"""Return the name of the block enclosing [[blockname]] in canonical form (lower case)"""
possibles = (a for a in self.child_table.items() if a[0] == blockname.lower())
try:
first = possibles.next() #get first one
except:
raise StarError('no parent for %s' % blockname)
try:
second = possibles.next()
except StopIteration:
return first[1].parent
raise StarError('More than one parent for %s' % blockname)
def get_roots(self):
"""Get the top-level blocks"""
return [a for a in self.child_table.items() if a[1].parent==None]
def get_children(self,blockname,include_parent=False,scoping='dictionary'):
"""Get all children of [[blockname]] as a block collection. If [[include_parent]] is
True, the parent block will also be included in the block collection as the root."""
newbc = BlockCollection()
block_lower = blockname.lower()
proto_child_table = [a for a in self.child_table.items() if self.is_child_of_parent(block_lower,a[1].block_id)]
newbc.child_table = dict(proto_child_table)
if not include_parent:
newbc.child_table.update(dict([(a[0],self.PC(a[1].block_id,None)) for a in proto_child_table if a[1].parent == block_lower]))
newbc.lower_keys = set([a[0] for a in proto_child_table])
newbc.dictionary = dict((a[0],self.dictionary[a[0]]) for a in proto_child_table)
if include_parent:
newbc.child_table.update({block_lower:self.PC(self.child_table[block_lower].block_id,None)})
newbc.lower_keys.add(block_lower)
newbc.dictionary.update({block_lower:self.dictionary[block_lower]})
newbc.scoping = scoping
return newbc
def get_immediate_children(self,parentname):
"""Get the next level of children of the given block as a list, without nested levels"""
child_handles = [a for a in self.child_table.items() if a[1].parent == parentname.lower()]
return child_handles
def get_child_list(self,parentname):
"""Get a list of all child categories"""
child_handles = [a[0] for a in self.child_table.items() if self.is_child_of_parent(parentname.lower(),a[0])]
return child_handles
def is_child_of_parent(self,parentname,blockname):
"""Recursively search for children of blockname, case is important for now"""
checkname = parentname.lower()
more_children = [a[0] for a in self.child_table.items() if a[1].parent == checkname]
if blockname.lower() in more_children:
return True
else:
for one_child in more_children:
if self.is_child_of_parent(one_child,blockname): return True
return False
def set_parent(self,parentname,childname):
"""Set the parent block"""
# first check that both blocks exist
if parentname.lower() not in self.lower_keys:
raise KeyError('Parent block %s does not exist' % parentname)
if childname.lower() not in self.lower_keys:
raise KeyError('Child block %s does not exist' % childname)
old_entry = self.child_table[childname.lower()]
self.child_table[childname.lower()]=self.PC(old_entry.block_id,
parentname.lower())
self.scoping = self.scoping #reset visibility
def WriteOut(self,comment='',wraplength=80,maxoutlength=2048):
import cStringIO
if not comment:
comment = self.header_comment
outstring = cStringIO.StringIO()
outstring.write(comment)
# loop over top-level
top_block_names = [(a[0],a[1].block_id) for a in self.child_table.items() if a[1].parent is None]
for blockref,blockname in top_block_names:
outstring.write('\n' + 'data_' +blockname+'\n')
child_names = [(a[0],a[1].block_id) for a in self.child_table.items() if a[1].parent==blockref]
if self.standard == 'Dic': #put contents before save frames
self[blockref].SetOutputLength(wraplength,maxoutlength)
outstring.write(str(self[blockref]))
for child_ref,child_name in child_names:
outstring.write('\n' + 'save_' + child_name + '\n')
self.block_to_string(child_ref,child_name,outstring,4)
outstring.write('\n' + 'save_'+ '\n')
if self.standard != 'Dic': #put contents after save frames
self[blockref].SetOutputLength(wraplength,maxoutlength)
outstring.write(str(self[blockref]))
returnstring = outstring.getvalue()
outstring.close()
return returnstring
def block_to_string(self,block_ref,block_id,outstring,indentlevel=0):
"""Output a complete datablock indexed by [[block_ref]] and named [[block_id]], including children"""
child_names = [(a[0],a[1].block_id) for a in self.child_table.items() if a[1].parent==block_ref]
if self.standard == 'Dic':
outstring.write(str(self[block_ref]))
for child_ref,child_name in child_names:
outstring.write('\n' + 'save_' + child_name + '\n')
self.block_to_string(child_ref,child_name,outstring,indentlevel)
outstring.write('\n' + ' '*indentlevel + 'save_' + '\n')
if self.standard != 'Dic':
outstring.write(str(self[block_ref]))
Ancestors (in MRO)
- BlockCollection
- __builtin__.object
Instance variables
var PC
var characterset
var child_table
var dictionary
var header_comment
var lower_keys
var parent_id
var renamed
var scoping
var standard
var visible_keys
Methods
def __init__(
self, datasource=None, standard='CIF', characterset='ascii', scoping='instance', parent_id=None, **kwargs)
def __init__(self,datasource=None,standard='CIF',
characterset='ascii',scoping='instance',parent_id=None,**kwargs):
import collections
self.dictionary = {}
self.standard = standard
self.lower_keys = set() # short_cuts
self.renamed = {}
self.characterset = characterset
self.PC = collections.namedtuple('PC',['block_id','parent'])
self.child_table = {}
self.visible_keys = [] # for efficiency
self.parent_id = parent_id
self.scoping = scoping #will trigger setting of child table
if isinstance(datasource,BlockCollection):
self.merge_fast(datasource)
self.scoping = scoping #reset visibility
elif isinstance(datasource,DictType):
for key,value in datasource.items():
self[key]= value
self.header_comment = ''
def NewBlock(
self, blockname, blockcontents=None, fix=True, parent=None)
def NewBlock(self,blockname,blockcontents=None,fix=True,parent=None):
if blockcontents is None:
blockcontents = StarBlock()
if self.standard is not None:
if self.standard == 'CIF':
self.checknamelengths(blockcontents,maxlength=75) #
self.checkloopnesting(blockcontents)
if len(blockname)>75:
raise StarError , 'Blockname %s is longer than 75 characters' % blockname
if fix:
newblockname = re.sub('[ \t]','_',blockname)
else: newblockname = blockname
new_lowerbn = newblockname.lower()
if new_lowerbn in self.lower_keys:
if self.standard is not None: #already there
toplevelnames = [a[0] for a in self.child_table.items() if a[1].parent==None]
if parent is None and new_lowerbn not in toplevelnames: #can give a new key to this one
while new_lowerbn in self.lower_keys: new_lowerbn = new_lowerbn + '+'
elif parent is not None and new_lowerbn in toplevelnames: #can fix a different one
replace_name = new_lowerbn
while replace_name in self.lower_keys: replace_name = replace_name + '+'
self._rekey(new_lowerbn,replace_name)
# now continue on to add in the new block
if parent.lower() == new_lowerbn: #the new block's requested parent just got renamed!!
parent = replace_name
else:
raise StarError( "Attempt to replace existing block " + blockname)
else:
del self[new_lowerbn]
self.dictionary.update({new_lowerbn:blockcontents})
self.lower_keys.add(new_lowerbn)
if parent is None:
self.child_table[new_lowerbn]=self.PC(newblockname,None)
self.visible_keys.append(new_lowerbn)
else:
if parent.lower() in self.lower_keys:
if self.scoping == 'instance':
self.child_table[new_lowerbn]=self.PC(newblockname,parent.lower())
else:
self.child_table[new_lowerbn]=self.PC(newblockname,parent.lower())
self.visible_keys.append(new_lowerbn)
else:
print 'Warning:Parent block %s does not exist for child %s' % (parent,newblockname)
return new_lowerbn #in case calling routine wants to know
def WriteOut(
self, comment='', wraplength=80, maxoutlength=2048)
def WriteOut(self,comment='',wraplength=80,maxoutlength=2048):
import cStringIO
if not comment:
comment = self.header_comment
outstring = cStringIO.StringIO()
outstring.write(comment)
# loop over top-level
top_block_names = [(a[0],a[1].block_id) for a in self.child_table.items() if a[1].parent is None]
for blockref,blockname in top_block_names:
outstring.write('\n' + 'data_' +blockname+'\n')
child_names = [(a[0],a[1].block_id) for a in self.child_table.items() if a[1].parent==blockref]
if self.standard == 'Dic': #put contents before save frames
self[blockref].SetOutputLength(wraplength,maxoutlength)
outstring.write(str(self[blockref]))
for child_ref,child_name in child_names:
outstring.write('\n' + 'save_' + child_name + '\n')
self.block_to_string(child_ref,child_name,outstring,4)
outstring.write('\n' + 'save_'+ '\n')
if self.standard != 'Dic': #put contents after save frames
self[blockref].SetOutputLength(wraplength,maxoutlength)
outstring.write(str(self[blockref]))
returnstring = outstring.getvalue()
outstring.close()
return returnstring
def block_to_string(
self, block_ref, block_id, outstring, indentlevel=0)
Output a complete datablock indexed by [[block_ref]] and named [[block_id]], including children
def block_to_string(self,block_ref,block_id,outstring,indentlevel=0):
"""Output a complete datablock indexed by [[block_ref]] and named [[block_id]], including children"""
child_names = [(a[0],a[1].block_id) for a in self.child_table.items() if a[1].parent==block_ref]
if self.standard == 'Dic':
outstring.write(str(self[block_ref]))
for child_ref,child_name in child_names:
outstring.write('\n' + 'save_' + child_name + '\n')
self.block_to_string(child_ref,child_name,outstring,indentlevel)
outstring.write('\n' + ' '*indentlevel + 'save_' + '\n')
if self.standard != 'Dic':
outstring.write(str(self[block_ref]))
def checkloopnesting(
self, target_block)
Check that block doesn't contain nested loops
def checkloopnesting(self,target_block):
"""Check that block doesn't contain nested loops"""
for one_loop in target_block.loops:
if len(one_loop.loops) > 0:
raise StarError('Block contains nested loops')
def checknamelengths(
self, target_block, maxlength=-1)
def checknamelengths(self,target_block,maxlength=-1):
if maxlength < 0:
return
else:
toolong = filter(lambda a:len(a)>maxlength, target_block.keys())
outstring = ""
for it in toolong: outstring += "\n" + it
if toolong:
raise StarError( 'Following data names too long:' + outstring)
def clear(
self)
def clear(self):
self.dictionary.clear()
self.lower_keys = set()
self.child_table = {}
self.visible_keys = []
def copy(
self)
def copy(self):
newcopy = self.dictionary.copy() #all blocks
newcopy = BlockCollection('',newcopy,parent_id=self.parent_id)
newcopy.child_table = self.child_table.copy()
newcopy.lower_keys = self.lower_keys
newcopy.characterset = self.characterset
newcopy.scoping = self.scoping #this sets visible keys
return newcopy
def first_block(
self)
Return the 'first' block. This is not necessarily the first block in the file.
def first_block(self):
"""Return the 'first' block. This is not necessarily the first block in the file."""
if self.keys():
return self[self.keys()[0]]
def get(
self, key, default=None)
def get(self,key,default=None):
if self.has_key(key): # take account of case
return self.__getitem__(key)
else:
return default
def get_all(
self, item_name)
def get_all(self,item_name):
raw_values = map(lambda a:self[a].get(item_name),self.keys())
raw_values = filter(lambda a:a != None, raw_values)
ret_vals = []
for rv in raw_values:
if isinstance(rv,ListType):
for rvv in rv:
if rvv not in ret_vals: ret_vals.append(rvv)
else:
if rv not in ret_vals: ret_vals.append(rv)
return ret_vals
def get_child_list(
self, parentname)
Get a list of all child categories
def get_child_list(self,parentname):
"""Get a list of all child categories"""
child_handles = [a[0] for a in self.child_table.items() if self.is_child_of_parent(parentname.lower(),a[0])]
return child_handles
def get_children(
self, blockname, include_parent=False, scoping='dictionary')
Get all children of [[blockname]] as a block collection. If [[include_parent]] is True, the parent block will also be included in the block collection as the root.
def get_children(self,blockname,include_parent=False,scoping='dictionary'):
"""Get all children of [[blockname]] as a block collection. If [[include_parent]] is
True, the parent block will also be included in the block collection as the root."""
newbc = BlockCollection()
block_lower = blockname.lower()
proto_child_table = [a for a in self.child_table.items() if self.is_child_of_parent(block_lower,a[1].block_id)]
newbc.child_table = dict(proto_child_table)
if not include_parent:
newbc.child_table.update(dict([(a[0],self.PC(a[1].block_id,None)) for a in proto_child_table if a[1].parent == block_lower]))
newbc.lower_keys = set([a[0] for a in proto_child_table])
newbc.dictionary = dict((a[0],self.dictionary[a[0]]) for a in proto_child_table)
if include_parent:
newbc.child_table.update({block_lower:self.PC(self.child_table[block_lower].block_id,None)})
newbc.lower_keys.add(block_lower)
newbc.dictionary.update({block_lower:self.dictionary[block_lower]})
newbc.scoping = scoping
return newbc
def get_immediate_children(
self, parentname)
Get the next level of children of the given block as a list, without nested levels
def get_immediate_children(self,parentname):
"""Get the next level of children of the given block as a list, without nested levels"""
child_handles = [a for a in self.child_table.items() if a[1].parent == parentname.lower()]
return child_handles
def get_parent(
self, blockname)
Return the name of the block enclosing [[blockname]] in canonical form (lower case)
def get_parent(self,blockname):
"""Return the name of the block enclosing [[blockname]] in canonical form (lower case)"""
possibles = (a for a in self.child_table.items() if a[0] == blockname.lower())
try:
first = possibles.next() #get first one
except:
raise StarError('no parent for %s' % blockname)
try:
second = possibles.next()
except StopIteration:
return first[1].parent
raise StarError('More than one parent for %s' % blockname)
def get_roots(
self)
Get the top-level blocks
def get_roots(self):
"""Get the top-level blocks"""
return [a for a in self.child_table.items() if a[1].parent==None]
def has_key(
self, key)
def has_key(self,key):
if not isinstance(key,StringTypes): return 0
if key.lower() in self.visible_keys:
return 1
return 0
def is_child_of_parent(
self, parentname, blockname)
Recursively search for children of blockname, case is important for now
def is_child_of_parent(self,parentname,blockname):
"""Recursively search for children of blockname, case is important for now"""
checkname = parentname.lower()
more_children = [a[0] for a in self.child_table.items() if a[1].parent == checkname]
if blockname.lower() in more_children:
return True
else:
for one_child in more_children:
if self.is_child_of_parent(one_child,blockname): return True
return False
def items(
self)
def items(self):
return [(a,self[a]) for a in self.keys()]
def keys(
self)
def keys(self):
return self.visible_keys
def lock(
self)
Disallow overwriting for all blocks in this collection
def lock(self):
"""Disallow overwriting for all blocks in this collection"""
for a in self.lower_keys:
self[a].overwrite = False
def merge(
self, new_bc, mode=None, parent=None, single_block=[], idblock='', match_att=[], match_function=None)
def merge(self,new_bc,mode=None,parent=None,single_block=[],
idblock="",match_att=[],match_function=None):
if mode is None:
if self.standard is None:
mode = 'replace'
else:
mode = 'strict'
if single_block:
self[single_block[0]].merge(new_bc[single_block[1]],mode,
match_att=match_att,
match_function=match_function)
return None
base_keys = [a[1].block_id for a in self.child_table.items()]
block_to_item = base_keys #default
new_keys = [a[1].block_id for a in new_bc.child_table.items()] #get list of incoming blocks
if match_att:
#make a blockname -> item name map
if match_function:
block_to_item = map(lambda a:match_function(self[a]),self.keys())
else:
block_to_item = map(lambda a:self[a].get(match_att[0],None),self.keys())
#print `block_to_item`
for key in new_keys: #run over incoming blocknames
if key == idblock: continue #skip dictionary id
basekey = key #default value
if len(match_att)>0:
attval = new_bc[key].get(match_att[0],0) #0 if ignoring matching
else:
attval = 0
for ii in range(len(block_to_item)): #do this way to get looped names
thisatt = block_to_item[ii] #keyname in old block
#print "Looking for %s in %s" % (attval,thisatt)
if attval == thisatt or \
(isinstance(thisatt,ListType) and attval in thisatt):
basekey = base_keys.pop(ii)
block_to_item.remove(thisatt)
break
if not self.has_key(basekey) or mode=="replace":
new_parent = new_bc.get_parent(key)
if parent is not None and new_parent is None:
new_parent = parent
self.NewBlock(basekey,new_bc[key],parent=new_parent) #add the block
else:
if mode=="strict":
raise StarError( "In strict merge mode: block %s in old and block %s in new files" % (basekey,key))
elif mode=="overlay":
# print "Merging block %s with %s" % (basekey,key)
self[basekey].merge(new_bc[key],mode,match_att=match_att)
else:
raise StarError( "Merge called with unknown mode %s" % mode)
def merge_fast(
self, new_bc, parent=None)
Do a fast merge
def merge_fast(self,new_bc,parent=None):
"""Do a fast merge"""
if self.standard is None:
mode = 'replace'
else:
mode = 'strict'
overlap_flag = not self.lower_keys.isdisjoint(new_bc.lower_keys)
if overlap_flag and mode != 'replace':
double_keys = self.lower_keys.intersection(new_bc.lower_keys)
for dup_key in double_keys:
our_parent = self.child_table[dup_key].parent
their_parent = new_bc.child_table[dup_key].parent
if (our_parent is None and their_parent is not None and parent is None) or\
parent is not None: #rename our block
start_key = dup_key
while start_key in self.lower_keys: start_key = start_key+'+'
self._rekey(dup_key,start_key)
if parent.lower() == dup_key: #we just renamed the prospective parent!
parent = start_key
elif our_parent is not None and their_parent is None and parent is None:
start_key = dup_key
while start_key in new_bc.lower_keys: start_key = start_key+'+'
new_bc._rekey(dup_key,start_key)
else:
raise StarError("In strict merge mode:duplicate keys %s" % dup_key)
self.dictionary.update(new_bc.dictionary)
self.lower_keys.update(new_bc.lower_keys)
self.visible_keys += (list(new_bc.lower_keys))
self.child_table.update(new_bc.child_table)
if parent is not None: #redo the child_table entries
reparent_list = [(a[0],a[1].block_id) for a in new_bc.child_table.items() if a[1].parent==None]
reparent_dict = [(a[0],self.PC(a[1],parent.lower())) for a in reparent_list]
self.child_table.update(dict(reparent_dict))
def rename(
self, oldname, newname)
Rename datablock from [[oldname]] to [[newname]]. Both key and printed name are changed. No conformance checks are conducted.
def rename(self,oldname,newname):
"""Rename datablock from [[oldname]] to [[newname]]. Both key and printed name are changed. No
conformance checks are conducted."""
realoldname = oldname.lower()
realnewname = newname.lower()
if realnewname in self.lower_keys:
raise StarError,'Cannot change blockname %s to %s as %s already present' % (oldname,newname,newname)
if realoldname not in self.lower_keys:
raise KeyError,'Cannot find old block %s' % realoldname
self._rekey(realoldname,realnewname,block_id=newname)
def set_parent(
self, parentname, childname)
Set the parent block
def set_parent(self,parentname,childname):
"""Set the parent block"""
# first check that both blocks exist
if parentname.lower() not in self.lower_keys:
raise KeyError('Parent block %s does not exist' % parentname)
if childname.lower() not in self.lower_keys:
raise KeyError('Child block %s does not exist' % childname)
old_entry = self.child_table[childname.lower()]
self.child_table[childname.lower()]=self.PC(old_entry.block_id,
parentname.lower())
self.scoping = self.scoping #reset visibility
def unlock(
self)
Allow overwriting of all blocks in this collection
def unlock(self):
"""Allow overwriting of all blocks in this collection"""
for a in self.lower_keys:
self[a].overwrite=True
def update(
self, adict)
def update(self,adict):
for key in adict.keys():
self[key] = adict[key]
class CIFStringIO
class CIFStringIO(StringIO):
def __init__(self,target_width=80,**kwargs):
StringIO.__init__(self,**kwargs)
self.currentpos = 0
self.target_width = target_width
self.tabwidth = -1
self.indentlist = [0]
def write(self,outstring,canbreak=False,mustbreak=False,do_tab=True,newindent=False,unindent=False,startcol=-1):
"""Write a string with correct linebreak, tabs and indents"""
# do we need to break?
if mustbreak: #insert a new line and indent
StringIO.write(self,'\n' + ' '*self.indentlist[-1])
self.currentpos = self.indentlist[-1]
if self.currentpos+len(outstring)>self.target_width: #try to break
if canbreak:
StringIO.write(self,'\n'+' '*self.indentlist[-1])
self.currentpos = self.indentlist[-1]
if newindent: #indent by current amount
if self.indentlist[-1] == 0: #first time
self.indentlist.append(self.currentpos)
print 'Indentlist: ' + `self.indentlist`
else:
self.indentlist.append(self.indentlist[-1]+2)
elif unindent:
if len(self.indentlist)>1:
self.indentlist.pop()
else:
print 'Warning: cannot unindent any further'
#handle tabs
if self.tabwidth >0 and do_tab:
next_stop = ((self.currentpos//self.tabwidth)+1)*self.tabwidth
#print 'Currentpos %d: Next tab stop at %d' % (self.currentpos,next_stop)
if self.currentpos < next_stop:
StringIO.write(self,(next_stop-self.currentpos)*' ')
self.currentpos = next_stop
#now output the string
StringIO.write(self,outstring)
last_line_break = outstring.rfind('\n')
if last_line_break >=0:
self.currentpos = len(outstring)-last_line_break
else:
self.currentpos = self.currentpos + len(outstring)
def set_tab(self,tabwidth):
"""Set the tab stop position"""
self.tabwidth = tabwidth
Ancestors (in MRO)
- CIFStringIO
- StringIO.StringIO
Instance variables
var currentpos
var indentlist
var tabwidth
var target_width
Methods
def __init__(
self, target_width=80, **kwargs)
def __init__(self,target_width=80,**kwargs):
StringIO.__init__(self,**kwargs)
self.currentpos = 0
self.target_width = target_width
self.tabwidth = -1
self.indentlist = [0]
def close(
self)
Free the memory buffer.
def close(self):
"""Free the memory buffer.
"""
if not self.closed:
self.closed = True
del self.buf, self.pos
def flush(
self)
Flush the internal buffer
def flush(self):
"""Flush the internal buffer
"""
_complain_ifclosed(self.closed)
def getvalue(
self)
Retrieve the entire contents of the "file" at any time before the StringIO object's close() method is called.
The StringIO object can accept either Unicode or 8-bit strings, but mixing the two may take some care. If both are used, 8-bit strings that cannot be interpreted as 7-bit ASCII (that use the 8th bit) will cause a UnicodeError to be raised when getvalue() is called.
def getvalue(self):
"""
Retrieve the entire contents of the "file" at any time before
the StringIO object's close() method is called.
The StringIO object can accept either Unicode or 8-bit strings,
but mixing the two may take some care. If both are used, 8-bit
strings that cannot be interpreted as 7-bit ASCII (that use the
8th bit) will cause a UnicodeError to be raised when getvalue()
is called.
"""
_complain_ifclosed(self.closed)
if self.buflist:
self.buf += ''.join(self.buflist)
self.buflist = []
return self.buf
def isatty(
self)
Returns False because StringIO objects are not connected to a tty-like device.
def isatty(self):
"""Returns False because StringIO objects are not connected to a
tty-like device.
"""
_complain_ifclosed(self.closed)
return False
def next(
self)
A file object is its own iterator, for example iter(f) returns f (unless f is closed). When a file is used as an iterator, typically in a for loop (for example, for line in f: print line), the next() method is called repeatedly. This method returns the next input line, or raises StopIteration when EOF is hit.
def next(self):
"""A file object is its own iterator, for example iter(f) returns f
(unless f is closed). When a file is used as an iterator, typically
in a for loop (for example, for line in f: print line), the next()
method is called repeatedly. This method returns the next input line,
or raises StopIteration when EOF is hit.
"""
_complain_ifclosed(self.closed)
r = self.readline()
if not r:
raise StopIteration
return r
def read(
self, n=-1)
Read at most size bytes from the file (less if the read hits EOF before obtaining size bytes).
If the size argument is negative or omitted, read all data until EOF is reached. The bytes are returned as a string object. An empty string is returned when EOF is encountered immediately.
def read(self, n = -1):
"""Read at most size bytes from the file
(less if the read hits EOF before obtaining size bytes).
If the size argument is negative or omitted, read all data until EOF
is reached. The bytes are returned as a string object. An empty
string is returned when EOF is encountered immediately.
"""
_complain_ifclosed(self.closed)
if self.buflist:
self.buf += ''.join(self.buflist)
self.buflist = []
if n is None or n < 0:
newpos = self.len
else:
newpos = min(self.pos+n, self.len)
r = self.buf[self.pos:newpos]
self.pos = newpos
return r
def readline(
self, length=None)
Read one entire line from the file.
A trailing newline character is kept in the string (but may be absent when a file ends with an incomplete line). If the size argument is present and non-negative, it is a maximum byte count (including the trailing newline) and an incomplete line may be returned.
An empty string is returned only when EOF is encountered immediately.
Note: Unlike stdio's fgets(), the returned string contains null characters ('\0') if they occurred in the input.
def readline(self, length=None):
r"""Read one entire line from the file.
A trailing newline character is kept in the string (but may be absent
when a file ends with an incomplete line). If the size argument is
present and non-negative, it is a maximum byte count (including the
trailing newline) and an incomplete line may be returned.
An empty string is returned only when EOF is encountered immediately.
Note: Unlike stdio's fgets(), the returned string contains null
characters ('\0') if they occurred in the input.
"""
_complain_ifclosed(self.closed)
if self.buflist:
self.buf += ''.join(self.buflist)
self.buflist = []
i = self.buf.find('\n', self.pos)
if i < 0:
newpos = self.len
else:
newpos = i+1
if length is not None and length >= 0:
if self.pos + length < newpos:
newpos = self.pos + length
r = self.buf[self.pos:newpos]
self.pos = newpos
return r
def readlines(
self, sizehint=0)
Read until EOF using readline() and return a list containing the lines thus read.
If the optional sizehint argument is present, instead of reading up to EOF, whole lines totalling approximately sizehint bytes (or more to accommodate a final whole line).
def readlines(self, sizehint = 0):
"""Read until EOF using readline() and return a list containing the
lines thus read.
If the optional sizehint argument is present, instead of reading up
to EOF, whole lines totalling approximately sizehint bytes (or more
to accommodate a final whole line).
"""
total = 0
lines = []
line = self.readline()
while line:
lines.append(line)
total += len(line)
if 0 < sizehint <= total:
break
line = self.readline()
return lines
def seek(
self, pos, mode=0)
Set the file's current position.
The mode argument is optional and defaults to 0 (absolute file positioning); other values are 1 (seek relative to the current position) and 2 (seek relative to the file's end).
There is no return value.
def seek(self, pos, mode = 0):
"""Set the file's current position.
The mode argument is optional and defaults to 0 (absolute file
positioning); other values are 1 (seek relative to the current
position) and 2 (seek relative to the file's end).
There is no return value.
"""
_complain_ifclosed(self.closed)
if self.buflist:
self.buf += ''.join(self.buflist)
self.buflist = []
if mode == 1:
pos += self.pos
elif mode == 2:
pos += self.len
self.pos = max(0, pos)
def set_tab(
self, tabwidth)
Set the tab stop position
def set_tab(self,tabwidth):
"""Set the tab stop position"""
self.tabwidth = tabwidth
def tell(
self)
Return the file's current position.
def tell(self):
"""Return the file's current position."""
_complain_ifclosed(self.closed)
return self.pos
def truncate(
self, size=None)
Truncate the file's size.
If the optional size argument is present, the file is truncated to (at most) that size. The size defaults to the current position. The current file position is not changed unless the position is beyond the new file size.
If the specified size exceeds the file's current size, the file remains unchanged.
def truncate(self, size=None):
"""Truncate the file's size.
If the optional size argument is present, the file is truncated to
(at most) that size. The size defaults to the current position.
The current file position is not changed unless the position
is beyond the new file size.
If the specified size exceeds the file's current size, the
file remains unchanged.
"""
_complain_ifclosed(self.closed)
if size is None:
size = self.pos
elif size < 0:
raise IOError(EINVAL, "Negative size not allowed")
elif size < self.pos:
self.pos = size
self.buf = self.getvalue()[:size]
self.len = size
def write(
self, outstring, canbreak=False, mustbreak=False, do_tab=True, newindent=False, unindent=False, startcol=-1)
Write a string with correct linebreak, tabs and indents
def write(self,outstring,canbreak=False,mustbreak=False,do_tab=True,newindent=False,unindent=False,startcol=-1):
"""Write a string with correct linebreak, tabs and indents"""
# do we need to break?
if mustbreak: #insert a new line and indent
StringIO.write(self,'\n' + ' '*self.indentlist[-1])
self.currentpos = self.indentlist[-1]
if self.currentpos+len(outstring)>self.target_width: #try to break
if canbreak:
StringIO.write(self,'\n'+' '*self.indentlist[-1])
self.currentpos = self.indentlist[-1]
if newindent: #indent by current amount
if self.indentlist[-1] == 0: #first time
self.indentlist.append(self.currentpos)
print 'Indentlist: ' + `self.indentlist`
else:
self.indentlist.append(self.indentlist[-1]+2)
elif unindent:
if len(self.indentlist)>1:
self.indentlist.pop()
else:
print 'Warning: cannot unindent any further'
#handle tabs
if self.tabwidth >0 and do_tab:
next_stop = ((self.currentpos//self.tabwidth)+1)*self.tabwidth
#print 'Currentpos %d: Next tab stop at %d' % (self.currentpos,next_stop)
if self.currentpos < next_stop:
StringIO.write(self,(next_stop-self.currentpos)*' ')
self.currentpos = next_stop
#now output the string
StringIO.write(self,outstring)
last_line_break = outstring.rfind('\n')
if last_line_break >=0:
self.currentpos = len(outstring)-last_line_break
else:
self.currentpos = self.currentpos + len(outstring)
def writelines(
self, iterable)
Write a sequence of strings to the file. The sequence can be any iterable object producing strings, typically a list of strings. There is no return value.
(The name is intended to match readlines(); writelines() does not add line separators.)
def writelines(self, iterable):
"""Write a sequence of strings to the file. The sequence can be any
iterable object producing strings, typically a list of strings. There
is no return value.
(The name is intended to match readlines(); writelines() does not add
line separators.)
"""
write = self.write
for line in iterable:
write(line)
class LoopBlock
class LoopBlock(object):
def __init__(self,data = (), dimension = 0, maxoutlength=2048, wraplength=80, overwrite=True,
characterset='ascii'):
# print 'Creating new loop block, dimension %d' % dimension
self.block = {}
self.loops = []
self.no_packets = 0
self.item_order = []
self.formatting_hints = {}
self.lower_keys = [] #for efficiency
self.value_switch = False #prefer string version always
self.comment_list = {}
self.dimension = dimension
self.dictionary = None #DDLm dictionary
self.popout = False #used during load iteration
self.curitem = -1 #used during iteration
self.maxoutlength = maxoutlength
self.wraplength = wraplength
self.overwrite = overwrite
self.characterset = characterset
if not hasattr(self,'loopclass'): #in case are derived class
self.loopclass = LoopBlock #when making new loops
if self.characterset == 'ascii':
self.char_check = re.compile("[][ \n\r\t!%&\(\)*+,./:<=>?@0-9A-Za-z\\\\^`{}\|~\"#$';_-]+",re.M)
elif self.characterset == 'unicode':
self.char_check = re.compile(u"[][ \n\r\t!%&\(\)*+,./:<=>?@0-9A-Za-z\\\\^`{}\|~\"#$';_\u00A0-\uD7FF\uE000-\uFDCF\uFDF0-\uFFFD\U00010000-\U0010FFFD-]+",re.M)
else:
raise StarError("No character set specified")
if isinstance(data,(TupleType,ListType)):
for item in data:
self.AddLoopItem(item)
elif isinstance(data,LoopBlock):
self.block = data.block.copy()
self.item_order = data.item_order[:]
self.lower_keys = data.lower_keys[:]
self.comment_list = data.comment_list.copy()
self.dimension = data.dimension
# loops as well
for loopno in range(len(data.loops)):
try:
placeholder = self.item_order.index(data.loops[loopno])
except ValueError:
print "Warning: loop %s (%s) in loops, but not in item_order (%s)" % (`data.loops[loopno]`,str(data.loops[loopno]),`self.item_order`)
placeholder = -1
self.item_order.remove(data.loops[loopno]) #gone
# newobject = self.loopclass(data.loops[loopno])
# print "Recasting and adding loop %s -> %s" % (`data.loops[loopno]`,`newobject`)
self.insert_loop(data.loops[loopno],position=placeholder)
def __str__(self):
return self.printsection()
def __setitem__(self,key,value):
if key == "saves":
raise StarError("""Setting the saves key is deprecated. Add the save block to
an enclosing block collection (e.g. CIF or STAR file) with this block as child""")
self.AddLoopItem((key,value))
def __getitem__(self,key):
if isinstance(key,IntType): #return a packet!!
return self.GetPacket(key)
if key == "saves":
raise StarError("""The saves key is deprecated. Access the save block from
the enclosing block collection (e.g. CIF or STAR file object)""")
try:
rawitem,is_value = self.GetLoopItemValue(key)
except KeyError:
if self.dictionary:
# send the dictionary the required key and a pointer to us
new_value = self.dictionary.derive_item(key,self,store_value=True)
print 'Set %s to derived value %s' % (key, `new_value`)
return new_value
else:
raise KeyError, 'No such item: %s' % key
# we now have an item, we can try to convert it to a number if that is appropriate
# note numpy values are never stored but are converted to lists
if not self.dictionary or not self.dictionary.has_key(key) or is_value: return rawitem
return self.dictionary.change_type(key,rawitem)
def __delitem__(self,key):
self.RemoveLoopItem(key)
def __len__(self):
blen = len(self.block)
for aloop in self.loops:
# print 'Aloop is %s' % `aloop`
blen = blen + len(aloop) # also a LoopBlock
return blen
def __nonzero__(self):
if self.__len__() > 0: return 1
return 0
# keys returns all internal keys
def keys(self):
thesekeys = self.block.keys()
for aloop in self.loops:
thesekeys.extend(aloop.keys())
return thesekeys
def values(self):
ourkeys = self.keys()
return map(lambda a:self[a],ourkeys)
def items(self):
ourkeys = self.keys()
return map(lambda a,b:(a,b),self.keys(),self.values())
def has_key(self,key):
if isinstance(key,StringTypes) and key.lower() in self.lower_keys:
return 1
for aloop in self.loops:
if aloop.has_key(key): return 1
return 0
def get(self,key,default=None):
if self.has_key(key):
retval = self.GetLoopItem(key)
else:
retval = default
return retval
def clear(self):
self.block = {}
self.loops = []
self.item_order = []
self.lower_keys = []
self.no_packets = 0
# doesn't appear to work
def copy(self):
newcopy = LoopBlock(dimension = self.dimension)
newcopy.block = self.block.copy()
newcopy.loops = []
newcopy.no_packets = self.no_packets
newcopy.item_order = self.item_order[:]
newcopy.lower_keys = self.lower_keys[:]
for loop in self.loops:
try:
placeholder = self.item_order.index(loop)
except ValueError:
print "Warning: loop %s (%s) in loops, but not in item_order (%s)" % (`loop`,str(loop),`self.item_order`)
placeholder = -1
newcopy.item_order.remove(loop) #gone
newobject = loop.copy()
# print "Adding loop %s -> %s" % (`loop`,`newobject`)
newcopy.insert_loop(newobject,position=placeholder)
return newcopy
# this is not appropriate for subloops. Instead, the loop block
# should be accessed directly for update
def update(self,adict):
for key in adict.keys():
self.AddLoopItem((key,adict[key]))
def load_iter(self,coords=[]):
count = 0 #to create packet index
while not self.popout:
# ok, we have a new packet: append a list to our subloops
for aloop in self.loops:
aloop.new_enclosing_packet()
for iname in self.item_order:
if isinstance(iname,LoopBlock): #into a nested loop
for subitems in iname.load_iter(coords=coords+[count]):
# print 'Yielding %s' % `subitems`
yield subitems
# print 'End of internal loop'
else:
if self.dimension == 0:
# print 'Yielding %s' % `self[iname]`
yield self,self[iname]
else:
backval = self.block[iname]
for i in range(len(coords)):
# print 'backval, coords: %s, %s' % (`backval`,`coords`)
backval = backval[coords[i]]
yield self,backval
count = count + 1 # count packets
self.popout = False # reinitialise
# print 'Finished iterating'
yield self,'###Blank###' #this value should never be used
# an experimental fast iterator for level-1 loops (ie CIF)
def fast_load_iter(self):
targets = map(lambda a:self.block[a],self.item_order)
while targets:
for target in targets:
yield self,target
# Add another list of the required shape to take into account a new outer packet
def new_enclosing_packet(self):
if self.dimension > 1: #otherwise have a top-level list
for iname in self.keys(): #includes lower levels
target_list = self[iname]
for i in range(3,self.dimension): #dim 2 upwards are lists of lists of...
target_list = target_list[-1]
target_list.append([])
# print '%s now %s' % (iname,`self[iname]`)
def recursive_iter(self,dict_so_far={},coord=[]):
# print "Recursive iter: coord %s, keys %s, dim %d" % (`coord`,`self.block.keys()`,self.dimension)
my_length = 0
top_items = self.block.items()
top_values = self.block.values() #same order as items
drill_values = self.block.values()
for dimup in range(0,self.dimension): #look higher in the tree
if len(drill_values)>0: #this block has values
drill_values=drill_values[0] #drill in
else:
raise StarError("Malformed loop packet %s" % `top_items[0]`)
my_length = len(drill_values[0]) #length of 'string' entry
if self.dimension == 0: #top level
for aloop in self.loops:
for apacket in aloop.recursive_iter():
# print "Recursive yielding %s" % `dict(top_items + apacket.items())`
prep_yield = StarPacket(top_values+apacket.values()) #straight list
for name,value in top_items + apacket.items():
setattr(prep_yield,name,value)
yield prep_yield
else: #in some loop
for i in range(my_length):
kvpairs = map(lambda a:(a,self.coord_to_group(a,coord)[i]),self.block.keys())
kvvals = map(lambda a:a[1],kvpairs) #just values
# print "Recursive kvpairs at %d: %s" % (i,`kvpairs`)
if self.loops:
for aloop in self.loops:
for apacket in aloop.recursive_iter(coord=coord+[i]):
# print "Recursive yielding %s" % `dict(kvpairs + apacket.items())`
prep_yield = StarPacket(kvvals+apacket.values())
for name,value in kvpairs + apacket.items():
setattr(prep_yield,name,value)
yield prep_yield
else: # we're at the bottom of the tree
# print "Recursive yielding %s" % `dict(kvpairs)`
prep_yield = StarPacket(kvvals)
for name,value in kvpairs:
setattr(prep_yield,name,value)
yield prep_yield
# small function to use the coordinates.
def coord_to_group(self,dataname,coords):
if not isinstance(dataname,StringTypes):
return dataname # flag inner loop processing
newm = self[dataname] # newm must be a list or tuple
for c in coords:
# print "Coord_to_group: %s ->" % (`newm`),
newm = newm[c]
# print `newm`
return newm
def flat_iterator(self):
if self.dimension == 0:
yield copy.copy(self)
else:
my_length = 0
top_keys = self.block.keys()
if len(top_keys)>0:
my_length = len(self.block[top_keys[0]])
for pack_no in range(my_length):
yield(self.collapse(pack_no))
def insert_loop(self,newloop,position=-1,audit=True):
# check that new loop is kosher
if newloop.dimension != self.dimension + 1:
raise StarError( 'Insertion of loop of wrong nesting level %d, should be %d' % (newloop.dimension, self.dimension+1))
self.loops.append(newloop)
if audit:
dupes = self.audit()
if dupes:
dupenames = map(lambda a:a[0],dupes)
raise StarError( 'Duplicate names: %s' % `dupenames`)
if position >= 0:
self.item_order.insert(position,newloop)
else:
self.item_order.append(newloop)
# print "Insert loop: item_order now" + `self.item_order`
def remove_loop(self,oldloop):
# print "Removing %s: item_order %s" % (`oldloop`,self.item_order)
# print "Length %d" % len(oldloop)
self.item_order.remove(oldloop)
self.loops.remove(oldloop)
def AddComment(self,itemname,comment):
self.comment_list[itemname.lower()] = comment
def RemoveComment(self,itemname):
del self.comment_list[itemname.lower()]
def GetLoopItem(self,itemname):
"""Return value of itemname in this loop block"""
return self.GetLoopItemValue(itemname)[0]
def GetLoopItemValue(self,itemname):
"""Return value of itemname and whether or not it is a native value"""
# assume case is correct first
try:
s,v = self.block[itemname]
except KeyError:
for loop in self.loops:
try:
return loop.GetLoopItemValue(itemname)
except KeyError:
pass
if itemname.lower() in self.lower_keys:
# it is there somewhere, now we need to find it
real_keys = self.block.keys()
lower_keys = map(lambda a:a.lower(),self.block.keys())
try:
k_index = lower_keys.index(itemname.lower())
except ValueError: #should never happen!!
raise KeyError, 'Bug: Item %s unexpectedly not in block' % itemname
s,v = self.block[real_keys[k_index]]
else:
raise KeyError, 'Item %s not in block' % itemname
# prefer string value unless all are None
if self.dimension == 0:
if s is not None:
return s,False
else:
return v,not isinstance(v,StarList)
elif None not in s: return s,False
else:
if len(v)>0:
return v,not isinstance(v[0],StarList)
return v,True
def RemoveLoopItem(self,itemname):
if self.has_key(itemname):
testkey = itemname.lower()
real_keys = self.block.keys()
lower_keys = map(lambda a:a.lower(),real_keys)
try:
k_index = lower_keys.index(testkey)
except ValueError: #must be in a lower loop
for aloop in self.loops:
if aloop.has_key(itemname):
# print "Deleting %s (%s)" % (itemname,aloop[itemname])
del aloop[itemname]
if len(aloop)==0: # all gone
self.remove_loop(aloop)
break
else:
del self.block[real_keys[k_index]]
self.lower_keys.remove(testkey)
# now remove the key in the order list
for i in range(len(self.item_order)):
if isinstance(self.item_order[i],StringTypes): #may be loop
if self.item_order[i].lower()==testkey:
del self.item_order[i]
break
if len(self.block)==0: #no items in loop, length -> 0
self.no_packets = 0
return #no duplicates, no more checking needed
def AddLoopItem(self,incomingdata,precheck=False,maxlength=-1):
# print "Received data %s" % `incomingdata`
# we accept tuples, strings, lists and dicts!!
# Direct insertion: we have a string-valued key, with an array
# of values -> single-item into our loop
if isinstance(incomingdata[0],(TupleType,ListType)):
# internal loop
# first we remove any occurences of these datanames in
# other loops
for one_item in incomingdata[0]:
if self.has_key(one_item):
if not self.overwrite:
raise StarError( 'Attempt to insert duplicate item name %s' % incomingdata[0])
else:
del self[one_item]
newloop = self.loopclass(dimension = self.dimension+1,characterset=self.characterset)
keyvals = zip(incomingdata[0],incomingdata[1])
for key,val in keyvals:
newloop.AddLoopItem((key,val))
self.insert_loop(newloop)
elif not isinstance(incomingdata[0],StringTypes):
raise TypeError, 'Star datanames are strings only (got %s)' % `incomingdata[0]`
else:
data = list(incomingdata) #copy
if data[1] == [] or get_dim(data[1])[0] == self.dimension:
if not precheck:
self.check_data_name(data[0],maxlength) # make sure no nasty characters
# check that we can replace data
if not self.overwrite:
if self.has_key(data[0]):
raise StarError( 'Attempt to insert duplicate item name %s' % data[0])
# put the data in the right container
regval,empty_val = self.regularise_data(data[1])
# check for pure string data
pure_string = check_stringiness(regval)
if not precheck:
self.check_item_value(regval)
if self.dimension > 0:
if self.no_packets <= 0:
self.no_packets = len(data[1]) #first item in this loop
if len(data[1]) != self.no_packets:
raise StarLengthError, 'Not enough values supplied for %s' % (data[0])
try:
oldpos = self.GetItemPosition(data[0])
except ValueError:
oldpos = len(self.item_order)#end of list
self.RemoveLoopItem(data[0]) # may be different case (upper/lower), so have to do this
if pure_string:
self.block.update({data[0]:[regval,empty_val]}) # trust the data is OK
else:
self.block.update({data[0]:[empty_val,regval]})
self.lower_keys.insert(oldpos,data[0].lower())
self.item_order.insert(oldpos,data[0])
# self.lower_keys.append(data[0].lower())
# self.item_order.append(data[0])
else: #dimension mismatch
# single-member lists could be seen as bare lists...
if isinstance(data[1],(TupleType,ListType)) and len(data[1])==1:
self.AddLoopItem(data[0],data[1][0])
# if that doesn't work, make the dataname list a compound item for inserting a loop
else:
self.AddLoopItem(((data[0],),(data[1],)))
# raise StarLengthError, "input data dim %d != required dim %d: %s %s" % (get_dim(data[1])[0],self.dimension,data[0],`data[1]`)
def check_data_name(self,dataname,maxlength=-1):
if maxlength > 0:
if len(dataname)>maxlength:
raise StarError( 'Dataname %s exceeds maximum length %d' % (dataname,maxlength))
if dataname[0]!='_':
raise StarError( 'Dataname ' + dataname + ' does not begin with _')
if self.characterset=='ascii':
if len (filter (lambda a: ord(a) < 33 or ord(a) > 126, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains forbidden characters')
else:
# print 'Checking %s for unicode characterset conformance' % dataname
if len (filter (lambda a: ord(a) < 33, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains forbidden characters (below code point 33)')
if len (filter (lambda a: ord(a) > 126 and ord(a) < 160, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains forbidden characters (between code point 127-159)')
if len (filter (lambda a: ord(a) > 0xD7FF and ord(a) < 0xE000, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains unsupported characters (between U+D800 and U+E000)')
if len (filter (lambda a: ord(a) > 0xFDCF and ord(a) < 0xFDF0, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains unsupported characters (between U+FDD0 and U+FDEF)')
if len (filter (lambda a: ord(a) == 0xFFFE or ord(a) == 0xFFFF, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains unsupported characters (U+FFFE and/or U+FFFF)')
if len (filter (lambda a: ord(a) > 0x10000 and (ord(a) & 0xE == 0xE) , dataname)) > 0:
print '%s fails' % dataname
for a in dataname: print '%x' % ord(a),
print
raise StarError( u'Dataname ' + dataname + u' contains unsupported characters (U+xFFFE and/or U+xFFFF)')
def check_item_value(self,item):
test_item = item
if not isinstance(item,(list,dict,tuple)):
test_item = [item] #single item list
def check_one (it):
if isinstance(it,basestring):
if it=='': return
me = self.char_check.match(it)
if not me:
print "Fail value check: %s" % it
raise StarError, u'Bad character in %s' % it
else:
if me.span() != (0,len(it)):
print "Fail value check, match only %d-%d in string %s" % (me.span()[0],me.span()[1],`it`)
raise StarError,u'Data item "' + `it` + u'"... contains forbidden characters'
map(check_one,test_item)
def regularise_data(self,dataitem):
"""Place dataitem into a list if necessary"""
from numbers import Number
if isinstance(dataitem,(Number,basestring,StarList,StarDict)):
return dataitem,None
if isinstance(dataitem,(tuple,list)):
return dataitem,[None]*len(dataitem)
# so try to make into a list
try:
regval = list(dataitem)
except TypeError, value:
raise StarError( str(dataitem) + ' is wrong type for data value\n' )
return regval,[None]*len(regval)
def GetLoop(self,keyname):
if not self.has_key(keyname):
raise KeyError, 'Item %s does not exist' % keyname
if keyname.lower() in self.lower_keys: #python 2.2 or above
return self
for aloop in self.loops:
try:
return aloop.GetLoop(keyname)
except KeyError:
pass
raise KeyError, 'Item %s does not exist' % keyname
def GetPacket(self,index):
thispack = StarPacket([])
for myitem in self.item_order:
if isinstance(myitem,LoopBlock):
pack_list = [myitem[b][index] for b in myitem.item_order]
# print 'Pack_list -> %s' % `pack_list`
thispack.append(pack_list)
elif self.dimension==0:
thispack.append(self[myitem])
else:
thispack.append(self[myitem][index])
setattr(thispack,myitem,thispack[-1])
return thispack
def AddPacket(self,packet):
if self.dimension==0:
raise StarError,"Attempt to add packet to top level block"
for myitem in self.item_order:
self[myitem] = list(self[myitem]) #in case we have stored a tuple
self[myitem].append(packet.__getattribute__(myitem))
self.no_packets +=1
# print "%s now %s" % (myitem,`self[myitem]`)
def RemoveKeyedPacket(self,keyname,keyvalue):
packet_coord = list(self[keyname]).index(keyvalue)
loophandle = self.GetLoop(keyname)
for dataname in loophandle.item_order:
loophandle.block[dataname][0] = list(loophandle.block[dataname][0])
del loophandle.block[dataname][0][packet_coord]
loophandle.block[dataname][1] = list(loophandle.block[dataname][1])
del loophandle.block[dataname][1][packet_coord]
self.no_packets -= 1
def GetKeyedPacket(self,keyname,keyvalue,no_case=False):
"""Return the loop packet where [[keyname]] has value [[keyvalue]]. Ignore case if no_case is true"""
#print "Looking for %s in %s" % (keyvalue, self[keyname])
my_loop = self.GetLoop(keyname)
if no_case:
one_pack= [a for a in my_loop if getattr(a,keyname).lower()==keyvalue.lower()]
else:
one_pack= [a for a in my_loop if getattr(a,keyname)==keyvalue]
if len(one_pack)!=1:
raise ValueError, "Bad packet key %s = %s: returned %d packets" % (keyname,keyvalue,len(one_pack))
#print "Keyed packet: %s" % one_pack[0]
return one_pack[0]
def GetKeyedSemanticPacket(self,keyvalue,cat_id):
"""Return a complete packet for category cat_id"""
target_keys = self.dictionary.cat_key_table[cat_id]
p = StarPacket()
# set case-sensitivity flag
lcase = False
if self.dictionary[target_keys[0]]['_type.contents'] in ['Code','Tag','Name']:
lcase = True
for cat_key in target_keys:
try:
extra_packet = self.GetKeyedPacket(cat_key,keyvalue,no_case=lcase)
except KeyError: #try to create the key
key_vals = self[cat_key] #will create a key column
p.merge_packet(self.GetKeyedPacket(cat_key,keyvalue,no_case=lcase))
# the following attributes used to calculate missing values
p.key = target_keys[0]
p.cif_dictionary = self.dictionary
p.fulldata = self
return p
def GetItemOrder(self):
return self.item_order[:]
def ChangeItemOrder(self,itemname,newpos):
testpos = self.GetItemPosition(itemname)
del self.item_order[testpos]
# so we have an object ready for action
self.item_order.insert(newpos,itemname)
def GetItemPosition(self,itemname):
import string
def low_case(item):
try:
return string.lower(item)
except AttributeError:
return item
try:
testname = string.lower(itemname)
except AttributeError:
testname = itemname
lowcase_order = map(low_case,self.item_order)
return lowcase_order.index(testname)
def collapse(self,packet_no):
if self.dimension == 0:
raise StarError( "Attempt to select non-existent packet")
newlb = LoopBlock(dimension=self.dimension-1)
for one_item in self.item_order:
if isinstance(one_item,LoopBlock):
newlb.insert_loop(one_item.collapse(packet_no))
else:
# print "Collapse: %s -> %s" % (one_item,`self[one_item][packet_no]`)
newlb[one_item] = self[one_item][packet_no]
return newlb
def audit(self):
allkeys = self.keys()
uniquenames = set(allkeys)
if len(uniquenames) == len(allkeys): return []
else:
keycount = map(lambda a:(a,allkeys.count(a)),uniquenames)
return filter(lambda a:a[1]>1,keycount)
def GetLoopNames(self,keyname):
if keyname in self:
return self.keys()
for aloop in self.loops:
try:
return aloop.GetLoopNames(keyname)
except KeyError:
pass
raise KeyError, 'Item does not exist'
def AddToLoop(self,dataname,loopdata):
thisloop = self.GetLoop(dataname)
for itemname,itemvalue in loopdata.items():
thisloop[itemname] = itemvalue
def Loopify(self,datanamelist):
thisloop = self.GetLoop(datanamelist[0])
badmatch = filter(lambda a:a in datanamelist,thisloop.keys())
if len(badmatch)==len(datanamelist): #all at same level so is OK
newloop = LoopBlock(dimension=self.dimension+1)
for name in datanamelist:
newloop[name]=[self[name]]
del self[name]
self.insert_loop(newloop)
def SetOutputLength(self,wraplength=80,maxoutlength=2048):
if wraplength > maxoutlength:
raise StarError("Wrap length (requested %d) must be <= Maximum line length (requested %d)" % (wraplength,maxoutlength))
self.wraplength = wraplength
self.maxoutlength = maxoutlength
for loop in self.loops:
loop.SetOutputLength(wraplength,maxoutlength)
def printsection(self,instring='',ordering=[],blockstart="",blockend="",indent=0,coord=[]):
import string
# first make an ordering
self.create_ordering(ordering)
# now do it...
if not instring:
outstring = CIFStringIO(target_width=80) # the returned string
else:
outstring = instring
if not coord:
coords = [0]*(self.dimension-1)
else:
coords = coord
if(len(coords)0:
#print "Remaining to output " + `self.output_order`
itemname = self.output_order.pop(0)
item_spec = [i for i in ordering if i['dataname'].lower()==itemname.lower()]
if len(item_spec)>0:
col_pos = item_spec[0].get('column',-1)
else:
col_pos = -1
item_spec = {}
if self.dimension == 0: # ie value next to tag
if not isinstance(itemname,LoopBlock): #no loop
if col_pos < 0: col_pos = 40
outstring.set_tab(col_pos)
itemvalue = self[itemname]
outstring.write(itemname,mustbreak=True,do_tab=False)
outstring.write(' ',canbreak=True,do_tab=False) #space after itemname
self.format_value(itemvalue,outstring,hints=item_spec)
else: # we are asked to print an internal loop block
#first make sure we have sensible coords. Length should be one
#less than the current dimension
outstring.set_tab(10) #guess this is OK?
outstring.write(' '*indent,mustbreak=True,do_tab=False); outstring.write('loop_\n',do_tab=False)
itemname.format_names(outstring,indent+2)
itemname.format_packets(outstring,coords,indent+2)
else: # we are a nested loop
outstring.write(' '*indent,mustbreak=True,do_tab=False); outstring.write('loop_\n',do_tab=False)
self.format_names(outstring,indent+2)
self.format_packets(outstring,coords,indent+2)
if instring: return #inside a recursion
else:
returnstring = outstring.getvalue()
outstring.close()
return returnstring
def format_names(self,outstring,indent=0):
temp_order = self.item_order[:]
while len(temp_order)>0:
itemname = temp_order.pop(0)
if isinstance(itemname,StringTypes): #(not loop)
outstring.write(' ' * indent,do_tab=False)
outstring.write(itemname,do_tab=False)
outstring.write("\n",do_tab=False)
else: # a loop
outstring.write(' ' * indent,do_tab=False)
outstring.write("loop_\n",do_tab=False)
itemname.format_names(outstring,indent+2)
outstring.write(" stop_\n",do_tab=False)
def format_packets(self,outstring,coordinates,indent=0):
import cStringIO
import string
# get our current group of data
# print 'Coords: %s' % `coordinates`
alldata = map(lambda a:self.coord_to_group(a,coordinates),self.item_order)
# print 'Alldata: %s' % `alldata`
packet_data = apply(zip,alldata)
# print 'Packet data: %s' % `packet_data`
for position in range(len(packet_data)):
for point in range(len(packet_data[position])):
datapoint = packet_data[position][point]
packstring = self.format_packet_item(datapoint,indent,outstring)
outstring.write("\n",do_tab=False)
def format_packet_item(self,pack_item,indent,outstring):
# print 'Formatting %s' % `pack_item`
if isinstance(pack_item,(StringType,UnicodeType,IntType,FloatType,LongType,StarList,StarDict)):
if isinstance(pack_item,StringTypes):
outstring.write(self._formatstring(pack_item))
else:
self.format_value(pack_item,outstring)
outstring.write(' ',canbreak=True,do_tab=False)
# Now, for each nested loop we call ourselves again
else: # a nested packet
if not isinstance(pack_item[0],(ListType,TupleType)): #base packet
item_list = pack_item
else:
item_list = apply(zip,pack_item)
for sub_item in item_list:
outstring.write(' ' + self.format_packet_item(sub_item,indent,outstring),canbreak=True)
# stop_ is not issued at the end of each innermost packet
if isinstance(pack_item[0],(ListType,TupleType)):
outstring.write(' stop_ ',canbreak=True)
def _formatstring(self,instring,delimiter=None,standard='CIF1',indent=0,
lbprotocol=True,pref_protocol=True):
import string
if standard == 'CIF2':
allowed_delimiters = set(['"',"'",";",None,'"""',"'''"])
else:
allowed_delimiters = set(['"',"'",";",None])
if len(instring)==0: allowed_delimiters.difference_update([None])
if len(instring) > (self.maxoutlength-2) or '\n' in instring:
allowed_delimiters.intersection_update([";","'''",'"""'])
if ' ' in instring or '\t' in instring or '\v' in instring or '_' in instring or ',' in instring:
allowed_delimiters.difference_update([None])
if '"' in instring: allowed_delimiters.difference_update(['"',None])
if "'" in instring: allowed_delimiters.difference_update(["'",None])
out_delimiter = ";" #default (most conservative)
if delimiter in allowed_delimiters:
out_delimiter = delimiter
elif "'" in allowed_delimiters: out_delimiter = "'"
elif '"' in allowed_delimiters: out_delimiter = '"'
if out_delimiter in ['"',"'",'"""',"'''"]: return out_delimiter + instring + out_delimiter
elif out_delimiter is None: return instring
# we are left with semicolon strings
outstring = "\n;"
# if there are returns in the string, try to work with them
while 1:
retin = string.find(instring,'\n')+1
if retin < self.maxoutlength and retin > 0: # honour this break
outstring = outstring + instring[:retin]
instring = instring[retin:]
elif len(instring)0:
self.format_value(itemvalue[0],stringsink)
for listval in itemvalue[1:]:
print 'Formatting %s' % `listval`
stringsink.write(', ',do_tab=False)
self.format_value(listval,stringsink,compound=True)
stringsink.write(']',unindent=True)
elif isinstance(itemvalue,StarDict):
stringsink.set_tab(0)
stringsink.write('{',newindent=True,mustbreak=compound) #start a new line inside
items = itemvalue.items()
if len(items)>0:
stringsink.write("'"+items[0][0]+"'"+':',canbreak=True)
self.format_value(items[0][1],stringsink)
for key,value in items[1:]:
stringsink.write(', ')
stringsink.write("'"+key+"'"+":",canbreak=True)
self.format_value(value,stringsink) #never break between key and value
stringsink.write('}',unindent=True)
else:
stringsink.write(str(itemvalue),canbreak=True) #numbers
def process_template(self,template_string):
"""Process a template datafile to formatting instructions"""
template_as_cif = StarFile(StringIO(template_string),grammar="DDLm").first_block()
#template_as_lines = template_string.split("\n")
#template_as_lines = [l for l in template_as_lines if len(l)>0 and l[0]!='#']
#template_as_lines = [l for l in template_as_lines if l.split()[0] != 'loop_']
#template_full_lines = dict([(l.split()[0],l) for l in template_as_lines if len(l.split())>0])
self.form_hints = [] #ordered array of hint dictionaries
for item in template_as_cif.item_order: #order of input
if not isinstance(item,LoopBlock): #not nested
hint_dict = {"dataname":item}
# find the line in the file
start_pos = re.search("(^[ \t]*" + item + "[ \t\n]+)(?P([\S]+)|(^;))",template_string,re.I|re.M)
if start_pos.group("spec") != None:
spec_pos = start_pos.start("spec")-start_pos.start(0)
spec_char = template_string[start_pos.start("spec")]
if spec_char in '\'";':
hint_dict.update({"delimiter":spec_char})
if spec_char != ";": #so we need to work out the column number
hint_dict.update({"column":spec_pos})
print '%s: %s' % (item,`hint_dict`)
self.form_hints.append(hint_dict)
else: #loop block
testname = item.item_order[0]
#find the loop spec line in the file
loop_regex = "(^[ \t]*loop_[ \t\n\r]+" + testname + "([ \t\n\r]+_[\S]+){%d}[ \t]*$(?P(.(?!_loop|_[\S]+))*))" % (len(item.item_order) - 1)
loop_line = re.search(loop_regex,template_string,re.I|re.M|re.S)
loop_so_far = loop_line.end()
packet_text = loop_line.group('packet')
packet_regex = "[ \t]*(?P(?P'([^\n\r\f']*)'+)|(?P\"([^\n\r\"]*)\"+)|(?P[^\s]+))"
packet_pos = re.finditer(packet_regex,packet_text)
line_end_pos = re.finditer("^",packet_text,re.M)
next_end = line_end_pos.next().end()
last_end = next_end
for loopname in item.item_order:
hint_dict = {"dataname":loopname}
thismatch = packet_pos.next()
while thismatch.start('all') > next_end:
try:
last_end = next_end
next_end = line_end_pos.next().start()
print 'next end %d' % next_end
except StopIteration:
pass
print 'Start %d, last_end %d' % (thismatch.start('all'),last_end)
col_pos = thismatch.start('all') - last_end
if thismatch.group('none') is None:
hint_dict.update({'delimiter':thismatch.groups()[0][0]})
hint_dict.update({'column':col_pos})
print '%s: %s' % (loopname,`hint_dict`)
self.form_hints.append(hint_dict)
return
def create_ordering(self,order_dict):
"""Create a canonical ordering that includes loops using our formatting hints dictionary"""
requested_order = [i['dataname'] for i in order_dict]
new_order = []
for item in requested_order:
if isinstance(item,basestring) and item.lower() in self.item_order:
new_order.append(item)
elif self.has_key(item): #in a loop somewhere
target_loop = self.GetLoop(item)
target_loop.create_ordering(order_dict)
new_order.append(self.GetLoop(item))
extras = [i for i in self.item_order if i not in new_order]
self.output_order = new_order + extras
print 'Final order: ' + `self.output_order`
Ancestors (in MRO)
- LoopBlock
- __builtin__.object
Instance variables
var block
var characterset
var comment_list
var curitem
var dictionary
var dimension
var formatting_hints
var item_order
var loops
var lower_keys
var maxoutlength
var no_packets
var overwrite
var popout
var value_switch
var wraplength
Methods
def __init__(
self, data=(), dimension=0, maxoutlength=2048, wraplength=80, overwrite=True, characterset='ascii')
def __init__(self,data = (), dimension = 0, maxoutlength=2048, wraplength=80, overwrite=True,
characterset='ascii'):
# print 'Creating new loop block, dimension %d' % dimension
self.block = {}
self.loops = []
self.no_packets = 0
self.item_order = []
self.formatting_hints = {}
self.lower_keys = [] #for efficiency
self.value_switch = False #prefer string version always
self.comment_list = {}
self.dimension = dimension
self.dictionary = None #DDLm dictionary
self.popout = False #used during load iteration
self.curitem = -1 #used during iteration
self.maxoutlength = maxoutlength
self.wraplength = wraplength
self.overwrite = overwrite
self.characterset = characterset
if not hasattr(self,'loopclass'): #in case are derived class
self.loopclass = LoopBlock #when making new loops
if self.characterset == 'ascii':
self.char_check = re.compile("[][ \n\r\t!%&\(\)*+,./:<=>?@0-9A-Za-z\\\\^`{}\|~\"#$';_-]+",re.M)
elif self.characterset == 'unicode':
self.char_check = re.compile(u"[][ \n\r\t!%&\(\)*+,./:<=>?@0-9A-Za-z\\\\^`{}\|~\"#$';_\u00A0-\uD7FF\uE000-\uFDCF\uFDF0-\uFFFD\U00010000-\U0010FFFD-]+",re.M)
else:
raise StarError("No character set specified")
if isinstance(data,(TupleType,ListType)):
for item in data:
self.AddLoopItem(item)
elif isinstance(data,LoopBlock):
self.block = data.block.copy()
self.item_order = data.item_order[:]
self.lower_keys = data.lower_keys[:]
self.comment_list = data.comment_list.copy()
self.dimension = data.dimension
# loops as well
for loopno in range(len(data.loops)):
try:
placeholder = self.item_order.index(data.loops[loopno])
except ValueError:
print "Warning: loop %s (%s) in loops, but not in item_order (%s)" % (`data.loops[loopno]`,str(data.loops[loopno]),`self.item_order`)
placeholder = -1
self.item_order.remove(data.loops[loopno]) #gone
# newobject = self.loopclass(data.loops[loopno])
# print "Recasting and adding loop %s -> %s" % (`data.loops[loopno]`,`newobject`)
self.insert_loop(data.loops[loopno],position=placeholder)
def AddComment(
self, itemname, comment)
def AddComment(self,itemname,comment):
self.comment_list[itemname.lower()] = comment
def AddLoopItem(
self, incomingdata, precheck=False, maxlength=-1)
def AddLoopItem(self,incomingdata,precheck=False,maxlength=-1):
# print "Received data %s" % `incomingdata`
# we accept tuples, strings, lists and dicts!!
# Direct insertion: we have a string-valued key, with an array
# of values -> single-item into our loop
if isinstance(incomingdata[0],(TupleType,ListType)):
# internal loop
# first we remove any occurences of these datanames in
# other loops
for one_item in incomingdata[0]:
if self.has_key(one_item):
if not self.overwrite:
raise StarError( 'Attempt to insert duplicate item name %s' % incomingdata[0])
else:
del self[one_item]
newloop = self.loopclass(dimension = self.dimension+1,characterset=self.characterset)
keyvals = zip(incomingdata[0],incomingdata[1])
for key,val in keyvals:
newloop.AddLoopItem((key,val))
self.insert_loop(newloop)
elif not isinstance(incomingdata[0],StringTypes):
raise TypeError, 'Star datanames are strings only (got %s)' % `incomingdata[0]`
else:
data = list(incomingdata) #copy
if data[1] == [] or get_dim(data[1])[0] == self.dimension:
if not precheck:
self.check_data_name(data[0],maxlength) # make sure no nasty characters
# check that we can replace data
if not self.overwrite:
if self.has_key(data[0]):
raise StarError( 'Attempt to insert duplicate item name %s' % data[0])
# put the data in the right container
regval,empty_val = self.regularise_data(data[1])
# check for pure string data
pure_string = check_stringiness(regval)
if not precheck:
self.check_item_value(regval)
if self.dimension > 0:
if self.no_packets <= 0:
self.no_packets = len(data[1]) #first item in this loop
if len(data[1]) != self.no_packets:
raise StarLengthError, 'Not enough values supplied for %s' % (data[0])
try:
oldpos = self.GetItemPosition(data[0])
except ValueError:
oldpos = len(self.item_order)#end of list
self.RemoveLoopItem(data[0]) # may be different case (upper/lower), so have to do this
if pure_string:
self.block.update({data[0]:[regval,empty_val]}) # trust the data is OK
else:
self.block.update({data[0]:[empty_val,regval]})
self.lower_keys.insert(oldpos,data[0].lower())
self.item_order.insert(oldpos,data[0])
# self.lower_keys.append(data[0].lower())
# self.item_order.append(data[0])
else: #dimension mismatch
# single-member lists could be seen as bare lists...
if isinstance(data[1],(TupleType,ListType)) and len(data[1])==1:
self.AddLoopItem(data[0],data[1][0])
# if that doesn't work, make the dataname list a compound item for inserting a loop
else:
self.AddLoopItem(((data[0],),(data[1],)))
def AddPacket(
self, packet)
def AddPacket(self,packet):
if self.dimension==0:
raise StarError,"Attempt to add packet to top level block"
for myitem in self.item_order:
self[myitem] = list(self[myitem]) #in case we have stored a tuple
self[myitem].append(packet.__getattribute__(myitem))
self.no_packets +=1
def AddToLoop(
self, dataname, loopdata)
def AddToLoop(self,dataname,loopdata):
thisloop = self.GetLoop(dataname)
for itemname,itemvalue in loopdata.items():
thisloop[itemname] = itemvalue
def ChangeItemOrder(
self, itemname, newpos)
def ChangeItemOrder(self,itemname,newpos):
testpos = self.GetItemPosition(itemname)
del self.item_order[testpos]
# so we have an object ready for action
self.item_order.insert(newpos,itemname)
def GetItemOrder(
self)
def GetItemOrder(self):
return self.item_order[:]
def GetItemPosition(
self, itemname)
def GetItemPosition(self,itemname):
import string
def low_case(item):
try:
return string.lower(item)
except AttributeError:
return item
try:
testname = string.lower(itemname)
except AttributeError:
testname = itemname
lowcase_order = map(low_case,self.item_order)
return lowcase_order.index(testname)
def GetKeyedPacket(
self, keyname, keyvalue, no_case=False)
Return the loop packet where [[keyname]] has value [[keyvalue]]. Ignore case if no_case is true
def GetKeyedPacket(self,keyname,keyvalue,no_case=False):
"""Return the loop packet where [[keyname]] has value [[keyvalue]]. Ignore case if no_case is true"""
#print "Looking for %s in %s" % (keyvalue, self[keyname])
my_loop = self.GetLoop(keyname)
if no_case:
one_pack= [a for a in my_loop if getattr(a,keyname).lower()==keyvalue.lower()]
else:
one_pack= [a for a in my_loop if getattr(a,keyname)==keyvalue]
if len(one_pack)!=1:
raise ValueError, "Bad packet key %s = %s: returned %d packets" % (keyname,keyvalue,len(one_pack))
#print "Keyed packet: %s" % one_pack[0]
return one_pack[0]
def GetKeyedSemanticPacket(
self, keyvalue, cat_id)
Return a complete packet for category cat_id
def GetKeyedSemanticPacket(self,keyvalue,cat_id):
"""Return a complete packet for category cat_id"""
target_keys = self.dictionary.cat_key_table[cat_id]
p = StarPacket()
# set case-sensitivity flag
lcase = False
if self.dictionary[target_keys[0]]['_type.contents'] in ['Code','Tag','Name']:
lcase = True
for cat_key in target_keys:
try:
extra_packet = self.GetKeyedPacket(cat_key,keyvalue,no_case=lcase)
except KeyError: #try to create the key
key_vals = self[cat_key] #will create a key column
p.merge_packet(self.GetKeyedPacket(cat_key,keyvalue,no_case=lcase))
# the following attributes used to calculate missing values
p.key = target_keys[0]
p.cif_dictionary = self.dictionary
p.fulldata = self
return p
def GetLoop(
self, keyname)
def GetLoop(self,keyname):
if not self.has_key(keyname):
raise KeyError, 'Item %s does not exist' % keyname
if keyname.lower() in self.lower_keys: #python 2.2 or above
return self
for aloop in self.loops:
try:
return aloop.GetLoop(keyname)
except KeyError:
pass
raise KeyError, 'Item %s does not exist' % keyname
def GetLoopItem(
self, itemname)
Return value of itemname in this loop block
def GetLoopItem(self,itemname):
"""Return value of itemname in this loop block"""
return self.GetLoopItemValue(itemname)[0]
def GetLoopItemValue(
self, itemname)
Return value of itemname and whether or not it is a native value
def GetLoopItemValue(self,itemname):
"""Return value of itemname and whether or not it is a native value"""
# assume case is correct first
try:
s,v = self.block[itemname]
except KeyError:
for loop in self.loops:
try:
return loop.GetLoopItemValue(itemname)
except KeyError:
pass
if itemname.lower() in self.lower_keys:
# it is there somewhere, now we need to find it
real_keys = self.block.keys()
lower_keys = map(lambda a:a.lower(),self.block.keys())
try:
k_index = lower_keys.index(itemname.lower())
except ValueError: #should never happen!!
raise KeyError, 'Bug: Item %s unexpectedly not in block' % itemname
s,v = self.block[real_keys[k_index]]
else:
raise KeyError, 'Item %s not in block' % itemname
# prefer string value unless all are None
if self.dimension == 0:
if s is not None:
return s,False
else:
return v,not isinstance(v,StarList)
elif None not in s: return s,False
else:
if len(v)>0:
return v,not isinstance(v[0],StarList)
return v,True
def GetLoopNames(
self, keyname)
def GetLoopNames(self,keyname):
if keyname in self:
return self.keys()
for aloop in self.loops:
try:
return aloop.GetLoopNames(keyname)
except KeyError:
pass
raise KeyError, 'Item does not exist'
def GetPacket(
self, index)
def GetPacket(self,index):
thispack = StarPacket([])
for myitem in self.item_order:
if isinstance(myitem,LoopBlock):
pack_list = [myitem[b][index] for b in myitem.item_order]
# print 'Pack_list -> %s' % `pack_list`
thispack.append(pack_list)
elif self.dimension==0:
thispack.append(self[myitem])
else:
thispack.append(self[myitem][index])
setattr(thispack,myitem,thispack[-1])
return thispack
def Loopify(
self, datanamelist)
def Loopify(self,datanamelist):
thisloop = self.GetLoop(datanamelist[0])
badmatch = filter(lambda a:a in datanamelist,thisloop.keys())
if len(badmatch)==len(datanamelist): #all at same level so is OK
newloop = LoopBlock(dimension=self.dimension+1)
for name in datanamelist:
newloop[name]=[self[name]]
del self[name]
self.insert_loop(newloop)
def RemoveComment(
self, itemname)
def RemoveComment(self,itemname):
del self.comment_list[itemname.lower()]
def RemoveKeyedPacket(
self, keyname, keyvalue)
def RemoveKeyedPacket(self,keyname,keyvalue):
packet_coord = list(self[keyname]).index(keyvalue)
loophandle = self.GetLoop(keyname)
for dataname in loophandle.item_order:
loophandle.block[dataname][0] = list(loophandle.block[dataname][0])
del loophandle.block[dataname][0][packet_coord]
loophandle.block[dataname][1] = list(loophandle.block[dataname][1])
del loophandle.block[dataname][1][packet_coord]
self.no_packets -= 1
def RemoveLoopItem(
self, itemname)
def RemoveLoopItem(self,itemname):
if self.has_key(itemname):
testkey = itemname.lower()
real_keys = self.block.keys()
lower_keys = map(lambda a:a.lower(),real_keys)
try:
k_index = lower_keys.index(testkey)
except ValueError: #must be in a lower loop
for aloop in self.loops:
if aloop.has_key(itemname):
# print "Deleting %s (%s)" % (itemname,aloop[itemname])
del aloop[itemname]
if len(aloop)==0: # all gone
self.remove_loop(aloop)
break
else:
del self.block[real_keys[k_index]]
self.lower_keys.remove(testkey)
# now remove the key in the order list
for i in range(len(self.item_order)):
if isinstance(self.item_order[i],StringTypes): #may be loop
if self.item_order[i].lower()==testkey:
del self.item_order[i]
break
if len(self.block)==0: #no items in loop, length -> 0
self.no_packets = 0
return #no duplicates, no more checking needed
def SetOutputLength(
self, wraplength=80, maxoutlength=2048)
def SetOutputLength(self,wraplength=80,maxoutlength=2048):
if wraplength > maxoutlength:
raise StarError("Wrap length (requested %d) must be <= Maximum line length (requested %d)" % (wraplength,maxoutlength))
self.wraplength = wraplength
self.maxoutlength = maxoutlength
for loop in self.loops:
loop.SetOutputLength(wraplength,maxoutlength)
def audit(
self)
def audit(self):
allkeys = self.keys()
uniquenames = set(allkeys)
if len(uniquenames) == len(allkeys): return []
else:
keycount = map(lambda a:(a,allkeys.count(a)),uniquenames)
return filter(lambda a:a[1]>1,keycount)
def check_data_name(
self, dataname, maxlength=-1)
def check_data_name(self,dataname,maxlength=-1):
if maxlength > 0:
if len(dataname)>maxlength:
raise StarError( 'Dataname %s exceeds maximum length %d' % (dataname,maxlength))
if dataname[0]!='_':
raise StarError( 'Dataname ' + dataname + ' does not begin with _')
if self.characterset=='ascii':
if len (filter (lambda a: ord(a) < 33 or ord(a) > 126, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains forbidden characters')
else:
# print 'Checking %s for unicode characterset conformance' % dataname
if len (filter (lambda a: ord(a) < 33, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains forbidden characters (below code point 33)')
if len (filter (lambda a: ord(a) > 126 and ord(a) < 160, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains forbidden characters (between code point 127-159)')
if len (filter (lambda a: ord(a) > 0xD7FF and ord(a) < 0xE000, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains unsupported characters (between U+D800 and U+E000)')
if len (filter (lambda a: ord(a) > 0xFDCF and ord(a) < 0xFDF0, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains unsupported characters (between U+FDD0 and U+FDEF)')
if len (filter (lambda a: ord(a) == 0xFFFE or ord(a) == 0xFFFF, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains unsupported characters (U+FFFE and/or U+FFFF)')
if len (filter (lambda a: ord(a) > 0x10000 and (ord(a) & 0xE == 0xE) , dataname)) > 0:
print '%s fails' % dataname
for a in dataname: print '%x' % ord(a),
print
raise StarError( u'Dataname ' + dataname + u' contains unsupported characters (U+xFFFE and/or U+xFFFF)')
def check_item_value(
self, item)
def check_item_value(self,item):
test_item = item
if not isinstance(item,(list,dict,tuple)):
test_item = [item] #single item list
def check_one (it):
if isinstance(it,basestring):
if it=='': return
me = self.char_check.match(it)
if not me:
print "Fail value check: %s" % it
raise StarError, u'Bad character in %s' % it
else:
if me.span() != (0,len(it)):
print "Fail value check, match only %d-%d in string %s" % (me.span()[0],me.span()[1],`it`)
raise StarError,u'Data item "' + `it` + u'"... contains forbidden characters'
map(check_one,test_item)
def clear(
self)
def clear(self):
self.block = {}
self.loops = []
self.item_order = []
self.lower_keys = []
self.no_packets = 0
def collapse(
self, packet_no)
def collapse(self,packet_no):
if self.dimension == 0:
raise StarError( "Attempt to select non-existent packet")
newlb = LoopBlock(dimension=self.dimension-1)
for one_item in self.item_order:
if isinstance(one_item,LoopBlock):
newlb.insert_loop(one_item.collapse(packet_no))
else:
# print "Collapse: %s -> %s" % (one_item,`self[one_item][packet_no]`)
newlb[one_item] = self[one_item][packet_no]
return newlb
def coord_to_group(
self, dataname, coords)
def coord_to_group(self,dataname,coords):
if not isinstance(dataname,StringTypes):
return dataname # flag inner loop processing
newm = self[dataname] # newm must be a list or tuple
for c in coords:
# print "Coord_to_group: %s ->" % (`newm`),
newm = newm[c]
# print `newm`
return newm
def copy(
self)
def copy(self):
newcopy = LoopBlock(dimension = self.dimension)
newcopy.block = self.block.copy()
newcopy.loops = []
newcopy.no_packets = self.no_packets
newcopy.item_order = self.item_order[:]
newcopy.lower_keys = self.lower_keys[:]
for loop in self.loops:
try:
placeholder = self.item_order.index(loop)
except ValueError:
print "Warning: loop %s (%s) in loops, but not in item_order (%s)" % (`loop`,str(loop),`self.item_order`)
placeholder = -1
newcopy.item_order.remove(loop) #gone
newobject = loop.copy()
# print "Adding loop %s -> %s" % (`loop`,`newobject`)
newcopy.insert_loop(newobject,position=placeholder)
return newcopy
def create_ordering(
self, order_dict)
Create a canonical ordering that includes loops using our formatting hints dictionary
def create_ordering(self,order_dict):
"""Create a canonical ordering that includes loops using our formatting hints dictionary"""
requested_order = [i['dataname'] for i in order_dict]
new_order = []
for item in requested_order:
if isinstance(item,basestring) and item.lower() in self.item_order:
new_order.append(item)
elif self.has_key(item): #in a loop somewhere
target_loop = self.GetLoop(item)
target_loop.create_ordering(order_dict)
new_order.append(self.GetLoop(item))
extras = [i for i in self.item_order if i not in new_order]
self.output_order = new_order + extras
print 'Final order: ' + `self.output_order`
def fast_load_iter(
self)
def fast_load_iter(self):
targets = map(lambda a:self.block[a],self.item_order)
while targets:
for target in targets:
yield self,target
def flat_iterator(
self)
def flat_iterator(self):
if self.dimension == 0:
yield copy.copy(self)
else:
my_length = 0
top_keys = self.block.keys()
if len(top_keys)>0:
my_length = len(self.block[top_keys[0]])
for pack_no in range(my_length):
yield(self.collapse(pack_no))
def format_names(
self, outstring, indent=0)
def format_names(self,outstring,indent=0):
temp_order = self.item_order[:]
while len(temp_order)>0:
itemname = temp_order.pop(0)
if isinstance(itemname,StringTypes): #(not loop)
outstring.write(' ' * indent,do_tab=False)
outstring.write(itemname,do_tab=False)
outstring.write("\n",do_tab=False)
else: # a loop
outstring.write(' ' * indent,do_tab=False)
outstring.write("loop_\n",do_tab=False)
itemname.format_names(outstring,indent+2)
outstring.write(" stop_\n",do_tab=False)
def format_packet_item(
self, pack_item, indent, outstring)
def format_packet_item(self,pack_item,indent,outstring):
# print 'Formatting %s' % `pack_item`
if isinstance(pack_item,(StringType,UnicodeType,IntType,FloatType,LongType,StarList,StarDict)):
if isinstance(pack_item,StringTypes):
outstring.write(self._formatstring(pack_item))
else:
self.format_value(pack_item,outstring)
outstring.write(' ',canbreak=True,do_tab=False)
# Now, for each nested loop we call ourselves again
else: # a nested packet
if not isinstance(pack_item[0],(ListType,TupleType)): #base packet
item_list = pack_item
else:
item_list = apply(zip,pack_item)
for sub_item in item_list:
outstring.write(' ' + self.format_packet_item(sub_item,indent,outstring),canbreak=True)
# stop_ is not issued at the end of each innermost packet
if isinstance(pack_item[0],(ListType,TupleType)):
outstring.write(' stop_ ',canbreak=True)
def format_packets(
self, outstring, coordinates, indent=0)
def format_packets(self,outstring,coordinates,indent=0):
import cStringIO
import string
# get our current group of data
# print 'Coords: %s' % `coordinates`
alldata = map(lambda a:self.coord_to_group(a,coordinates),self.item_order)
# print 'Alldata: %s' % `alldata`
packet_data = apply(zip,alldata)
# print 'Packet data: %s' % `packet_data`
for position in range(len(packet_data)):
for point in range(len(packet_data[position])):
datapoint = packet_data[position][point]
packstring = self.format_packet_item(datapoint,indent,outstring)
outstring.write("\n",do_tab=False)
def format_value(
self, itemvalue, stringsink, compound=False, hints={})
Format a Star data value
def format_value(self,itemvalue,stringsink,compound=False,hints={}):
"""Format a Star data value"""
delimiter = hints.get('delimiter',None)
if isinstance(itemvalue,StringTypes): #need to sanitize
stringsink.write(self._formatstring(itemvalue,delimiter=delimiter),canbreak = True)
elif isinstance(itemvalue,StarList):
stringsink.set_tab(0)
stringsink.write('[',canbreak=True,newindent=True,mustbreak=compound)
if len(itemvalue)>0:
self.format_value(itemvalue[0],stringsink)
for listval in itemvalue[1:]:
print 'Formatting %s' % `listval`
stringsink.write(', ',do_tab=False)
self.format_value(listval,stringsink,compound=True)
stringsink.write(']',unindent=True)
elif isinstance(itemvalue,StarDict):
stringsink.set_tab(0)
stringsink.write('{',newindent=True,mustbreak=compound) #start a new line inside
items = itemvalue.items()
if len(items)>0:
stringsink.write("'"+items[0][0]+"'"+':',canbreak=True)
self.format_value(items[0][1],stringsink)
for key,value in items[1:]:
stringsink.write(', ')
stringsink.write("'"+key+"'"+":",canbreak=True)
self.format_value(value,stringsink) #never break between key and value
stringsink.write('}',unindent=True)
else:
stringsink.write(str(itemvalue),canbreak=True) #numbers
def get(
self, key, default=None)
def get(self,key,default=None):
if self.has_key(key):
retval = self.GetLoopItem(key)
else:
retval = default
return retval
def has_key(
self, key)
def has_key(self,key):
if isinstance(key,StringTypes) and key.lower() in self.lower_keys:
return 1
for aloop in self.loops:
if aloop.has_key(key): return 1
return 0
def insert_loop(
self, newloop, position=-1, audit=True)
def insert_loop(self,newloop,position=-1,audit=True):
# check that new loop is kosher
if newloop.dimension != self.dimension + 1:
raise StarError( 'Insertion of loop of wrong nesting level %d, should be %d' % (newloop.dimension, self.dimension+1))
self.loops.append(newloop)
if audit:
dupes = self.audit()
if dupes:
dupenames = map(lambda a:a[0],dupes)
raise StarError( 'Duplicate names: %s' % `dupenames`)
if position >= 0:
self.item_order.insert(position,newloop)
else:
self.item_order.append(newloop)
def items(
self)
def items(self):
ourkeys = self.keys()
return map(lambda a,b:(a,b),self.keys(),self.values())
def keys(
self)
def keys(self):
thesekeys = self.block.keys()
for aloop in self.loops:
thesekeys.extend(aloop.keys())
return thesekeys
def load_iter(
self, coords=[])
def load_iter(self,coords=[]):
count = 0 #to create packet index
while not self.popout:
# ok, we have a new packet: append a list to our subloops
for aloop in self.loops:
aloop.new_enclosing_packet()
for iname in self.item_order:
if isinstance(iname,LoopBlock): #into a nested loop
for subitems in iname.load_iter(coords=coords+[count]):
# print 'Yielding %s' % `subitems`
yield subitems
# print 'End of internal loop'
else:
if self.dimension == 0:
# print 'Yielding %s' % `self[iname]`
yield self,self[iname]
else:
backval = self.block[iname]
for i in range(len(coords)):
# print 'backval, coords: %s, %s' % (`backval`,`coords`)
backval = backval[coords[i]]
yield self,backval
count = count + 1 # count packets
self.popout = False # reinitialise
# print 'Finished iterating'
yield self,'###Blank###' #this value should never be used
def new_enclosing_packet(
self)
def new_enclosing_packet(self):
if self.dimension > 1: #otherwise have a top-level list
for iname in self.keys(): #includes lower levels
target_list = self[iname]
for i in range(3,self.dimension): #dim 2 upwards are lists of lists of...
target_list = target_list[-1]
target_list.append([])
def printsection(
self, instring='', ordering=[], blockstart='', blockend='', indent=0, coord=[])
def printsection(self,instring='',ordering=[],blockstart="",blockend="",indent=0,coord=[]):
import string
# first make an ordering
self.create_ordering(ordering)
# now do it...
if not instring:
outstring = CIFStringIO(target_width=80) # the returned string
else:
outstring = instring
if not coord:
coords = [0]*(self.dimension-1)
else:
coords = coord
if(len(coords)0:
#print "Remaining to output " + `self.output_order`
itemname = self.output_order.pop(0)
item_spec = [i for i in ordering if i['dataname'].lower()==itemname.lower()]
if len(item_spec)>0:
col_pos = item_spec[0].get('column',-1)
else:
col_pos = -1
item_spec = {}
if self.dimension == 0: # ie value next to tag
if not isinstance(itemname,LoopBlock): #no loop
if col_pos < 0: col_pos = 40
outstring.set_tab(col_pos)
itemvalue = self[itemname]
outstring.write(itemname,mustbreak=True,do_tab=False)
outstring.write(' ',canbreak=True,do_tab=False) #space after itemname
self.format_value(itemvalue,outstring,hints=item_spec)
else: # we are asked to print an internal loop block
#first make sure we have sensible coords. Length should be one
#less than the current dimension
outstring.set_tab(10) #guess this is OK?
outstring.write(' '*indent,mustbreak=True,do_tab=False); outstring.write('loop_\n',do_tab=False)
itemname.format_names(outstring,indent+2)
itemname.format_packets(outstring,coords,indent+2)
else: # we are a nested loop
outstring.write(' '*indent,mustbreak=True,do_tab=False); outstring.write('loop_\n',do_tab=False)
self.format_names(outstring,indent+2)
self.format_packets(outstring,coords,indent+2)
if instring: return #inside a recursion
else:
returnstring = outstring.getvalue()
outstring.close()
return returnstring
def process_template(
self, template_string)
Process a template datafile to formatting instructions
def process_template(self,template_string):
"""Process a template datafile to formatting instructions"""
template_as_cif = StarFile(StringIO(template_string),grammar="DDLm").first_block()
#template_as_lines = template_string.split("\n")
#template_as_lines = [l for l in template_as_lines if len(l)>0 and l[0]!='#']
#template_as_lines = [l for l in template_as_lines if l.split()[0] != 'loop_']
#template_full_lines = dict([(l.split()[0],l) for l in template_as_lines if len(l.split())>0])
self.form_hints = [] #ordered array of hint dictionaries
for item in template_as_cif.item_order: #order of input
if not isinstance(item,LoopBlock): #not nested
hint_dict = {"dataname":item}
# find the line in the file
start_pos = re.search("(^[ \t]*" + item + "[ \t\n]+)(?P([\S]+)|(^;))",template_string,re.I|re.M)
if start_pos.group("spec") != None:
spec_pos = start_pos.start("spec")-start_pos.start(0)
spec_char = template_string[start_pos.start("spec")]
if spec_char in '\'";':
hint_dict.update({"delimiter":spec_char})
if spec_char != ";": #so we need to work out the column number
hint_dict.update({"column":spec_pos})
print '%s: %s' % (item,`hint_dict`)
self.form_hints.append(hint_dict)
else: #loop block
testname = item.item_order[0]
#find the loop spec line in the file
loop_regex = "(^[ \t]*loop_[ \t\n\r]+" + testname + "([ \t\n\r]+_[\S]+){%d}[ \t]*$(?P(.(?!_loop|_[\S]+))*))" % (len(item.item_order) - 1)
loop_line = re.search(loop_regex,template_string,re.I|re.M|re.S)
loop_so_far = loop_line.end()
packet_text = loop_line.group('packet')
packet_regex = "[ \t]*(?P(?P'([^\n\r\f']*)'+)|(?P\"([^\n\r\"]*)\"+)|(?P[^\s]+))"
packet_pos = re.finditer(packet_regex,packet_text)
line_end_pos = re.finditer("^",packet_text,re.M)
next_end = line_end_pos.next().end()
last_end = next_end
for loopname in item.item_order:
hint_dict = {"dataname":loopname}
thismatch = packet_pos.next()
while thismatch.start('all') > next_end:
try:
last_end = next_end
next_end = line_end_pos.next().start()
print 'next end %d' % next_end
except StopIteration:
pass
print 'Start %d, last_end %d' % (thismatch.start('all'),last_end)
col_pos = thismatch.start('all') - last_end
if thismatch.group('none') is None:
hint_dict.update({'delimiter':thismatch.groups()[0][0]})
hint_dict.update({'column':col_pos})
print '%s: %s' % (loopname,`hint_dict`)
self.form_hints.append(hint_dict)
return
def recursive_iter(
self, dict_so_far={}, coord=[])
def recursive_iter(self,dict_so_far={},coord=[]):
# print "Recursive iter: coord %s, keys %s, dim %d" % (`coord`,`self.block.keys()`,self.dimension)
my_length = 0
top_items = self.block.items()
top_values = self.block.values() #same order as items
drill_values = self.block.values()
for dimup in range(0,self.dimension): #look higher in the tree
if len(drill_values)>0: #this block has values
drill_values=drill_values[0] #drill in
else:
raise StarError("Malformed loop packet %s" % `top_items[0]`)
my_length = len(drill_values[0]) #length of 'string' entry
if self.dimension == 0: #top level
for aloop in self.loops:
for apacket in aloop.recursive_iter():
# print "Recursive yielding %s" % `dict(top_items + apacket.items())`
prep_yield = StarPacket(top_values+apacket.values()) #straight list
for name,value in top_items + apacket.items():
setattr(prep_yield,name,value)
yield prep_yield
else: #in some loop
for i in range(my_length):
kvpairs = map(lambda a:(a,self.coord_to_group(a,coord)[i]),self.block.keys())
kvvals = map(lambda a:a[1],kvpairs) #just values
# print "Recursive kvpairs at %d: %s" % (i,`kvpairs`)
if self.loops:
for aloop in self.loops:
for apacket in aloop.recursive_iter(coord=coord+[i]):
# print "Recursive yielding %s" % `dict(kvpairs + apacket.items())`
prep_yield = StarPacket(kvvals+apacket.values())
for name,value in kvpairs + apacket.items():
setattr(prep_yield,name,value)
yield prep_yield
else: # we're at the bottom of the tree
# print "Recursive yielding %s" % `dict(kvpairs)`
prep_yield = StarPacket(kvvals)
for name,value in kvpairs:
setattr(prep_yield,name,value)
yield prep_yield
def regularise_data(
self, dataitem)
Place dataitem into a list if necessary
def regularise_data(self,dataitem):
"""Place dataitem into a list if necessary"""
from numbers import Number
if isinstance(dataitem,(Number,basestring,StarList,StarDict)):
return dataitem,None
if isinstance(dataitem,(tuple,list)):
return dataitem,[None]*len(dataitem)
# so try to make into a list
try:
regval = list(dataitem)
except TypeError, value:
raise StarError( str(dataitem) + ' is wrong type for data value\n' )
return regval,[None]*len(regval)
def remove_loop(
self, oldloop)
def remove_loop(self,oldloop):
# print "Removing %s: item_order %s" % (`oldloop`,self.item_order)
# print "Length %d" % len(oldloop)
self.item_order.remove(oldloop)
self.loops.remove(oldloop)
def update(
self, adict)
def update(self,adict):
for key in adict.keys():
self.AddLoopItem((key,adict[key]))
def values(
self)
def values(self):
ourkeys = self.keys()
return map(lambda a:self[a],ourkeys)
class StarBlock
class StarBlock(LoopBlock):
def copy(self):
newblock = super(StarBlock,self).copy()
return self.copy.im_class(newblock) #catch inheritance
def merge(self,new_block,mode="strict",match_att=[],match_function=None,
rel_keys = []):
if mode == 'strict':
for key in new_block.item_order:
if self.has_key(key) and key not in match_att:
raise CifError( "Identical keys %s in strict merge mode" % key)
elif key not in match_att: #no change otherwise
if isinstance(key,StringTypes):
self[key] = new_block[key]
else:
self.insert_loop(key)
elif mode == 'replace':
newkeys = new_block.keys()
for ma in match_att:
try:
newkeys.remove(ma) #don't touch the special ones
except ValueError:
pass
for key in new_block.item_order:
if isinstance(key,StringTypes):
self[key] = new_block[key]
else:
self.insert_loop(key) #assume is a loop
elif mode == 'overlay':
print 'Overlay mode, current overwrite is %s' % self.overwrite
save_overwrite = self.overwrite
self.overwrite = True
for attribute in new_block.keys():
if attribute in match_att: continue #ignore this one
new_value = new_block[attribute]
#non-looped items
if isinstance(new_value,StringTypes):
self[attribute] = new_value
these_atts = self.keys()
for newloop in new_block.loops:
newkeys = newloop.keys()
# note that the following line determines packet item order
overlaps = filter(lambda a: a in these_atts,newkeys)
if len(overlaps)< len(newloop):#completely new loop
self.insert_loop(newloop)
elif len(overlaps)==len(newloop):
# appending packets
# print "In overlay merge mode, found extra packet items:"
# print `overlaps`
# get key position
loop_keys = filter(lambda a:a in rel_keys,overlaps)
try:
newkeypos = map(lambda a:newkeys.index(a),loop_keys)
newkeypos = newkeypos[0] #one key per loop for now
loop_keys = loop_keys[0]
except (ValueError,IndexError):
newkeypos = []
overlap_data = map(lambda a:listify(self[a]),overlaps) #old packet data
new_data = map(lambda a:new_block[a],overlaps) #new packet data
packet_data = transpose(overlap_data)
new_p_data = transpose(new_data)
# remove any packets for which the keys match between old and new; we
# make the arbitrary choice that the old data stays
if newkeypos:
# get matching values in new list
print "Old, new data:\n%s\n%s" % (`overlap_data[newkeypos]`,`new_data[newkeypos]`)
key_matches = filter(lambda a:a in overlap_data[newkeypos],new_data[newkeypos])
# filter out any new data with these key values
new_p_data = filter(lambda a:a[newkeypos] not in key_matches,new_p_data)
if new_p_data:
new_data = transpose(new_p_data)
else: new_data = []
# wipe out the old data and enter the new stuff
byebyeloop = self.GetLoop(overlaps[0])
# print "Removing '%s' with overlaps '%s'" % (`byebyeloop`,`overlaps`)
# Note that if, in the original dictionary, overlaps are not
# looped, GetLoop will return the block itself. So we check
# for this case...
if byebyeloop != self:
self.remove_loop(byebyeloop)
self.AddLoopItem((overlaps,overlap_data)) #adding old packets
for pd in new_p_data: #adding new packets
if pd not in packet_data:
for i in range(len(overlaps)):
#don't do this at home; we are appending
#to something in place
self[overlaps[i]].append(pd[i])
self.overwrite = save_overwrite
def assign_dictionary(self,dic):
if not dic.diclang=="DDLm":
print "Warning: ignoring dictionary %s" % dic.dic_as_cif.my_uri
return
self.dictionary = dic
def unassign_dictionary(self):
"""Remove dictionary-dependent behaviour"""
self.dictionary = None
Ancestors (in MRO)
Instance variables
Methods
def __init__(
self, data=(), dimension=0, maxoutlength=2048, wraplength=80, overwrite=True, characterset='ascii')
Inheritance:
LoopBlock.__init__
def __init__(self,data = (), dimension = 0, maxoutlength=2048, wraplength=80, overwrite=True,
characterset='ascii'):
# print 'Creating new loop block, dimension %d' % dimension
self.block = {}
self.loops = []
self.no_packets = 0
self.item_order = []
self.formatting_hints = {}
self.lower_keys = [] #for efficiency
self.value_switch = False #prefer string version always
self.comment_list = {}
self.dimension = dimension
self.dictionary = None #DDLm dictionary
self.popout = False #used during load iteration
self.curitem = -1 #used during iteration
self.maxoutlength = maxoutlength
self.wraplength = wraplength
self.overwrite = overwrite
self.characterset = characterset
if not hasattr(self,'loopclass'): #in case are derived class
self.loopclass = LoopBlock #when making new loops
if self.characterset == 'ascii':
self.char_check = re.compile("[][ \n\r\t!%&\(\)*+,./:<=>?@0-9A-Za-z\\\\^`{}\|~\"#$';_-]+",re.M)
elif self.characterset == 'unicode':
self.char_check = re.compile(u"[][ \n\r\t!%&\(\)*+,./:<=>?@0-9A-Za-z\\\\^`{}\|~\"#$';_\u00A0-\uD7FF\uE000-\uFDCF\uFDF0-\uFFFD\U00010000-\U0010FFFD-]+",re.M)
else:
raise StarError("No character set specified")
if isinstance(data,(TupleType,ListType)):
for item in data:
self.AddLoopItem(item)
elif isinstance(data,LoopBlock):
self.block = data.block.copy()
self.item_order = data.item_order[:]
self.lower_keys = data.lower_keys[:]
self.comment_list = data.comment_list.copy()
self.dimension = data.dimension
# loops as well
for loopno in range(len(data.loops)):
try:
placeholder = self.item_order.index(data.loops[loopno])
except ValueError:
print "Warning: loop %s (%s) in loops, but not in item_order (%s)" % (`data.loops[loopno]`,str(data.loops[loopno]),`self.item_order`)
placeholder = -1
self.item_order.remove(data.loops[loopno]) #gone
# newobject = self.loopclass(data.loops[loopno])
# print "Recasting and adding loop %s -> %s" % (`data.loops[loopno]`,`newobject`)
self.insert_loop(data.loops[loopno],position=placeholder)
def AddComment(
self, itemname, comment)
Inheritance:
LoopBlock.AddComment
def AddComment(self,itemname,comment):
self.comment_list[itemname.lower()] = comment
def AddLoopItem(
self, incomingdata, precheck=False, maxlength=-1)
Inheritance:
LoopBlock.AddLoopItem
def AddLoopItem(self,incomingdata,precheck=False,maxlength=-1):
# print "Received data %s" % `incomingdata`
# we accept tuples, strings, lists and dicts!!
# Direct insertion: we have a string-valued key, with an array
# of values -> single-item into our loop
if isinstance(incomingdata[0],(TupleType,ListType)):
# internal loop
# first we remove any occurences of these datanames in
# other loops
for one_item in incomingdata[0]:
if self.has_key(one_item):
if not self.overwrite:
raise StarError( 'Attempt to insert duplicate item name %s' % incomingdata[0])
else:
del self[one_item]
newloop = self.loopclass(dimension = self.dimension+1,characterset=self.characterset)
keyvals = zip(incomingdata[0],incomingdata[1])
for key,val in keyvals:
newloop.AddLoopItem((key,val))
self.insert_loop(newloop)
elif not isinstance(incomingdata[0],StringTypes):
raise TypeError, 'Star datanames are strings only (got %s)' % `incomingdata[0]`
else:
data = list(incomingdata) #copy
if data[1] == [] or get_dim(data[1])[0] == self.dimension:
if not precheck:
self.check_data_name(data[0],maxlength) # make sure no nasty characters
# check that we can replace data
if not self.overwrite:
if self.has_key(data[0]):
raise StarError( 'Attempt to insert duplicate item name %s' % data[0])
# put the data in the right container
regval,empty_val = self.regularise_data(data[1])
# check for pure string data
pure_string = check_stringiness(regval)
if not precheck:
self.check_item_value(regval)
if self.dimension > 0:
if self.no_packets <= 0:
self.no_packets = len(data[1]) #first item in this loop
if len(data[1]) != self.no_packets:
raise StarLengthError, 'Not enough values supplied for %s' % (data[0])
try:
oldpos = self.GetItemPosition(data[0])
except ValueError:
oldpos = len(self.item_order)#end of list
self.RemoveLoopItem(data[0]) # may be different case (upper/lower), so have to do this
if pure_string:
self.block.update({data[0]:[regval,empty_val]}) # trust the data is OK
else:
self.block.update({data[0]:[empty_val,regval]})
self.lower_keys.insert(oldpos,data[0].lower())
self.item_order.insert(oldpos,data[0])
# self.lower_keys.append(data[0].lower())
# self.item_order.append(data[0])
else: #dimension mismatch
# single-member lists could be seen as bare lists...
if isinstance(data[1],(TupleType,ListType)) and len(data[1])==1:
self.AddLoopItem(data[0],data[1][0])
# if that doesn't work, make the dataname list a compound item for inserting a loop
else:
self.AddLoopItem(((data[0],),(data[1],)))
def AddPacket(
self, packet)
Inheritance:
LoopBlock.AddPacket
def AddPacket(self,packet):
if self.dimension==0:
raise StarError,"Attempt to add packet to top level block"
for myitem in self.item_order:
self[myitem] = list(self[myitem]) #in case we have stored a tuple
self[myitem].append(packet.__getattribute__(myitem))
self.no_packets +=1
def AddToLoop(
self, dataname, loopdata)
Inheritance:
LoopBlock.AddToLoop
def AddToLoop(self,dataname,loopdata):
thisloop = self.GetLoop(dataname)
for itemname,itemvalue in loopdata.items():
thisloop[itemname] = itemvalue
def ChangeItemOrder(
self, itemname, newpos)
Inheritance:
LoopBlock.ChangeItemOrder
def ChangeItemOrder(self,itemname,newpos):
testpos = self.GetItemPosition(itemname)
del self.item_order[testpos]
# so we have an object ready for action
self.item_order.insert(newpos,itemname)
def GetItemOrder(
self)
Inheritance:
LoopBlock.GetItemOrder
def GetItemOrder(self):
return self.item_order[:]
def GetItemPosition(
self, itemname)
Inheritance:
LoopBlock.GetItemPosition
def GetItemPosition(self,itemname):
import string
def low_case(item):
try:
return string.lower(item)
except AttributeError:
return item
try:
testname = string.lower(itemname)
except AttributeError:
testname = itemname
lowcase_order = map(low_case,self.item_order)
return lowcase_order.index(testname)
def GetKeyedPacket(
self, keyname, keyvalue, no_case=False)
Inheritance:
LoopBlock.GetKeyedPacket
Return the loop packet where [[keyname]] has value [[keyvalue]]. Ignore case if no_case is true
def GetKeyedPacket(self,keyname,keyvalue,no_case=False):
"""Return the loop packet where [[keyname]] has value [[keyvalue]]. Ignore case if no_case is true"""
#print "Looking for %s in %s" % (keyvalue, self[keyname])
my_loop = self.GetLoop(keyname)
if no_case:
one_pack= [a for a in my_loop if getattr(a,keyname).lower()==keyvalue.lower()]
else:
one_pack= [a for a in my_loop if getattr(a,keyname)==keyvalue]
if len(one_pack)!=1:
raise ValueError, "Bad packet key %s = %s: returned %d packets" % (keyname,keyvalue,len(one_pack))
#print "Keyed packet: %s" % one_pack[0]
return one_pack[0]
def GetKeyedSemanticPacket(
self, keyvalue, cat_id)
Inheritance:
LoopBlock.GetKeyedSemanticPacket
Return a complete packet for category cat_id
def GetKeyedSemanticPacket(self,keyvalue,cat_id):
"""Return a complete packet for category cat_id"""
target_keys = self.dictionary.cat_key_table[cat_id]
p = StarPacket()
# set case-sensitivity flag
lcase = False
if self.dictionary[target_keys[0]]['_type.contents'] in ['Code','Tag','Name']:
lcase = True
for cat_key in target_keys:
try:
extra_packet = self.GetKeyedPacket(cat_key,keyvalue,no_case=lcase)
except KeyError: #try to create the key
key_vals = self[cat_key] #will create a key column
p.merge_packet(self.GetKeyedPacket(cat_key,keyvalue,no_case=lcase))
# the following attributes used to calculate missing values
p.key = target_keys[0]
p.cif_dictionary = self.dictionary
p.fulldata = self
return p
def GetLoop(
self, keyname)
Inheritance:
LoopBlock.GetLoop
def GetLoop(self,keyname):
if not self.has_key(keyname):
raise KeyError, 'Item %s does not exist' % keyname
if keyname.lower() in self.lower_keys: #python 2.2 or above
return self
for aloop in self.loops:
try:
return aloop.GetLoop(keyname)
except KeyError:
pass
raise KeyError, 'Item %s does not exist' % keyname
def GetLoopItem(
self, itemname)
Inheritance:
LoopBlock.GetLoopItem
Return value of itemname in this loop block
def GetLoopItem(self,itemname):
"""Return value of itemname in this loop block"""
return self.GetLoopItemValue(itemname)[0]
def GetLoopItemValue(
self, itemname)
Inheritance:
LoopBlock.GetLoopItemValue
Return value of itemname and whether or not it is a native value
def GetLoopItemValue(self,itemname):
"""Return value of itemname and whether or not it is a native value"""
# assume case is correct first
try:
s,v = self.block[itemname]
except KeyError:
for loop in self.loops:
try:
return loop.GetLoopItemValue(itemname)
except KeyError:
pass
if itemname.lower() in self.lower_keys:
# it is there somewhere, now we need to find it
real_keys = self.block.keys()
lower_keys = map(lambda a:a.lower(),self.block.keys())
try:
k_index = lower_keys.index(itemname.lower())
except ValueError: #should never happen!!
raise KeyError, 'Bug: Item %s unexpectedly not in block' % itemname
s,v = self.block[real_keys[k_index]]
else:
raise KeyError, 'Item %s not in block' % itemname
# prefer string value unless all are None
if self.dimension == 0:
if s is not None:
return s,False
else:
return v,not isinstance(v,StarList)
elif None not in s: return s,False
else:
if len(v)>0:
return v,not isinstance(v[0],StarList)
return v,True
def GetLoopNames(
self, keyname)
Inheritance:
LoopBlock.GetLoopNames
def GetLoopNames(self,keyname):
if keyname in self:
return self.keys()
for aloop in self.loops:
try:
return aloop.GetLoopNames(keyname)
except KeyError:
pass
raise KeyError, 'Item does not exist'
def GetPacket(
self, index)
Inheritance:
LoopBlock.GetPacket
def GetPacket(self,index):
thispack = StarPacket([])
for myitem in self.item_order:
if isinstance(myitem,LoopBlock):
pack_list = [myitem[b][index] for b in myitem.item_order]
# print 'Pack_list -> %s' % `pack_list`
thispack.append(pack_list)
elif self.dimension==0:
thispack.append(self[myitem])
else:
thispack.append(self[myitem][index])
setattr(thispack,myitem,thispack[-1])
return thispack
def Loopify(
self, datanamelist)
Inheritance:
LoopBlock.Loopify
def Loopify(self,datanamelist):
thisloop = self.GetLoop(datanamelist[0])
badmatch = filter(lambda a:a in datanamelist,thisloop.keys())
if len(badmatch)==len(datanamelist): #all at same level so is OK
newloop = LoopBlock(dimension=self.dimension+1)
for name in datanamelist:
newloop[name]=[self[name]]
del self[name]
self.insert_loop(newloop)
def RemoveComment(
self, itemname)
Inheritance:
LoopBlock.RemoveComment
def RemoveComment(self,itemname):
del self.comment_list[itemname.lower()]
def RemoveKeyedPacket(
self, keyname, keyvalue)
Inheritance:
LoopBlock.RemoveKeyedPacket
def RemoveKeyedPacket(self,keyname,keyvalue):
packet_coord = list(self[keyname]).index(keyvalue)
loophandle = self.GetLoop(keyname)
for dataname in loophandle.item_order:
loophandle.block[dataname][0] = list(loophandle.block[dataname][0])
del loophandle.block[dataname][0][packet_coord]
loophandle.block[dataname][1] = list(loophandle.block[dataname][1])
del loophandle.block[dataname][1][packet_coord]
self.no_packets -= 1
def RemoveLoopItem(
self, itemname)
Inheritance:
LoopBlock.RemoveLoopItem
def RemoveLoopItem(self,itemname):
if self.has_key(itemname):
testkey = itemname.lower()
real_keys = self.block.keys()
lower_keys = map(lambda a:a.lower(),real_keys)
try:
k_index = lower_keys.index(testkey)
except ValueError: #must be in a lower loop
for aloop in self.loops:
if aloop.has_key(itemname):
# print "Deleting %s (%s)" % (itemname,aloop[itemname])
del aloop[itemname]
if len(aloop)==0: # all gone
self.remove_loop(aloop)
break
else:
del self.block[real_keys[k_index]]
self.lower_keys.remove(testkey)
# now remove the key in the order list
for i in range(len(self.item_order)):
if isinstance(self.item_order[i],StringTypes): #may be loop
if self.item_order[i].lower()==testkey:
del self.item_order[i]
break
if len(self.block)==0: #no items in loop, length -> 0
self.no_packets = 0
return #no duplicates, no more checking needed
def SetOutputLength(
self, wraplength=80, maxoutlength=2048)
Inheritance:
LoopBlock.SetOutputLength
def SetOutputLength(self,wraplength=80,maxoutlength=2048):
if wraplength > maxoutlength:
raise StarError("Wrap length (requested %d) must be <= Maximum line length (requested %d)" % (wraplength,maxoutlength))
self.wraplength = wraplength
self.maxoutlength = maxoutlength
for loop in self.loops:
loop.SetOutputLength(wraplength,maxoutlength)
def assign_dictionary(
self, dic)
def assign_dictionary(self,dic):
if not dic.diclang=="DDLm":
print "Warning: ignoring dictionary %s" % dic.dic_as_cif.my_uri
return
self.dictionary = dic
def audit(
self)
def audit(self):
allkeys = self.keys()
uniquenames = set(allkeys)
if len(uniquenames) == len(allkeys): return []
else:
keycount = map(lambda a:(a,allkeys.count(a)),uniquenames)
return filter(lambda a:a[1]>1,keycount)
def check_data_name(
self, dataname, maxlength=-1)
Inheritance:
LoopBlock.check_data_name
def check_data_name(self,dataname,maxlength=-1):
if maxlength > 0:
if len(dataname)>maxlength:
raise StarError( 'Dataname %s exceeds maximum length %d' % (dataname,maxlength))
if dataname[0]!='_':
raise StarError( 'Dataname ' + dataname + ' does not begin with _')
if self.characterset=='ascii':
if len (filter (lambda a: ord(a) < 33 or ord(a) > 126, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains forbidden characters')
else:
# print 'Checking %s for unicode characterset conformance' % dataname
if len (filter (lambda a: ord(a) < 33, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains forbidden characters (below code point 33)')
if len (filter (lambda a: ord(a) > 126 and ord(a) < 160, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains forbidden characters (between code point 127-159)')
if len (filter (lambda a: ord(a) > 0xD7FF and ord(a) < 0xE000, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains unsupported characters (between U+D800 and U+E000)')
if len (filter (lambda a: ord(a) > 0xFDCF and ord(a) < 0xFDF0, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains unsupported characters (between U+FDD0 and U+FDEF)')
if len (filter (lambda a: ord(a) == 0xFFFE or ord(a) == 0xFFFF, dataname)) > 0:
raise StarError( 'Dataname ' + dataname + ' contains unsupported characters (U+FFFE and/or U+FFFF)')
if len (filter (lambda a: ord(a) > 0x10000 and (ord(a) & 0xE == 0xE) , dataname)) > 0:
print '%s fails' % dataname
for a in dataname: print '%x' % ord(a),
print
raise StarError( u'Dataname ' + dataname + u' contains unsupported characters (U+xFFFE and/or U+xFFFF)')
def check_item_value(
self, item)
Inheritance:
LoopBlock.check_item_value
def check_item_value(self,item):
test_item = item
if not isinstance(item,(list,dict,tuple)):
test_item = [item] #single item list
def check_one (it):
if isinstance(it,basestring):
if it=='': return
me = self.char_check.match(it)
if not me:
print "Fail value check: %s" % it
raise StarError, u'Bad character in %s' % it
else:
if me.span() != (0,len(it)):
print "Fail value check, match only %d-%d in string %s" % (me.span()[0],me.span()[1],`it`)
raise StarError,u'Data item "' + `it` + u'"... contains forbidden characters'
map(check_one,test_item)
def clear(
self)
def clear(self):
self.block = {}
self.loops = []
self.item_order = []
self.lower_keys = []
self.no_packets = 0
def collapse(
self, packet_no)
Inheritance:
LoopBlock.collapse
def collapse(self,packet_no):
if self.dimension == 0:
raise StarError( "Attempt to select non-existent packet")
newlb = LoopBlock(dimension=self.dimension-1)
for one_item in self.item_order:
if isinstance(one_item,LoopBlock):
newlb.insert_loop(one_item.collapse(packet_no))
else:
# print "Collapse: %s -> %s" % (one_item,`self[one_item][packet_no]`)
newlb[one_item] = self[one_item][packet_no]
return newlb
def coord_to_group(
self, dataname, coords)
Inheritance:
LoopBlock.coord_to_group
def coord_to_group(self,dataname,coords):
if not isinstance(dataname,StringTypes):
return dataname # flag inner loop processing
newm = self[dataname] # newm must be a list or tuple
for c in coords:
# print "Coord_to_group: %s ->" % (`newm`),
newm = newm[c]
# print `newm`
return newm
def copy(
self)
def copy(self):
newblock = super(StarBlock,self).copy()
return self.copy.im_class(newblock) #catch inheritance
def create_ordering(
self, order_dict)
Inheritance:
LoopBlock.create_ordering
Create a canonical ordering that includes loops using our formatting hints dictionary
def create_ordering(self,order_dict):
"""Create a canonical ordering that includes loops using our formatting hints dictionary"""
requested_order = [i['dataname'] for i in order_dict]
new_order = []
for item in requested_order:
if isinstance(item,basestring) and item.lower() in self.item_order:
new_order.append(item)
elif self.has_key(item): #in a loop somewhere
target_loop = self.GetLoop(item)
target_loop.create_ordering(order_dict)
new_order.append(self.GetLoop(item))
extras = [i for i in self.item_order if i not in new_order]
self.output_order = new_order + extras
print 'Final order: ' + `self.output_order`
def fast_load_iter(
self)
Inheritance:
LoopBlock.fast_load_iter
def fast_load_iter(self):
targets = map(lambda a:self.block[a],self.item_order)
while targets:
for target in targets:
yield self,target
def flat_iterator(
self)
Inheritance:
LoopBlock.flat_iterator
def flat_iterator(self):
if self.dimension == 0:
yield copy.copy(self)
else:
my_length = 0
top_keys = self.block.keys()
if len(top_keys)>0:
my_length = len(self.block[top_keys[0]])
for pack_no in range(my_length):
yield(self.collapse(pack_no))
def format_names(
self, outstring, indent=0)
Inheritance:
LoopBlock.format_names
def format_names(self,outstring,indent=0):
temp_order = self.item_order[:]
while len(temp_order)>0:
itemname = temp_order.pop(0)
if isinstance(itemname,StringTypes): #(not loop)
outstring.write(' ' * indent,do_tab=False)
outstring.write(itemname,do_tab=False)
outstring.write("\n",do_tab=False)
else: # a loop
outstring.write(' ' * indent,do_tab=False)
outstring.write("loop_\n",do_tab=False)
itemname.format_names(outstring,indent+2)
outstring.write(" stop_\n",do_tab=False)
def format_packet_item(
self, pack_item, indent, outstring)
Inheritance:
LoopBlock.format_packet_item
def format_packet_item(self,pack_item,indent,outstring):
# print 'Formatting %s' % `pack_item`
if isinstance(pack_item,(StringType,UnicodeType,IntType,FloatType,LongType,StarList,StarDict)):
if isinstance(pack_item,StringTypes):
outstring.write(self._formatstring(pack_item))
else:
self.format_value(pack_item,outstring)
outstring.write(' ',canbreak=True,do_tab=False)
# Now, for each nested loop we call ourselves again
else: # a nested packet
if not isinstance(pack_item[0],(ListType,TupleType)): #base packet
item_list = pack_item
else:
item_list = apply(zip,pack_item)
for sub_item in item_list:
outstring.write(' ' + self.format_packet_item(sub_item,indent,outstring),canbreak=True)
# stop_ is not issued at the end of each innermost packet
if isinstance(pack_item[0],(ListType,TupleType)):
outstring.write(' stop_ ',canbreak=True)
def format_packets(
self, outstring, coordinates, indent=0)
Inheritance:
LoopBlock.format_packets
def format_packets(self,outstring,coordinates,indent=0):
import cStringIO
import string
# get our current group of data
# print 'Coords: %s' % `coordinates`
alldata = map(lambda a:self.coord_to_group(a,coordinates),self.item_order)
# print 'Alldata: %s' % `alldata`
packet_data = apply(zip,alldata)
# print 'Packet data: %s' % `packet_data`
for position in range(len(packet_data)):
for point in range(len(packet_data[position])):
datapoint = packet_data[position][point]
packstring = self.format_packet_item(datapoint,indent,outstring)
outstring.write("\n",do_tab=False)
def format_value(
self, itemvalue, stringsink, compound=False, hints={})
Inheritance:
LoopBlock.format_value
Format a Star data value
def format_value(self,itemvalue,stringsink,compound=False,hints={}):
"""Format a Star data value"""
delimiter = hints.get('delimiter',None)
if isinstance(itemvalue,StringTypes): #need to sanitize
stringsink.write(self._formatstring(itemvalue,delimiter=delimiter),canbreak = True)
elif isinstance(itemvalue,StarList):
stringsink.set_tab(0)
stringsink.write('[',canbreak=True,newindent=True,mustbreak=compound)
if len(itemvalue)>0:
self.format_value(itemvalue[0],stringsink)
for listval in itemvalue[1:]:
print 'Formatting %s' % `listval`
stringsink.write(', ',do_tab=False)
self.format_value(listval,stringsink,compound=True)
stringsink.write(']',unindent=True)
elif isinstance(itemvalue,StarDict):
stringsink.set_tab(0)
stringsink.write('{',newindent=True,mustbreak=compound) #start a new line inside
items = itemvalue.items()
if len(items)>0:
stringsink.write("'"+items[0][0]+"'"+':',canbreak=True)
self.format_value(items[0][1],stringsink)
for key,value in items[1:]:
stringsink.write(', ')
stringsink.write("'"+key+"'"+":",canbreak=True)
self.format_value(value,stringsink) #never break between key and value
stringsink.write('}',unindent=True)
else:
stringsink.write(str(itemvalue),canbreak=True) #numbers
def get(
self, key, default=None)
def get(self,key,default=None):
if self.has_key(key):
retval = self.GetLoopItem(key)
else:
retval = default
return retval
def has_key(
self, key)
Inheritance:
LoopBlock.has_key
def has_key(self,key):
if isinstance(key,StringTypes) and key.lower() in self.lower_keys:
return 1
for aloop in self.loops:
if aloop.has_key(key): return 1
return 0
def insert_loop(
self, newloop, position=-1, audit=True)
Inheritance:
LoopBlock.insert_loop
def insert_loop(self,newloop,position=-1,audit=True):
# check that new loop is kosher
if newloop.dimension != self.dimension + 1:
raise StarError( 'Insertion of loop of wrong nesting level %d, should be %d' % (newloop.dimension, self.dimension+1))
self.loops.append(newloop)
if audit:
dupes = self.audit()
if dupes:
dupenames = map(lambda a:a[0],dupes)
raise StarError( 'Duplicate names: %s' % `dupenames`)
if position >= 0:
self.item_order.insert(position,newloop)
else:
self.item_order.append(newloop)
def items(
self)
def items(self):
ourkeys = self.keys()
return map(lambda a,b:(a,b),self.keys(),self.values())
def keys(
self)
def keys(self):
thesekeys = self.block.keys()
for aloop in self.loops:
thesekeys.extend(aloop.keys())
return thesekeys
def load_iter(
self, coords=[])
Inheritance:
LoopBlock.load_iter
def load_iter(self,coords=[]):
count = 0 #to create packet index
while not self.popout:
# ok, we have a new packet: append a list to our subloops
for aloop in self.loops:
aloop.new_enclosing_packet()
for iname in self.item_order:
if isinstance(iname,LoopBlock): #into a nested loop
for subitems in iname.load_iter(coords=coords+[count]):
# print 'Yielding %s' % `subitems`
yield subitems
# print 'End of internal loop'
else:
if self.dimension == 0:
# print 'Yielding %s' % `self[iname]`
yield self,self[iname]
else:
backval = self.block[iname]
for i in range(len(coords)):
# print 'backval, coords: %s, %s' % (`backval`,`coords`)
backval = backval[coords[i]]
yield self,backval
count = count + 1 # count packets
self.popout = False # reinitialise
# print 'Finished iterating'
yield self,'###Blank###' #this value should never be used
def merge(
self, new_block, mode='strict', match_att=[], match_function=None, rel_keys=[])
def merge(self,new_block,mode="strict",match_att=[],match_function=None,
rel_keys = []):
if mode == 'strict':
for key in new_block.item_order:
if self.has_key(key) and key not in match_att:
raise CifError( "Identical keys %s in strict merge mode" % key)
elif key not in match_att: #no change otherwise
if isinstance(key,StringTypes):
self[key] = new_block[key]
else:
self.insert_loop(key)
elif mode == 'replace':
newkeys = new_block.keys()
for ma in match_att:
try:
newkeys.remove(ma) #don't touch the special ones
except ValueError:
pass
for key in new_block.item_order:
if isinstance(key,StringTypes):
self[key] = new_block[key]
else:
self.insert_loop(key) #assume is a loop
elif mode == 'overlay':
print 'Overlay mode, current overwrite is %s' % self.overwrite
save_overwrite = self.overwrite
self.overwrite = True
for attribute in new_block.keys():
if attribute in match_att: continue #ignore this one
new_value = new_block[attribute]
#non-looped items
if isinstance(new_value,StringTypes):
self[attribute] = new_value
these_atts = self.keys()
for newloop in new_block.loops:
newkeys = newloop.keys()
# note that the following line determines packet item order
overlaps = filter(lambda a: a in these_atts,newkeys)
if len(overlaps)< len(newloop):#completely new loop
self.insert_loop(newloop)
elif len(overlaps)==len(newloop):
# appending packets
# print "In overlay merge mode, found extra packet items:"
# print `overlaps`
# get key position
loop_keys = filter(lambda a:a in rel_keys,overlaps)
try:
newkeypos = map(lambda a:newkeys.index(a),loop_keys)
newkeypos = newkeypos[0] #one key per loop for now
loop_keys = loop_keys[0]
except (ValueError,IndexError):
newkeypos = []
overlap_data = map(lambda a:listify(self[a]),overlaps) #old packet data
new_data = map(lambda a:new_block[a],overlaps) #new packet data
packet_data = transpose(overlap_data)
new_p_data = transpose(new_data)
# remove any packets for which the keys match between old and new; we
# make the arbitrary choice that the old data stays
if newkeypos:
# get matching values in new list
print "Old, new data:\n%s\n%s" % (`overlap_data[newkeypos]`,`new_data[newkeypos]`)
key_matches = filter(lambda a:a in overlap_data[newkeypos],new_data[newkeypos])
# filter out any new data with these key values
new_p_data = filter(lambda a:a[newkeypos] not in key_matches,new_p_data)
if new_p_data:
new_data = transpose(new_p_data)
else: new_data = []
# wipe out the old data and enter the new stuff
byebyeloop = self.GetLoop(overlaps[0])
# print "Removing '%s' with overlaps '%s'" % (`byebyeloop`,`overlaps`)
# Note that if, in the original dictionary, overlaps are not
# looped, GetLoop will return the block itself. So we check
# for this case...
if byebyeloop != self:
self.remove_loop(byebyeloop)
self.AddLoopItem((overlaps,overlap_data)) #adding old packets
for pd in new_p_data: #adding new packets
if pd not in packet_data:
for i in range(len(overlaps)):
#don't do this at home; we are appending
#to something in place
self[overlaps[i]].append(pd[i])
self.overwrite = save_overwrite
def new_enclosing_packet(
self)
Inheritance:
LoopBlock.new_enclosing_packet
def new_enclosing_packet(self):
if self.dimension > 1: #otherwise have a top-level list
for iname in self.keys(): #includes lower levels
target_list = self[iname]
for i in range(3,self.dimension): #dim 2 upwards are lists of lists of...
target_list = target_list[-1]
target_list.append([])
def printsection(
self, instring='', ordering=[], blockstart='', blockend='', indent=0, coord=[])
Inheritance:
LoopBlock.printsection
def printsection(self,instring='',ordering=[],blockstart="",blockend="",indent=0,coord=[]):
import string
# first make an ordering
self.create_ordering(ordering)
# now do it...
if not instring:
outstring = CIFStringIO(target_width=80) # the returned string
else:
outstring = instring
if not coord:
coords = [0]*(self.dimension-1)
else:
coords = coord
if(len(coords)0:
#print "Remaining to output " + `self.output_order`
itemname = self.output_order.pop(0)
item_spec = [i for i in ordering if i['dataname'].lower()==itemname.lower()]
if len(item_spec)>0:
col_pos = item_spec[0].get('column',-1)
else:
col_pos = -1
item_spec = {}
if self.dimension == 0: # ie value next to tag
if not isinstance(itemname,LoopBlock): #no loop
if col_pos < 0: col_pos = 40
outstring.set_tab(col_pos)
itemvalue = self[itemname]
outstring.write(itemname,mustbreak=True,do_tab=False)
outstring.write(' ',canbreak=True,do_tab=False) #space after itemname
self.format_value(itemvalue,outstring,hints=item_spec)
else: # we are asked to print an internal loop block
#first make sure we have sensible coords. Length should be one
#less than the current dimension
outstring.set_tab(10) #guess this is OK?
outstring.write(' '*indent,mustbreak=True,do_tab=False); outstring.write('loop_\n',do_tab=False)
itemname.format_names(outstring,indent+2)
itemname.format_packets(outstring,coords,indent+2)
else: # we are a nested loop
outstring.write(' '*indent,mustbreak=True,do_tab=False); outstring.write('loop_\n',do_tab=False)
self.format_names(outstring,indent+2)
self.format_packets(outstring,coords,indent+2)
if instring: return #inside a recursion
else:
returnstring = outstring.getvalue()
outstring.close()
return returnstring
def process_template(
self, template_string)
Inheritance:
LoopBlock.process_template
Process a template datafile to formatting instructions
def process_template(self,template_string):
"""Process a template datafile to formatting instructions"""
template_as_cif = StarFile(StringIO(template_string),grammar="DDLm").first_block()
#template_as_lines = template_string.split("\n")
#template_as_lines = [l for l in template_as_lines if len(l)>0 and l[0]!='#']
#template_as_lines = [l for l in template_as_lines if l.split()[0] != 'loop_']
#template_full_lines = dict([(l.split()[0],l) for l in template_as_lines if len(l.split())>0])
self.form_hints = [] #ordered array of hint dictionaries
for item in template_as_cif.item_order: #order of input
if not isinstance(item,LoopBlock): #not nested
hint_dict = {"dataname":item}
# find the line in the file
start_pos = re.search("(^[ \t]*" + item + "[ \t\n]+)(?P([\S]+)|(^;))",template_string,re.I|re.M)
if start_pos.group("spec") != None:
spec_pos = start_pos.start("spec")-start_pos.start(0)
spec_char = template_string[start_pos.start("spec")]
if spec_char in '\'";':
hint_dict.update({"delimiter":spec_char})
if spec_char != ";": #so we need to work out the column number
hint_dict.update({"column":spec_pos})
print '%s: %s' % (item,`hint_dict`)
self.form_hints.append(hint_dict)
else: #loop block
testname = item.item_order[0]
#find the loop spec line in the file
loop_regex = "(^[ \t]*loop_[ \t\n\r]+" + testname + "([ \t\n\r]+_[\S]+){%d}[ \t]*$(?P(.(?!_loop|_[\S]+))*))" % (len(item.item_order) - 1)
loop_line = re.search(loop_regex,template_string,re.I|re.M|re.S)
loop_so_far = loop_line.end()
packet_text = loop_line.group('packet')
packet_regex = "[ \t]*(?P(?P'([^\n\r\f']*)'+)|(?P\"([^\n\r\"]*)\"+)|(?P[^\s]+))"
packet_pos = re.finditer(packet_regex,packet_text)
line_end_pos = re.finditer("^",packet_text,re.M)
next_end = line_end_pos.next().end()
last_end = next_end
for loopname in item.item_order:
hint_dict = {"dataname":loopname}
thismatch = packet_pos.next()
while thismatch.start('all') > next_end:
try:
last_end = next_end
next_end = line_end_pos.next().start()
print 'next end %d' % next_end
except StopIteration:
pass
print 'Start %d, last_end %d' % (thismatch.start('all'),last_end)
col_pos = thismatch.start('all') - last_end
if thismatch.group('none') is None:
hint_dict.update({'delimiter':thismatch.groups()[0][0]})
hint_dict.update({'column':col_pos})
print '%s: %s' % (loopname,`hint_dict`)
self.form_hints.append(hint_dict)
return
def recursive_iter(
self, dict_so_far={}, coord=[])
Inheritance:
LoopBlock.recursive_iter
def recursive_iter(self,dict_so_far={},coord=[]):
# print "Recursive iter: coord %s, keys %s, dim %d" % (`coord`,`self.block.keys()`,self.dimension)
my_length = 0
top_items = self.block.items()
top_values = self.block.values() #same order as items
drill_values = self.block.values()
for dimup in range(0,self.dimension): #look higher in the tree
if len(drill_values)>0: #this block has values
drill_values=drill_values[0] #drill in
else:
raise StarError("Malformed loop packet %s" % `top_items[0]`)
my_length = len(drill_values[0]) #length of 'string' entry
if self.dimension == 0: #top level
for aloop in self.loops:
for apacket in aloop.recursive_iter():
# print "Recursive yielding %s" % `dict(top_items + apacket.items())`
prep_yield = StarPacket(top_values+apacket.values()) #straight list
for name,value in top_items + apacket.items():
setattr(prep_yield,name,value)
yield prep_yield
else: #in some loop
for i in range(my_length):
kvpairs = map(lambda a:(a,self.coord_to_group(a,coord)[i]),self.block.keys())
kvvals = map(lambda a:a[1],kvpairs) #just values
# print "Recursive kvpairs at %d: %s" % (i,`kvpairs`)
if self.loops:
for aloop in self.loops:
for apacket in aloop.recursive_iter(coord=coord+[i]):
# print "Recursive yielding %s" % `dict(kvpairs + apacket.items())`
prep_yield = StarPacket(kvvals+apacket.values())
for name,value in kvpairs + apacket.items():
setattr(prep_yield,name,value)
yield prep_yield
else: # we're at the bottom of the tree
# print "Recursive yielding %s" % `dict(kvpairs)`
prep_yield = StarPacket(kvvals)
for name,value in kvpairs:
setattr(prep_yield,name,value)
yield prep_yield
def regularise_data(
self, dataitem)
Inheritance:
LoopBlock.regularise_data
Place dataitem into a list if necessary
def regularise_data(self,dataitem):
"""Place dataitem into a list if necessary"""
from numbers import Number
if isinstance(dataitem,(Number,basestring,StarList,StarDict)):
return dataitem,None
if isinstance(dataitem,(tuple,list)):
return dataitem,[None]*len(dataitem)
# so try to make into a list
try:
regval = list(dataitem)
except TypeError, value:
raise StarError( str(dataitem) + ' is wrong type for data value\n' )
return regval,[None]*len(regval)
def remove_loop(
self, oldloop)
Inheritance:
LoopBlock.remove_loop
def remove_loop(self,oldloop):
# print "Removing %s: item_order %s" % (`oldloop`,self.item_order)
# print "Length %d" % len(oldloop)
self.item_order.remove(oldloop)
self.loops.remove(oldloop)
def unassign_dictionary(
self)
Remove dictionary-dependent behaviour
def unassign_dictionary(self):
"""Remove dictionary-dependent behaviour"""
self.dictionary = None
def update(
self, adict)
def update(self,adict):
for key in adict.keys():
self.AddLoopItem((key,adict[key]))
def values(
self)
def values(self):
ourkeys = self.keys()
return map(lambda a:self[a],ourkeys)
class StarDict
class StarDict(dict):
pass
Ancestors (in MRO)
- StarDict
- __builtin__.dict
- __builtin__.object
class StarError
class StarError(Exception):
def __init__(self,value):
self.value = value
def __str__(self):
return '\nStar Format error: '+ self.value
Ancestors (in MRO)
- StarError
- exceptions.Exception
- exceptions.BaseException
- __builtin__.object
Class variables
var args
var message
Instance variables
var value
Methods
def __init__(
self, value)
def __init__(self,value):
self.value = value
class StarFile
class StarFile(BlockCollection):
def __init__(self,datasource=None,maxinlength=-1,maxoutlength=0,
scoping='instance',grammar='1.1',scantype='standard',**kwargs):
super(StarFile,self).__init__(datasource=datasource,**kwargs)
self.my_uri = getattr(datasource,'my_uri','')
self.maxinlength = maxinlength #no restriction
if maxoutlength == 0:
self.maxoutlength = 2048
else:
self.maxoutlength = maxoutlength
self.scoping = scoping
if type(datasource) in StringTypes or hasattr(datasource,"read"):
ReadStar(datasource,prepared=self,maxlength=self.maxinlength,
grammar=grammar,scantype=scantype)
self.header_comment = \
"""#\\#STAR
##########################################################################
# STAR Format file
# Produced by PySTARRW module
#
# This is a STAR file. STAR is a superset of the CIF file type. For
# more information, please refer to International Tables for Crystallography,
# Volume G, Chapter 2.1
#
##########################################################################
"""
def set_uri(self,my_uri): self.my_uri = my_uri
Ancestors (in MRO)
- StarFile
- BlockCollection
- __builtin__.object
Instance variables
var maxinlength
var my_uri
Methods
def __init__(
self, datasource=None, maxinlength=-1, maxoutlength=0, scoping='instance', grammar='1.1', scantype='standard', **kwargs)
Inheritance:
BlockCollection.__init__
def __init__(self,datasource=None,maxinlength=-1,maxoutlength=0,
scoping='instance',grammar='1.1',scantype='standard',**kwargs):
super(StarFile,self).__init__(datasource=datasource,**kwargs)
self.my_uri = getattr(datasource,'my_uri','')
self.maxinlength = maxinlength #no restriction
if maxoutlength == 0:
self.maxoutlength = 2048
else:
self.maxoutlength = maxoutlength
self.scoping = scoping
if type(datasource) in StringTypes or hasattr(datasource,"read"):
ReadStar(datasource,prepared=self,maxlength=self.maxinlength,
grammar=grammar,scantype=scantype)
self.header_comment = \
\\#STAR
######################################################################
STAR Format file
Produced by PySTARRW module
his is a STAR file. STAR is a superset of the CIF file type. For
ore information, please refer to International Tables for Crystallography,
olume G, Chapter 2.1
######################################################################
def NewBlock(
self, blockname, blockcontents=None, fix=True, parent=None)
Inheritance:
BlockCollection.NewBlock
def NewBlock(self,blockname,blockcontents=None,fix=True,parent=None):
if blockcontents is None:
blockcontents = StarBlock()
if self.standard is not None:
if self.standard == 'CIF':
self.checknamelengths(blockcontents,maxlength=75) #
self.checkloopnesting(blockcontents)
if len(blockname)>75:
raise StarError , 'Blockname %s is longer than 75 characters' % blockname
if fix:
newblockname = re.sub('[ \t]','_',blockname)
else: newblockname = blockname
new_lowerbn = newblockname.lower()
if new_lowerbn in self.lower_keys:
if self.standard is not None: #already there
toplevelnames = [a[0] for a in self.child_table.items() if a[1].parent==None]
if parent is None and new_lowerbn not in toplevelnames: #can give a new key to this one
while new_lowerbn in self.lower_keys: new_lowerbn = new_lowerbn + '+'
elif parent is not None and new_lowerbn in toplevelnames: #can fix a different one
replace_name = new_lowerbn
while replace_name in self.lower_keys: replace_name = replace_name + '+'
self._rekey(new_lowerbn,replace_name)
# now continue on to add in the new block
if parent.lower() == new_lowerbn: #the new block's requested parent just got renamed!!
parent = replace_name
else:
raise StarError( "Attempt to replace existing block " + blockname)
else:
del self[new_lowerbn]
self.dictionary.update({new_lowerbn:blockcontents})
self.lower_keys.add(new_lowerbn)
if parent is None:
self.child_table[new_lowerbn]=self.PC(newblockname,None)
self.visible_keys.append(new_lowerbn)
else:
if parent.lower() in self.lower_keys:
if self.scoping == 'instance':
self.child_table[new_lowerbn]=self.PC(newblockname,parent.lower())
else:
self.child_table[new_lowerbn]=self.PC(newblockname,parent.lower())
self.visible_keys.append(new_lowerbn)
else:
print 'Warning:Parent block %s does not exist for child %s' % (parent,newblockname)
return new_lowerbn #in case calling routine wants to know
def WriteOut(
self, comment='', wraplength=80, maxoutlength=2048)
Inheritance:
BlockCollection.WriteOut
def WriteOut(self,comment='',wraplength=80,maxoutlength=2048):
import cStringIO
if not comment:
comment = self.header_comment
outstring = cStringIO.StringIO()
outstring.write(comment)
# loop over top-level
top_block_names = [(a[0],a[1].block_id) for a in self.child_table.items() if a[1].parent is None]
for blockref,blockname in top_block_names:
outstring.write('\n' + 'data_' +blockname+'\n')
child_names = [(a[0],a[1].block_id) for a in self.child_table.items() if a[1].parent==blockref]
if self.standard == 'Dic': #put contents before save frames
self[blockref].SetOutputLength(wraplength,maxoutlength)
outstring.write(str(self[blockref]))
for child_ref,child_name in child_names:
outstring.write('\n' + 'save_' + child_name + '\n')
self.block_to_string(child_ref,child_name,outstring,4)
outstring.write('\n' + 'save_'+ '\n')
if self.standard != 'Dic': #put contents after save frames
self[blockref].SetOutputLength(wraplength,maxoutlength)
outstring.write(str(self[blockref]))
returnstring = outstring.getvalue()
outstring.close()
return returnstring
def block_to_string(
self, block_ref, block_id, outstring, indentlevel=0)
Inheritance:
BlockCollection.block_to_string
Output a complete datablock indexed by [[block_ref]] and named [[block_id]], including children
def block_to_string(self,block_ref,block_id,outstring,indentlevel=0):
"""Output a complete datablock indexed by [[block_ref]] and named [[block_id]], including children"""
child_names = [(a[0],a[1].block_id) for a in self.child_table.items() if a[1].parent==block_ref]
if self.standard == 'Dic':
outstring.write(str(self[block_ref]))
for child_ref,child_name in child_names:
outstring.write('\n' + 'save_' + child_name + '\n')
self.block_to_string(child_ref,child_name,outstring,indentlevel)
outstring.write('\n' + ' '*indentlevel + 'save_' + '\n')
if self.standard != 'Dic':
outstring.write(str(self[block_ref]))
def checkloopnesting(
self, target_block)
Inheritance:
BlockCollection.checkloopnesting
Check that block doesn't contain nested loops
def checkloopnesting(self,target_block):
"""Check that block doesn't contain nested loops"""
for one_loop in target_block.loops:
if len(one_loop.loops) > 0:
raise StarError('Block contains nested loops')
def checknamelengths(
self, target_block, maxlength=-1)
Inheritance:
BlockCollection.checknamelengths
def checknamelengths(self,target_block,maxlength=-1):
if maxlength < 0:
return
else:
toolong = filter(lambda a:len(a)>maxlength, target_block.keys())
outstring = ""
for it in toolong: outstring += "\n" + it
if toolong:
raise StarError( 'Following data names too long:' + outstring)
def clear(
self)
Inheritance:
BlockCollection.clear
def clear(self):
self.dictionary.clear()
self.lower_keys = set()
self.child_table = {}
self.visible_keys = []
def copy(
self)
Inheritance:
BlockCollection.copy
def copy(self):
newcopy = self.dictionary.copy() #all blocks
newcopy = BlockCollection('',newcopy,parent_id=self.parent_id)
newcopy.child_table = self.child_table.copy()
newcopy.lower_keys = self.lower_keys
newcopy.characterset = self.characterset
newcopy.scoping = self.scoping #this sets visible keys
return newcopy
def first_block(
self)
Inheritance:
BlockCollection.first_block
Return the 'first' block. This is not necessarily the first block in the file.
def first_block(self):
"""Return the 'first' block. This is not necessarily the first block in the file."""
if self.keys():
return self[self.keys()[0]]
def get(
self, key, default=None)
Inheritance:
BlockCollection.get
def get(self,key,default=None):
if self.has_key(key): # take account of case
return self.__getitem__(key)
else:
return default
def get_all(
self, item_name)
Inheritance:
BlockCollection.get_all
def get_all(self,item_name):
raw_values = map(lambda a:self[a].get(item_name),self.keys())
raw_values = filter(lambda a:a != None, raw_values)
ret_vals = []
for rv in raw_values:
if isinstance(rv,ListType):
for rvv in rv:
if rvv not in ret_vals: ret_vals.append(rvv)
else:
if rv not in ret_vals: ret_vals.append(rv)
return ret_vals
def get_child_list(
self, parentname)
Inheritance:
BlockCollection.get_child_list
Get a list of all child categories
def get_child_list(self,parentname):
"""Get a list of all child categories"""
child_handles = [a[0] for a in self.child_table.items() if self.is_child_of_parent(parentname.lower(),a[0])]
return child_handles
def get_children(
self, blockname, include_parent=False, scoping='dictionary')
Inheritance:
BlockCollection.get_children
Get all children of [[blockname]] as a block collection. If [[include_parent]] is True, the parent block will also be included in the block collection as the root.
def get_children(self,blockname,include_parent=False,scoping='dictionary'):
"""Get all children of [[blockname]] as a block collection. If [[include_parent]] is
True, the parent block will also be included in the block collection as the root."""
newbc = BlockCollection()
block_lower = blockname.lower()
proto_child_table = [a for a in self.child_table.items() if self.is_child_of_parent(block_lower,a[1].block_id)]
newbc.child_table = dict(proto_child_table)
if not include_parent:
newbc.child_table.update(dict([(a[0],self.PC(a[1].block_id,None)) for a in proto_child_table if a[1].parent == block_lower]))
newbc.lower_keys = set([a[0] for a in proto_child_table])
newbc.dictionary = dict((a[0],self.dictionary[a[0]]) for a in proto_child_table)
if include_parent:
newbc.child_table.update({block_lower:self.PC(self.child_table[block_lower].block_id,None)})
newbc.lower_keys.add(block_lower)
newbc.dictionary.update({block_lower:self.dictionary[block_lower]})
newbc.scoping = scoping
return newbc
def get_immediate_children(
self, parentname)
Inheritance:
BlockCollection.get_immediate_children
Get the next level of children of the given block as a list, without nested levels
def get_immediate_children(self,parentname):
"""Get the next level of children of the given block as a list, without nested levels"""
child_handles = [a for a in self.child_table.items() if a[1].parent == parentname.lower()]
return child_handles
def get_parent(
self, blockname)
Inheritance:
BlockCollection.get_parent
Return the name of the block enclosing [[blockname]] in canonical form (lower case)
def get_parent(self,blockname):
"""Return the name of the block enclosing [[blockname]] in canonical form (lower case)"""
possibles = (a for a in self.child_table.items() if a[0] == blockname.lower())
try:
first = possibles.next() #get first one
except:
raise StarError('no parent for %s' % blockname)
try:
second = possibles.next()
except StopIteration:
return first[1].parent
raise StarError('More than one parent for %s' % blockname)
def get_roots(
self)
Inheritance:
BlockCollection.get_roots
Get the top-level blocks
def get_roots(self):
"""Get the top-level blocks"""
return [a for a in self.child_table.items() if a[1].parent==None]
def has_key(
self, key)
Inheritance:
BlockCollection.has_key
def has_key(self,key):
if not isinstance(key,StringTypes): return 0
if key.lower() in self.visible_keys:
return 1
return 0
def is_child_of_parent(
self, parentname, blockname)
Inheritance:
BlockCollection.is_child_of_parent
Recursively search for children of blockname, case is important for now
def is_child_of_parent(self,parentname,blockname):
"""Recursively search for children of blockname, case is important for now"""
checkname = parentname.lower()
more_children = [a[0] for a in self.child_table.items() if a[1].parent == checkname]
if blockname.lower() in more_children:
return True
else:
for one_child in more_children:
if self.is_child_of_parent(one_child,blockname): return True
return False
def items(
self)
Inheritance:
BlockCollection.items
def items(self):
return [(a,self[a]) for a in self.keys()]
def lock(
self)
Inheritance:
BlockCollection.lock
Disallow overwriting for all blocks in this collection
def lock(self):
"""Disallow overwriting for all blocks in this collection"""
for a in self.lower_keys:
self[a].overwrite = False
def merge(
self, new_bc, mode=None, parent=None, single_block=[], idblock='', match_att=[], match_function=None)
Inheritance:
BlockCollection.merge
def merge(self,new_bc,mode=None,parent=None,single_block=[],
idblock="",match_att=[],match_function=None):
if mode is None:
if self.standard is None:
mode = 'replace'
else:
mode = 'strict'
if single_block:
self[single_block[0]].merge(new_bc[single_block[1]],mode,
match_att=match_att,
match_function=match_function)
return None
base_keys = [a[1].block_id for a in self.child_table.items()]
block_to_item = base_keys #default
new_keys = [a[1].block_id for a in new_bc.child_table.items()] #get list of incoming blocks
if match_att:
#make a blockname -> item name map
if match_function:
block_to_item = map(lambda a:match_function(self[a]),self.keys())
else:
block_to_item = map(lambda a:self[a].get(match_att[0],None),self.keys())
#print `block_to_item`
for key in new_keys: #run over incoming blocknames
if key == idblock: continue #skip dictionary id
basekey = key #default value
if len(match_att)>0:
attval = new_bc[key].get(match_att[0],0) #0 if ignoring matching
else:
attval = 0
for ii in range(len(block_to_item)): #do this way to get looped names
thisatt = block_to_item[ii] #keyname in old block
#print "Looking for %s in %s" % (attval,thisatt)
if attval == thisatt or \
(isinstance(thisatt,ListType) and attval in thisatt):
basekey = base_keys.pop(ii)
block_to_item.remove(thisatt)
break
if not self.has_key(basekey) or mode=="replace":
new_parent = new_bc.get_parent(key)
if parent is not None and new_parent is None:
new_parent = parent
self.NewBlock(basekey,new_bc[key],parent=new_parent) #add the block
else:
if mode=="strict":
raise StarError( "In strict merge mode: block %s in old and block %s in new files" % (basekey,key))
elif mode=="overlay":
# print "Merging block %s with %s" % (basekey,key)
self[basekey].merge(new_bc[key],mode,match_att=match_att)
else:
raise StarError( "Merge called with unknown mode %s" % mode)
def merge_fast(
self, new_bc, parent=None)
Inheritance:
BlockCollection.merge_fast
Do a fast merge
def merge_fast(self,new_bc,parent=None):
"""Do a fast merge"""
if self.standard is None:
mode = 'replace'
else:
mode = 'strict'
overlap_flag = not self.lower_keys.isdisjoint(new_bc.lower_keys)
if overlap_flag and mode != 'replace':
double_keys = self.lower_keys.intersection(new_bc.lower_keys)
for dup_key in double_keys:
our_parent = self.child_table[dup_key].parent
their_parent = new_bc.child_table[dup_key].parent
if (our_parent is None and their_parent is not None and parent is None) or\
parent is not None: #rename our block
start_key = dup_key
while start_key in self.lower_keys: start_key = start_key+'+'
self._rekey(dup_key,start_key)
if parent.lower() == dup_key: #we just renamed the prospective parent!
parent = start_key
elif our_parent is not None and their_parent is None and parent is None:
start_key = dup_key
while start_key in new_bc.lower_keys: start_key = start_key+'+'
new_bc._rekey(dup_key,start_key)
else:
raise StarError("In strict merge mode:duplicate keys %s" % dup_key)
self.dictionary.update(new_bc.dictionary)
self.lower_keys.update(new_bc.lower_keys)
self.visible_keys += (list(new_bc.lower_keys))
self.child_table.update(new_bc.child_table)
if parent is not None: #redo the child_table entries
reparent_list = [(a[0],a[1].block_id) for a in new_bc.child_table.items() if a[1].parent==None]
reparent_dict = [(a[0],self.PC(a[1],parent.lower())) for a in reparent_list]
self.child_table.update(dict(reparent_dict))
def rename(
self, oldname, newname)
Inheritance:
BlockCollection.rename
Rename datablock from [[oldname]] to [[newname]]. Both key and printed name are changed. No conformance checks are conducted.
def rename(self,oldname,newname):
"""Rename datablock from [[oldname]] to [[newname]]. Both key and printed name are changed. No
conformance checks are conducted."""
realoldname = oldname.lower()
realnewname = newname.lower()
if realnewname in self.lower_keys:
raise StarError,'Cannot change blockname %s to %s as %s already present' % (oldname,newname,newname)
if realoldname not in self.lower_keys:
raise KeyError,'Cannot find old block %s' % realoldname
self._rekey(realoldname,realnewname,block_id=newname)
def set_parent(
self, parentname, childname)
Inheritance:
BlockCollection.set_parent
Set the parent block
def set_parent(self,parentname,childname):
"""Set the parent block"""
# first check that both blocks exist
if parentname.lower() not in self.lower_keys:
raise KeyError('Parent block %s does not exist' % parentname)
if childname.lower() not in self.lower_keys:
raise KeyError('Child block %s does not exist' % childname)
old_entry = self.child_table[childname.lower()]
self.child_table[childname.lower()]=self.PC(old_entry.block_id,
parentname.lower())
self.scoping = self.scoping #reset visibility
def set_uri(
self, my_uri)
def set_uri(self,my_uri): self.my_uri = my_uri
def unlock(
self)
Inheritance:
BlockCollection.unlock
Allow overwriting of all blocks in this collection
def unlock(self):
"""Allow overwriting of all blocks in this collection"""
for a in self.lower_keys:
self[a].overwrite=True
def update(
self, adict)
Inheritance:
BlockCollection.update
def update(self,adict):
for key in adict.keys():
self[key] = adict[key]
class StarLengthError
class StarLengthError(Exception):
def __init__(self,value):
self.value = value
def __str__(self):
return '\nStar length error: ' + self.value
Ancestors (in MRO)
- StarLengthError
- exceptions.Exception
- exceptions.BaseException
- __builtin__.object
Class variables
var args
var message
Instance variables
var value
Methods
def __init__(
self, value)
def __init__(self,value):
self.value = value
class StarList
class StarList(list):
pass
Ancestors (in MRO)
- StarList
- __builtin__.list
- __builtin__.object
class StarPacket
class StarPacket(list):
def merge_packet(self,incoming):
"""Merge contents of incoming packet with this packet"""
new_attrs = [a for a in dir(incoming) if a[0] == '_' and a[1] != "_"]
self.append(incoming)
for na in new_attrs:
setattr(self,na,getattr(incoming,na))
def __getattr__(self,att_name):
"""Derive a missing attribute"""
if att_name in ('cif_dictionary','fulldata','key'):
raise AttributeError, 'Programming error: cannot compute value of %s' % att_name
d = self.cif_dictionary
c = self.fulldata
k = self.key
d.derive_item(att_name,c,store_value=True)
#
# now pick out the new value
keyval = getattr(self,k)
full_pack = c.GetKeyedPacket(k,keyval)
return getattr(full_pack,att_name)
Ancestors (in MRO)
- StarPacket
- __builtin__.list
- __builtin__.object
Methods
def merge_packet(
self, incoming)
Merge contents of incoming packet with this packet
def merge_packet(self,incoming):
"""Merge contents of incoming packet with this packet"""
new_attrs = [a for a in dir(incoming) if a[0] == '_' and a[1] != "_"]
self.append(incoming)
for na in new_attrs:
setattr(self,na,getattr(incoming,na))