在現(xiàn)代軟件開發(fā)中,高效處理大數(shù)據(jù)量是一個常見且具有挑戰(zhàn)性的任務。SQLite作為一個輕量級、無服務器的關系型數(shù)據(jù)庫,在C#中提供了強大的數(shù)據(jù)處理能力。本文將深入探討如何使用SQLite優(yōu)化大數(shù)據(jù)量的存儲和檢索。
準備工作
首先,我們需要引入必要的NuGet包:
// 使用System.Data.SQLite進行SQLite數(shù)據(jù)庫操作
using System.Data.SQLite;

大數(shù)據(jù)量處理的關鍵技術
數(shù)據(jù)實體類
// 數(shù)據(jù)實體類
public class DataItem
{
public long Id { get; set; }
public string Name { get; set; }
public decimal Value { get; set; }
public DateTime Timestamp { get; set; }
}
批量插入優(yōu)化
using System;
using System.Collections.Generic;
using System.Data.SQLite;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace AppBigData
{
publicclass DataProcessor
{
private readonly string _connectionString;
public DataProcessor(string connectionString = "Data Source=large_database.db")
{
_connectionString = connectionString;
}
// 創(chuàng)建表結(jié)構(gòu)
public void CreateTableIfNotExists()
{
using (var connection = new SQLiteConnection(_connectionString))
{
connection.Open();
using (var command = new SQLiteCommand(connection))
{
command.CommandText = @"
CREATE TABLE IF NOT EXISTS DataTable (
Id INTEGER PRIMARY KEY,
Name TEXT NOT NULL,
Value REAL NOT NULL,
Timestamp TEXT NOT NULL
)";
command.ExecuteNonQuery();
}
}
}
// 批量插入數(shù)據(jù)的高效方法
public void BulkInsert(List<DataItem> items)
{
if (items == null || !items.Any())
return;
try
{
// 使用事務大幅提升插入性能
using (var connection = new SQLiteConnection(_connectionString))
{
connection.Open();
using (var transaction = connection.BeginTransaction())
{
try
{
using (var command = new SQLiteCommand(connection))
{
// 預編譯SQL語句
command.CommandText = @"
INSERT INTO DataTable
(Id, Name, Value, Timestamp)
VALUES
(@Id, @Name, @Value, @Timestamp)";
// 準備參數(shù)
command.Parameters.Add("@Id", System.Data.DbType.Int64);
command.Parameters.Add("@Name", System.Data.DbType.String);
command.Parameters.Add("@Value", System.Data.DbType.Decimal);
command.Parameters.Add("@Timestamp", System.Data.DbType.DateTime);
// 批量插入
foreach (var item in items)
{
command.Parameters["@Id"].Value = item.Id;
command.Parameters["@Name"].Value = item.Name;
command.Parameters["@Value"].Value = item.Value;
command.Parameters["@Timestamp"].Value = item.Timestamp;
command.ExecuteNonQuery();
}
}
// 提交事務
transaction.Commit();
}
catch (Exception)
{
// 出錯時回滾事務
transaction.Rollback();
throw;
}
}
}
}
catch (Exception ex)
{
Console.WriteLine($"批量插入數(shù)據(jù)時發(fā)生錯誤: {ex.Message}");
throw;
}
}
/// <summary>
/// 改進的批量插入方法 - 使用SQLite的批處理特性和事務
/// 這里batchSize很重要,太大了不行,太小了也不行
/// </summary>
/// <param name="items"></param>
/// <param name="batchSize"></param>
public void ImprovedBulkInsert(List<DataItem> items, int batchSize = 100)
{
if (items == null || !items.Any())
return;
try
{
using (var connection = new SQLiteConnection(_connectionString))
{
connection.Open();
// 保留性能優(yōu)化的pragmas
using (var pragmaCommand = new SQLiteCommand(connection))
{
pragmaCommand.CommandText = "PRAGMA synchronous = OFF; PRAGMA journal_mode = MEMORY;";
pragmaCommand.ExecuteNonQuery();
}
// 不使用using語句來管理事務,這樣我們可以重新創(chuàng)建它
SQLiteTransaction transaction = connection.BeginTransaction();
try
{
using (var command = new SQLiteCommand(connection))
{
command.CommandText = @"INSERT INTO DataTable (Id, Name, Value, Timestamp)
VALUES (@Id, @Name, @Value, @Timestamp)";
// 一次性準備參數(shù)
command.Parameters.Add("@Id", System.Data.DbType.Int64);
command.Parameters.Add("@Name", System.Data.DbType.String);
command.Parameters.Add("@Value", System.Data.DbType.Decimal);
command.Parameters.Add("@Timestamp", System.Data.DbType.DateTime);
// 以較小的批次處理
int count = 0;
foreach (var item in items)
{
command.Parameters["@Id"].Value = item.Id;
command.Parameters["@Name"].Value = item.Name;
command.Parameters["@Value"].Value = item.Value;
command.Parameters["@Timestamp"].Value = item.Timestamp;
command.ExecuteNonQuery();
count++;
// 分批提交,平衡內(nèi)存使用和事務開銷
if (count % batchSize == 0)
{
transaction.Commit();
// 創(chuàng)建新的事務
transaction = connection.BeginTransaction();
}
}
// 提交剩余的記錄
transaction.Commit();
}
}
catch
{
transaction.Rollback();
throw;
}
}
}
catch (Exception ex)
{
Console.WriteLine($"改進型批量插入數(shù)據(jù)時發(fā)生錯誤: {ex.Message}");
throw;
}
}
// 基準測試方法
public void PerformanceBenchmark(int itemCount = 10000)
{
// 清空表
ClearTable();
// 生成測試數(shù)據(jù)
var testData = GenerateTestData(itemCount);
// 測試標準批量插入
Console.WriteLine($"開始標準批量插入測試 ({itemCount} 條記錄)...");
var standardTimer = Stopwatch.StartNew();
BulkInsert(testData);
standardTimer.Stop();
Console.WriteLine($"標準批量插入完成: {standardTimer.ElapsedMilliseconds} ms");
// 清空表
ClearTable();
// 測試增強型批量插入
Console.WriteLine($"開始增強型批量插入測試 ({itemCount} 條記錄)...");
var enhancedTimer = Stopwatch.StartNew();
ImprovedBulkInsert(testData);
enhancedTimer.Stop();
Console.WriteLine($"增強型批量插入完成: {enhancedTimer.ElapsedMilliseconds} ms");
// 比較結(jié)果
double improvement = (double)standardTimer.ElapsedMilliseconds / enhancedTimer.ElapsedMilliseconds;
Console.WriteLine($"性能提升比例: {improvement:F2}");
}
// 生成測試數(shù)據(jù)
private List<DataItem> GenerateTestData(int count)
{
var random = new Random();
var result = new List<DataItem>(count);
for (int i = 0; i < count; i++)
{
result.Add(new DataItem
{
Id = i + 1,
Name = $"Item-{i}",
Value = (decimal)(random.NextDouble() * 1000),
Timestamp = DateTime.Now.AddSeconds(-random.Next(0, 86400))
});
}
return result;
}
// 清空表
private void ClearTable()
{
using (var connection = new SQLiteConnection(_connectionString))
{
connection.Open();
using (var command = new SQLiteCommand("DELETE FROM DataTable", connection))
{
command.ExecuteNonQuery();
}
}
}
}
}
調(diào)用
namespace AppBigData
{
internal class Program
{
static void Main(string[] args)
{
var processor = new DataProcessor();
// 確保表存在
processor.CreateTableIfNotExists();
// 執(zhí)行性能測試
processor.PerformanceBenchmark(500000);
Console.WriteLine("示例數(shù)據(jù)插入完成");
Console.ReadKey();
}
}
}

