[Windows Azure][IT鐵人賽系列] Day 16 - Storage Service (3): Queue Storage

學過資料結構的人一定都聽過Stack和Queue吧,Stack是後進先出(LIFO),而Queue則是先進先出(FIFO)的資料結構,商用系統的實務開發上,Queue的應用範圍比Stack要大的多了,因為在實務上會用到先進先出的案例太多了,舉凡線上訂位(購買)、抽號碼牌、選位等等商用的需求都會要求先進先出的條件,故Queue的應用範圍會比Stack大的多,微軟當然也很清楚這一點,所以在Storage中也實作了一個專門處理Queue的服務,即為Queue Storage。

學過資料結構的人一定都聽過Stack和Queue吧,Stack是後進先出(LIFO),而Queue則是先進先出(FIFO)的資料結構,商用系統的實務開發上,Queue的應用範圍比Stack要大的多了,因為在實務上會用到先進先出的案例太多了,舉凡線上訂位(購買)、抽號碼牌、選位等等商用的需求都會要求先進先出的條件,故Queue的應用範圍會比Stack大的多,微軟當然也很清楚這一點,所以在Storage中也實作了一個專門處理Queue的服務,即為Queue Storage。

Queue Storage和BLOB、Table一樣,都有容器的概念,Queue的容器被稱為通道(tunnel),每個通道內的訊息都是先進先出,一支應用程式可以同時生成多個通道,並且在通道內放不同的訊息,通道的另一端可以是程式,也可以是在不同虛擬機器內的不同角色,像是Web Role傳遞訊息給後面負責處理的Worker Role,就能運用Queue Storage來實作,所以Queue Storage也會用在不同角色或不同虛擬機器間程式的通訊。

clip_image002[4]

佇列訊息(Queue Message)則是在通道內流通的資料,為佇列通訊的主體,它可以裝載可二進位序列化(Binary Serializable)的物件資料,這代表在佇列中可用的資料類型非常彈性,我們可以直接將物件裝到佇列中,只要兩端應用程式都擁有相同的型別資訊的話,就可以很容易的將物件還原回原始的物件型別。但佇列內的資料最長為8KB,所以不能將完整的資料放到佇列中,但能配合BLOB或Table來存放資料,再以佇列訊息搭載參數與資料所在的URL給處理程式即可。

Queue通道提供了不同的方法,以支援佇列通道內的處理訊息插入與擷取的各項作業,Queue通道支援兩種不同的訊息擷取方法:

  • GetMessage(s):將訊息由佇列通道中取出,並且將訊息的NextVisibleTime設為指定的下次的有效時間(可由程式控制,或是預設30秒後)。
  • PeekMessage(s):將訊息由佇列通道中取出,但是不會檢查NextVisibleTime,也不會修改NextVisibleTime以控制訊息的可見性。

 

每一個佇列訊息都有一個有效期間,預設為7天,只要程式沒有下指令將訊息明確自通道中刪除,則訊息會一直存在,直到有效期間到期為止(GetMessage()會讓訊息狀態設為不可見,但並不會刪除訊息),這樣的作法是基於佇列的強固性(Reliability),讓佇列另一端的處理程式一定可以得到訊息。但副作用就是毒訊息(Poison Message),當處理程式有多個個體時,當其中一個個體取回訊息但在處理時當機了,其他的個體會在訊息恢復可見(visible)時再次由佇列取得,如果沒有做檢查的話,很容易讓訊息被重覆處理,所以應用程式必須要做檢查,確認訊息沒有被處理,否則就會被毒訊息的副作用影響。

Queue的程式處理非常簡單,服務的入口為CloudQueueClient,首先要先建立訊息通道,使用CloudQueueClient.GetQueueReference()得到CloudQueue的參考後,再用CloudQueue.CreateIfNotExist()建立通道:

public override bool OnStart()
{
    ...

    this._storageAccount = CloudStorageAccount.FromConfigurationSetting("QueueDataSource");
    this._queueClient = this._storageAccount.CreateCloudQueueClient();
    this._queue = this._queueClient.GetQueueReference("cmdqueue");
    this._queue.CreateIfNotExist();
    this._queueOutput = this._queueClient.GetQueueReference("resultqueue");
    this._queueOutput.CreateIfNotExist();

    return base.OnStart();
}

 

 

然後,使用CloudQueueMessage物件生成佇列訊息後,用AddMessage()將訊息推入通道內,而處理程式則可用GetMessage()取回訊息,處理完後呼叫DeleteMessage()刪除它,以避免被重覆處理:

protected void cmdCalc_Click(object sender, EventArgs e)
{
    CloudStorageAccount storageAccount =
CloudStorageAccount.FromConfigurationSetting("QueueDataSource");
    CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
    CloudQueue queue = queueClient.GetQueueReference("cmdqueue");

    CloudQueueMessage queueMessage =
new CloudQueueMessage(this.txtPrimeFrom.Text + ";" + this.txtPrimeTo.Text);
    queue.AddMessage(queueMessage);

    storageAccount = null;
    queueClient = null;
    queue = null;
    queueMessage = null;

    Response.Redirect("~/default.aspx");
}

protected void Page_Load(object sender, EventArgs e)
{
    DataTable queueResults = null;

    if (Session["lastQueueResult"] == null)
    {
        queueResults = new DataTable();
        queueResults.Columns.Add("ReturnTime", typeof(string));
        queueResults.Columns.Add("Range", typeof(string));
        queueResults.Columns.Add("Result", typeof(string));

        Session["lastQueueResult"] = queueResults;
    }

    CloudStorageAccount storageAccount =
CloudStorageAccount.FromConfigurationSetting("QueueDataSource");
    CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
    CloudQueue queue = queueClient.GetQueueReference("resultqueue");

    queue.RetrieveApproximateMessageCount();

    if (queue.ApproximateMessageCount > 0)
    {
        queueResults = Session["lastQueueResult"] as DataTable;
        IEnumerable<CloudQueueMessage> queueResultMessages = queue.GetMessages(32);

        foreach (CloudQueueMessage queueResultMessage in queueResultMessages)
        {
            queueResults.Rows.Add(new object[] {
                DateTime.Now.ToString(),
                queueResultMessage.AsString.Split('$')[0],
                queueResultMessage.AsString.Split('$')[1]
            });

            queue.DeleteMessage(queueResultMessage);
        }               

        Session["lastQueueResult"] = queueResults;
    }

    this.gvPrimeQueueResult.DataSource = (Session["lastQueueResult"] as DataTable);
    this.gvPrimeQueueResult.DataBind();
}

與BLOB和Table Storage一樣,Queue Storage也有一些使用限制:

  • 容器必須全部小寫字母組成,且只能用英文半型字,數字字元以及分隔符號 (“-“) 。
  • 容器名稱的第一個字和最後一個字元必須是英文字,分隔符號 (“-“) 不可以出現在第一個或最後一個字元。
  • 長度必須要是 3-63 個字元。

 

Reference:

http://msdn.microsoft.com/en-us/windowsazure/wazplatformtrainingcourse_exploringwindowsazurestoragevs2010_topic5#_Toc303848598