IProgress, В словарь попадает null

	public async Task<int> Download(
		string outputFilePath,
		int firstChunkId,
		int lastChunkId,
		ChunkDownloadProgressedDelegate chunkDownloadProgressed,
        GroupDownloadCompletedDelegate groupDownloadCompleted,
		bool saveChunkInfo)
	{
		return await Task.Run(async () =>
		{
			const int groupSize = 3;
			string streamRootUrl = VodPlaylist.StreamRoot.EndsWith("/") ?
				VodPlaylist.StreamRoot : $"{VodPlaylist.StreamRoot}/";
			int currentChunkId = firstChunkId;
			while (currentChunkId <= lastChunkId && !_cancellationToken.IsCancellationRequested)
			{
				List<TwitchVodChunk> chunkGroup = new List<TwitchVodChunk>();
				for (int i = 0; i < groupSize && i <= lastChunkId; ++i)
				{
					chunkGroup.Add(VodPlaylist.GetChunk(currentChunkId + i));
				}
				if (chunkGroup.Count > 0)
				{
					Dictionary<int, DownloadItem> dict = new Dictionary<int, DownloadItem>();

					Progress<DownloadItem> progress = new Progress<DownloadItem>();
					progress.ProgressChanged += (s, pi) =>
					{
						dict[pi.TaskId] = pi;

						var items = dict.Values.ToList();
						items.Sort((x, y) => x.TaskId < y.TaskId ? -1 : 1);
						chunkDownloadProgressed?.Invoke(this, items);
					};

					var tasks = chunkGroup.Select((chunk, taskId) => Task.Run(() =>
					{
						Stream chunkStream = null;
						IProgress<DownloadItem> reporter = progress;

						FileDownloader d = new FileDownloader() { Url = streamRootUrl + chunk.FileName };
						d.WorkProgress += (sender, downloadedBytes, contentLength) =>
						{
							DownloadItem downloadItem = new DownloadItem(
								taskId, chunk, contentLength, downloadedBytes, chunkStream,
								(sender as FileDownloader).LastErrorCode);
							reporter.Report(downloadItem);
						};

						d.WorkFinished += (sender, downloadedBytes, contentLength, e) =>
						{
							DownloadItem downloadItem = new DownloadItem(
								taskId, chunk, contentLength, downloadedBytes, chunkStream,
								(sender as FileDownloader).LastErrorCode);
							reporter.Report(downloadItem);
						};

						chunkStream = new MemoryStream();
						return d.Download(chunkStream, 4096, _cancellationTokenSource);
					}));

					try
					{
						await Task.WhenAll(tasks);

						var items = dict.Values.ToList();
						items.Sort((x, y) => x.TaskId < y.TaskId ? -1 : 1);
						groupDownloadCompleted?.Invoke(this, items);
					} catch (Exception ex)
					{
						System.Diagnostics.Debug.WriteLine(ex.Message);
						return ex.HResult;
					}
				}
 
				currentChunkId += groupSize;
			}

			return 0;
		});
	}

Иногда выдаёт NullReferenceException при сортировке.

	progress.ProgressChanged += (s, pi) =>
	{
		dict[pi.TaskId] = pi;

		var items = dict.Values.ToList();
		items.Sort((x, y) => x.TaskId < y.TaskId ? -1 : 1); //здесь
		chunkDownloadProgressed?.Invoke(this, items);
	};

Один из элементов равен null. Я не пойму, откуда он там берётся :thinking: IProgress ведь синхронизирует потоки.

Ранее было.

IProgress ведь синхронизирует потоки

Объясни, что ты вкладываешь в эту фразу. Интерфейс - это таблица в памяти. Элементами таблицы являются адреса функций, являющихся реализациями методов интерфейса. Поясни, где ты видишь код инструкций, отвечающих за синхронизацию? Наверное такие инструкции должны быть в какой-то имплементации этого иннтерфейса, в каком-то классе. Ну назови имя этого класса.

«В этом блоке кода reporter.Report(downloadItem) используется для отправки информации о прогрессе загрузки каждого из фрагментов видео. Этот вызов метода Report вызывается каждый раз, когда происходит обновление прогресса загрузки»

Any handler provided to the constructor or event handlers registered with the ProgressChanged event are invoked through a SynchronizationContext instance captured when the instance is constructed. If there is no current SynchronizationContext at the time of construction, the callbacks will be invoked on the ThreadPool.

Итак, название класса я за тебя выяснил, это SynchronizationContext.

Я не вижу, чтобы в твоём коде использовался один (общий) объект класса SynchronizationContext (про который бы знала каждая нить на момент создания объектов типа Progrеss<T>).

SynchronizationContext context = new SynchronizationContext();

это создаст экземпляр базового класса SynchronizationContext, который не предоставляет никакой функциональности.

Смотрим: https://referencesource.microsoft.com/mscorlib/system/threading/synchronizationcontext.cs.html

        public virtual void Send(SendOrPostCallback d, Object state)
        {
            d(state);
        }
 
        public virtual void Post(SendOrPostCallback d, Object state)
        {
            ThreadPool.QueueUserWorkItem(new WaitCallback(d), state);
        }