分頁查詢與索引優(yōu)化
public class DataRetriever
{
// 分頁查詢大數(shù)據(jù)集
public List<DataItem> GetPaginatedData(int pageNumber, int pageSize)
{
var results = new List<DataItem>();
using (var connection = new SQLiteConnection("Data Source=large_database.db"))
{
connection.Open();
// 使用參數(shù)化查詢提高安全性和性能
using (var command = new SQLiteCommand(connection))
{
command.CommandText = @"
SELECT Id, Name, Value, Timestamp
FROM DataTable
ORDER BY Id
LIMIT @PageSize OFFSET @Offset";
command.Parameters.AddWithValue("@PageSize", pageSize);
command.Parameters.AddWithValue("@Offset", (pageNumber - 1) * pageSize);
using (var reader = command.ExecuteReader())
{
while (reader.Read())
{
results.Add(new DataItem
{
Id = Convert.ToInt64(reader["Id"]),
Name = reader["Name"].ToString(),
Value = Convert.ToDecimal(reader["Value"]),
Timestamp = Convert.ToDateTime(reader["Timestamp"])
});
}
}
}
}
return results;
}
// 創(chuàng)建性能索引
public void CreatePerformanceIndex()
{
using (var connection = new SQLiteConnection("Data Source=large_database.db"))
{
connection.Open();
using (var command = new SQLiteCommand(connection))
{
// 創(chuàng)建復合索引
command.CommandText = @"
CREATE INDEX IF NOT EXISTS idx_data_performance
ON DataTable (Timestamp, Value)";
command.ExecuteNonQuery();
}
}
}
}

