[.NET] Thread Safe Collection

[.NET] : Thread Safe Collection


前言 :

最近為了多執行緒程式分享資料集合,搞的焦頭爛額。
主要的問題點卡在,
當有一條執行緒使用 foreach列舉資料集合的時候,另外一條執行緒去變更資料集合。
這時候會發生Exception,通知說在列舉的同時資料集合被變更。

當下最先想到的解決方案是,使用lock在讀寫資料集合的時候做鎖定。
這樣的確可以解決問題,
但是因為不論讀寫都先lock,這樣會降低程式執行的效能。
並且這樣的寫法,要求使用資料集合的程式碼必須要記得做lock動作,不然會發生錯誤。


上網搜尋到這篇「再談程式多工(III)─執行緒安全與資料集合」,剛好解決了我遇到的問題點。:D

整篇文章主要的思路就是,
使用 ReaderWriterLockSlim,建立IEnumerable、IEnumerator介面封裝lock動作。
在讀取、列舉資料集合的時候,做唯讀的Lock。
在寫入資料集合的時候,做寫入的Lock並且不允許唯讀的的Lock。

依照這樣的方式建立起來的資料集合,
在使用外觀上因為是建立IEnumerable、IEnumerator介面封裝lock動作,使用資料集合的程式碼可以不用注意lock動作。
並且讀寫兩種lock分離,可以在大量取資料,少量寫資料的情景下,達成最大的效率。


本篇文章依照「再談程式多工(III)─執行緒安全與資料集合」的思路,整理出可擴展的Thread safe Collection。
留個紀錄,也希望能幫助到有同樣問題的開發人員。 :D


程式碼 :

ThreadSafeReaderWriterLock :
首先建立 ThreadSafeReaderWriterLock用來完善 ReaderWriterLock的Dipose功能。
參考資料 : 「再談程式多工(I)──升級版的資料鎖定和等待機制

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace CLK.Threading
{
    public class ThreadSafeReaderWriterLock : IDisposable
    {
        // Fields
        private readonly ReaderWriterLockSlim _readerWriterLock = new ReaderWriterLockSlim();

        private readonly CountdownEvent _countdownEvent = new CountdownEvent(1);


        // Constructor
        public ThreadSafeReaderWriterLock() 
        { 

        }

        public void Dispose()
        {
            _countdownEvent.Signal();
            _countdownEvent.Wait();
            _countdownEvent.Dispose();
            _readerWriterLock.Dispose();
        }
        

        // Methods
        public void EnterReadLock()
        {
            _countdownEvent.AddCount();
            _readerWriterLock.EnterReadLock();
        }

        public void ExitReadLock()
        {
            _readerWriterLock.ExitReadLock();
            _countdownEvent.Signal();
        }

        public void EnterWriteLock()
        {
            _countdownEvent.AddCount();
            _readerWriterLock.EnterWriteLock();
        }

        public void ExitWriteLock()
        {
            _readerWriterLock.ExitWriteLock();
            _countdownEvent.Signal();
        }
    }
}

ThreadSafeEnumerator<T> :
接著建立 ThreadSafeEnumerator<T>,用以處理使用foeach做列舉的lock動作。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using CLK.Threading;

namespace CLK.Collections.Concurrent
{
    public sealed class ThreadSafeEnumerator<T> : IEnumerator<T>, IDisposable
    {
        // Fields         
        private readonly ThreadSafeReaderWriterLock _readerWriterLock = null;

        private readonly IEnumerator<T> _component = null;        


        private readonly object _syncRoot = new object();

        private bool _disposed = false;


        // Constructor
        public ThreadSafeEnumerator(Func getEnumeratorDelegate, ThreadSafeReaderWriterLock readerWriterLock)
        {
            #region Require

            if (getEnumeratorDelegate == null) throw new ArgumentNullException();
            if (readerWriterLock == null) throw new ArgumentNullException();

            #endregion

            // ReaderWriterLock
            _readerWriterLock = readerWriterLock;
            _readerWriterLock.EnterReadLock();

            // Component
            _component = getEnumeratorDelegate();
        }

        public void Dispose()
        {
            // Require
            lock (_syncRoot)
            {
                if (_disposed == true) return;
                _disposed = true;
            }

            // Component
            _component.Dispose();

            // ReaderWriterLock            
            _readerWriterLock.ExitReadLock();
        }


        // Properties
        public T Current
        {
            get { return _component.Current; }
        }

        object System.Collections.IEnumerator.Current
        {
            get { return this.Current; }
        }


        // Methods
        public bool MoveNext()
        {
            return _component.MoveNext();
        }

        public void Reset()
        {
            _component.Reset();
        }
    }
}

ThreadSafeEnumerable<T> :
再來建立 ThreadSafeEnumerable<T>,來處理讀寫資料 lock動作。並且他也是後續擴展資料集合的基礎物件。

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using CLK.Threading;

namespace CLK.Collections.Concurrent
{
    public class ThreadSafeEnumerable<T> : IEnumerable<T>, IDisposable
    {
        // Fields         
        private readonly ThreadSafeReaderWriterLock _readerWriterLock = null;

        private readonly IEnumerable<T> _component = null;       


        private readonly object _syncRoot = new object();

        private bool _disposed = false;


