MongoDB
 sql >> база данни >  >> NoSQL >> MongoDB

System.TimeoutException:Възникна изчакване след 30000ms избиране на сървър с помощта на CompositeServerSelector

Това е много труден проблем, свързан с библиотеката със задачи. Накратко, има твърде много задачи, създадени и планирани, така че една от задачите, които драйверът на MongoDB чака, няма да може да бъде завършена. Отне ми много време, за да разбера, че не е задънена улица, въпреки че изглежда така.

Ето стъпката за възпроизвеждане:

  1. Изтеглете изходния код на драйвера CSharp на MongoDB .
  2. Отворете това решение и създайте конзолен проект вътре и препращайте към проекта на драйвера.
  3. Във основната функция създайте System.Threading.Timer, който ще извика TestTask навреме. Настройте таймера да стартира веднага веднъж. В края добавете Console.Read().
  4. В TestTask използвайте for цикъл, за да създадете 300 задачи, като извикате Task.Factory.StartNew(DoOneThing). Добавете всички тези задачи към списък и използвайте Task.WaitAll, за да изчакате всички да приключат.
  5. Във функцията DoOneThing създайте MongoClient и направете някаква проста заявка.
  6. Сега го стартирайте.

Това ще се провали на същото място, което споменахте:MongoDB.Driver.Core.Clusters.Cluster.WaitForDescriptionChangedHelper.HandleCompletedTask(Task completedTask)

Ако поставите някои точки на прекъсване, ще знаете, че WaitForDescriptionChangedHelper е създал задача за изчакване. След това изчаква някоя от задачите DescriptionUpdate или задачата за изчакване да завърши. Актуализацията на DescriptionUpdate обаче никога не се случва, но защо?

Сега, обратно към моя пример, има една интересна част:стартирах таймер. Ако извикате TestTask директно, той ще работи без проблем. Сравнявайки ги с прозореца Задачи на Visual Studio, ще забележите, че версията с таймер ще създаде много повече задачи от версията без таймер. Нека обясня тази част малко по-късно. Има още една важна разлика. Трябва да добавите редове за отстраняване на грешки в Cluster.cs :

    protected void UpdateClusterDescription(ClusterDescription newClusterDescription)
    {
        ClusterDescription oldClusterDescription = null;
        TaskCompletionSource<bool> oldDescriptionChangedTaskCompletionSource = null;

        Console.WriteLine($"Before UpdateClusterDescription {_descriptionChangedTaskCompletionSource?.Task.Id}, {_descriptionChangedTaskCompletionSource?.Task?.GetHashCode().ToString("F8")}");
        lock (_descriptionLock)
        {
            oldClusterDescription = _description;
            _description = newClusterDescription;

            oldDescriptionChangedTaskCompletionSource = _descriptionChangedTaskCompletionSource;
            _descriptionChangedTaskCompletionSource = new TaskCompletionSource<bool>();
        }

        OnDescriptionChanged(oldClusterDescription, newClusterDescription);
        Console.WriteLine($"Setting UpdateClusterDescription {oldDescriptionChangedTaskCompletionSource?.Task.Id}, {oldDescriptionChangedTaskCompletionSource?.Task?.GetHashCode().ToString("F8")}");
        oldDescriptionChangedTaskCompletionSource.TrySetResult(true);
        Console.WriteLine($"Set UpdateClusterDescription {oldDescriptionChangedTaskCompletionSource?.Task.Id}, {oldDescriptionChangedTaskCompletionSource?.Task?.GetHashCode().ToString("F8")}");
    }

    private void WaitForDescriptionChanged(IServerSelector selector, ClusterDescription description, Task descriptionChangedTask, TimeSpan timeout, CancellationToken cancellationToken)
    {
        using (var helper = new WaitForDescriptionChangedHelper(this, selector, description, descriptionChangedTask, timeout, cancellationToken))
        {
            Console.WriteLine($"Waiting {descriptionChangedTask?.Id}, {descriptionChangedTask?.GetHashCode().ToString("F8")}");
            var index = Task.WaitAny(helper.Tasks);
            helper.HandleCompletedTask(helper.Tasks[index]);
        }
    }

