Python: django中定期执行任务

python scott 391℃ 0评论

最近需要考虑如何在django环境中跑定时任务. 这个在 stackoverflow也有对应的 讨论, 方法也有不少, 这边简单尝试和总结下.

假设我们现在的定期任务就是睡眠 n秒, 然后往数据库里面写一条记录, 记录这个任务的起始以及结束时间, 并且我们 不关心该任务的返回结果. 项目名称为 mmtest, 应用名称为 mma_cron(说实话我也不知道自己怎么取这样的名称….), 数据库使用 sqlite, model代码如下:

mma_cron/models.py
1
2
3
4
5
6
7
8
9
from  django.db  import  models
 from  django.utils  import  timezone
 
 # Create your models here.
 
 class  SimpleTask ( models . Model ):
  task_name  =  models . CharField ( max_length = 200 )
  start_time  =  models . DateTimeField ( 'task begin time' ,  default = timezone . now )
  finish_time  =  models . DateTimeField ( 'task end time' ,  blank = True ,  null = True )

最朴素的方法

考虑一个最最朴素的实现: 起一个简单的daemon程序, 每隔一定时间进行任务处理; 或者是写一个简单的程序, 利用 cron来定期执行该脚本. 一个好处是足够简单. 但很多时候写这样的程序需要花不少时间.

简单的改进

自己重新写一个程序有时候代价还是挺大的, 尤其是业务逻辑十分复杂的时候, 所以可以考虑复用已有的代码. 如果考虑 cron定期执行的策略, 我们可以在 django中实现对应逻辑, 方法也有几个:

  1. 实现一个api, 定期请求
  2. 扩展manage.py

方法一没什么好说的, 但是需要考虑访问控制; 方法二的话也很简单, 可以参考 自定义commands. 无论用哪种方式, 我们可以先实现对应的任务处理逻辑:

mma_cron/models.py
1
2
3
4
5
6
7
8
9
10
11
12
def  run_simple_task ( task_name ):
  task  =  SimpleTask ( task_name  =  task_name )
  task . save ()
 
  ## do a lot lot of stuff......
  seconds  =  random . randint ( 5 ,  15 )
  time . sleep ( seconds )
 
  task . finish_time  =  timezone . now ()
  task . save ()
 
  return  seconds

针对方法一, 实现对应的view:

mma_cron/views.py
1
2
3
4
5
6
7
from  django.http  import  HttpResponse
 
 from  .models  import  run_simple_task
 
 def  do_task ( request ,  task_name ):
  seconds  =  run_simple_task ( task_name )
  return  HttpResponse ( "I have done task in  %d  sec:  %s >"  %  ( seconds ,  task_name ))

添加对应的route:

mma_cron/urls.py
1
2
3
4
urlpatterns  =  [
  ## add one
  url ( r'^task/(?P<task_name>w+)' ,  views . do_task ,  name = 'do_task' ),
 ]

我们手动访问试一下: curl 127.0.0.1:8000/mma_cron/task/ddddd, 可以看到在经过一段时间后数据库中有结果了. 可行~

针对方法二, 扩展 manage.py, 实现对应的command:

mma_cron/management/commands/yy.py
1
2
3
4
5
6
7
8
9
10
11
12
from  django.core.management.base  import  BaseCommand ,  CommandError
 from  mma_cron.models  import  run_simple_task
 
 class  Command ( BaseCommand ):
  help  =  'Run the simple command'
 
  def  add_arguments ( self ,  parser ):
  pass
 
  def  handle ( self ,  * args ,  ** options ):
  seconds  =  run_simple_task ( 'run from manage.py' )
  self . stdout . write ( "task done>" )

再手动试一下: python manage.py yy, 发现也有对应的结果, 可行~

至于定期可以通过设定cron来实现.

插件

上面的实现比较简便, 也很有效. django的插件也有对应的实现, 这里着重介绍一下 django-cron. 它类似使用了上面提到的方法二, 在此之上增加了 任务执行检查, 日志记录等功能. 我们也来看一下;)

安装的话推荐直接把源码下的 django_cron(相当于一个django的应用)目录放到当前工程目录. 使用可以参考 这里. 接下来需要执行下它的迁移操作(migration):

1
python manage.py migrate django_cron

该操作是创建对应的数据库, 之后执行定时任务时会把 时间和 相关结果保存到数据库中. 接下来我们需要在 mma_cron中定义一个对应的定时任务:

mma_cron/cron.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from  django.utils  import  timezone
 from  django_cron  import  CronJobBase ,  Schedule
 from  .models  import  run_simple_task
 
 class  SimpleTaskCronJob ( CronJobBase ):
  RUN_EVERY_MINS  =  1
 
  schedule  =  Schedule ( run_every_mins = RUN_EVERY_MINS )
  code  =  'mma_cron.cron.simple_task_cron_job'
 
  def  do ( self ):
  seconds  =  run_simple_task ( 'task from django-cron' )
 
  msg  =  "I have done task in  %s  sec:  %s >"
 
  return  ( "I have done task in  %s  sec:  %s >"  %  ( seconds ,  task_name ))

可以看到里面定义了该定时任务的 处理间隔与相应动作. 最后我们还需要在 settings.py注册一下我们的定时任务:

mmtest/settings.py
1
2
3
CRON_CLASSES  =  [
  "mma_cron.cron.SimpleTaskCronJob>" ,
 ]

大功告成, 接下来我们只需在执行 python manage.py runcrons即可跑我们的任务了. 运行时 django_cron会根据任务的执行时间与数据库中记录的时间做对比, 如果没到对应时间会直接退出. 每次执行完都会在 django_cron_cronjoblog这张表中记录对应的时间以及返回的结果(原来python不是默认把最后一个语句的结果当成返回值的, 好吧). 同样的, 为了让其定时跑, 我们也需要在crontab中添加对应的命令.