И правда, работает через ThreadPool, никакой синхронизации.

Для создания объекта SynchronizationContext, который будет использоваться для синхронизации операций, нужно использовать класс, который наследуется от SynchronizationContext и переопределяет его методы.

В .NET Framework есть несколько классов, которые отнаследованы от SynchronizationContext. Вот некоторые из них:

  1. WindowsFormsSynchronizationContext: Этот класс используется в приложениях Windows Forms для синхронизации вызовов в потоке пользовательского интерфейса. Он предоставляет методы для отправки делегатов в очередь на выполнение в потоке пользовательского интерфейса.

  2. DispatcherSynchronizationContext: Этот класс используется в приложениях WPF и Silverlight для синхронизации вызовов в потоке пользовательского интерфейса. Он предоставляет методы для отправки делегатов в очередь на выполнение в потоке пользовательского интерфейса.

Понятно, что это не наш случай (потому что хочется синхронизировать доступ не к интерфейсу, а к коллекции).

Поэтому есть такие решения:
№1) использовать синхронизированные коллекции;
№2) реализовать свой класс-контекст, куда вписать синхронизацию доступа;
№3) попробовать использовать тот же контекст, который используется UI приложения.

Я так понимаю, что хочется пойти третьим путём.

Чтобы получить объект класса WindowsFormsSynchronizationContext, можно использовать свойство SynchronizationContext.Current в главном потоке приложения. Это свойство возвращает текущий контекст синхронизации, который обычно является экземпляром WindowsFormsSynchronizationContext для приложений Windows Forms. Если вызывать SynchronizationContext.Current не в главном потоке приложения, то он вернет контекст синхронизации для этого потока, если он был установлен. Если контекст синхронизации не был установлен для этого потока, то SynchronizationContext.Current вернет null.

Осталось придумать, как этот объект передать в каждую нить.
Тут два варианта:
№3.1) конструировать объекты Progress в главной нити и передавать их;
№3.2) передавать объект контекста, чтобы он стал известен на момент конструирования Progress.

https://referencesource.microsoft.com/#mscorlib/system/progress.cs,52

        public Progress()
        {
            // Capture the current synchronization context.  "current" is determined by CurrentNoFlow,
            // which doesn't consider the sync ctx flown with an ExecutionContext, avoiding
            // sync ctx reference identity issues where the sync ctx for one thread could be Current on another.
            // If there is no current context, we use a default instance targeting the ThreadPool.
            m_synchronizationContext = SynchronizationContext.CurrentNoFlow ?? ProgressStatics.DefaultContext;
            Contract.Assert(m_synchronizationContext != null);
            m_invokeHandlers = new SendOrPostCallback(InvokeHandlers);
        }

Я думаю, что из этих двух вариантов работоспособный только первый (№3.1), потому что в комментарии к коду же написано “боремся с вами и будем бороться” (я так понимаю, что в конструкторе читается значение SynchronizationContext.CurrentNoFlow, а не SynchronizationContext.Current для текущей нити).

Рекомендуют читать:

