[SQLite]大量insert

[SQLite]大量insert

紀錄一下過程

問題描述

把通匯銀行檔(約五千多筆資料)匯入系統,發現寫入速度太慢,等到天荒地老…

解決方式

將要寫入資料庫的資料先暫存起來,再一次寫到資料庫表格,步驟如下:

1. 讀取sqlite資料庫表格的定義,使用的SQL如下:

讀取欄位名稱及型態

3

讀取欄位長度

沒有辦法直接下SQL取得,採用正規表示式將欄位取得。

表格定義


    [Filter1]       [CHAR](1),
    [UnitNo]        [CHAR](3),
    [GroupNo]       [CHAR](4),
    [Filter2]       [CHAR](1),
    [GroupName]     [CHAR](40),
    [GroupNickName] [CHAR](10),
    [Address]       [CHAR](40),
    [Filter3]       [CHAR](1),
    [PhoneSecNum]   [CHAR](3),
    [PhoneNum]      [CHAR](8),
    [Filter4]       [CHAR](1),
    [ValueDate]     [CHAR](7),
    [ChangeType]    [CHAR](1),
    [DefaultCode]   [CHAR](1),
    [Remark]        [CHAR](2),
    [Blank]         [CHAR](8) 
);

正規表示式


             ".*\\[(?<ColumnName>.*)\\]\\s+\\[(?<ColumnType>.*)\\]\\((?<Len" +
             "gth>\\d{1,})\\).*",
           RegexOptions.IgnoreCase
           | RegexOptions.CultureInvariant
           | RegexOptions.IgnorePatternWhitespace
           | RegexOptions.Compiled
           );

可以使用工具Expresso進行測試。

2. 使用交易(Transaction)將要寫入資料庫的資料先暫存起來,再一次寫到資料庫表格。

程式碼


using System.Collections.Generic;
using System.Data;
using System.Data.SQLite;
using System.Diagnostics;
using System.Text.RegularExpressions;
using Utility;

namespace DAL
{
    /// <summary>
    /// 資料存取層模組
    /// 用途:批次資料處理
    /// 作者:林大貓
    /// </summary>
    public class BatchWork
    {
        DataAccessLayerSqlite dal = new DataAccessLayerSqlite();
        /// <summary>
        /// 插入表格指令
        /// </summary>
        /// <param name="TableName">表格名</param>
        /// <param name="condition">where條件</param>
        /// <param name="InData">要輸入的資料陣列</param>
        public void InsertTable(string TableName, string[] InData)
        {
            string sql = string.Empty;
            try
            {
                if (InData.Length == 0)
                {
                    return;
                }
                string columnName = GetColumnName(TableName);
                string columnNameParm = GetColumnNameParm(TableName);
                string[] columnNameArray = GetColumnNameArrary(TableName);
                sql = " insert into " + TableName + "(" + columnName + " ) values ( " + columnNameParm + " )";
                List<SQLiteParameter> sqlParm = new List<SQLiteParameter>();
                for (int i = 0; i < InData.Length; i++)
                {
                    sqlParm.Add(new SQLiteParameter(columnNameArray[i], DbType.String));
                    sqlParm[i].Value = string.Copy(InData[i]);
                }
                dal.NonQueryResultNoTranc(sql, sqlParm.ToArray());
            }
            catch (Exception e)
            {
                throw e;
            }
        }



