Синхронизация потоков третьего уровня с потоком второго уровня

В одной из старых тем меня научили (кажется, AlexP), что для отображения прогресса в потоке можно использовать IProgress. Выглядеть это будет примерно так:

        private async void button1_Click(object sender, EventArgs e)
        {
            button1.Enabled = false;

            Progress<int> progress = new Progress<int>();
            progress.ProgressChanged += (s, n) =>
            {
                Text = n.ToString();
            };
            await Task.Run(() =>
            {
                IProgress<int> reporter = progress;
                int i = 0;
                while (i < 100)
                {
                    Thread.Sleep(100);
                    reporter.Report(i++);
                }
            });

            button1.Enabled = true;
        }

Здесь у нас форма - это поток первого уровня (скажем так). А таска - это поток второго уровня.
Если в этой таске нужно запустить ещё кучку потоков - это уже будут потоки третьего уровня. Они все должны синхронизироваться со вторым, а второй в этот момент должен синхронизироваться с первым. Правильно? :thinking:
Код усложняется:

        private async void button1_Click(object sender, EventArgs e)
        {
            button1.Enabled = false;

            Progress<int> progress = new Progress<int>();
            progress.ProgressChanged += (s, n) =>
            {
                Text = n.ToString();
            };
            await Task.Run(async () =>
            {
                IProgress<int> reporter = progress;

                int sum = 0;

                Progress<ProgressItem> progressTask = new Progress<ProgressItem>();
                progressTask.ProgressChanged += (s, item) =>
                {
                    sum += item.Value;
                    reporter.Report(sum);
                };

                int[] ids = new int[] { 0, 1, 2, 3, 4, 5 };
                var tasks = ids.Select(id => Task.Run(() =>
                {
                    IProgress<ProgressItem> reporterTask = progressTask;

                    int i = 0;
                    while (i < 100)
                    {
                        Thread.Sleep(100);
                        reporterTask.Report(new ProgressItem(id, i++));
                    }
                }));
                await Task.WhenAll(tasks);
            });

            button1.Enabled = true;
        }

Правильно ли я понимаю, что IProgress работает за счёт контекста синхонизации потока формы? А если перенести этот код, скажем, в библиотеку, то он перестанет работать, потому что по-умолчанию там никакого контекста нет?

Оно берет контекст из SynchronizationContext.Current, так что наверно будет.

Ну и работает не IProgress, а Progress. В сложных ситуациях можно сделать/найти другую реализацию.

Перенёс код в библиотеку. Там SynchronizationContext.Current равен null. Но при этом, Progress и IProgress как-то работают. Во всяком случае, крашей я не наблюдаю :man_shrugging:
Откуда тогда берётся контекст? :thinking:

Накидал новую скачиваалку.
FileDownloader:

using System;
using System.IO;
using System.Net;

namespace MultitaskerLib
{
    public class FileDownloader
    {
        public string Url { get; set; }
        public long DownloadedBytes { get; private set; }

        public const int ERROR_SIZE_MISMATCH = -1;
        public const int ERROR_NO_CONTENT = -2;

        public Action<object, long, long> Progressed;

        public int Download(Stream outputStream, long byteFrom, long byteTo)
        {
            try
            {
                DownloadedBytes = 0L;

                HttpWebRequest request = (HttpWebRequest)WebRequest.Create(Url);
                request.Method = "GET";
                request.AddRange(byteFrom, byteTo);

                using (HttpWebResponse response = (HttpWebResponse)request.GetResponse())
                {
                    int errorCode = (int)response.StatusCode;
                    if (errorCode != 200 && errorCode != 206) { return errorCode; }

                    Stream stream = response.GetResponseStream();
                    long contentLength = response.ContentLength;
                    if (contentLength <= 0L)
                    {
                        stream.Close();
                        return ERROR_NO_CONTENT;
                    }

                    byte[] buffer = new byte[4096];
                    int start = Environment.TickCount;
                    do
                    {
                        int bytesRead = stream.Read(buffer, 0, buffer.Length);
                        if (bytesRead <= 0) { break; }
                        outputStream.Write(buffer, 0, bytesRead);
                        DownloadedBytes += bytesRead;

                        int end = Environment.TickCount;
                        if (end - start >= 50)
                        {
                            Progressed?.Invoke(this, DownloadedBytes, contentLength);
                            start = end;
                        }
                    } while (true);

                    if (DownloadedBytes != contentLength)
                    {
                        return ERROR_SIZE_MISMATCH;
                    }

                    return errorCode;
                }
            } catch (Exception ex)
            {
                System.Diagnostics.Debug.WriteLine(ex.Message);
                return ex.HResult;
            }
        }