Может показаться, что код после открывающей скобки

		return await Task.Run(async () =>
		{
				...

работает как-бы в главной нити, потому что замыкание захватывает значения переменных из главной нити. Но к значению свойства SynchronizationContext.Current это не относится. Объекты Progress фактически конструируются в какой-попало нити, и доступа к контексту синхронизации главной нити у них нет.

А как же нам запихнуть всё-таки что-нибудь в лямбду?

«Use variable capture to “pass in” parameters»

var x = ...;
Task.Run(() =>
{
    // Do something with 'x'
});

Сможем ли мы такое (конструировать снаружи и передавать внутрь) проделать именно с Progress в данном конкретном сценарии? Был бы объект один, не было бы проблем. Но почему-то их хочется много.
(на этом месте я задолбался и пошел лесом, потому что зачем, ну просто зачем? Если можно взять синхронизировнную коллекцию, как я предлагал 2 месяца и 8 дней назад)

Два года назад AlexP скидывал мне пример кода и говорил, что связка Progress<> + IProgress<> специально предназначена, чтобы избежать необходимость синхронизации. То есть, код:

                        progress.ProgressChanged += (s, p) =>
                        {
                            label1.Text = p.ToString();
                        };

Выполнится в том потоке, в котором был создан экземпляр Progress. То есть, проще говоря, синхронизируется автоматически. То есть, можно просто вызвать IProgress.Report() и забить на синхронизацию. Разве я не так понял? :thinking: Не помню, в какой теме это было.
Потом я пытался использовать Progress без многопоточности. И обнаружилось, что в одном потоке это не работает. Были какие-то лютые тормоза и глюки.

Конкретно в коде, который выше, в стартовом посте этой темы, экземпляры Process создаются в разных потоках. AlexP всё правильно говорит, но ты неправильно используешь.

Выполнится в том потоке, в котором был создан экземпляр Progress.

Ну если создаются экземпляры в разных потоках, то и код выполняется в разных потоках. И все эти потоки, разные, значит лезут в одну и ту же несинхронизированную коллекцию.

Давай ты всё-таки сосредоточишься и напишешь, что ты хочешь сделать. Потому что мне лень читать ВСЕ твои сообщения, чтобы составить документ требований.

Так разве не в этом и смысл, чтобы создать в одном, а вызывать из другого? :thinking: А как тогда правильно? Ты предыдущий пост вообще читал? :man_shrugging:

Ну. У меня же коллекция и Progress созданы в одном потоке. Значит

это неправильно, получается? :thinking:

Но читающий тоже должен сосредоточиться, иначе фигня какая-то получается. Каждое новое предложение противоречит предыдущему.

Так, пробую сосредоточиться ещё раз.

  1. ты не предоставляешь твою личную реализацию класса SynchronizationContext;
  2. контекст синхронизации не задан (равен null);
  3. поэтому используется реализация по-умолчанию, которая синхронизацию не делает;
  4. системная-по-умолчанию тебе ничем не обязана и может использовать
    любые нити из ThreadPool, какие захочет, в том числе разные;

Контекст синхронизации равен null, потому что ты создаёшь объекты Progress не в интерфейсной нити приложения.

Так-так-так. Уже что-то интересное. То есть, для правильной работы Progress обязательно нужен SynchronizationContext? То есть, с помощью IProgress нельзя просто так синхронизировать, например, две таски, потому что у них нет SynchronizationContext?

можно, но в типовом случае. твой случай не типовой, поэтому его надо доработать.

для правильной работы Progress обязательно нужен SynchronizationContext?

Здесь у тебя семантика (смысл) словосочетания “правильной работы” не совпадает с пониманием разработчиков. Ты хочешь, чтобы синхронизация была обязательно, а они не хотят.

Выше есть стена текста, там это всё очень подробно описано.

Почему нельзя сразу так и говорить - двумя простыми и короткими предложениями, а не писать вместо этого горы текста, тратя при этом своё и чужое время? :man_facepalming:

Потому что для правильной работы Progress не обязательно нужно принудительно устанавливать SynchronizationContext.

А нельзя просто обращаться к нужным объектам через lock? :thinking: Это же, вроде, делает то же самое, что синхронизация.

Можно. Учитывая, что lock это синтаксический сахар, то можно и без него. Но можно и с ним.

Но разве тебе не интересно доделать реализацию с SynchronizationContext?

Что-нибудь типа

public class ThreadSynchronizationContext : SynchronizationContext
{
   private readonly Thread targetThread;

   public ThreadSynchronizationContext(Thread targetThread)
   {
       this.targetThread = targetThread;
   }

   public override void Post(SendOrPostCallback d, object state)
   {
       if (Thread.CurrentThread == targetThread)
       {
           d(state);
       }
       else
       {
           targetThread.QueueUserWorkItem(s => d(s), state);
       }
   }

   public override void Send(SendOrPostCallback d, object state)
   {
       if (Thread.CurrentThread == targetThread)
       {
           d(state);
       }
       else
       {
           targetThread.Invoke(d, state);
       }
   }
}

Хотя, это, конечно, офигенские накладные расходы. чтобы Invoke работал ведь наверное нужно там event loop в той нити иметь и отдельную специальную нить держать.

https://www.codeproject.com/Articles/31971/Understanding-SynchronizationContext-Part-I
перевод

Можно ли обойтись без перехода управления в другую нить? Просто заблокировать весь доступ:

public class LockSynchronizationContext : SynchronizationContext
{
   private readonly object _lock = new object();

   public LockSynchronizationContext()
   {
   }

   public override void Post(SendOrPostCallback d, object state)
   {
       lock (_lock)
       {
           d(state);
       }
   }

   public override void Send(SendOrPostCallback d, object state)
   {
       lock (_lock)
       {
           d(state);
       }
   }
}

Можно, но в предыдущем варианте другие нити не ждали и в очередь можно несколько сообщений добавить, а тут все нити будут критическую секцию проходить по одной штуке.

Научи меня понимать такие конструкции предложения.
У нас есть объект, к которому нужен доступ из нескольких потоков. Но делать так, понятно, нельзя. У нас есть оператор lock, который позволяет нам это делать. То есть, мы должны его использовать. Но при этом мы знаем, что lock это синтаксический сахар. А это значит, что мы можем не использовать lock. Но при этом, не использовать lock мы не можем.


При чём тут синтаксический сахар?

У нас есть объект, к которому нужен доступ из нескольких потоков. Поэтому мы будем использовать примитивы синхронизации. Есть разные варианты - какие. Один из них Monitor (класс такой). Методы этого класса можно вызывать вручную, а можно использовать синтаксическую конструкцию lock, которая тоже вызывает методы этого класса Monitor, но делает это внутри.

Ну так понятнее. А когда говоришь про что-то одно, то мысль надо строить правильно, в соответствии с российским языком, чтобы в ней противоречий не было.

И вот опять же. То он нужен, то необязателен. Контекст Шрёдингера какой-то.