Redis
 sql >> база данни >  >> NoSQL >> Redis

Колба по пример – Внедряване на опашка със задачи Redis

Тази част от урока подробно описва как да внедрите опашка от задачи Redis за обработка на текст.

Актуализации:

  • 12.02.2020 г.:Надстроен до версия 3.8.1 на Python, както и до най-новите версии на Redis, Python Redis и RQ. Вижте по-долу за подробности. Споменете грешка в най-новата версия на RQ и предоставете решение. Решена грешка http преди https.
  • 22.03.2016 г.:Надстроен до Python версия 3.5.1, както и най-новите версии на Redis, Python Redis и RQ. Вижте по-долу за подробности.
  • 22.02.2015 г.:Добавена поддръжка за Python 3.

Безплатен бонус: Щракнете тук, за да получите достъп до безплатен видеоурок за Flask + Python, който ви показва как да създадете уеб приложение за Flask, стъпка по стъпка.

Запомнете:Ето какво изграждаме – приложение Flask, което изчислява двойки думи-честота въз основа на текста от даден URL.

  1. Първа част:Настройте локална среда за разработка и след това разгръщайте както сценична, така и производствена среда на Heroku.
  2. Част втора:Настройте PostgreSQL база данни заедно с SQLAlchemy и Alembic за обработка на миграции.
  3. Част трета:Добавете логиката на задния край, за да изстържете и след това да обработите броя на думите от уеб страница, като използвате библиотеките със заявки, BeautifulSoup и Natural Language Toolkit (NLTK).
  4. Четвърта част:Внедряване на опашка от задачи Redis за обработка на текста. (текущи )
  5. Част пета:Настройте Angular на предния край, за да анкетира непрекъснато задния край, за да видите дали заявката е обработена.
  6. Част шеста:Пренасочване към сървъра за етапи на Heroku – настройка на Redis и подробно описание как да се стартират два процеса (уеб и работен) на един Dyno.
  7. Част седма:Актуализирайте предния край, за да го направите по-удобен за потребителя.
  8. Част осма:Създайте персонализирана ъглова директива за показване на диаграма на честотно разпределение с помощта на JavaScript и D3.

Имате нужда от кода? Вземете го от репо.


Изисквания за инсталиране

Използвани инструменти:

  • Redis (5.0.7)
  • Python Redis (3.4.1)
  • RQ (1.2.2) - проста библиотека за създаване на опашка от задачи

Започнете, като изтеглите и инсталирате Redis от официалния сайт или чрез Homebrew (brew install redis ). След като бъде инсталиран, стартирайте Redis сървъра:

$ redis-server

След това инсталирайте Python Redis и RQ в нов прозорец на терминала:

$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt


Настройте Worker

Нека започнем със създаване на работен процес, който да слуша за задачи на опашка. Създайте нов файл worker.py и добавете този код:

import os

import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

Тук слушахме за опашка, наречена default и установи връзка със сървъра Redis на localhost:6379 .

Пуснете това в друг прозорец на терминала:

$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...

Сега трябва да актуализираме нашия app.py за изпращане на задания на опашката...



Актуализирайте app.py

Добавете следните импортирания към app.py :

from rq import Queue
from rq.job import Job
from worker import conn

След това актуализирайте секцията за конфигурация:

app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)

q = Queue(connection=conn)

from models import *

q = Queue(connection=conn) настройте връзка с Redis и инициализирайте опашка въз основа на тази връзка.

Преместете функционалността за обработка на текст извън нашия индексен маршрут в нова функция, наречена count_and_save_words() . Тази функция приема един аргумент, URL, който ще й предадем, когато я извикаме от нашия индексен маршрут.

def count_and_save_words(url):

    errors = []

    try:
        r = requests.get(url)
    except:
        errors.append(
            "Unable to get URL. Please make sure it's valid and try again."
        )
        return {"error": errors}

    # text processing
    raw = BeautifulSoup(r.text).get_text()
    nltk.data.path.append('./nltk_data/')  # set the path
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)

    # remove punctuation, count raw words
    nonPunct = re.compile('.*[A-Za-z].*')
    raw_words = [w for w in text if nonPunct.match(w)]
    raw_word_count = Counter(raw_words)

    # stop words
    no_stop_words = [w for w in raw_words if w.lower() not in stops]
    no_stop_words_count = Counter(no_stop_words)

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    except:
        errors.append("Unable to add item to database.")
        return {"error": errors}


