首页 > 编程笔记

Python常用装饰器汇总

Python自带了一个装饰器,如在类中,可以使用 staticmethod 装饰器来表明某个函数是静态成员函数,使用 classmethod 来表明该函数是一个类成员函数。另外,可以自己定义自己的装饰,或者使用第三方提供的装饰器。

本节将介绍一些比较常用的装饰器。

1、类装饰器

下面的代码演示了 staticmethod 和 classmethod 两个类装饰器的用法,分别用来装饰类成员函数,用来表示某个方法是静态方法或类方法。
class DemoClass1(object):
    attr1 = 1                   # 类属性
    @staticmethod               # 静态方法
    def static_func1():
        print(DemoClass1.attr1)
        DemoClass1.attr1 += 1
    @classmethod                # 类方法
    def class_func1(cls):
        print(cls.attr1)
        cls.attr1 += 2
DemoClass1.static_func1()
DemoClass1.class_func1()
DemoClass1.static_func1()
DemoClass1.class_func1()
运行后输出结果如下:

$ python staticclassdemo1.py
1
2
4
5


另外一个常见的装饰器是 property,该装饰器表明被装饰函数会在读取与函数同名的属性时被调用,并且得到的是被装饰函数的返回值。注意,该属性是只读的,不能被赋值。下面是一个例子。
>>> class PropertyDemo1(object):
...     def __init__(self):
...         self.field1 = 0
...         self.field2 = 0
...     @property
...     def count(self):
...         return self.field1
...                                       # 类定义结束
>>> obj1 = PropertyDemo1()
>>> obj1.count
0
>>> obj1.count = 4                        # 只读属性,不能被赋值
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: can't set attribute
如果希望可读也可写,那么需要先定义一个被 property 装饰的函数,然后定义一个被 setter 装饰的函数。下面是一个例子。
>>> class GetterSetter(object):
...     def __init__(self):
...         self.field1 = 0
...     @property                               # 表明是一个属性,默认是只读的
...     def count(self):
...         return self.field1
...     @count.setter                           # 添加设置写操作函数
...     def count(self, value):
...         if not isinstance(value, int):
...             raise ValueError('count must be an integer!')
...         self.field1 = value
...                                            # 类定义结束
>>> obj1 = GetterSetter()                      # 创建一个对象
>>> obj1.count = 3                             # 赋值
>>> obj1.count                                 # 读出
3

2、退出时执行的函数

接下来介绍的装饰器用来装饰在程序退出时执行的函数。演示例子如下:
import time
def onexit(f):                            # 装饰器
    import atexit
    atexit.register(f)                    # 将被装饰函数注册到系统中
    return f
@onexit
def on_exit_func1():
    print("on_exit_func1() is running")
@onexit
def on_exit_func2():
    print("on_exit_func2() is running")
print("Starting running")                  # 开始执行脚本
time.sleep(3)
print("Ending")                            # 脚本执行完毕
运行后的输出结果如下:

$ python onexit.py
Starting running
Ending
on_exit_func2() is running
on_exit_func1() is running

3、单例模式

这时装饰器装饰的类,表示该类最多只能有一个实例对象。演示例子如下:
>>> def singleton(cls):                # 装饰器,不过传入的是一个类对象而不是函数对象
...     instances = {}
...     def getinstance():
...         if cls not in instances:
...             instances[cls] = cls()      # 生成一个对象
...         return instances[cls]
...     return getinstance
...                                         # 装饰器定义结束
>>> @singleton
... class DemoClass1:
...     def __init__(self):
...         self.field1 = 8
...         self.field2 = "python"
...                                          # 被装饰的类定义结束
>>> obj1 = DemoClass1()                      # 创建两个对象
>>> obj2 = DemoClass1()                      # 第二个对象
>>> obj1 is obj2                             # 这两个对象是否是同一个对象
True                                         # 是同一个对象

4、执行时间限制

有时需要限定某个函数的执行时间,如果超出该时间,需要强制结束运行。提供这种装饰器的 Python 包很多,这里仅介绍其中一个,即 timeout_decorator。

使用该 timeout_decorator 之前,需要自行安装,方法是运行下面的命令行:

pip install timeout-decorator