        public static int GetContentLength(string url, out long contentLength)
        {
            try
            {
                HttpWebRequest request = (HttpWebRequest)WebRequest.Create(url);
                request.Method = "HEAD";

                using (HttpWebResponse response = (HttpWebResponse)request.GetResponse())
                {
                    contentLength = response.ContentLength;
                    return (int)response.StatusCode;
                }
            }
            catch (Exception ex)
            {
                System.Diagnostics.Debug.WriteLine(ex.Message);
                contentLength = -1L;
                return ex.HResult;
            }
        }
    }
}

Multitasker:

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;

namespace MultitaskerLib
{
	public class Multitasker
	{
		public long RangeFrom { get; set; } = 0L;
		public long RangeTo { get; set; } = -1L;

		public Action<object, long, long> Progressed;
		public Action<object> Finished;
		public Action<object> PostFinished;

		public async Task<int> Download(string url)
		{
			System.Diagnostics.Debug.WriteLine(SynchronizationContext.Current);
			Progress<Dictionary<int, ProgressItem>> progress = new Progress<Dictionary<int, ProgressItem>>();
			progress.ProgressChanged += (s, item) =>
			{
				long max = item.Sum(o => o.Value.Size);
				long downloaded = item.Sum(o => o.Value.Value);
				Progressed?.Invoke(this, downloaded, max);
			};

			int e = FileDownloader.GetContentLength(url, out long contentLength);
			if ((e != 200 && e != 206) || contentLength <= 0L)
			{
				return FileDownloader.ERROR_NO_CONTENT;
			}

			Dictionary<int, ProgressItem> dictionary = new Dictionary<int, ProgressItem>();

			IProgress<Dictionary<int, ProgressItem>> reporter = progress;

			Progress<ProgressItem> progressTask = new Progress<ProgressItem>();
			progressTask.ProgressChanged += (s, item) =>
			{
				dictionary[item.TaskId] = item;
				reporter.Report(dictionary);
			};

			var ranges = SplitContentToChunks(contentLength, 25);
			var tasks = ranges.Select((range, taskId) => Task.Run(() =>
			{
				long byteStart = range.Item1;
				long byteEnd = range.Item2;

				Stream streamTask = new MemoryStream();
				System.Diagnostics.Debug.WriteLine($"{taskId}: {SynchronizationContext.Current}");
				IProgress<ProgressItem> reporterTask = progressTask;

				FileDownloader d = new FileDownloader() { Url = url };
				d.Progressed += (s, n, max) =>
				{
					double percent = 100.0 / max * n;
					string percentFormatted = string.Format("{0:F2}", percent);
					string t = $"{taskId}: {n} / {max} ({percentFormatted}%)";
					System.Diagnostics.Debug.WriteLine(t);
					ProgressItem progressItem = new ProgressItem(taskId, max, n);
					reporterTask.Report(progressItem);
				};

				int errorCode = d.Download(streamTask, byteStart, byteEnd);
				if (errorCode != 200 && errorCode != 206)
				{
					throw new Exception("WRONG RESPONSE CODE");
				}
			}));

			try
			{
				await Task.WhenAll(tasks);
			}
			catch (Exception ex)
			{
				System.Diagnostics.Debug.WriteLine("WhenAll() is crashed");
				System.Diagnostics.Debug.WriteLine(ex.Message);
				return ex.HResult;
			}
			Finished?.Invoke(this);
			PostFinished?.Invoke(this);

			return 200;
		}