@app.route('/', methods=['GET', 'POST'])
def index():
    results = {}
    if request.method == "POST":
        # this import solves a rq bug which currently exists
        from app import count_and_save_words

        # get url that the person has entered
        url = request.form['url']
        if not url[:8].startswith(('https://', 'http://')):
            url = 'http://' + url
        job = q.enqueue_call(
            func=count_and_save_words, args=(url,), result_ttl=5000
        )
        print(job.get_id())

    return render_template('index.html', results=results)

Обърнете внимание на следния код:

job = q.enqueue_call(
    func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())

Забележка: Трябва да импортираме count_and_save_words функция в нашата функция index тъй като пакетът RQ в момента има грешка, при която няма да намери функции в същия модул.

Тук използвахме опашката, която инициализирахме по-рано и нарекохме enqueue_call() функция. Това добави ново задание към опашката и това задание стартира count_and_save_words() функция с URL като аргумент. result_ttl=5000 Аргументът на линията казва на RQ колко дълго да задържи резултата от заданието - в този случай 5000 секунди. След това изведохме идентификатора на заданието в терминала. Този идентификатор е необходим, за да се види дали работата е приключила с обработката.

Нека настроим нов маршрут за това...



Вземете резултати

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        return str(job.result), 200
    else:
        return "Nay!", 202

Нека тестваме това.

Стартирайте сървъра, отидете до http://localhost:5000/, използвайте URL адреса https://realpython.com и вземете идентификатора на заданието от терминала. След това използвайте този идентификатор в крайната точка „/results/“, т.е. http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.

Докато са изминали по-малко от 5000 секунди, преди да проверите състоянието, тогава трябва да видите идентификационен номер, който се генерира, когато добавим резултатите към базата данни:

# save the results
try:
    from models import Result
    result = Result(
        url=url,
        result_all=raw_word_count,
        result_no_stop_words=no_stop_words_count
    )
    db.session.add(result)
    db.session.commit()
    return result.id

Сега нека преработим леко маршрута, за да върнем действителните резултати от базата данни в JSON:

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        result = Result.query.filter_by(id=job.result).first()
        results = sorted(
            result.result_no_stop_words.items(),
            key=operator.itemgetter(1),
            reverse=True
        )[:10]
        return jsonify(results)
    else:
        return "Nay!", 202

Не забравяйте да добавите импортирането:

from flask import jsonify

Тествайте това отново. Ако всичко е минало добре, трябва да видите нещо подобно на вашия браузър:

[
  [
    "Python", 
    315
  ], 
  [
    "intermediate", 
    167
  ], 
  [
    "python", 
    161
  ], 
  [
    "basics", 
    118
  ], 
  [
    "web-dev", 
    108
  ], 
  [
    "data-science", 
    51
  ], 
  [
    "best-practices", 
    49
  ], 
  [
    "advanced", 
    45
  ], 
  [
    "django", 
    43
  ], 
  [
    "flask", 
    41
  ]
]


Какво следва?

Безплатен бонус: Щракнете тук, за да получите достъп до безплатен видеоурок за Flask + Python, който ви показва как да създадете уеб приложение за Flask, стъпка по стъпка.

В част 5 ще обединим клиента и сървъра, като добавим Angular към микса, за да създадем полер, който ще изпраща заявка на всеки пет секунди до /results/<job_key> крайна точка, която иска актуализации. След като данните са налични, ще ги добавим към DOM.

Наздраве!

Това е част от сътрудничеството между Cam Linke, съосновател на Startup Edmonton, и хората от Real Python



  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Съхраняване на ключове с префикс, които изтичат в redis

  2. Елемент от списъка Redis Pop По брой елементи

  3. Главният винаги ли редис екземпляр с най-малък приоритет?

  4. Региони в стил Azure Cache/DataCache в Redis

  5. Грешка при изпълнение на Lua скрипт от Redis клиент