#!/usr/bin/python import sys import time import simplejson from Job import Job from Worker import Worker class Schwartz(object): conns = [] dbmap = {} workermap = {} def __init__(self, databases): for database in databases: m = __import__(database['driver'], globals(), locals(), ['']) del database['driver'] conn = m.connect(**database) self.conns.append(conn) if not self.dbmap.has_key(str(conn)): self.dbmap[str(conn)] = {} @staticmethod def funcname_from_id(conns, dbmap, id): funcname = '' for conn in conns: if dbmap.has_key(str(conn)): funcmap = dbmap[str(conn)] for func in funcmap: if id == funcmap[func]: return funcname cursor = None try: cursor = conn.cursor() cursor.execute("select funcname from funcmap where funcid = '%s'" % funcid) row = cursor.fetchone() if len(row) and isinstance(row, list): funcname = row[0] cursor.close() except: pass finally: if cursor: cursor.close() cursor = None if funcname: break return funcname @staticmethod def funcname_to_id(conns, dbmap, funcname): funcid = '' fix = lambda x: x.replace("\\","\\\\").replace("'","\\'") for conn in conns: if dbmap.has_key(str(conn)): funcmap = dbmap[str(conn)] if funcmap.has_key(funcname): funcid = funcmap[funcname] if not funcid: cursor = None try: cursor = conn.cursor() cursor.execute("select funcid from funcmap where funcname = '%s'" % fix(funcname)) row = cursor.fetchone() if row and len(row): funcid = row[0] dbmap[str(conn)][funcname] = funcid if not funcid: cursor.execute("insert into funcmap (funcname) values ('%s')" % fix(funcname)) conn.commit() cursor.execute("select funcid, funcname from funcmap where funcname = '%s'" % fix(funcname)) row = cursor.fetchone() if len(row) and isinstance(row, list): funcid = row[0] dbmap[str(conn)][funcname] = funcid cursor.close() except: conn.rollback() pass finally: if cursor: cursor.close() cursor = None if funcid: break return funcid def can_do(self, funcname, worker): if isinstance(worker, Worker): if not self.workermap.has_key(funcname): self.workermap[funcname] = [] self.workermap[funcname].append(worker) def find_job_with_coalescing_value(self, funcname, value): jobs = [] funcid = Schwartz.funcname_to_id(self.conns, self.dbmap, funcname) fix = lambda x: x.replace("\\","\\\\").replace("'","\\'") for conn in self.conns: cursor = None try: cursor = conn.cursor() cursor.execute("select jobid, funcid, arg, run_after, grabbed_until, uniqkey, priority, coalesce from job where funcid = '%s' and coalesce = '%s'" % (funcid, fix(value))) rows = cursor.fetchall() cursor.close() for row in rows: jobs.append( Job( jobid = row[0], funcname = Schwartz.funcname_from_id(self.conns, self.dbmap, row[1]), arg = row[2], run_after = row[3], grabbed_until = row[4], uniqkey = row[5], priority = row[6], coalesce = row[7] )) finally: if cursor: cursor.close() return jobs def find_job_for_workers(self, funcnames): jobs = [] for funcname in funcnames: funcid = Schwartz.funcname_to_id(self.conns, self.dbmap, funcname) for conn in self.conns: cursor = None try: cursor = conn.cursor() cursor.execute("select jobid, funcid, arg, run_after, grabbed_until, uniqkey, priority, coalesce from job where funcid = '%s'" % funcid) rows = cursor.fetchall() cursor.close() for row in rows: jobs.append( Job( jobid = row[0], funcname = Schwartz.funcname_from_id(self.conns, self.dbmap, row[1]), arg = simplejson.loads(row[2]), run_after = row[3], grabbed_until = row[4], uniqkey = row[5], priority = row[6], coalesce = row[7] )) finally: if cursor: cursor.close() return jobs def work_once(self): for funcname in self.workermap: funcid = Schwartz.funcname_to_id(self.conns, self.dbmap, funcname) for worker in self.workermap[funcname]: for conn in self.conns: cursor = None try: cursor = conn.cursor() cursor.execute("select jobid, funcid, arg, run_after, grabbed_until, uniqkey, priority, coalesce from job where funcid = '%s' order by priority desc" % funcid) rows = cursor.fetchall() for row in rows: worker.work( Job( jobid = row[0], funcname = Schwartz.funcname_from_id(self.conns, self.dbmap, row[1]), arg = simplejson.loads(row[2]), run_after = row[3], grabbed_until = row[4], uniqkey = row[5], priority = row[6], coalesce = row[7] )) cursor.execute("delete from job where funcid = '%s'" % funcid) cursor.close() conn.commit() except: pass finally: if cursor: cursor.close() conn.rollback() def work(self, delay=3): while True: self.work_once() time.sleep(delay) def __del__(self): for conn in self.conns: try: conn.close() except: pass