異步數(shù)據(jù)處理
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data.SQLite;
using System.Threading;
using System.Threading.Tasks;
namespace AppBigData
{
publicclass AsyncDataProcessor
{
private readonly string _connectionString;
public AsyncDataProcessor(string connectionString = "Data Source=large_database.db")
{
_connectionString = connectionString;
}
// 異步批量處理數(shù)據(jù) - 修改為線程安全方式
public async Task ProcessLargeDataSetAsync(List<DataItem> items)
{
if (items == null || items.Count == 0)
return;
// 分區(qū)處理數(shù)據(jù)以提高性能
var partitionSize = Math.Max(1, items.Count / Environment.ProcessorCount);
var partitions = new List<List<DataItem>>();
for (int i = 0; i < items.Count; i += partitionSize)
{
var partition = items.GetRange(
i,
Math.Min(partitionSize, items.Count - i));
partitions.Add(partition);
}
// 記錄處理錯誤
var exceptions = new ConcurrentBag<Exception>();
await Task.Run(() =>
{
// 每個分區(qū)用單獨的連接處理
Parallel.ForEach(partitions, new ParallelOptions { MaxDegreeOfParallelism = 4 }, partition =>
{
try
{
// 每個線程創(chuàng)建自己的連接
using (var connection = new SQLiteConnection(_connectionString))
{
connection.Open();
// 使用事務提高性能
using (var transaction = connection.BeginTransaction())
{
try
{
foreach (var item in partition)
{
ProcessSingleItem(connection, item);
}
// 提交此分區(qū)的所有更改
transaction.Commit();
}
catch (Exception ex)
{
// 如果處理過程中出錯,回滾事務
transaction.Rollback();
exceptions.Add(ex);
}
}
}
}
catch (Exception ex)
{
exceptions.Add(ex);
}
});
});
// 如果有錯誤,拋出聚合異常
if (exceptions.Count > 0)
{
thrownew AggregateException("處理數(shù)據(jù)時發(fā)生一個或多個錯誤", exceptions);
}
}
private void ProcessSingleItem(SQLiteConnection connection, DataItem item)
{
// 處理單個數(shù)據(jù)項的邏輯
using (var command = new SQLiteCommand(connection))
{
// 首先檢查記錄是否存在
command.CommandText = "SELECT COUNT(*) FROM DataTable WHERE Id = @Id";
command.Parameters.AddWithValue("@Id", item.Id);
int exists = Convert.ToInt32(command.ExecuteScalar());
if (exists > 0)
{
// 更新現(xiàn)有記錄
command.CommandText = @"
UPDATE DataTable
SET Name = @Name, Value = @Value, Timestamp = @Timestamp
WHERE Id = @Id";
}
else
{
// 插入新記錄
command.CommandText = @"
INSERT INTO DataTable (Id, Name, Value, Timestamp)
VALUES (@Id, @Name, @Value, @Timestamp)";
}
// 設置參數(shù)
command.Parameters.Clear();
command.Parameters.AddWithValue("@Id", item.Id);
command.Parameters.AddWithValue("@Name", item.Name);
command.Parameters.AddWithValue("@Value", item.Value);
command.Parameters.AddWithValue("@Timestamp", item.Timestamp);
// 執(zhí)行命令
command.ExecuteNonQuery();
}
}
}
}
調(diào)用
using System.Data.SQLite;
namespace AppBigData
{
internal class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("正在啟動數(shù)據(jù)庫測試...");
var processor = new AsyncDataProcessor("Data Source=large_database.db");
var testData = GenerateTestData(50000);
Console.WriteLine($"已生成 {testData.Count} 條測試記錄");
try
{
Console.WriteLine("正在異步處理數(shù)據(jù)...");
var startTime = DateTime.Now;
await processor.ProcessLargeDataSetAsync(testData);
var endTime = DateTime.Now;
Console.WriteLine($"處理完成,耗時 {(endTime - startTime).TotalSeconds:F2} 秒");
}
catch (AggregateException ex)
{
Console.WriteLine("處理過程中發(fā)生錯誤:");
foreach (var innerEx in ex.InnerExceptions)
{
Console.WriteLine($"- {innerEx.Message}");
}
}
catch (Exception ex)
{
Console.WriteLine($"錯誤: {ex.Message}");
}
Console.WriteLine("測試完成。按任意鍵退出。");
Console.ReadKey();
}
privatestatic List<DataItem> GenerateTestData(int count)
{
var random = new Random();
var items = new List<DataItem>();
for (int i = 1; i <= count; i++)
{
items.Add(new DataItem
{
Id = i,
Name = $"測試項目 {i}",
Value = (decimal)Math.Round(random.NextDouble() * 1000, 2),
Timestamp = DateTime.Now.AddMinutes(-random.Next(0, 1000))
});
}
return items;
}
}
}
?性能優(yōu)化建議
- 使用事務批量處理數(shù)據(jù)
- 為常用查詢創(chuàng)建適當?shù)乃饕?/span>
- 使用參數(shù)化查詢
- 分頁獲取大數(shù)據(jù)集
- 考慮使用異步和并行處理
注意事項
- SQLite對并發(fā)寫入支持有限
- 大數(shù)據(jù)量時考慮分庫分表
- 定期進行數(shù)據(jù)庫維護和vacuum操作
結(jié)論
通過合理運用SQLite的特性和C#的高級特性,我們可以高效地處理大數(shù)據(jù)量,保證系統(tǒng)的性能和穩(wěn)定性。
這篇文章全面展示了在C#中使用SQLite處理大數(shù)據(jù)量的關鍵技術和最佳實踐。文章涵蓋了批量插入、分頁查詢、索引優(yōu)化和異步處理等重要技術點。
該文章在 2025/3/24 18:31:54 編輯過