        /// <summary>
        /// 從文字檔寫入表格.
        /// </summary>
        /// <param name="tableName">Name of the table.</param>
        /// <param name="fileName">Name of the file.</param>
        public void InsertTableFromTextFile(string tableName, string fileName)
        {
            string sql = string.Empty;
            if (fileName.Length == 0)
            {
                return;
            }
            int[] format = GetColumnLengths(tableName);
            int remainder = 1000;
            List<string> textContent = fileName.ImportText();
            string columnName = GetColumnName(tableName);
            string columnNameParm = GetColumnNameParm(tableName);
            string[] columnNameArray = GetColumnNameArrary(tableName);
            sql = " insert into " + tableName + "(" + columnName + " ) values ( " + columnNameParm + " )";
            using (SQLiteConnection Sql_Conn = new SQLiteConnection(dal.connectionString))
            {
                Sql_Conn.Open();
                SQLiteTransaction Sql_Transaction;
                Sql_Transaction = Sql_Conn.BeginTransaction(System.Data.IsolationLevel.Serializable);
                for (int i = 0; i < textContent.Count; i++)
                {
                    List<string> InData = textContent[i].SplitString(format);
                    List<SQLiteParameter> sqlParm = new List<SQLiteParameter>();
                    for (int j = 0; j < format.Length; j++)
                    {
                        sqlParm.Add(new SQLiteParameter(columnNameArray[j], DbType.String));
                        sqlParm[j].Value = string.Copy(InData[j]);
                    }
                    SQLiteCommand Sql_Command = new SQLiteCommand(sql, Sql_Conn);
                    Sql_Command.Transaction = Sql_Transaction;
                    dal.PrepareCommand(ref Sql_Command, Sql_Conn, sql, sqlParm.ToArray());
                    Sql_Command.ExecuteNonQuery();
                    try
                    {
                        if (i == textContent.Count - 1)
                        {
                            Sql_Transaction.Commit();
                        }
                        else if (i > remainder && i % remainder == 0)
                        {
                            Sql_Transaction.Commit();
                            Sql_Transaction = Sql_Conn.BeginTransaction(System.Data.IsolationLevel.Serializable);
                        }
                        else
                        {
                            ;
                        }
                    }
                    catch (Exception ex)
                    {
                        Debug.Write(ex.ToString());
                        Sql_Transaction.Rollback();
                    }
                }

            }
        }


        /// <summary>
        /// 取得欄位名稱
        /// </summary>
        /// <param name="TableName">表格名</param>
        /// <returns>表格名稱字串用逗號分隔</returns>
        public static string GetColumnName(string TableName)
        {
            DAL.DataAccessLayerSqlite dal = new DAL.DataAccessLayerSqlite();
            DataTable dtSource = dal.GetQueryResultToDataTable("PRAGMA table_info( '" + TableName + "' );", null);
            List<string> columnNames = new List<string>();
            foreach (DataRow tableInfo in dtSource.Rows)
            {
                columnNames.Add(tableInfo["name"].ToString());
            }
            return string.Join(" , ", columnNames.ToArray());
        }

        /// <summary>
        /// 取得欄位名稱
        /// </summary>
        /// <param name="TableName">表格名</param>
        /// <returns>表格名稱字串用逗號分隔</returns>
        public string[] GetColumnNameArrary(string TableName)
        {
            DAL.DataAccessLayerSqlite dal = new DAL.DataAccessLayerSqlite();
            DataTable dtSource = dal.GetQueryResultToDataTable("PRAGMA table_info( '" + TableName + "' );", null);
            List<string> columnNames = new List<string>();
            foreach (DataRow tableInfo in dtSource.Rows)
            {
                columnNames.Add(tableInfo["name"].ToString());
            }
            return columnNames.ToArray();
        }
        /// <summary>
        /// 取得欄位名稱參數
        /// </summary>
        /// <param name="TableName">表格名</param>
        /// <returns>表格名稱字串用逗號分隔</returns>
        public string GetColumnNameParm(string TableName)
        {
            DAL.DataAccessLayerSqlite dal = new DAL.DataAccessLayerSqlite();
            DataTable dtSource = dal.GetQueryResultToDataTable("PRAGMA table_info( '" + TableName + "' );", null);
            List<string> columnNames = new List<string>();
            foreach (DataRow tableInfo in dtSource.Rows)
            {
                columnNames.Add("@" + tableInfo["name"].ToString());
            }
            return string.Join(" , ", columnNames.ToArray());
        }

