Create
the table in your DataBase.
CREATE
TABLE
TEST_PK
(
ID_MAIN
BIGINT NOT
NULL,
ID_SUB
BIGINT NOT
NULL,/*
Dont Use: AUTO_INCREMENT This will not correctly incremented with
Bulkloader*/
T_ID
BIGINT,
T_TEXT
VARCHAR(50),
T_CREATEDON
TIMESTAMP
NOT
NULL
DEFAULT
CURRENT_TIMESTAMP,
STATUS
INT
DEFAULT
1,
PRIMARY
KEY
(ID_MAIN,ID_SUB)
);
SELECT
*
FROM
TEST_PK;
Here
we are going load bulk data from specified source.First we will load
the data into a text file then
loads
the data in to database from text file using MySqlBulkLoader.Here
table consists two primary keys so it will check for existing records
based on two primary keys (like select * from TEST_PK where ID_MAIN
=input1 and ID_SUB=input2),if they are exists base on
ConflictOption(Replace
i.e update or None/Ignore) it process the data.
Program.cs:
using
System;
using
System.Configuration;
using
System.Data;
using
System.Text;
namespace
MySqlBulkLoader_CSharp
{
internal
class
Program
{
private
static
void
Main(string[]
args)
{
Console.WriteLine("Program
Started.Please wait...");
Do_work
obj = new
Do_work();
obj.Update_Data(MyFunction.Get_Dump());
Console.WriteLine("Program
Completed.");
}
}
internal
static
class
MyFunction
{
private
static
string
Message = string.Empty;
public
static
DataTable
Get_Dump()
{
DataTable
dt = new
DataTable();
DataColumn
dc1 = new
DataColumn();
dc1.ColumnName
= "ID_MAIN";
dc1.DataType
= System.Type.GetType("System.Int32");
dt.Columns.Add(dc1);
DataColumn
dc2 = new
DataColumn();
dc2.ColumnName
= "ID_SUB";
dc2.DataType
= System.Type.GetType("System.Int32");
dt.Columns.Add(dc2);
DataColumn
dc3 = new
DataColumn();
dc3.ColumnName
= "T_ID";
dc3.DataType
= System.Type.GetType("System.Int32");
dt.Columns.Add(dc3);
DataColumn
dc4 = new
DataColumn();
dc4.ColumnName
= "T_TEXT";
dc4.DataType
= System.Type.GetType("System.String");
dt.Columns.Add(dc4);
for
(int
i = 1; i <= 100; i++)
{
DataRow
dr = dt.NewRow();
dr["ID_MAIN"]
= i;
dr["ID_SUB"]
= i + 2;
dr["T_ID"]
= i + 10000;
dr["T_TEXT"]
= "This
is"
+ (i);
dt.Rows.Add(dr);
}
dt.AcceptChanges();
return
dt;
}
public
static
void
Bulk_Update(string
FileName, string
query, string
TableName)
{
System.IO.File.WriteAllText(FileName,
query);
bool
retval = MySqlCon.MySqlBULK(FileName,
TableName);
if
(retval)
{
Message
= string.Format("==========>
Successfully MySqlBulkLoader executed for the TableName:
{0},FileName: {1}<==========",
TableName, FileName);
Console.WriteLine(Message);
}
else
{
Message
= string.Format("==========>
Failed to execute MySqlBulkLoader for the TableName: {0},FileName:
{1}<==========",
TableName, FileName);
Console.WriteLine(Message);
}
}
}
internal
class
Do_work
{
private
const
int
Row_Split_Count = 500;
public
void
Update_Data(DataTable
dt)
{
if
(dt.Rows.Count > 0)
{
string
MysqlCreatedDate = DateTime.Now.ToString("yyyy-MM-dd
HH:mm:ss");
string
RecordStatus = "1";
StringBuilder
query = new
StringBuilder();
string
TableName = "TEST_PK";
string
FileName = string.Empty;
FileName
= ConfigurationManager.AppSettings["FILEPATH"]
+ TableName + ".txt";
int
No_of_times = dt.Rows.Count / Row_Split_Count;
int
reminder = dt.Rows.Count % Row_Split_Count;
for
(int
KU = 0; KU < No_of_times; KU++)
{
query.Clear();
DataRow
dr = dt.Rows[KU * Row_Split_Count];
query.Append(Form_Query(dr,
MysqlCreatedDate, RecordStatus));
for
(int
PK = (KU * Row_Split_Count) + 1; PK < (KU + 1) * Row_Split_Count;
PK++)
{
dr
= dt.Rows[PK];
query.Append(@"{;:}"
+ Form_Query(dr, MysqlCreatedDate, RecordStatus));
}
MyFunction.Bulk_Update(FileName,
query.ToString(), TableName);
query.Clear();
}
if
(reminder != 0)
{
DataRow
dr = dt.Rows[Row_Split_Count * No_of_times];
query.Clear();
query.Append(Form_Query(dr,
MysqlCreatedDate, RecordStatus));
for
(int
PK = (Row_Split_Count * No_of_times) + 1; PK < dt.Rows.Count;
PK++)
{
dr
= dt.Rows[PK];
query.Append(@"{;:}"
+ Form_Query(dr, MysqlCreatedDate, RecordStatus));
}
MyFunction.Bulk_Update(FileName,
query.ToString(), TableName);
query.Clear();
}
}
}
private
string
Form_Query(DataRow
dr, string
MysqlCreatedDate, string
RecordStatus)
{
string
Query = string.Empty;
Query
= dr["ID_MAIN"].ToString()
+ "(:;)"
+ dr["ID_SUB"].ToString()
+ "(:;)"
+ dr["T_ID"].ToString()
+ "(:;)"
+ dr["T_TEXT"].ToString()
+ "(:;)"
+ MysqlCreatedDate + "(:;)"
+ RecordStatus + "(:;)";
return
Query;
}
}
}
MySqlCon.cs:
using
System;
using
System.Configuration;
using
System.Data;
using
MySql.Data.MySqlClient;
namespace
MySqlBulkLoader_CSharp
{
public
static
class
MySqlCon
{
private
static
MySqlCommand
cmd;
private
static
MySqlConnection
con;
static
MySqlCon()
{
}
public
static
void
Closeconnection()
{
if
(con.State != ConnectionState.Closed)
{
con.Close();
}
}
public
static
bool
MySqlBULK(string
FileName, string
TableName)
{
bool
retval = false;
string
connStr =
System.Configuration.ConfigurationManager.ConnectionStrings["MySqlConnection"].ToString();
MySqlConnection
conn = new
MySql.Data.MySqlClient.MySqlConnection(connStr);
MySqlBulkLoader
bl = new
MySql.Data.MySqlClient.MySqlBulkLoader(conn);
bl.TableName
= TableName;
bl.FieldTerminator
= "(:;)";
bl.LineTerminator
= "{;:}";
bl.FileName
= FileName;
bl.NumberOfLinesToSkip
= 0;
bl.ConflictOption
= MySql.Data.MySqlClient.MySqlBulkLoaderConflictOption.Replace;
//Replace:
use this option if you want to update the record if already
existing.This will check only primary key,if exists updates all
columns of the row.
//Ignore:
use this option if you want to leave the record if already
existing(This will check only primary key).Other wise updated all
columns of the row.
//None:
use this option if you wnat to do nothing if already existing.This
will check only primary key.This will not update anything.
try
{
conn.Open();
bl.Load();
conn.Close();
retval
= true;
}
catch
(Exception
ex)
{
retval
= false;
}
return
retval;
}
private
static
void
CreateConnection()
{
try
{
con
= new
MySqlConnection(ConfigurationManager.ConnectionStrings["MySqlConnection"].ConnectionString);
cmd
= new
MySqlCommand();
cmd.Connection
= con;
}
catch
(Exception
ex)
{
}
}
}