        private IEnumerable<Tuple<long, long>> SplitContentToChunks(long contentLength, int chunkCount)
        {
			const int MEGABYTE = 1048576; //1024 * 1024;
			long contentLengthRanged = RangeTo >= 0L ? RangeTo - RangeFrom : contentLength - RangeFrom;
            if (chunkCount <= 1 || contentLengthRanged <= MEGABYTE)
            {
                long byteTo = RangeTo >= 0L ? RangeTo : contentLengthRanged + RangeFrom - 1;
                yield return new Tuple<long, long>(RangeFrom, byteTo);
                yield break;
            }

            long chunkSize = contentLengthRanged / chunkCount;
            long startPos = RangeFrom;
            for (int i = 0; i < chunkCount; ++i)
            {
                bool lastChunk = i == chunkCount - 1;
                long endPos = lastChunk ? (RangeTo >= 0 ? RangeTo : contentLength - 1) : (startPos + chunkSize);

                yield return new Tuple<long, long>(startPos, endPos);

                if (!lastChunk) { startPos += chunkSize + 1; }
            }
        }
        
		public static void SetMaximumConnectionsLimit(int limit)
		{
			ServicePointManager.DefaultConnectionLimit = limit;
		}
	}
}

Форма:

        private void Form1_Load(object sender, EventArgs e)
        {
            Multitasker.SetMaximumConnectionsLimit(100);
        }

        private async void button1_Click(object sender, EventArgs e)
        {
            button1.Enabled = false;
            progressBar1.Value = 0;
            progressBar1.Maximum = 100;

            Multitasker m = new Multitasker();
            m.Progressed += (s, n, max) =>
            {
                Invoke(new MethodInvoker(() =>
                {
                    Text = $"{n} / {max}";
                }));
            };
            m.Finished += (s) =>
            {
                Invoke(new MethodInvoker(() => progressBar1.Value = 100));
            };
            m.PostFinished += (s) =>
            {
                Invoke(new MethodInvoker(() => progressBar1.Maximum = 25));
            };

            string url = "https://введите ссылку";
            int err = await Task.Run(() => m.Download(url));
            System.Diagnostics.Debug.WriteLine(err);

            button1.Enabled = true;
        }

Через какое-то время после начала скачивания, выдаёт ошибку:


SynchronizationContext.Current в тасках равен null. Не важно, в библиотеке или нет.
Получается, что надо вручную создать контекст и использовать его вместо Progress? :thinking:

