萬盛學電腦網

 萬盛學電腦網 >> 網絡編程 >> 編程語言綜合 >> python實用代碼片段收集貼

python實用代碼片段收集貼

   這篇文章主要介紹了python實用代碼片段收集貼,本文收集了如獲取一個類的所有子類、計算運行時間、SQLAlchemy簡單使用、實現類似Java或C中的枚舉等實用功能代碼,需要的朋友可以參考下

  獲取一個類的所有子類

  復制代碼 代碼如下:

  def itersubclasses(cls, _seen=None):

  """Generator over all subclasses of a given class in depth first order."""

  if not isinstance(cls, type):

  raise TypeError(_('itersubclasses must be called with '

  'new-style classes, not %.100r') % cls)

  _seen = _seen or set()

  try:

  subs = cls.__subclasses__()

  except TypeError: # fails only when cls is type

  subs = cls.__subclasses__(cls)

  for sub in subs:

  if sub not in _seen:

  _seen.add(sub)

  yield sub

  for sub in itersubclasses(sub, _seen):

  yield sub

  簡單的線程配合

  復制代碼 代碼如下:

  import threading

  is_done = threading.Event()

  consumer = threading.Thread(

  target=self.consume_results,

  args=(key, self.task, runner.result_queue, is_done))

  consumer.start()

  self.duration = runner.run(

  name, kw.get("context", {}), kw.get("args", {}))

  is_done.set()

  consumer.join() #主線程堵塞,直到consumer運行結束

  多說一點,threading.Event()也可以被替換為threading.Condition(),condition有notify(), wait(), notifyAll()。解釋如下:

  復制代碼 代碼如下:

  The wait() method releases the lock, and then blocks until it is awakened by a notify() or notifyAll() call for the same condition variable in another thread. Once awakened, it re-acquires the lock and returns. It is also possible to specify a timeout.

  The notify() method wakes up one of the threads waiting for the condition variable, if any are waiting. The notifyAll() method wakes up all threads waiting for the condition variable.

  Note: the notify() and notifyAll() methods don't release the lock; this means that the thread or threads awakened will not return from their wait() call immediately, but only when the thread that called notify() or notifyAll() finally relinquishes ownership of the lock.

  復制代碼 代碼如下:

  # Consume one item

  cv.acquire()

  while not an_item_is_available():

  cv.wait()

  get_an_available_item()

  cv.release()

  # Produce one item

  cv.acquire()

  make_an_item_available()

  cv.notify()

  cv.release()

  計算運行時間

  復制代碼 代碼如下:

  class Timer(object):

  def __enter__(self):

  self.error = None

  self.start = time.time()

  return self

  def __exit__(self, type, value, tb):

  self.finish = time.time()

  if type:

  self.error = (type, value, tb)

  def duration(self):

  return self.finish - self.start

  with Timer() as timer:

  func()

  return timer.duration()

  元類

  __new__()方法接收到的參數依次是:

  當前准備創建的類的對象;

  類的名字;

  類繼承的父類集合;

  類的方法集合;

  復制代碼 代碼如下:

  class ModelMetaclass(type):

  def __new__(cls, name, bases, attrs):

  if name=='Model':

  return type.__new__(cls, name, bases, attrs)

  mappings = dict()

  for k, v in attrs.iteritems():

  if isinstance(v, Field):

  print('Found mapping: %s==>%s' % (k, v))

  mappings[k] = v

  for k in mappings.iterkeys():

  attrs.pop(k)

  attrs['__table__'] = name # 假設表名和類名一致

  attrs['__mappings__'] = mappings # 保存屬性和列的映射關系

  return type.__new__(cls, name, bases, attrs)

  class Model(dict):

  __metaclass__ = ModelMetaclass

  def __init__(self, **kw):

  super(Model, self).__init__(**kw)

  def __getattr__(self, key):

  try:

  return self[key]

  except KeyError:

  raise AttributeError(r"'Model' object has no attribute '%s'" % key)

  def __setattr__(self, key, value):

  self[key] = value

  def save(self):

  fields = []

  params = []

  args = []

  for k, v in self.__mappings__.iteritems():

  fields.append(v.name)

  params.append('?')

  args.append(getattr(self, k, None))

  sql = 'insert into %s (%s) values (%s)' % (self.__table__, ','.join(fields), ','.join(params))

  print('SQL: %s' % sql)

  print('ARGS: %s' % str(args))

  class Field(object):

  def __init__(self, name, column_type):

  self.name = name

  self.column_type = column_type

  def __str__(self):

  return '<%s:%s>

copyright © 萬盛學電腦網 all rights reserved