Вярвам в TypeError
идва от multiprocessing
get
на .
Изтрих целия DB код от вашия скрипт. Погледнете това:
import multiprocessing
import sqlalchemy.exc
def do(kwargs):
i = kwargs['i']
print i
raise sqlalchemy.exc.ProgrammingError("", {}, None)
return i
pool = multiprocessing.Pool(processes=5) # start 4 worker processes
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
# Use get or wait?
# r.get()
r.wait()
pool.close()
pool.join()
print results
Използване на r.wait
връща очаквания резултат, но използвайки r.get
повдига TypeError
. Както е описано в документите на python
, използвайте r.wait
след map_async
.
Редактиране :Трябва да променя предишния си отговор. Вече вярвам на TypeError
идва от SQLAlchemy. Промених скрипта си, за да възпроизведа грешката.
Редактиране 2 :Изглежда, че проблемът е, че multiprocessing.pool
не работи добре, ако някой работник повдигне изключение, чийто конструктор изисква параметър (вижте също тук
).
Промених скрипта си, за да подчертая това.
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
class GoodExc(Exception):
def __init__(self, a=None):
'''Optional param in the constructor.'''
self.a = a
def do(kwargs):
i = kwargs['i']
print i
raise BadExc('a')
# raise GoodExc('a')
return i
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
Във вашия случай, като се има предвид, че вашият код предизвиква изключение SQLAlchemy, единственото решение, за което мога да се сетя, е да уловя всички изключения в do
функция и повторно повдигане на нормално Exception
вместо. Нещо подобно:
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
def do(kwargs):
try:
i = kwargs['i']
print i
raise BadExc('a')
return i
except Exception as e:
raise Exception(repr(e))
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
Редактиране 3 :така че изглежда, че е бъг с Python , но подходящи изключения в SQLAlchemy биха го заобиколили:следователно, аз повдигнах проблема с SQLAlchemy също.
Като заобиколно решение на проблема мисля, че решението в края на Редактиране 2 ще свърши работа (обвиване на обратни извиквания в try-except и re-raise).