        /// <summary>
        /// 根據表格的Create sql取得欄位長度.
        /// 
        /// </summary>
        /// <param name="TableName">Name of the table.</param>
        /// <returns></returns>
        public int[] GetColumnLengths(string TableName)
        {
            /* 格式如下
             * CREATE TABLE Bank ( 
                [Filter1]       [CHAR](1),
                [UnitNo]        [CHAR](3),
                [GroupNo]       [CHAR](4),
                [Filter2]       [CHAR](1),
                [GroupName]     [CHAR](40),
                [GroupNickName] [CHAR](10),
                [Address]       [CHAR](40),
                [Filter3]       [CHAR](1),
                [PhoneSecNum]   [CHAR](3),
                [PhoneNum]      [CHAR](8),
                [Filter4]       [CHAR](1),
                [ValueDate]     [CHAR](7),
                [ChangeType]    [CHAR](1),
                [DefaultCode]   [CHAR](1),
                [Remark]        [CHAR](2),
                [Blank]         [CHAR](8) 
            );*/
            Regex MyRegex = new Regex(
             ".*\\[(?<ColumnName>.*)\\]\\s+\\[(?<ColumnType>.*)\\]\\((?<Len" +
             "gth>\\d{1,})\\).*",
           RegexOptions.IgnoreCase
           | RegexOptions.CultureInvariant
           | RegexOptions.IgnorePatternWhitespace
           | RegexOptions.Compiled
           );

            SplitItems items = new SplitItems(MyRegex);

            DAL.DataAccessLayerSqlite dao = new DAL.DataAccessLayerSqlite();

            DataTable dt = dao.GetQueryResultToDataTable(DAL.DbInfoDao.GetAllDbInfromation(), null);
            //return;
            DataRow[] selectedrows = dt.Select("Trim(TABLE_NAME)='" + TableName + "'");
            if (selectedrows.Length > 1 || selectedrows.Length == 0)
            {
                throw new Exception("找不到表格名稱!");
            }
            List<int> columnLengths = new List<int>();

            foreach (DataRow dr in selectedrows)
            {
                DataTable dbinformation = items.StartSplit(dr["sql"].ToString());
                foreach (DataRow dvitem in dbinformation.Rows)
                {
                    columnLengths.Add(int.Parse(dvitem["Length"].ToString()));
                }
            }
            return columnLengths.ToArray();
        }
    }
}

using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data.SQLite;
using System.Configuration;
using System.Data;
using System.Diagnostics;

namespace DAL
{
    /// <summary>
    /// 讀取資料庫連線設定(使用sqlite)
    /// </summary>
    public class DataAccessLayerSqlite
    {
        /// <summary>
        /// 連線字串
        /// </summary>
        public string connectionString = string.Empty;
     
        public DataAccessLayerSqlite()
        {         
            connectionString = ConfigurationManager.ConnectionStrings["PCD_RTC_WINFORM.Properties.Settings.PCDDBConnectionString"].ConnectionString;
        }

        /// <summary>
        /// Prepares the command.
        /// </summary>
        /// <param name="cmd">The CMD.</param>
        /// <param name="conn">The conn.</param>
        /// <param name="cmdtext">The cmdtext.</param>
        /// <param name="cmdparams">The cmdparams.</param>
        public void PrepareCommand(ref SQLiteCommand cmd, SQLiteConnection conn, string cmdtext, SQLiteParameter[] cmdparams)
        {
            if ((conn.State != ConnectionState.Open))
            {
                conn.Open();
            }
            cmd.Connection = conn;
            cmd.CommandText = cmdtext;
            cmd.CommandType = CommandType.Text;
            if ((cmdparams != null))
            {
                foreach (SQLiteParameter paraitem in cmdparams)
                {
                    cmd.Parameters.Add(paraitem);
                }
            }
        }       

        //取得查詢的結果(輸出Datatable)
        public DataTable GetQueryResultToDataTable(string sqlcmd, SQLiteParameter[] sqlparams)
        {
            DataTable dtResult = new DataTable();
            using (SQLiteConnection Sql_Conn = new SQLiteConnection(connectionString))
            {
                try
                {
                    SQLiteCommand Sql_Command = new SQLiteCommand(sqlcmd, Sql_Conn);
                    PrepareCommand(ref Sql_Command, Sql_Conn, sqlcmd, sqlparams);
                    SQLiteDataAdapter Sql_dataaddapter = new SQLiteDataAdapter(Sql_Command);
                    Sql_dataaddapter.Fill(dtResult);
                    Sql_dataaddapter.Dispose();
                    Sql_Command.Dispose();
                    return dtResult;

                }
                catch (Exception ex)
                {
                    Debug.Write(ex.ToString());
                    return dtResult;
                }

            }
        }