Като добавите тези редове, ще разберете също, че версията без таймер ще се актуализира два пъти, но версията с таймер ще се актуализира само веднъж. А вторият идва от "MonitorServerAsync" в ServerMonitor.cs. Оказа се, че във версията на таймера MontiorServerAsync е изпълнен, но след като минава през ServerMonitor.HeartbeatAsync, BinaryConnection.OpenAsync, BinaryConnection.OpenHelperAsync и TcpStreamFactory.CreateStreamAsync, най-накрая достига до TcpStreamFactory.ResolveEndPointsAsync. Лошото се случва тук:Dns.GetHostAddressesAsync . Този никога не се екзекутира. Ако леко промените кода и го превърнете в:

    var task = Dns.GetHostAddressesAsync(dnsInitial.Host).ConfigureAwait(false);

    return (await task)
        .Select(x => new IPEndPoint(x, dnsInitial.Port))
        .OrderBy(x => x, new PreferredAddressFamilyComparer(preferred))
        .ToArray();

Ще можете да намерите идентификатора на задачата. Поглеждайки в прозореца със задачи на Visual Studio, е съвсем очевидно, че пред него има около 300 задачи. Само няколко от тях се изпълняват, но са блокирани. Ако добавите Console.Writeline във функцията DoOneThing, ще видите, че планировчикът на задачи стартира няколко от тях почти по едно и също време, но след това се забавя до около една в секунда. Така че това означава, че трябва да изчакате около 300 секунди, преди задачата за разрешаване на dns да започне да се изпълнява. Ето защо надвишава времето за изчакване от 30 секунди.

Ето едно бързо решение, ако не правите луди неща:

Task.Factory.StartNew(DoOneThing, TaskCreationOptions.LongRunning);

Това ще принуди ThreadPoolScheduler да стартира нишка незабавно, вместо да чака една секунда, преди да създаде нова.

Това обаче няма да работи, ако правите наистина луди неща като мен. Нека променим for цикъла от 300 на 30 000, дори това решение също може да се провали. Причината е, че създава твърде много нишки. Това отнема ресурси и време. И може да започне процеса на GC. Като цяло може да не успее да завърши създаването на всички тези нишки, преди да изтече времето.

Идеалният начин е да спрете да създавате много задачи и да използвате планировчика по подразбиране, за да ги планирате. Можете да опитате да създадете работен елемент и да го поставите в ConcurrentQueue и след това да създадете няколко нишки като работници, които да консумират елементите.

Ако обаче не искате да променяте твърде много оригиналната структура, можете да опитате следния начин:

Създайте ThrottledTaskScheduler, извлечен от TaskScheduler.

  1. Този ThrottledTaskScheduler приема TaskScheduler като основен, който ще изпълнява действителната задача.
  2. Изхвърлете задачите към основния планировчик, но ако надхвърли ограничението, вместо това го поставете в опашка.
  3. Ако някоя от задачите приключи, проверете опашката и се опитайте да ги изхвърлите в основния планировчик в рамките на лимита.
  4. Използвайте следния код, за да започнете всички тези луди нови задачи:

·

var taskScheduler = new ThrottledTaskScheduler(
    TaskScheduler.Default,
    128,
    TaskCreationOptions.LongRunning | TaskCreationOptions.HideScheduler,
    logger
    );
var taskFactory = new TaskFactory(taskScheduler);
for (var i = 0; i < 30000; i++)
{
    tasks.Add(taskFactory.StartNew(DoOneThing))
}
Task.WaitAll(tasks.ToArray());

Можете да вземете System.Threading.Tasks.ConcurrentExclusiveSchedulerPair.ConcurrentExclusiveTaskScheduler като справка. Това е малко по-сложно от това, от което се нуждаем. Това е за някаква друга цел. Така че не се притеснявайте за онези части, които се движат напред и назад с функцията вътре в класа ConcurrentExclusiveSchedulerPair. Въпреки това не можете да го използвате директно, тъй като той не предава TaskCreationOptions.LongRunning, когато създава задачата за опаковане.

При мен работи. Успех!

P.S.:Причината за наличието на много задачи във версията на таймера вероятно е в TaskScheduler.TryExecuteTaskInline. Ако е в основната нишка, където е създаден ThreadPool, той ще може да изпълни някои от задачите, без да ги поставя в опашката.




  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Pymongo агрегат:филтриране по брой полета номер (динамично)

  2. Как мога да използвам mongodump за изхвърляне на записи, съответстващи на определен период от време?

  3. Как да моделираме система за гласуване с харесвания с MongoDB

  4. Използвайте LIKE/regex с променлива в mongoid

  5. findOneAndUpdate не е функция