An event aggregator.
Two components coupled by a database and a data model
- ETL System
- Reports webapp
Two .INI-style config files:
local.ini-- read byworkhours.tasksandworkhours.climaindevelopment.ini-- read by pserve, gunicorn
$ workhours --help
Usage: workhours [-c conf] [--fs path] [--db uri]] <options> [-s source path+] [-r report+]
event aggregation CLI
Options:
-h, --help show this help message and exit
-c CONFIG_FILE, --config=CONFIG_FILE
path to configuration file containing
db.uri, fs.uri, and TaskQueue
definitions
--db=EVENTSDB_URI, --eventsdb=EVENTSDB_URI
database uri for storing task results
ex: ``sqlite:///:memory:``
--fs=FS_URI, --task-storage=FS_URI
Path where task data will be copied
and reports files will be stored
-l, --list-source-types
List supported source (TaskQueue) types
-s SRC_QUEUES, --src=SRC_QUEUES
Type and filename tuples
(ex: ``-s shell.log ./.usrlog``)
-P, --parse Parse and extract all sources defined in
by the ``-s`` option and the ``-c``
config
-u USERNAMES, --username=USERNAMES
Usernames to include
--list-report-types List supported report types
-r REPORTS, --report=REPORTS
Generate a report type
-o OUTPUT, --output-file=OUTPUT
Output file (default: '-' for stdout)
-O OUTPUT_FORMAT, --output-format=OUTPUT_FORMAT
Output format <csv|json> (default: None)
-G GAPTIME, --gaptime=GAPTIME
Minute gap to detect between entries
-p, --print-all Dump the events table to stdout
-v, --verbose
-q, --quiet
-t, --tes
a one-pass copy and parse of each source listed in -c --config-file as
[queue_type] uniqkey_n = file_uri_n
and on the commandline as source path to -s --src:
workhours -s log.shell ~/shell.log
Each source is copied into a filestore at ``fs.uri specified as either
- config:
fs.uriin the config file - CLI:
--fson the commandline
and read into a SQL database wrapped by SQLAlchemy specified either by
- Config:
eventsdb.uriin thelocal.iniconfiguration file - CLI:
--db sqlite:///example.db
- TODO: es indexing
Parse functions are imported ("registered")
as named queues workhours.tasks linked to parse_ functions.
@classmethod
def Event.from_uhm(cls, source, obj, **kwargs):
_kwargs = {}
_kwargs['task_id'] = kwargs.get('task_id')
try:
if isinstance(obj, dict):
_kwargs.update(obj)
_obj = cls(source, **_kwargs)
elif hasattr(obj, 'to_event_row'):
_obj = cls(source, *obj.to_event_row(), **_kwargs)
# punt
elif hasattr(obj, '__iter__'):
_obj = cls(source, *obj, **_kwargs)
else:
raise Exception("uh")
except Exception, e:
log.error({'obj': obj,
'type': type(obj),
'dir': dir(obj)
})
log.exception(e)
raise Exception()- TODO: normalize parse function signatures:
*args,*kwargs - TODO:
workhours.interfaces.IDataSource - TODO: Tag Support
- TODO: IDataSource Interface
- TODO: Tests
- TODO: Standard bookmarks.html file
- TODO: HTTP common log
- TOOD: Pyline column mappings
to_event_row():tuple- TODO: IEventRecord Interface
- sqlite:///:memory:
- mysql://...
- [...]://...
- TODO: connection timeouts configuration
- TODO: tasks configuration
- TODO: elasticsearch sqlalchemy event integration
- TODO: generate a
pandas.DataFramefrom event tables
Standard python classes mapped to SQLAlchemy tables.
EventPlaceTaskQueueTask Models
Event .
.date
.url
.text
.task_id
- TODO: sadisplay
- TODO: stdout norm (__{str,unicode}__)
- TODO: periodic tasks
- TODO: inotify throttling
- TODO: messaging middleware
- TODO: celery || zmq
- TODO: handle potentially frequently changing events.db files when
- TODO: or, manage n databases and n sets of models (see)
TODO: tests: histograms with sqlalchemy date paging
TODO: date aggregation
- TODO: webapp configuration
- TODO: fulltext search
- TODO: faceted search and highlighting
TODO: events HTML tables + paging TODO: frequency timeline histogram TODO: REST API TODO: js layer