C# 8.0 搶先看 -- Async Stream (1)

C# 8.0 隨著 Visual Studio 2019 preview 版本的發布出世了,過去幾次 C# 的更版都帶來不少新功能,此次的 8.0 版也不例外。其中一個很有趣的功能就是 Async Stream,我打算花幾個篇幅來介紹這玩意兒。

本篇文章使用環境
開發環境 Visual Studio 2019 Preview 1 (16.0.0 Preview 1)
框架       .NET Core 3.0.0-preview-27122-01
編譯器    C# 8.0 beta
 Async Stream?

Async Stream 簡單說來是一種非同步的迭代器模式,說更白一點就是可以 await 的 foreach。在過去的 C# 中如果要回傳一個可迭代的 IEnumerable<T> ,我們可能會這樣寫:

    public class EnumerableProcess
    {
        async static public Task<IEnumerable<string>> ReadLineAsync(string path)
        {           
            List<string> list = new List<string>();
            using (StreamReader reader = File.OpenText(path))
            {
                while (await reader.ReadLineAsync() is string result)
                {
                    list.Add(result);
                    await Task.Delay(100);
                }
            }
            return list;
        }
    }

這是一個非同步一行一行讀取文字檔的例子,這個例子裡的回傳型別是一個 Task<IEnumerable<string>>  ,外部程式碼將會這樣呼叫這個方法:

 var r = await EnumerableProcess.ReadLineAsync(path);
 foreach (var item in r)
 {
     Console.WriteLine(item);
 }

這造成一個長時等待的問題,因為呼叫端必須等待 ReadLineAsync 這個 Task 整個完成後才能回傳;所以 C# 8.0 引入了 Async Stream 使得非同步的迭代得以實現, 這件事情不僅僅牽涉到編譯器,也需要一些新的型別,主要是以下三個:

(1) IAsyncDisposable -- IAsyncEnumerator<out T> 將會拓展這個介面

public interface IAsyncDisposable
{
    ValueTask DisposeAsync();
}

(2) IAsyncEnumerator<out T>
 

public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
    T Current { get; }

    ValueTask<bool> MoveNextAsync();
}   

(3) IAsyncEnumerable<out T>

public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetAsyncEnumerator();
}
實作 Async Stream

由於此時在框架中對於整個 Async Stream 的實作尚未完整,所以沒辦法直接使用 yield return,先示範最基本的寫法,建立一個類別,並且實作以上介面:

    sealed class AsyncFileProcess : IAsyncEnumerable<string>, IAsyncEnumerator<string>
    {
        private readonly StreamReader _reader;

        private bool _disposed;
        public AsyncFileProcess(string path)
        {
            _reader = File.OpenText(path);
            _disposed = false;
        }

        public string Current { get; private set; }
        public IAsyncEnumerator<string> GetAsyncEnumerator()
        {
            return this;
        }
        async public ValueTask<bool> MoveNextAsync()
        {
            await Task.Delay(100);            
            var result = await _reader.ReadLineAsync();
            Current = result;
            return result != null;
        }

        async public ValueTask DisposeAsync()
        {
            await Task.Run(() => Dispose());
        }

        private void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        private void Dispose(bool disposing)
        {
            if (!this._disposed)
            {
                if (_reader != null)
                {
                    _reader.Dispose();
                }
                _disposed = true;
            }
        }
    }

呼叫端就可以這樣呼叫它:

var process = new AsyncFileProcess("SourceFile.txt");
try
{
    await foreach (var s in process)
    {
        Console.WriteLine(s);
    }
   
    Console.ReadLine();
}
finally
{
    await process.DisposeAsync();
}

你可以感受到第一個例子是停頓了很久之後,蹦一下全跳出來;而第二的例子則會一行行跑出來 (為了強化這個效果在兩方都加了 Task.Delay )。在第二個例子的呼叫端可以看到 await foreach 的使用。

這一篇先介紹到此,下一篇來談目前無法使用 yield return 的問題以及暫時的解決方案。