PostgreSQL
 sql >> база данни >  >> RDS >> PostgreSQL

Задържайте скрипта на Python, използвайки SQLAlchemy и многопроцесорна обработка

Вярвам в TypeError идва от multiprocessing get на .

Изтрих целия DB код от вашия скрипт. Погледнете това:

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results

Използване на r.wait връща очаквания резултат, но използвайки r.get повдига TypeError . Както е описано в документите на python , използвайте r.wait след map_async .

Редактиране :Трябва да променя предишния си отговор. Вече вярвам на TypeError идва от SQLAlchemy. Промених скрипта си, за да възпроизведа грешката.

Редактиране 2 :Изглежда, че проблемът е, че multiprocessing.pool не работи добре, ако някой работник повдигне изключение, чийто конструктор изисква параметър (вижте също тук ).

Промених скрипта си, за да подчертая това.

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Във вашия случай, като се има предвид, че вашият код предизвиква изключение SQLAlchemy, единственото решение, за което мога да се сетя, е да уловя всички изключения в do функция и повторно повдигане на нормално Exception вместо. Нещо подобно:

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Редактиране 3 :така че изглежда, че е бъг с Python , но подходящи изключения в SQLAlchemy биха го заобиколили:следователно, аз повдигнах проблема с SQLAlchemy също.

Като заобиколно решение на проблема мисля, че решението в края на Редактиране 2 ще свърши работа (обвиване на обратни извиквания в try-except и re-raise).



  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Трябва ли да добавя колона тип за проектиране на наследяване в postgreSQL?

  2. Динамична колона в оператор SELECT postgres

  3. Подобрете производителността на заявките за агрегиране на PostgresSQL

  4. Управление на замръзване в PostgreSQL

  5. Postgresql - Как да извлека първото появяване на подниз в низ, използвайки модел на регулярен израз?