Тази част от урока подробно описва как да внедрите опашка от задачи Redis за обработка на текст.
Актуализации:
- 12.02.2020 г.:Надстроен до версия 3.8.1 на Python, както и до най-новите версии на Redis, Python Redis и RQ. Вижте по-долу за подробности. Споменете грешка в най-новата версия на RQ и предоставете решение. Решена грешка http преди https.
- 22.03.2016 г.:Надстроен до Python версия 3.5.1, както и най-новите версии на Redis, Python Redis и RQ. Вижте по-долу за подробности.
- 22.02.2015 г.:Добавена поддръжка за Python 3.
Безплатен бонус: Щракнете тук, за да получите достъп до безплатен видеоурок за Flask + Python, който ви показва как да създадете уеб приложение за Flask, стъпка по стъпка.
Запомнете:Ето какво изграждаме – приложение Flask, което изчислява двойки думи-честота въз основа на текста от даден URL.
- Първа част:Настройте локална среда за разработка и след това разгръщайте както сценична, така и производствена среда на Heroku.
- Част втора:Настройте PostgreSQL база данни заедно с SQLAlchemy и Alembic за обработка на миграции.
- Част трета:Добавете логиката на задния край, за да изстържете и след това да обработите броя на думите от уеб страница, като използвате библиотеките със заявки, BeautifulSoup и Natural Language Toolkit (NLTK).
- Четвърта част:Внедряване на опашка от задачи Redis за обработка на текста. (текущи )
- Част пета:Настройте Angular на предния край, за да анкетира непрекъснато задния край, за да видите дали заявката е обработена.
- Част шеста:Пренасочване към сървъра за етапи на Heroku – настройка на Redis и подробно описание как да се стартират два процеса (уеб и работен) на един Dyno.
- Част седма:Актуализирайте предния край, за да го направите по-удобен за потребителя.
- Част осма:Създайте персонализирана ъглова директива за показване на диаграма на честотно разпределение с помощта на JavaScript и D3.
Имате нужда от кода? Вземете го от репо.
Изисквания за инсталиране
Използвани инструменти:
- Redis (5.0.7)
- Python Redis (3.4.1)
- RQ (1.2.2) - проста библиотека за създаване на опашка от задачи
Започнете, като изтеглите и инсталирате Redis от официалния сайт или чрез Homebrew (brew install redis
). След като бъде инсталиран, стартирайте Redis сървъра:
$ redis-server
След това инсталирайте Python Redis и RQ в нов прозорец на терминала:
$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt
Настройте Worker
Нека започнем със създаване на работен процес, който да слуша за задачи на опашка. Създайте нов файл worker.py и добавете този код:
import os
import redis
from rq import Worker, Queue, Connection
listen = ['default']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
Тук слушахме за опашка, наречена default
и установи връзка със сървъра Redis на localhost:6379
.
Пуснете това в друг прозорец на терминала:
$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...
Сега трябва да актуализираме нашия app.py за изпращане на задания на опашката...
Актуализирайте app.py
Добавете следните импортирания към app.py :
from rq import Queue
from rq.job import Job
from worker import conn
След това актуализирайте секцията за конфигурация:
app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)
q = Queue(connection=conn)
from models import *
q = Queue(connection=conn)
настройте връзка с Redis и инициализирайте опашка въз основа на тази връзка.
Преместете функционалността за обработка на текст извън нашия индексен маршрут в нова функция, наречена count_and_save_words()
. Тази функция приема един аргумент, URL, който ще й предадем, когато я извикаме от нашия индексен маршрут.
def count_and_save_words(url):
errors = []
try:
r = requests.get(url)
except:
errors.append(
"Unable to get URL. Please make sure it's valid and try again."
)
return {"error": errors}
# text processing
raw = BeautifulSoup(r.text).get_text()
nltk.data.path.append('./nltk_data/') # set the path
tokens = nltk.word_tokenize(raw)
text = nltk.Text(tokens)
# remove punctuation, count raw words
nonPunct = re.compile('.*[A-Za-z].*')
raw_words = [w for w in text if nonPunct.match(w)]
raw_word_count = Counter(raw_words)
# stop words
no_stop_words = [w for w in raw_words if w.lower() not in stops]
no_stop_words_count = Counter(no_stop_words)
# save the results
try:
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
except:
errors.append("Unable to add item to database.")
return {"error": errors}
@app.route('/', methods=['GET', 'POST'])
def index():
results = {}
if request.method == "POST":
# this import solves a rq bug which currently exists
from app import count_and_save_words
# get url that the person has entered
url = request.form['url']
if not url[:8].startswith(('https://', 'http://')):
url = 'http://' + url
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
return render_template('index.html', results=results)
Обърнете внимание на следния код:
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
Забележка: Трябва да импортираме
count_and_save_words
функция в нашата функцияindex
тъй като пакетът RQ в момента има грешка, при която няма да намери функции в същия модул.
Тук използвахме опашката, която инициализирахме по-рано и нарекохме enqueue_call()
функция. Това добави ново задание към опашката и това задание стартира count_and_save_words()
функция с URL като аргумент. result_ttl=5000
Аргументът на линията казва на RQ колко дълго да задържи резултата от заданието - в този случай 5000 секунди. След това изведохме идентификатора на заданието в терминала. Този идентификатор е необходим, за да се види дали работата е приключила с обработката.
Нека настроим нов маршрут за това...
Вземете резултати
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
return str(job.result), 200
else:
return "Nay!", 202
Нека тестваме това.
Стартирайте сървъра, отидете до http://localhost:5000/, използвайте URL адреса https://realpython.com и вземете идентификатора на заданието от терминала. След това използвайте този идентификатор в крайната точка „/results/“, т.е. http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.
Докато са изминали по-малко от 5000 секунди, преди да проверите състоянието, тогава трябва да видите идентификационен номер, който се генерира, когато добавим резултатите към базата данни:
# save the results
try:
from models import Result
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
Сега нека преработим леко маршрута, за да върнем действителните резултати от базата данни в JSON:
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
result = Result.query.filter_by(id=job.result).first()
results = sorted(
result.result_no_stop_words.items(),
key=operator.itemgetter(1),
reverse=True
)[:10]
return jsonify(results)
else:
return "Nay!", 202
Не забравяйте да добавите импортирането:
from flask import jsonify
Тествайте това отново. Ако всичко е минало добре, трябва да видите нещо подобно на вашия браузър:
[
[
"Python",
315
],
[
"intermediate",
167
],
[
"python",
161
],
[
"basics",
118
],
[
"web-dev",
108
],
[
"data-science",
51
],
[
"best-practices",
49
],
[
"advanced",
45
],
[
"django",
43
],
[
"flask",
41
]
]
Какво следва?
Безплатен бонус: Щракнете тук, за да получите достъп до безплатен видеоурок за Flask + Python, който ви показва как да създадете уеб приложение за Flask, стъпка по стъпка.
В част 5 ще обединим клиента и сървъра, като добавим Angular към микса, за да създадем полер, който ще изпраща заявка на всеки пет секунди до /results/<job_key>
крайна точка, която иска актуализации. След като данните са налични, ще ги добавим към DOM.
Наздраве!
Това е част от сътрудничеството между Cam Linke, съосновател на Startup Edmonton, и хората от Real Python