#!/usr/bin/python import time import simplejson from Schwartz import Schwartz from Job import Job class Client(object): conns = [] dbmap = {} def __init__(self, databases): for database in databases: m = __import__(database['driver'], globals(), locals(), ['']) del database['driver'] conn = m.connect(**database) self.conns.append(conn) if not self.dbmap.has_key(str(conn)): self.dbmap[str(conn)] = {} def insert_jobs(self, jobs): for job in jobs: self.insert(job) def insert(self, funcname, **param): if isinstance(funcname, Job): param = funcname elif isinstance(funcname, str): param = Job(funcname=funcname, **param) val = { 'funcid': Schwartz.funcname_to_id(self.conns, self.dbmap, param.funcname), 'arg': simplejson.dumps(param.arg, ensure_ascii=False), 'uniqkey': param.uniqkey, 'insert_time': int(time.time()), 'run_after': param.run_after, 'grabbed_until': param.grabbed_until, 'priority': param.priority, 'coalesce': param.coalesce, } fix = lambda x: x.replace("\\","\\\\").replace("'","\\'") for conn in self.conns: try: cursor = conn.cursor() cursor.execute("insert into job (%s) values (%s)" % ( fix(",".join([str(x[0]) for x in val.items() if not x[1] is None])), ",".join(["'%s'" % fix(str(x)) for x in val.values() if not x is None]))) conn.commit() if '_mysql' in str(conn): cursor.execute("select last_insert_id()") elif 'pysqlite2' in str(conn): cursor.execute("select last_insert_rowid()") elif 'pgdb' in str(conn): cursor.execute("select LASTVAL()") rows = cursor.fetchall() if len(rows): param.jobid = rows[0][0] cursor.close() break #except: except: conn.rollback() pass return param.jobid def __del__(self): for conn in self.conns: try: conn.close() except: pass