Flask例子-实现Redis Task Queue(任务队列)

Python部落组织翻译,禁止转载,欢迎转发


在本文中,我们加入基础的 Redis task queue来进行文本处理。

有许多的工具也能够实现,例如ReTask和HotQueue。我们将使用的是Python RQ。它是一个建立在Redis之上用来创建task queue(任务队列)的简单的库,并且容易设置和实现。

请牢记,我们是这样创建的:一个基于给定的URL中的文本用来统计单词的分布对的Flask应用。这是一个完整的教程。


1、 第一部分:设置本地开发环境,接着在Heroku云平台上部署staging环境和production环境。
2、 第二部分:设置一个带有SQLAlche和Alembic的PostgreSQL数据库来处理迁移。
3、 第三部分:加入后端逻辑到scrape,然后使用requests,BeautifulSoup和Natural Language Toolkit(NLTK)库来计算处理从页面中得到的单词。
4、 第四部分:实现一个Redis task queue 来进行文本处理。(本文描述的)
5、 第五部分:安装Angular前端框架来不断查看后端框架请求是否完成。
6、 第六部分:启动在Heroku云平台上的staging服务——设置Redis,详细介绍如何在单个Dyno上运行两个进程(web和worker)。(ps:在Heroku云平台上,对于每一个进程采用一个叫Dyno的单位来进行性能管理,增加这个值,则能提高应用的相应速度和吞吐量。有适用于Web server的Web dyno和作为后台进程的Worker dyno。)
7、 第七部分:更新前端使其用户界面更友好。
8、 第八部分:将D3库添加到混合频率分布图和柱状图。

需要代码吗?可以到repo(https://github.com/realpython/flask-by-example/releases)上抓取。

安装要求


工具:

  • Redis——http://redis.io/

  • Python Redis —— https://pypi.python.org/pypi/redis/2.10.5

  • RQ(Redis Queue)——http://python-rq.org/


首先从http://redis.io/下载和安装Redis(或者使用Homebrew命令——brew install redis),然后开启Redis服务。


$ redis-server


接下来在一个新的终端窗口安装Python Redis 库和RQ库。


$ workon wordcounts
$ pip install redis rq
$ pip freeze > requirements.txt


设置Worker


接下来我们开始创建一个worker进程来监听queued tasks(队列任务)。


import os
import redis
from rq import Worker, Queue, Connection
listen = ['default']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
   with Connection(conn):
      worker = Worker(list(map(Queue, listen)))
      worker.work()


保存以上文本为worker.py。在这段代码中,我们监听命名为default的queue(队列)并且与我们的Redis服务在localhost:6379 建立连接。

开启另一个终端窗口:


$ workon wordcounts
$ python worker.py
17:01:29 RQ worker started, version 0.4.6
17:01:29
17:01:29 *** Listening on default...


现在我们需要更新我们的app.py来向queue(队列)发送任务。

更新app.py

将下面一些模块import到app.py


from rq import Queue
from rq.job import Job
from worker import conn


然后更新配置部分:


app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
db = SQLAlchemy(app)
q = Queue(connection=conn)
from models import *


q = Queue(connection=conn)设置一个Redis连接并基于连接初始化队列。
现在我们将文本处理功能 搬出我们的索引路径并且添加一个叫做count_and_save_words()的函数。当我们从索引路径调用这个函数时需要传入一个URL参数让它接收。


def count_and_save_words(url):

    errors = []

    try:
        r = requests.get(url)
    except:
        errors.append(
            "Unable to get URL. Please make sure it's valid and try again."
        )
        return {"error": errors}

    # text processing
    raw = BeautifulSoup(r.text).get_text()
    nltk.data.path.append('./nltk_data/')  # set the path
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)

    # remove punctuation, count raw words
    nonPunct = re.compile('.*[A-Za-z].*')
    raw_words = [w for w in text if nonPunct.match(w)]
    raw_word_count = Counter(raw_words)

    # stop words
    no_stop_words = [w for w in raw_words if w.lower() not in stops]
    no_stop_words_count = Counter(no_stop_words)

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    except:
        errors.append("Unable to add item to database.")
        return {"error": errors}


@app.route('/', methods=['GET', 'POST'])
def index():
    results = {}
    if request.method == "POST":
        # get url that the person has entered
        url = request.form['url']
        if 'http://' not in url[:7]:
            url = 'http://' + url
        job = q.enqueue_call(
            func=count_and_save_words, args=(url,), result_ttl=5000
        )
        print(job.get_id())

    return render_template('index.html', results=results)


注意下面的代码:


job = q.enqueue_call(
 func=count_and_save_words, args=(url,), result_ttl=5000
)
 print(job.get_id())


在这里我们使用我们之前初始化的queue(队列)和命名为enqueue_call() 的函数。这里添加了一个运行将URL作为参数的count_and_save_words() 函数的新job到我们的queue中。result_ttl=5000 这一行的参数告知RQ为job结果等待多久——5000秒。然后我们输出job的id到终端。这个是必要的,用来查看这个job是否处理完成。

让我们来设置一条新的route(路径)

新的Route(路径)


@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        return str(job.result), 200
    else:
        return "Nay!", 202


让我们来测试这一点。


启动服务,进入http://localhost:5000/,使用URL http://realpython.com,然后从终端抓取这个job的id。接着使用这个id在 ‘/results/’末端。


只要例如:http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197


只要不超过5000秒前检查运行状态,你就可以看到id数值,这是我们将结果添加到数据库时产生的。


# save the results
try:
    from models import Result
    result = Result(
        url=url,
        result_all=raw_word_count,
        result_no_stop_words=no_stop_words_count
    )
    db.session.add(result)
    db.session.commit()
    return result.id


现在,让我们稍微重构route(路径)来用JSON将数据库的结果集返回。


@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        result = Result.query.filter_by(id=job.result).first()
        results = sorted(
            result.result_no_stop_words.items(),
            key=operator.itemgetter(1),
            reverse=True
        )[:10]
        return jsonify(results)
    else:
        return "Nay!", 202


确保import这个模块:


from flask import jsonify


再次测试这个点。如果一切顺利,你将在浏览器看到以下结果:


{
Course: 5,
Download: 4,
Python: 19,
Real: 11,
courses: 7,
development: 7,
return: 4,
sample: 4,
videos: 5,
web: 12
}


接下来是什么呢?


为了能够让所有整合到一起,我们将会添加AngularJS进行扩展,在下一个部分,创建一个基本轮询方式的服务端——每五秒发送一个请求或者直接在/results/<job_key> 末端请求更新。一旦数据是可用的,我们可以添加到DOM。


英文原文:https://realpython.com/blog/python/flask-by-example-implementing-a-redis-task-queue/

译者:Arvin