值得一提的是如果注册了多个任务, 这些任务默认是串行执行的(考虑到安全性). 如果想要并行执行需要改一些设置, 详见文档.

Celery

上述几种方法都比较简单, 但是我们还有其他的方式;) 比如很多人都提到的
Celery
. 给我的感觉它很像 gearmansidekiq, 类似一个任务分发和处理框架. 利用它的 Periodic Tasks, 可以实现定期发布任务让worker取执行任务. 我们也来简单看看.

简单尝试

我们先不管django, 先感性的了解下Celery. Celery的 任务分发和 结果存储需要依赖外部组件, 可选的有 RabbitMQ, Redis, 数据库等等. 简单起见, 这里选择使用 redis( RabbitMQ部署起来还是稍微复杂了点).

首先根据官方的 教程, 创建个分发 加法和 乘法任务的系统. 先来看看 worker的代码:

proj/tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from  __future__  import  absolute_import
 
 from  proj.celery  import  app
 
 @app.task
 def  add ( x ,  y ):
  return  x  +  y
 
 @app.task
 def  mul ( x ,  y ):
  return  x  *  y
 
 @app.task
 def  xsum ( numbers ):
  return  sum ( numbers )

再来看看 celery的设置代码:

proj/celery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from  __future__  import  absolute_import
 
 from  celery  import  Celery
 from  datetime  import  timedelta
 
 app  =  Celery ( 'proj' ,
  broker = 'redis://localhost' ,
  backend = 'redis://localhost' ,
  include = [ 'proj.tasks' ])
 
 ## 这段代码可以先忽略;)
 app . conf . update (
  CELERY_TASK_RESULT_EXPIRES  =  3600 ,
  CELERYBEAT_SCHEDULE  =  {
  'add-every-30-seconds' :  {
  'task' :  'proj.tasks.add' ,
  'schedule' :  timedelta ( seconds = 30 ),
  'args' :  ( 16 ,  32 ),
  },
  },
 )
 
 if  __name__  ==  '__main__' :
  app . start ()

我们先启动 redis, 然后通过执行 celery -A proj worker -l info来启动一个 worker. 接下来我们手动来创建任务, 然后丢给worker:

python
1
2
3
4
>>> import proj.tasks
 >>> result  =  proj.tasks.add.delay( 2,2)
 >>> result.get()
 4

这样就创建一个任务, 我们在celery的控制台(worker进程)可以看到这样的输出:

celery
1
2
Received task: proj.tasks.add[ a24a9792-a7c6-4f64-994d-ca6903b3182c]
 Task proj.tasks.add[ a24a9792-a7c6-4f64-994d-ca6903b3182c]  succeeded in 0.0018904690005s: 4

很直观~ 我们同时也可以看看它在redis里面是什么个样子:

redis-cli
1
2
3
4
5
6
7
8
127.0.0.1:6379> keys *
 1)  "celery-task-meta-a24a9792-a7c6-4f64-994d-ca6903b3182c>"
 2)  "_kombu.binding.celery.pidbox>"
 3)  "_kombu.binding.celery>"
 4)  "_kombu.binding.celeryev>"
 
 127.0.0.1:6379> TYPE _kombu.binding.celery
 set

现在我们是手动来创建任务, 我们可以启动一个定时生产任务的生产者 celery -A proj beat -l info. 它会定期创建我们在 proj/celery.py代码中指定的 CELERYBEAT_SCHEDULE中的任务.

结合django

理解了celery, 再结合django就相对比较简单了. 推荐阅读一下官方的 文档. 结合我们的SimpleTask的例子, 先设定下celery的任务:

mmtest/celery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from  __future__  import  absolute_import
 
 import  os
 
 from  celery  import  Celery
 from  datetime  import  timedelta
 
 os . environ . setdefault ( 'DJANGO_SETTINGS_MODULE' ,  'mmtest.settings' )
 
 from  django.conf  import  settings
 
 app  =  Celery ( 'mmtest' ,  broker = 'redis://localhost' )
 
 app . config_from_object ( 'django.conf:settings' )
 
 ## 自动发现任务: APP/tasks.py
 app . autodiscover_tasks ( lambda :  settings . INSTALLED_APPS )
 
 ## 设置定时任务参数
 app . conf . update (
  CELERYBEAT_SCHEDULE  =  {
  'do-task-every-30-seconds' :  {
  'task' :  'mma_cron.tasks.do_task' ,
  'schedule' :  timedelta ( seconds = 30 ),
  },
  },
 )
 
 @app.task ( bind = True )
 def  debug_task ( self ):
  print ( 'Request: {0!r}' . format ( self . request ))

接下来我们需要实现一下我们的worker:

mma_cron/tasks.py
1
2
3
4
5
6
7
8
from  __future__  import  absolute_import
 
 from  celery  import  shared_task
 from  .cron  import  run_simple_task
 
 @shared_task
 def  do_task ():
  run_simple_task ( 'run by celery' )

不需要其他的工作, 我们就完成了我们的目的. 接下来就是分别启动 worker和 beat了. 这也实现了我们的定时任务的目的(就我们这个例子有点大材小用了).

选择

就我个人观点, 我比较倾向于改进后的朴素方法. 原因就是业务比较简单, 比较轻量级.

整个过程还是学到了不少东西. 继续学习python~

原文:http://blog.aka-cool.net/blog/2015/05/14/cron-job-in-django/

转载请注明:osetc.com » Python: django中定期执行任务

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址