MySql Bulk Loader in C#.net


Create the table in your DataBase.

CREATE TABLE TEST_PK
(
ID_MAIN BIGINT NOT NULL,
ID_SUB BIGINT NOT NULL,/* Dont Use: AUTO_INCREMENT This will not correctly incremented with Bulkloader*/
T_ID BIGINT,
T_TEXT VARCHAR(50),
T_CREATEDON TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
STATUS INT DEFAULT 1,
PRIMARY KEY (ID_MAIN,ID_SUB)
);
SELECT * FROM TEST_PK;

Here we are going load bulk data from specified source.First we will load the data into a text file then
loads the data in to database from text file using MySqlBulkLoader.Here table consists two primary keys so it will check for existing records based on two primary keys (like select * from TEST_PK where ID_MAIN =input1 and ID_SUB=input2),if they are exists base on ConflictOption(Replace i.e update or None/Ignore) it process the data.

Program.cs:
using System;
using System.Configuration;
using System.Data;
using System.Text;

namespace MySqlBulkLoader_CSharp
{
internal class Program
{
private static void Main(string[] args)
{
Console.WriteLine("Program Started.Please wait...");
Do_work obj = new Do_work();
obj.Update_Data(MyFunction.Get_Dump());
Console.WriteLine("Program Completed.");
}
}

internal static class MyFunction
{
private static string Message = string.Empty;

public static DataTable Get_Dump()
{
DataTable dt = new DataTable();

DataColumn dc1 = new DataColumn();
dc1.ColumnName = "ID_MAIN";
dc1.DataType = System.Type.GetType("System.Int32");
dt.Columns.Add(dc1);

DataColumn dc2 = new DataColumn();
dc2.ColumnName = "ID_SUB";
dc2.DataType = System.Type.GetType("System.Int32");
dt.Columns.Add(dc2);

DataColumn dc3 = new DataColumn();
dc3.ColumnName = "T_ID";
dc3.DataType = System.Type.GetType("System.Int32");
dt.Columns.Add(dc3);

DataColumn dc4 = new DataColumn();
dc4.ColumnName = "T_TEXT";
dc4.DataType = System.Type.GetType("System.String");
dt.Columns.Add(dc4);

for (int i = 1; i <= 100; i++)
{
DataRow dr = dt.NewRow();
dr["ID_MAIN"] = i;
dr["ID_SUB"] = i + 2;
dr["T_ID"] = i + 10000;
dr["T_TEXT"] = "This is" + (i);

dt.Rows.Add(dr);
}

dt.AcceptChanges();

return dt;
}

public static void Bulk_Update(string FileName, string query, string TableName)
{
System.IO.File.WriteAllText(FileName, query);
bool retval = MySqlCon.MySqlBULK(FileName, TableName);

if (retval)
{
Message = string.Format("==========> Successfully MySqlBulkLoader executed for the TableName: {0},FileName: {1}<==========", TableName, FileName);
Console.WriteLine(Message);
}
else
{
Message = string.Format("==========> Failed to execute MySqlBulkLoader for the TableName: {0},FileName: {1}<==========", TableName, FileName);
Console.WriteLine(Message);
}
}
}

internal class Do_work
{
private const int Row_Split_Count = 500;

public void Update_Data(DataTable dt)
{
if (dt.Rows.Count > 0)
{
string MysqlCreatedDate = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
string RecordStatus = "1";
StringBuilder query = new StringBuilder();
string TableName = "TEST_PK";
string FileName = string.Empty;
FileName = ConfigurationManager.AppSettings["FILEPATH"] + TableName + ".txt";

int No_of_times = dt.Rows.Count / Row_Split_Count;
int reminder = dt.Rows.Count % Row_Split_Count;

for (int KU = 0; KU < No_of_times; KU++)
{
query.Clear();
DataRow dr = dt.Rows[KU * Row_Split_Count];

query.Append(Form_Query(dr, MysqlCreatedDate, RecordStatus));

for (int PK = (KU * Row_Split_Count) + 1; PK < (KU + 1) * Row_Split_Count; PK++)
{
dr = dt.Rows[PK];
query.Append(@"{;:}" + Form_Query(dr, MysqlCreatedDate, RecordStatus));
}

MyFunction.Bulk_Update(FileName, query.ToString(), TableName);
query.Clear();
}

if (reminder != 0)
{
DataRow dr = dt.Rows[Row_Split_Count * No_of_times];
query.Clear();
query.Append(Form_Query(dr, MysqlCreatedDate, RecordStatus));

for (int PK = (Row_Split_Count * No_of_times) + 1; PK < dt.Rows.Count; PK++)
{
dr = dt.Rows[PK];
query.Append(@"{;:}" + Form_Query(dr, MysqlCreatedDate, RecordStatus));
}

MyFunction.Bulk_Update(FileName, query.ToString(), TableName);
query.Clear();
}
}
}

private string Form_Query(DataRow dr, string MysqlCreatedDate, string RecordStatus)
{
string Query = string.Empty;

Query = dr["ID_MAIN"].ToString() + "(:;)" + dr["ID_SUB"].ToString() + "(:;)" + dr["T_ID"].ToString() + "(:;)" + dr["T_TEXT"].ToString() + "(:;)" + MysqlCreatedDate + "(:;)" + RecordStatus + "(:;)";

return Query;
}
}
}


MySqlCon.cs:

using System;
using System.Configuration;
using System.Data;
using MySql.Data.MySqlClient;

namespace MySqlBulkLoader_CSharp
{
public static class MySqlCon
{
private static MySqlCommand cmd;
private static MySqlConnection con;

static MySqlCon()
{
}

public static void Closeconnection()
{
if (con.State != ConnectionState.Closed)
{
con.Close();
}
}

public static bool MySqlBULK(string FileName, string TableName)
{
bool retval = false;
string connStr = System.Configuration.ConfigurationManager.ConnectionStrings["MySqlConnection"].ToString();
MySqlConnection conn = new MySql.Data.MySqlClient.MySqlConnection(connStr);
MySqlBulkLoader bl = new MySql.Data.MySqlClient.MySqlBulkLoader(conn);

bl.TableName = TableName;
bl.FieldTerminator = "(:;)";
bl.LineTerminator = "{;:}";
bl.FileName = FileName;
bl.NumberOfLinesToSkip = 0;
bl.ConflictOption = MySql.Data.MySqlClient.MySqlBulkLoaderConflictOption.Replace;
//Replace: use this option if you want to update the record if already existing.This will check only primary key,if exists updates all columns of the row.
//Ignore: use this option if you want to leave the record if already existing(This will check only primary key).Other wise updated all columns of the row.
//None: use this option if you wnat to do nothing if already existing.This will check only primary key.This will not update anything.

try
{
conn.Open();
bl.Load();
conn.Close();
retval = true;
}
catch (Exception ex)
{
retval = false;
}

return retval;
}

private static void CreateConnection()
{
try
{
con = new MySqlConnection(ConfigurationManager.ConnectionStrings["MySqlConnection"].ConnectionString);
cmd = new MySqlCommand();
cmd.Connection = con;
}
catch (Exception ex)
{
}
}
}
}

You Can Download the Working Code From here.