Переписал вот так:

	public class Multitasker
	{
		public long RangeFrom { get; set; } = 0L;
		public long RangeTo { get; set; } = -1L;

		private SynchronizationContext _synchronizationContext;
		public Action<object, long, long> Progressed;
		public Action<object> Finished;
		public Action<object> PostFinished;

		public Multitasker(SynchronizationContext synchronizationContext)
		{
			_synchronizationContext = synchronizationContext;
		}

		public async Task<int> Download(string url)
		{
			int e = FileDownloader.GetContentLength(url, out long contentLength);
			if ((e != 200 && e != 206) || contentLength <= 0L)
			{
				return FileDownloader.ERROR_NO_CONTENT;
			}

            SynchronizationContext context = new SynchronizationContext();
            
			Dictionary<int, ProgressItem> dictionary = new Dictionary<int, ProgressItem>();

			var ranges = SplitContentToChunks(contentLength, 25);
			var tasks = ranges.Select((range, taskId) => Task.Run(() =>
			{
				long byteStart = range.Item1;
				long byteEnd = range.Item2;

				Stream streamTask = new MemoryStream();

				FileDownloader d = new FileDownloader() { Url = url };
				d.Progressed += (s, n, max) =>
				{
					context.Send(obj =>
					{
						double percent = 100.0 / max * n;
						string percentFormatted = string.Format("{0:F2}", percent);
						string t = $"{taskId}: {n} / {max} ({percentFormatted}%)";
						System.Diagnostics.Debug.WriteLine(t);
						ProgressItem progressItem = new ProgressItem(taskId, max, n);
                        _synchronizationContext.Send(obj2 =>
                        {
                            dictionary[progressItem.TaskId] = progressItem;
                            long fileSize = dictionary.Sum(o => o.Value.Size);
                            long downloaded = dictionary.Sum(o => o.Value.Value);
                            Progressed?.Invoke(this, downloaded, fileSize);
                        }, null);
					}, null);
				};

				int errorCode = d.Download(streamTask, byteStart, byteEnd);
				if (errorCode != 200 && errorCode != 206)
				{
					throw new Exception("WRONG RESPONSE CODE");
				}
			}));

			try
			{
				await Task.WhenAll(tasks);
			}
			catch (Exception ex)
			{
				System.Diagnostics.Debug.WriteLine("WhenAll() is crashed");
				System.Diagnostics.Debug.WriteLine(ex.Message);
				return ex.HResult;
			}
			Finished?.Invoke(this);
			PostFinished?.Invoke(this);

			return 200;
		}

Теперь в конструктор передаю SynchronizationContext от потока формы и в методе Download() его использую.
Я же правильно это понял? :thinking:
Краши сразу пропали :man_shrugging: Хотя, я всё-равно не уверен, что код правильный.

Но что если я не хочу передавать контекст потока формы, чтобы не напрягать этот поток постоянными синхронизациями?
То есть, ещё раз. Метод Download() вызван в таске. Значит, своего SynchronizationContext.Current у этого потока нет. Значит, контекст надо создать вручную вызовом

			SynchronizationContext context = new SynchronizationContext();

а потом таски третьего уровня должны использовать этот контекст, вызывая у него метод Send(). Таким образом и происходит синхронизация, правильно? :thinking:
Вот код:

		public async Task<int> Download(string url)
		{
			int e = FileDownloader.GetContentLength(url, out long contentLength);
			if ((e != 200 && e != 206) || contentLength <= 0L)
			{
				return FileDownloader.ERROR_NO_CONTENT;
			}
            
			Dictionary<int, ProgressItem> dictionary = new Dictionary<int, ProgressItem>();
			SynchronizationContext context = new SynchronizationContext();

			var ranges = SplitContentToChunks(contentLength, 25);
			var tasks = ranges.Select((range, taskId) => Task.Run(() =>
			{
				long byteStart = range.Item1;
				long byteEnd = range.Item2;

				Stream streamTask = new MemoryStream();

				FileDownloader d = new FileDownloader() { Url = url };
				d.Progressed += (s, n, max) =>
				{
					double percent = 100.0 / max * n;
					string percentFormatted = string.Format("{0:F2}", percent);
					string t = $"{taskId}: {n} / {max} ({percentFormatted}%)";
					System.Diagnostics.Debug.WriteLine(t);
					ProgressItem progressItem = new ProgressItem(taskId, max, n);
					context.Send(obj =>
					{
						try
						{
							dictionary[progressItem.TaskId] = progressItem;
							long downloaded = dictionary.Sum(o => o.Value.Value);
							Progressed?.Invoke(this, downloaded, contentLength);
						} catch (Exception ex)
						{
							System.Diagnostics.Debug.WriteLine("synchronization is crashed");
							System.Diagnostics.Debug.WriteLine(ex.Message);
						}
					}, null);
				};

				int errorCode = d.Download(streamTask, byteStart, byteEnd);
				if (errorCode != 200 && errorCode != 206)
				{
					throw new Exception($"WRONG RESPONSE CODE: {errorCode}");
				}
			}));

			try
			{
				await Task.WhenAll(tasks);
			}
			catch (Exception ex)
			{
				System.Diagnostics.Debug.WriteLine("WhenAll() is crashed");
				System.Diagnostics.Debug.WriteLine(ex.Message);
				return ex.HResult;
			}
			Finished?.Invoke(this);
			PostFinished?.Invoke(this);

			return 200;
		}

Но это не работает. Довольно часто выбрасывает в экскепшен и выводит в консоль:

synchronization is crashed
Коллекция была изменена; невозможно выполнить операцию перечисления.

То есть, как будто синхронизация вообще не работает, если контекст создан вручную :thinking:
Ещё у класса SynchronizationContext есть метод SetSynchronizationContext(). Может надо созданный контекст туда положить, он там к чему-то привяжется и начнёт работать? :thinking: Это я ещё не пробовал. Попробовал - не работает :man_shrugging:
Но почему работает с контекстом, взятым из потока формы? :thinking: