GEMMA python module home

Source code for gemma.gemmaclass

from readfuncs import *
from sampleclass import sample
import os
import numpy as np
import pandas
        
[docs]class gemma(object): ''' Class to read out gemma output.bin Reads in header information and further acts as an iterator object. Data from samples are not read by default. Samples can be accessed as gemma[samplename] or gemma[sampleindex] Parameters ---------- fname : str, filename of binary gemma output file, default gemma_output.bin version : version of gemma executable, default 34 persistent : keep samples in memory if True (default), otherwise re-read from file Attributes ---------- fname : str filename nsamples : int number of samples niterations : int number of iterations diag : bool g-emma diagnostics mode useLLS : bool linear least squares approximation used in g-emma withintriangle : bool behaviourals constrained within individual trapezoids/triangles stopmax : bool stopped iterations when maximum behaviourals were reached (and nbehave is therefore an approximation) nendmembers : int number of end-members endmembers : list of strings end-member names nsolutes : int number of solutes solutes : list of strings solute names samples : dict sample objects, with samplenames as keys samplenames : list sample names persistent : bool keep samples in memory version : int version of G-EMMA executable ''' def __init__(self, fname='gemma_output.bin', version=34, persistent=True): self.data_pos = 0 self.cur_pos = 0 self.cur_s = 0 self.fname = fname self.nsamples = 0 self.niterations = 0 self.nendmembers = 0 self.diag = False self.useLLS = False self.withintriangle = False self.stopmax = False self.short = False self.endmembers = [] self.nsolutes = 0 self.solutes = [] self.columns = ['ITER','NENDM','NSOL'] self.samples = {} self.samplenames = [] self.samplepos = {} self.readsamples = False self.persistent = persistent #keep samples in memory self.version = version # version of G-EMMA executable try: self.f = open(self.fname, 'rb') except IOError: print 'IOError reading %s'%self.fname raise self._read_header() iter(self) def _read_header(self): self.f.seek(0, os.SEEK_SET) self.x64 = readbool(self.f) self.nsamples = readint(self.f) self.niterations = readlong(self.f, self.x64) self.nendmembers = readint(self.f) for i in xrange(self.nendmembers): self.endmembers += [readstring(self.f)] self.nsolutes = readint(self.f) for i in xrange(self.nsolutes): self.solutes += [readstring(self.f)] self.diag = readbool(self.f) if self.version > 33: self.useLLS = readbool(self.f) self.withintriangle = readbool(self.f) self.stopmax = readbool(self.f) self.short = readbool(self.f) # create columns for dataframe for e in self.endmembers+['MIX']: # solute concentrations for s in self.solutes: self.columns += [e+'_'+s] for e in self.endmembers: # fractions self.columns += [e] if self.diag: self.columns += ['Res_WB'] for s in self.solutes: self.columns += ['Res_'+s] self.columns += ['LH'] if self.diag: self.columns += ['Behavioural'] # store position of cursor at beginning of data self.data_pos = self.f.tell() def _read_all(self): '''From v39: samples no longer in order, order sample list based on sample dates Function must be called before initialisation of iterator''' sampledates = [] self.cur_pos = self.data_pos while (1): self.f.seek(self.cur_pos) try: s = sample(self, False) self.cur_pos = self.f.tell() self.samplenames += [s.name] sampledates += [s.date] self.samples[s.name] = s except (IOError, struct.error): break self.samplenames = [x for (y,x) in sorted(zip(sampledates,self.samplenames))] self.readsamples = True def __iter__(self): if not self.readsamples: self._read_all() self.cur_pos = self.data_pos self.cur_s = -1 return self
[docs] def next(self): '''Iterator over samples. Only through ordered samples list/dict''' self.cur_s += 1 if self.cur_s < len(self.samples): return self.samples[self.samplenames[self.cur_s]] else: raise StopIteration
[docs] def read_data(self, nrows=None): ''' Read data from samples. Not done by default Parameters ---------- nrows : int, no of rows to read from sample data. Use None (default) for all ''' for s in self: s.read_data(nrows)
[docs] def print_summary(self): '''Print summary data on gemma run''' nb = [] for s in self: nb += [s.nbehave] nb = pandas.Series(nb) s = 'G-EMMA run\n-----------------\n' s += '%i samples, %i yielded behavioural runs\n'%(self.nsamples, len(nb.nonzero()[0])) #s += '%.2e behavioural (s=%.2e) of %.2e runs\n'%(nb.mean(),nb.std(), # self.niterations) s += 'Distribution of behaviourals (out of %.2e runs):'%self.niterations print s print nb.describe()
[docs] def em_contrib(self): '''Calculate end-member contributions for all samples, constructs DataFrame Returns ------- emc : pandas.DataFrame with end-member contributions ''' emc = [] nms = [] dts = [] for s in self: if s.nbehave: emc.append(s.em_contrib()) nms.append(s.name) dts.append(s.date) return ((pandas.concat(emc, keys=dts, axis=1)).T).fillna(0)
[docs] def percentiles(self, percentiles, params, mask=None): '''Create DataFrame with percentiles of parameters for all samples. For subsequent use sets self.pctresult with the result. Uses self.pctresult if requested parameters / percentiles are in self.pctresult Parameters ---------- percentiles : list-like of integers. Percentiles in the 0-100 range are reset to the 0-1 range. params : list of parameters to calculate percentiles of. mask : tuple with first item parameter name and second item equality Returns ------- pctresult : pandas.DataFrame ''' if percentiles[-1] > 1: percentiles = [x/100. for x in percentiles] ids = [] dates = [] data = [] prms_new = [] if 'pctresult' in dir(self): #check if some percentages / params already exist for p in params: skip = False for pct in percentiles: if (p,pct) not in self.pctresult: skip = True if skip: prms_new += [p] params = prms_new # still params to do? if params != []: for s in self: if s.nbehave: if not s.has_data: s.read_data() ids += [s.name] dates += [s.date] if isinstance(mask, (tuple,list)): mask = s.data[mask[0]] == mask[1] data += [s.percentile(pct=percentiles, params=params, mask=mask).T.stack()] else: data += [s.percentile(pct=percentiles, params=params).T.stack()] A = pandas.concat(data, keys=ids, axis=1).T A['Date'] = dates A['Sample'] = ids if 'pctresult' in dir(self): try: for c in A.columns: if c in self.pctresult.columns: C = A.pop(c) self.pctresult = pandas.merge(self.pctresult, A,how='outer', left_index=True,right_index=True) except: print self.pctresult, A raise else: self.pctresult = A return self.pctresult # overload []
def __getitem__(self, index): if isinstance(index, int): return self.samples[self.samplenames[index]] else: return self.samples[index] def __del__(self): self.f.close()
if __name__ == '__main__': g = gemma()