PostgreSQL
 sql >> база данни >  >> RDS >> PostgreSQL

Django ORM пропуска връзки при използване на ThreadPoolExecutor

Предполагам, че ThreadPoolExecutor не е това, което създава връзката с DB, но задачите с нишка са тези, които поддържат връзката. Вече трябваше да се справям с това.

В крайна сметка създадох тази обвивка, за да гарантирам, че нишките се затварят ръчно, когато се изпълняват задачи в ThreadPoolExecutor. Това би трябвало да е полезно, за да се гарантира, че връзките не са изтекли, досега не съм виждал изтичане, докато използвам този код.

from functools import wraps
from concurrent.futures import ThreadPoolExecutor
from django.db import connection

class DjangoConnectionThreadPoolExecutor(ThreadPoolExecutor):
    """
    When a function is passed into the ThreadPoolExecutor via either submit() or map(), 
    this will wrap the function, and make sure that close_django_db_connection() is called 
    inside the thread when it's finished so Django doesn't leak DB connections.

    Since map() calls submit(), only submit() needs to be overwritten.
    """
    def close_django_db_connection(self):
        connection.close()

    def generate_thread_closing_wrapper(self, fn):
        @wraps(fn)
        def new_func(*args, **kwargs):
            try:
                return fn(*args, **kwargs)
            finally:
                self.close_django_db_connection()
        return new_func

    def submit(*args, **kwargs):
        """
        I took the args filtering/unpacking logic from 
   
        https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/thread.py 
        
        so I can properly get the function object the same way it was done there.
        """
        if len(args) >= 2:
            self, fn, *args = args
            fn = self.generate_thread_closing_wrapper(fn=fn)
        elif not args:
            raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
                        "needs an argument")
        elif 'fn' in kwargs:
            fn = self.generate_thread_closing_wrapper(fn=kwargs.pop('fn'))
            self, *args = args
    
        return super(self.__class__, self).submit(fn, *args, **kwargs)

Тогава можете просто да използвате това:

    with DjangoConnectionThreadPoolExecutor(max_workers=15) as executor:
        results = list(executor.map(func, args_list))

...и бъдете уверени, че връзките ще се затворят.




  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Репликация на Londiste с PostgreSQL 9.0

  2. Как мога да обединя връзки с помощта на psycopg и gevent?

  3. Можем ли да използваме DDL команди в подготвен израз (PostgreSQL)?

  4. SQL подзаявки в ограничение за проверка

  5. Как да извикам функцията на Postgres, връщаща SETOF запис?