下面是一个使用该装饰器的例子。
import time
import timeout_decorator                           # 引入装饰器
@timeout_decorator.timeout(5)                      # 不能超时5秒
def func_with_time_limit():
    print("Start")
    for i in range(1,10):                          # 循环10次
        time.sleep(1)
        print("{} seconds have passed".format(i))
if __name__ == '__main__':
    func_with_time_limit()
运行该脚本,输出如下:

$ python timeoutdemo1.py
Start
1 seconds have passed
2 seconds have passed
3 seconds have passed
4 seconds have passed
Traceback (most recent call last):                      # 打印出来调用栈
  File "timeoutdemo1.py", line 12, in <module>
    func_with_time_limit()
  File
"/anaconda3/lib/python3.7/site-packages/timeout_decorator/timeout_
        decorator.py",
line 81, in new_function
    return function(*args, **kwargs)
  File "timeoutdemo1.py", line 8, in func_with_time_limit
    time.sleep(1)
  File
"/anaconda3/lib/python3.7/site-packages/timeout_decorator/timeout_
        decorator.py",
line 72, in handler
    _raise_exception(timeout_exception, exception_message)
  File
"/anaconda3/lib/python3.7/site-packages/timeout_decorator/timeout_
        decorator.py",
line 45, in _raise_exception
    raise exception()
timeout_decorator.timeout_decorator.TimeoutError: 'Timed Out'


可以发现,在超时后它会自动抛出异常,我们可以捕捉该异常。下面是修改后的代码:
import time
import timeout_decorator
@timeout_decorator.timeout(5)                   # 不能超时5秒
def func_with_time_limit():
    print("Start")
    for i in range(1,10):
        time.sleep(1)
        print("{} seconds have passed".format(i))
if __name__ == '__main__':
    try:
        func_with_time_limit()
    except:
        print("Time-out")
运行后的输出如下:

$ python timeoutdemo2.py
Start
1 seconds have passed
2 seconds have passed
3 seconds have passed
4 seconds have passed
Time-out

5、执行时间标定

有时需要统计出某个函数每次执行的时间,此时既可以使用在函数执行之前和之后获取系统时间,然后通过计算得到实际运行时间,也可以使用现成的装饰器来达到这个目的。

这里介绍的装饰器是 benchmark-decorator,这个装饰器也是需要读者自行安装的。安装命令如下:

pip install benchmark-decorator


下面是一个使用benchmark-decorator装饰器的例子。
import time
from benchmark import Benchmark         # 引入标定模块
@Benchmark()                            # 标定该函数的执行时间
def do_real_work1():
    time.sleep(0.1)
    return 1
@Benchmark()                            # 标定该函数的执行时间
def do_real_work2():
    time.sleep(0.2)
    return 2
@Benchmark()                            # 标定该函数的执行时间
def func_benchmarked():
    print("Start")
    for i in range(1,10):
        do_real_work1()
        do_real_work2()
if __name__ == '__main__':
    func_benchmarked ()
运行该脚本后会发现,在当前目录下多了一个文件 bench.log,标定数据都记录在该文件中了。下面是该文件的内容:

2019-07-28 06:10:05,921 - BENCHMARK - INFO - do_real_work1: 0.10025286674499512
2019-07-28 06:10:06,123 - BENCHMARK - INFO - do_real_work2: 0.20151209831237793
2019-07-28 06:10:06,227 - BENCHMARK - INFO - do_real_work1: 0.10357093811035156
2019-07-28 06:10:06,431 - BENCHMARK - INFO - do_real_work2: 0.2039330005645752
......

6、自动重新运行

如果执行过程中抛出了异常,那么可以尝试让 retry() 函数再次运行。该软件包也需要我们自行安装,安装命令如下:

pip install retry_decorator

下面是一个使用该装饰器的例子:
from retry_decorator import *                 # 引入重试模块
@retry(Exception, tries=3, timeout_secs=0.3)  # 自动重试3次,间隔0.3秒
def retry_demo():                             # 如果抛出了任意类型的异常
    import sys,time
    print('working now')
    raise Exception('Testing retry')
if __name__ == '__main__':
    try:
        retry_demo()
    except Exception as e:
        print('Received the last exception')
运行后的输出如下:

