Source code for gemma.gemmaclass
from readfuncs import *
from sampleclass import sample
import os
import numpy as np
import pandas
[docs]class gemma(object):
''' Class to read out gemma output.bin
Reads in header information and further acts as an iterator object. Data from samples are not read by default.
Samples can be accessed as gemma[samplename] or gemma[sampleindex]
Parameters
----------
fname : str, filename of binary gemma output file, default gemma_output.bin
version : version of gemma executable, default 34
persistent : keep samples in memory if True (default), otherwise re-read from file
Attributes
----------
fname : str
filename
nsamples : int
number of samples
niterations : int
number of iterations
diag : bool
g-emma diagnostics mode
useLLS : bool
linear least squares approximation used in g-emma
withintriangle : bool
behaviourals constrained within individual trapezoids/triangles
stopmax : bool
stopped iterations when maximum behaviourals were reached (and nbehave is therefore an approximation)
nendmembers : int
number of end-members
endmembers : list of strings
end-member names
nsolutes : int
number of solutes
solutes : list of strings
solute names
samples : dict
sample objects, with samplenames as keys
samplenames : list
sample names
persistent : bool
keep samples in memory
version : int
version of G-EMMA executable
'''
def __init__(self, fname='gemma_output.bin', version=34, persistent=True):
self.data_pos = 0
self.cur_pos = 0
self.cur_s = 0
self.fname = fname
self.nsamples = 0
self.niterations = 0
self.nendmembers = 0
self.diag = False
self.useLLS = False
self.withintriangle = False
self.stopmax = False
self.short = False
self.endmembers = []
self.nsolutes = 0
self.solutes = []
self.columns = ['ITER','NENDM','NSOL']
self.samples = {}
self.samplenames = []
self.samplepos = {}
self.readsamples = False
self.persistent = persistent #keep samples in memory
self.version = version # version of G-EMMA executable
try:
self.f = open(self.fname, 'rb')
except IOError:
print 'IOError reading %s'%self.fname
raise
self._read_header()
iter(self)
def _read_header(self):
self.f.seek(0, os.SEEK_SET)
self.x64 = readbool(self.f)
self.nsamples = readint(self.f)
self.niterations = readlong(self.f, self.x64)
self.nendmembers = readint(self.f)
for i in xrange(self.nendmembers):
self.endmembers += [readstring(self.f)]
self.nsolutes = readint(self.f)
for i in xrange(self.nsolutes):
self.solutes += [readstring(self.f)]
self.diag = readbool(self.f)
if self.version > 33:
self.useLLS = readbool(self.f)
self.withintriangle = readbool(self.f)
self.stopmax = readbool(self.f)
self.short = readbool(self.f)
# create columns for dataframe
for e in self.endmembers+['MIX']: # solute concentrations
for s in self.solutes:
self.columns += [e+'_'+s]
for e in self.endmembers: # fractions
self.columns += [e]
if self.diag:
self.columns += ['Res_WB']
for s in self.solutes:
self.columns += ['Res_'+s]
self.columns += ['LH']
if self.diag:
self.columns += ['Behavioural']
# store position of cursor at beginning of data
self.data_pos = self.f.tell()
def _read_all(self):
'''From v39: samples no longer in order,
order sample list based on sample dates
Function must be called before initialisation of iterator'''
sampledates = []
self.cur_pos = self.data_pos
while (1):
self.f.seek(self.cur_pos)
try:
s = sample(self, False)
self.cur_pos = self.f.tell()
self.samplenames += [s.name]
sampledates += [s.date]
self.samples[s.name] = s
except (IOError, struct.error):
break
self.samplenames = [x for (y,x) in sorted(zip(sampledates,self.samplenames))]
self.readsamples = True
def __iter__(self):
if not self.readsamples:
self._read_all()
self.cur_pos = self.data_pos
self.cur_s = -1
return self
[docs] def next(self):
'''Iterator over samples. Only through ordered samples list/dict'''
self.cur_s += 1
if self.cur_s < len(self.samples):
return self.samples[self.samplenames[self.cur_s]]
else:
raise StopIteration
[docs] def read_data(self, nrows=None):
''' Read data from samples. Not done by default
Parameters
----------
nrows : int, no of rows to read from sample data. Use None (default) for all
'''
for s in self:
s.read_data(nrows)
[docs] def print_summary(self):
'''Print summary data on gemma run'''
nb = []
for s in self:
nb += [s.nbehave]
nb = pandas.Series(nb)
s = 'G-EMMA run\n-----------------\n'
s += '%i samples, %i yielded behavioural runs\n'%(self.nsamples,
len(nb.nonzero()[0]))
#s += '%.2e behavioural (s=%.2e) of %.2e runs\n'%(nb.mean(),nb.std(),
# self.niterations)
s += 'Distribution of behaviourals (out of %.2e runs):'%self.niterations
print s
print nb.describe()
[docs] def em_contrib(self):
'''Calculate end-member contributions for all samples, constructs DataFrame
Returns
-------
emc : pandas.DataFrame with end-member contributions
'''
emc = []
nms = []
dts = []
for s in self:
if s.nbehave:
emc.append(s.em_contrib())
nms.append(s.name)
dts.append(s.date)
return ((pandas.concat(emc, keys=dts, axis=1)).T).fillna(0)
[docs] def percentiles(self, percentiles, params, mask=None):
'''Create DataFrame with percentiles of parameters for all samples.
For subsequent use sets self.pctresult with the result. Uses self.pctresult if requested parameters / percentiles are in self.pctresult
Parameters
----------
percentiles : list-like of integers. Percentiles in the 0-100 range are reset to the 0-1 range.
params : list of parameters to calculate percentiles of.
mask : tuple with first item parameter name and second item equality
Returns
-------
pctresult : pandas.DataFrame
'''
if percentiles[-1] > 1:
percentiles = [x/100. for x in percentiles]
ids = []
dates = []
data = []
prms_new = []
if 'pctresult' in dir(self):
#check if some percentages / params already exist
for p in params:
skip = False
for pct in percentiles:
if (p,pct) not in self.pctresult:
skip = True
if skip:
prms_new += [p]
params = prms_new
# still params to do?
if params != []:
for s in self:
if s.nbehave:
if not s.has_data:
s.read_data()
ids += [s.name]
dates += [s.date]
if isinstance(mask, (tuple,list)):
mask = s.data[mask[0]] == mask[1]
data += [s.percentile(pct=percentiles, params=params, mask=mask).T.stack()]
else:
data += [s.percentile(pct=percentiles, params=params).T.stack()]
A = pandas.concat(data, keys=ids, axis=1).T
A['Date'] = dates
A['Sample'] = ids
if 'pctresult' in dir(self):
try:
for c in A.columns:
if c in self.pctresult.columns:
C = A.pop(c)
self.pctresult = pandas.merge(self.pctresult, A,how='outer',
left_index=True,right_index=True)
except:
print self.pctresult, A
raise
else:
self.pctresult = A
return self.pctresult
# overload []
def __getitem__(self, index):
if isinstance(index, int):
return self.samples[self.samplenames[index]]
else:
return self.samples[index]
def __del__(self):
self.f.close()
if __name__ == '__main__':
g = gemma()