        //取得查詢的結果(輸出DataSet)  
        public DataSet GetQueryResultToDataset(string sqlcmd, SQLiteParameter[] sqlparams)
        {
            DataSet dtResult = new DataSet();
            using (SQLiteConnection Sql_Conn = new SQLiteConnection(connectionString))
            {
                try
                {
                    SQLiteCommand Sql_Command = new SQLiteCommand(sqlcmd, Sql_Conn);
                    PrepareCommand(ref Sql_Command, Sql_Conn, sqlcmd, sqlparams);
                    SQLiteDataAdapter Sql_dataaddapter = new SQLiteDataAdapter(Sql_Command);
                    Sql_dataaddapter.Fill(dtResult);
                    Sql_dataaddapter.Dispose();
                    Sql_Command.Dispose();
                    return dtResult;
                }
                catch (Exception ex)
                {
                    Debug.Write(ex.ToString());
                    return dtResult;
                }
            }
            
        }

        //取得查詢的結果(輸出單個資料)    
        public string GetQueryResultSingleValue(string sqlcmd, SQLiteParameter[] sqlparams)
        {
            using (SQLiteConnection Sql_Conn = new SQLiteConnection(connectionString))
            {
                try
                {
                SQLiteCommand Sql_Command = new SQLiteCommand(sqlcmd, Sql_Conn);
                PrepareCommand(ref Sql_Command, Sql_Conn, sqlcmd, sqlparams);
                return (Sql_Command.ExecuteScalar() == null) ? string.Empty : Sql_Command.ExecuteScalar().ToString();
                }
                catch (Exception ex)
                {
                    Debug.Write(ex.ToString());
                    return string.Empty;
                }
            }
            //return string.Empty;
        }