$ python retrydemo1.py
working now                             #第5行代码的输出
ERROR:root:Retrying in 0.32 seconds ... # 会在0.32秒后再次尝试运行
Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/site-packages/retry_decorator/retry_decorator.py",
line 26, in f_retry
    return f(*args, **kwargs)
  File "retrydemo1.py", line 7, in retry_demo
    raise Exception('Testing retry')
Exception: Testing retry
working now                              # 第2轮运行,第5行代码的输出
ERROR:root:Retrying in 0.60 seconds ...
Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/site-packages/retry_decorator/retry_
          decorator.py",
line 26, in f_retry
    return f(*args, **kwargs)
  File "retrydemo1.py", line 7, in retry_demo
    raise Exception('Testing retry')
Exception: Testing retry
working now                               # 第3轮运行
Received the last exception

7、状态机

对于一个状态机来说,需要定义状态和状态迁移函数,在某个事件发生时,会调用对应的状态迁移函数,如果迁移函数执行失败,其会调用用户定义的失败处理函数。

下面是 pysta temachine 包的源代码:
import functools
import inspect
class InvalidStateTransition(Exception):
    pass
class StateInfo(object):
    @staticmethod
    def get_states(cls):
        if not inspect.isclass(cls):
            raise TypeError('"{0}" is no class object!'.format(cls))
        if not hasattr(cls, '___pystatemachine_cls_states'):
            states = tuple(state for _, state  in
                inspect.getmembers(cls, lambda member: isinstance(member,
                        State)))
            setattr(cls, '___pystatemachine_cls_states', states)
        return getattr(cls, '___pystatemachine_cls_states')
    @staticmethod
    def get_initial_state(cls):
        if not inspect.isclass(cls):
            raise TypeError('"{0}" is no class object!'.format(cls))
        if not hasattr(cls, '___pystatemachine_cls_initial_state'):
            states = StateInfo.get_states(cls)
            initial_states = [state for state in states if state.is_
                    initial]
            assert initial_states
            assert len(initial_states) == 1
            initial_state = initial_states[0]
            setattr(cls, '___pystatemachine_cls_initial_state', initial_
                    state)
        return getattr(cls, '___pystatemachine_cls_initial_state')
    @staticmethod
    def get_current_state(obj):
        if not hasattr(obj, '___pystatemachine_obj_current_state'):
            initial_state = StateInfo.get_initial_state(obj.__class__)
            setattr(obj, '___pystatemachine_obj_current_state', initial_
                    state)
        return getattr(obj, '___pystatemachine_obj_current_state')
    @staticmethod
    def set_current_state(obj, state):
        assert isinstance(state, State), 'invalid state type!'
        setattr(obj, '___pystatemachine_obj_current_state', state)
class State(object):                                                    # 定义状态类
    def __init__(self, name, initial=False):
        super(State, self).__init__()
        self.is_initial = True if initial else False
        self.name = name.upper()
    def __str__(self):
        return '<{0}.State[{1}] object at 0x{2:X}>'.format(__name__,
                self.name, id(self))
def event(from_states=None, to_state=None):     # 装饰器
    """ a decorator for transitioning from certain states to a target
            state.
       must be used on bound methods of a class instance, only. """
    from_states_tuple = (from_states, ) if isinstance(from_states,
            State) \
           else tuple(from_states or [])
    if not len(from_states_tuple) >= 1:
        raise ValueError()
    if not all(isinstance(state, State) for state in from_states_tuple):
        raise TypeError()
    if not isinstance(to_state, State):
        raise TypeError()
    def wrapper(wrapped):
        @functools.wraps(wrapped)
        def transition(instance, *a, **kw):
            if instance.current_state not in from_states_tuple:
                raise InvalidStateTransition()
            try:
                result = wrapped(instance, *a, **kw)
            except Exception as error:
                error_handlers = getattr(instance,
                     '___pystatemachine_transition_failure_handlers', [])
                for error_handler in error_handlers:
                    error_handler(instance, wrapped, instance.current_state,
                       to_state, error)
                if not error_handlers:
                    raise error
            else:
                StateInfo.set_current_state(instance, to_state)
                return result
        return transition
    return wrapper
