python, analyzing csv files, part 2

Previously, we discussed analyzing CSV files, parsing the csv into a native python object that supports iteration while providing easy access to the data (such as a sum by column header).

For very large files this can be cumbersome, especially where more advanced analytics are desired.

I would like to keep the same simple interface but use an in-memory database connection, thus transforming the CSV files into database tables for deeper analysis.

For example, I would like to do the following (leveraging the builtin sqlite3 module):

>>> 
>>> reports = Reports('/home/user/reports-1109')
>>> reports.billing_detail.sum('Tax Amount', {'Fiscal Period':'2011-09'})
Decimal('123321.1')
>>> 
>>> reports.tax_summary.sum('Amount', {'Fiscal Period':'2011-09'})
Decimal('123321.1')
>>> 
>>> len(reports.billing_detail)
719153
>>> 
>>> curs = reports.conn.cursor()
>>> curs.execute('SELECT name FROM sqlite_master WHERE type="table"').fetchall()
[(u'billing_detail',), (u'billing_summary',)]
>>> 

This approach can be orders of magnitude faster for even the most basic analysis. Furthermore, this allows OLAP cube analysis of the data from the CSV files, e.g.,

>>> 
>>> curs.execute('CREATE TABLE t_fact(id TEXT UNIQUE, b INT, t INT, r INT)').fetchall()
[]
>>> curs.execute('CREATE INDEX IF NOT EXISTS idxt ON t_fact(id)').fetchall()
[]
>>> 
>>> ## load some data into the fact table
>>> curs.execute('''INSERT OR REPLACE INTO t_fact(id,b,t,r)
                SELECT bd.%(id)s as id, bd.ROWID as b, ts.ROWID as t, rf.ROWID as r
                FROM billing_detail bd
                LEFT OUTER JOIN tax_summary ts ON bd.%(id)s = ts.%(tax_id)s
                LEFT OUTER JOIN refunds r ON bd.%(id)s = rf.%(ref_id)s
                ''' % query_dict).fetchall()
[]
>>> 
>>> ## e.g., find billing records without tax summaries
>>> billings_without_tax = curs.execute('SELECT id FROM t_fact WHERE t IS NULL').fetchall()
>>> 

Using the same Report and Reports objects discussed previously, the code can be modified to leverage a database connection to support this type of analytics:

class Report(collections.Mapping):
    def __init__(self, filehint, table = None, conn = sqlite3.connect(':memory:')):
        self.filename = Reports.find_report(filehint)
        self.info = []
        self.headers = []
        self.table = table
        self.conn = conn
        self.indexes = []
        self._load()

    def _load(self):
        logging.debug('loading %s' %(self.filename))
        curs = self.conn.cursor()
        fh = open(self.filename)
        reader = csv.reader(fh)
        self.info = reader.next()
        self.headers = reader.next()
        columns = ', '.join(['c'+str(x) for x in range(len(self.headers))])
        columnTypes = ' TEXT, '.join(['c'+str(x) for x in range(len(self.headers))]) + ' TEXT'
        try:
            curs.execute('CREATE TABLE %s(%s)' %(self.table, columnTypes))
        except sqlite3.OperationalError as e:
            logging.debug('%s -- using existing table' %(e))
        else:
            curs.executemany('INSERT INTO %s (%s) VALUES(%s)' %(
                self.table, columns,
                '?, ' * (len(self.headers) -1) + '?'
            ), reader)
            self.conn.commit()
        curs.close()

    def _column(self, key):
        if key.lower() not in [x.lower() for x in self.headers]:
            raise IndexError('%s not in %s'%(key, self.table))
        return 'c' + str([x.lower() for x in self.headers].index(key.lower()))

    def create_index(self, col):
        col = self._column(col)
        icol = 'i' + col
        if icol not in self.indexes:
            logging.debug('adding index %s to %s(%s)' %(icol, self.table, col))
            curs = self.conn.cursor()
            curs.execute('CREATE INDEX IF NOT EXISTS %s ON %s(%s)' %(icol, self.table, col))
            curs.close()
            self.indexes.append(icol)

    def __getitem__(self, key):
        curs = self.conn.cursor()
        res = list(curs.execute('SELECT * FROM %s WHERE ROWID = %s' %(self.table, key+1)).fetchall()[0])
        curs.close()
        return res

    def __iter__(self):
        curs = self.conn.cursor()
        self.__iter = curs.execute('SELECT * FROM %s' %(self.table))
        curs.close()
        return self

    def next(self):
        return self.__iter.next()

    def __len__(self):
        curs = self.conn.cursor()
        ret = curs.execute('SELECT COUNT(*) FROM %s' %(self.table)).fetchall()[0][0]
        curs.close()
        return ret

    def get(self, column, value):
        '''get rows where column matches value'''
        curs = self.conn.cursor()
        column = self._column(column)
        ret = curs.execute('SELECT * FROM %s WHERE %s = "%s"' %(self.table, column, value)).fetchall()
        curs.close()
        return ret

    def sum(self, col, filter = {}):
        curs = self.conn.cursor()
        _where = []
        for k,v in filter.iteritems():
            _where.append(' %s = "%s" ' %(self._column(k),v) )
        ret = Decimal(str(curs.execute('SELECT SUM(%s) FROM %s %s' %(
            self._column(col),
            self.table,
            ' WHERE ' + ' AND '.join(_where) if _where else ''
            )).fetchall()[0][0]))
        curs.close()
        return ret
This entry was posted in data arch., python, software arch.. Bookmark the permalink.

Comments are closed.