        //執行sql指令(沒包Transaction)    
        public string NonQueryResultNoTranc(string sqlcmd, SQLiteParameter[] sqlparams)
        {
            using (SQLiteConnection Sql_Conn = new SQLiteConnection(connectionString))
            {
                try
                {
                    SQLiteCommand Sql_Command = new SQLiteCommand(sqlcmd, Sql_Conn);
                    PrepareCommand(ref Sql_Command, Sql_Conn, sqlcmd, sqlparams);
                    Sql_Command.ExecuteNonQuery();
                    Sql_Command.Dispose();
                    return "OK";
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }          
        }

        //執行sql指令(包Transaction)    
        public string NonQueryResultTranc(string sqlcmd, SQLiteParameter[] sqlparams)
        {
            using (SQLiteConnection Sql_Conn = new SQLiteConnection(connectionString))
            {
                SQLiteCommand Sql_Command = new SQLiteCommand(sqlcmd, Sql_Conn);
                PrepareCommand(ref Sql_Command, Sql_Conn, sqlcmd, sqlparams);
                SQLiteTransaction Sql_Transaction;
                Sql_Transaction = Sql_Conn.BeginTransaction();
                Sql_Command.Transaction = Sql_Transaction;
                try
                {
                    Sql_Command.ExecuteNonQuery();
                    Sql_Transaction.Commit();
                    return "OK";
                }
                catch (Exception ex)
                {
                    Sql_Transaction.Rollback();
                    return ex.ToString();
                }

            }
        }
    }
}

取得資料庫欄位長度物件

using System.Collections.Generic;
using System.Data;
using System.Text.RegularExpressions;
namespace Utility
{
    /// <summary>
    /// 工具模組
    /// 用途:儲存正規表示式切割後的字串群組
    /// 作者:林大貓
    /// </summary>
    public class SplitItems
    {
        //格式
        private Dictionary<string, string> format = new Dictionary<string, string>();
        //輸出結果
        private Dictionary<string, string> output = new Dictionary<string, string>();
        private Regex MyRegex;
        private int matchcount;
        public SplitItems() { }
        public SplitItems(Regex inMyRegex)
        {
            SetDictionary(inMyRegex);
            MyRegex = inMyRegex;
        }
        private void SetValue(int keyindex, string value)
        {
            int remainder = keyindex % matchcount;
            foreach (KeyValuePair<string, string> tempvalue in format)
            {
                if (string.Compare(tempvalue.Value, remainder.ToString()) == 0)
                {
                    output.Add(tempvalue.Key, value);
                }
            }
        }
        private DataTable CreateEmptyTable()
        {
            List<string> tablecolname = new List<string>();
            foreach (KeyValuePair<string, string> tempresult in format)
            {
                if (string.Compare(tempresult.Value, "0") != 0)
                {
                    tablecolname.Add(tempresult.Key);
                }
            }
            return CreateDataTable(tablecolname.ToArray(), null);
        }
        private List<string> GetColumnValues()
        {
            List<string> tablecolvalue = new List<string>();
            foreach (KeyValuePair<string, string> tempresult in output)
            {
                if (string.Compare(tempresult.Key, "0") != 0)
                {
                    tablecolvalue.Add(tempresult.Value);
                }
            }
            return tablecolvalue;
        }
        /// <summary>
        /// 建立datatable
        /// </summary>
        /// <param name="ColumnName">欄位名</param>
        /// <param name="value">data陣列 , rowmajor</param>
        /// <returns>DataTable</returns>
        private DataTable CreateDataTable(string[] ColumnName, string[,] value)
        {
            /*  輸入範例
            string[] cname = new string[]{"name", "sex "};
            string[,] aaz = new string[4, 2];
            for (int q = 0; q < 4; q++)
                for (int r = 0; r < 2; r++)
                    aaz[q, r] = "1";
            dataGridView1.DataSource = DataUtility.CreateDataTable(cname, aaz);
            */
            int i, j;
            DataTable kk = new DataTable();
            for (i = 0; i < ColumnName.Length; i++)
            {
                DataColumn c1 = new DataColumn(ColumnName[i].ToString().Trim(), typeof(string));
                kk.Columns.Add(c1);
            }
            if (value != null)
            {
                for (i = 0; i < value.GetLength(0); i++)
                {
                    DataRow newrow = kk.NewRow();
                    for (j = 0; j < ColumnName.Length; j++)
                    {
                        newrow[j] = string.Copy(value[i, j].ToString());

                    }
                    kk.Rows.Add(newrow);
                }
            }
            return kk;
        }
        private void SetDictionary(Regex MyRegex)
        {
            // Get the names of all the named and numbered capture groups
            string[] GroupNames = MyRegex.GetGroupNames();

            // Get the numbers of all the named and numbered capture groups
            int[] GroupNumbers = MyRegex.GetGroupNumbers();
            matchcount = GroupNames.Length;
            for (int i = 0; i < GroupNames.Length; i++)
            {
                format.Add(GroupNames[i].ToString(), i.ToString());
            }
        }
        public DataTable StartSplit(string inputtext)
        {
            MatchCollection ms = MyRegex.Matches(inputtext);
            int count = 0;
            DataTable dt = CreateEmptyTable();
            // 將切割後的結果丟到輸出物件當中
            foreach (Match temp in ms)
            {
                string searchresult = string.Empty;
                SplitItems tempitem = new SplitItems(MyRegex);
                foreach (Group grp in temp.Groups)
                {

                    foreach (Capture cap in grp.Captures)
                    {
                        tempitem.SetValue(count, cap.Value);
                        count++;
                    }

                }
                DataRow dr = dt.NewRow();
                for (int i = 0; i < dt.Columns.Count; i++)
                {
                    dr[i] = tempitem.GetColumnValues()[i];
                }
                dt.Rows.Add(dr);
            }

            return dt;
        }
    }
}

檔案匯入


using System.IO;
using System;
using System.Text;
namespace Utility
{
    /// <summary>
    /// 工具模組
    /// 用途:文字檔處理
    /// 作者:林大貓
    /// </summary>
    public static class TextFileUtil
    {
        /// <summary>
        /// Imports the text.
        /// </summary>
        /// <param name="InFName">Name of the input File Name.</param>
        /// <returns></returns>
        public static List<string> ImportText(this string InFName)
        {
            try
            {
                FileStream fs = new FileStream(InFName, FileMode.Open, FileAccess.Read);
                StreamReader sr = new StreamReader(fs, UnicodeEncoding.GetEncoding("Big5"));//轉成中文碼                                
                
                List<string> result = new List<string>();
                while (sr.Peek() >= 0)
                {
                    string temp = sr.ReadLine();
                    result.Add(temp);
                }
                sr.Close();
                return result;
            }
            catch (Exception e)
            {
                Console.Write(e.ToString());
                return null;
            }
        }
    }
}

輸出結果

真的變快了!從兩分多鐘減少到不到兩秒鐘。

修改前

1

修改後

2

參考資料

深度解析SQLite批量插入,修改数据库慢的问题

SQLite大量插入的效率问题

[工具]使用Expresso設計正規表示式

How to get a list of column names on sqlite3 / iPhone?