        // Constructor
        protected ThreadSafeEnumerable(IEnumerable<T> component)
        {
            #region Require
           
            if (component == null) throw new ArgumentNullException();

            #endregion

            // ReaderWriterLock
            _readerWriterLock = new ThreadSafeReaderWriterLock();

            // Component
            _component = component;            
        }

        public void Dispose()
        {
            // Require
            lock (_syncRoot)
            {
                if (_disposed == true) return;
                _disposed = true;
            }

            // ReaderWriterLock            
            _readerWriterLock.Dispose();

            // Component
            if (_component is IDisposable)
            {
                ((IDisposable)_component).Dispose();
            }
        }


        // Methods
        protected void EnterReadLock()
        {
            _readerWriterLock.EnterReadLock();
        }

        protected void ExitReadLock()
        {
            _readerWriterLock.ExitReadLock();
        }

        protected void EnterWriteLock()
        {
            _readerWriterLock.EnterWriteLock();
        }

        protected void ExitWriteLock()
        {
            _readerWriterLock.ExitWriteLock();
        }


        public IEnumerator<T> GetEnumerator()
        {
            return new ThreadSafeEnumerator<T>(_component.GetEnumerator, _readerWriterLock);
        }

        System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
        {
            return this.GetEnumerator();
        }
    }
}

ThreadSafeCollection<T> :
這是第一個擴展的資料集合,是以ThreadSafeEnumerable<T>為基礎,並且實作ICollection<T>的介面功能。
特別要注意的是,讀寫資料集合式採用不同的Lock動作。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ComponentModel;
using System.Threading;

namespace CLK.Collections.Concurrent
{
    public class ThreadSafeCollection<T> : ThreadSafeEnumerable<T>, ICollection<T>
    {
        // Fields         
        private readonly ICollection<T> _component = null;


        // Constructor
        public ThreadSafeCollection() : this(new List<T>()) { }

        protected ThreadSafeCollection(ICollection<T> component) : base(component)
        {
            #region Require
           
            if (component == null) throw new ArgumentNullException();

            #endregion

            // Component
            _component = component;            
        }

        
        // Properties
        public int Count
        {
            get
            {
                try
                {
                    this.EnterReadLock();
                    return _component.Count;
                }
                finally
                {
                    this.ExitReadLock();
                }
            }
        }

        public bool IsReadOnly
        {
            get
            {
                try
                {
                    this.EnterReadLock();
                    return _component.IsReadOnly;
                }
                finally
                {
                    this.ExitReadLock();
                }
            }
        }


        // Methods
        public void Add(T item)
        {
            try
            {
                this.EnterWriteLock();
                _component.Add(item);
            }
            finally
            {
                this.ExitWriteLock();
            }
        }

        public bool Remove(T item)
        {
            try
            {
                this.EnterWriteLock();
                return _component.Remove(item);
            }
            finally
            {
                this.ExitWriteLock();
            }
        }

        public void Clear()
        {
            try
            {
                this.EnterWriteLock();
                _component.Clear();
            }
            finally
            {
                this.ExitWriteLock();
            }
        }       


        public bool Contains(T item)
        {
            try
            {
                this.EnterReadLock();
                return _component.Contains(item);
            }
            finally
            {
                this.ExitReadLock();
            }
        }

        public void CopyTo(T[] array, int arrayIndex)
        {
            try
            {
                this.EnterReadLock();
                _component.CopyTo(array, arrayIndex);
            }
            finally
            {
                this.ExitReadLock();
            }
        }        
    }
}


ThreadSafeList<T> :
最後再擴展一個資料集合ThreadSafeList<T> ,是以ThreadSafeCollection<T>為基礎,並且實作IList<T>的介面功能。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace CLK.Collections.Concurrent
{
    public class ThreadSafeList<T> : ThreadSafeCollection<T>, IList<T>
    {
        // Fields         
        private readonly IList<T> _component = null;


        // Constructor
        public ThreadSafeList() : this(new List<T>()) { }

        protected ThreadSafeList(IList<T> component)
            : base(component)
        {
            #region Require

            if (component == null) throw new ArgumentNullException();

            #endregion

            // Component
            _component = component;
        }


        // Properties
        public T this[int index]
        {
            get
            {
                try
                {
                    this.EnterReadLock();
                    return _component[index];
                }
                finally
                {
                    this.ExitReadLock();
                }
            }
            set
            {
                try
                {
                    this.EnterWriteLock();
                    _component[index] = value;
                }
                finally
                {
                    this.ExitWriteLock();
                }
            }
        }


        // Methods
        public void Insert(int index, T item)
        {
            try
            {
                this.EnterWriteLock();
                _component.Insert(index, item);
            }
            finally
            {
                this.ExitWriteLock();
            }
        }

        public void RemoveAt(int index)
        {
            try
            {
                this.EnterWriteLock();
                _component.RemoveAt(index);
            }
            finally
            {
                this.ExitWriteLock();
            }
        }


        public int IndexOf(T item)
        {
            try
            {
                this.EnterReadLock();
                return _component.IndexOf(item);
            }
            finally
            {
                this.ExitReadLock();
            }
        }
    }
}


後記 :

依照上面擴展資料集合的方式,可以擴展出例如 IDictionary等等的各種Thread Safe Collection。 :D

期許自己
能以更簡潔的文字與程式碼,傳達出程式設計背後的精神。
真正做到「以形寫神」的境界。