def transition_failure_handler(calling_sequence=0):
    def wrapper(wrapped):
        setattr(wrapped, '___pystatemachine_is_transition_failure_
                handler', True)
        setattr(wrapped,
              '___pystatemachine_transition_failure_handler_calling_
                      sequence',
              int(calling_sequence))
        return wrapped
    return wrapper
def acts_as_state_machine(cls):
    """
    a decorator which sets two properties on a class:
        * the 'current_state' property: a read-only property,
          returning the state machine's current state, as 'State' object
        * the 'states' property: a tuple of all valid state machine states,
          as 'State' objects
    class objects may use current_state and states freely
    :param cls:
    :return:
    """
    assert not hasattr(cls, 'current_state')
    assert not hasattr(cls, 'states'), '{0} already has a "states"
            attribute!'.format(cls)
    def get_states(obj):
        return StateInfo.get_states(obj.__class__)
    def is_transition_failure_handler(obj):
        return all([
           any([
               inspect.ismethod(obj),  # Python 2
               inspect.isfunction(obj),  # python3
           ]),
           getattr(obj, '___pystatemachine_is_transition_failure_
                    handler', False),
       ])
   transition_failure_handlers = sorted(
       [value for name, value in
      inspect.getmembers(cls, is_transition_failure_handler)],
       key=lambda m:
         getattr(m,
              '___pystatemachine_transition_failure_handler_calling_
                       sequence', 0),
      )
   setattr(cls,
         '___pystatemachine_transition_failure_handlers',
         transition_failure_handlers)
   cls.current_state = property(fget=StateInfo.get_current_state)
   cls.states = property(fget=get_states)
   return cls
下面是使用该状态机装饰器的例子,该例子描述了闸机的状态。闸机只有两个状态,一个是锁死,一个是可以推动的非锁死状态。事件也有两个,一个是让闸机锁死,一个是让闸机解锁。状态转换函数应该有4个,但有两个可以和别的共享,所以实际上只有两个状态转换函数,一个是解锁处理函数,一个是锁定处理函数。

由于任意状态转换的过程中可能发生异常情况,所以需要定义异常,该函数会在转换函数抛出异常时被调用。下面是一个例子:

@acts_as_state_machine
class Turnstile(object):                                        # 这个类对应一个状态机
    locked = State('locked', initial=True)      # 定义两个状态
    unlocked = State('unlocked')
    @event(from_states=(locked, unlocked), to_state=unlocked)
    def coin(self):                                                     # 解锁的处理函数
        assert random.random() > .5, 'failing for demonstration
               purposes, only ..'
        print('*blingbling* .. unlocked!')
    @event(from_states=(locked, unlocked), to_state=locked)
    def push(self):                                                     # 锁定处理函数
        print('*push* .. locked!')
    @transition_failure_handler(calling_sequence=2)
    def turnstile_malfunction(self, method, from_state, to_state,
            error):
        print(" *** turnstile_malfunction() is Running")
    @transition_failure_handler(calling_sequence=1)
    def before_turnstile_malfunction(self, method, from_state, to_state,
            error):
        print(" *** before_turnstile_malfunction() is Running")
import random
turnstile = Turnstile()                                         # 创建状态机
for _ in range(10):                                                     # 产生10个事件
    handler = random.choice([turnstile.coin, turnstile.push])
    handler()                                                           # 触发事件
    print("=======================").           # 分隔符
运行后的输出如下:

$ python statemachine.py
*blingbling* .. unlocked!
=======================
*push* .. locked!
=======================
*** before_turnstile_malfunction() is Running
*** turnstile_malfunction() is Running
=======================
*** before_turnstile_malfunction() is Running
*** turnstile_malfunction() is Running
=======================
*** before_turnstile_malfunction() is Running
*** turnstile_malfunction() is Running
=======================
*** before_turnstile_malfunction() is Running
*** turnstile_malfunction() is Running
=======================
*push* .. locked!
=======================
*push* .. locked!
=======================
*push* .. locked!
=======================
*push* .. locked!
=======================

可以发现,错误处理函数是按照 calling_sequence 属性所指定的顺序执行的。每次在执行状态转换函数的过程中,如果抛出了异常,那么被 transition_failure_handler 装饰的函数都会被调用,我们可以从参数中得知错误的详细情况。

优秀文章