你的位置:首页 > 信息动态 > 新闻中心
信息动态
联系我们

从MySQL数据库迁移到AWS DynamoDB

2021/11/8 15:40:12

GO语言——实现从MySQL数据库迁移到Amazon DynamoDB

目录

  • GO语言——实现从MySQL数据库迁移到Amazon DynamoDB
    • 一、前言
      • 1.1 迁移背景
      • 1.2 迁移问题
      • 1.3 AWS官方迁移方法
    • 二、思路与函数
      • 2.1 思路
      • 2.2 主函数
    • 三、MySQL函数
      • 3.1 查询所有非系统的数据库
      • 3.2 查询数据表信息
      • 3.3 查询数据表字段信息
      • 3.4 查询表所有信息
    • 四、Amazon DynamoDB
      • 4.1 创建DynamoDB表
      • 4.2 插入数据
    • 附录:
      • 参考
      • 代码地址

一、前言

博主在学习Go语言,碰巧Leader要求研究一下能不能用脚本方式实现MySQL数据库到DynamoDB的迁移

这篇博文是博主使用Go语言实现的一种简单迁移,因为初学Go语言,代码不是很优美,也没有算法优化。

功能上, 只是简单的把数据一比一的迁移到DynamoDB。并且原MySQL数据库类型,在DynamoDB中都为String类型。

后续有时间会继续优化该程序,也欢迎Go语言的大佬们提出宝贵的修改和优化方案。

1.1 迁移背景

许多公司考虑从MySQL等关系数据库迁移到Amazon DynamoDB

Amazon DynamoDB是一项完全托管、快速、高度可扩展且灵活的NoSQL数据库。DynamoDB可以根据业务需求根据流量增加或减少容量。与典型的基于媒体的RDBMS相比,可以更轻松地优化服务的总成本

1.2 迁移问题

  • 由于停机造成的服务中断,尤其是当客户服务必须24/7/365无缝可用时
  • RDBMS和DynamoDB的不同键设计

1.3 AWS官方迁移方法

两种基于AWS托管服务的迁移:https://aws.amazon.com/cn/blogs/big-data/near-zero-downtime-migration-from-mysql-to-dynamodb/

学习视频:https://www.youtube.com/watch?v=j88icq7JArI

二、思路与函数

2.1 思路

  1. 初始数据库连接和DynamoDB client

  2. 读取MySQL数据库

  3. 获取每个数据库中MySQL数据表

  4. 将数据表结构转为DynamoDB结构(字段,类型)

  • 获取表字段并判断表字段是否为主键
  1. 创建DynamoDB表:

    • 定义DynamoDB表名称:mysql数据库名_数据表

    • 创建DynamoDB表

  2. 循环获取MySQL数据表的数据,加载到DynamoDB

  • 获取所有列数据信息,以及行数

  • 读取数据表的数据

  • 把数据写入DynamoDB

2.2 主函数

main函数功能为调用其他函数

初始化数据库连接,和初始化Amazon DynamoDB客户端

// 初始化数据库连接
	db := Mysql.ConnectDB()
	if db == nil {
		fmt.Printf("init db failed!\n")
		return
	}
	// 初始DynamoDB client
	cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("ap-southeast-1"))
	if err != nil {
		log.Fatalf("unable to load SDK config, %v", err)
	}
	// Using the Config value, create the DynamoDB client
	svc := dynamodb.NewFromConfig(cfg)

函数调用

// 1 读取数据库
Mysql.DatabaseInfo(db)
// 2 读取数据表
Mysql.TableInfo(db, database[i])
// 3 获取表字段
Mysql.TableFiledInfo(db, database[i], table[j])
// 4 创建DynamoDB表
DynamoDB.CreateDynamoDB(svc, field, tableName)
// 5.1 获取所有列数据信息,以及行数
Mysql.TableData(db, field, database[i], table[j])
// 5.3 把数据写入DynamoDB
DynamoDB.PutItemDynamoDB(svc , itemMap, tableName)

三、MySQL函数

3.1 查询所有非系统的数据库

在MySQL数据库中,INFORMATION_SCHEMA.TABLES表存储了MySQL数据库的元数据。

元数据信息主要包括数据库中表信息以及表字段信息,可以从INFORMATION_SCHEMA.TABLES表中查询数据库信息:

SELECT table_schema databaseName
FROM INFORMATION_SCHEMA.TABLES 
WHERE UPPER(table_type)='BASE TABLE'
AND table_schema NOT IN ('mysql','performance_schema','sys')
GROUP BY table_schema
ORDER BY table_schema asc

UPPER(table_type)='BASE TABLE'只选择基础数据库,在MySQL数据库中,人为创建的数据库都为BASE TABLE类型。

在MySQL数据库中,数据库mysql,performance_schema,sys都属于BASE TABLE类型。但是这三个数据库也是MySQL自带的数据库,不是用户数据库,需要排除。

func DatabaseInfo(db *sql.DB) []string {
	sqlStr := `SELECT table_schema databaseName
			FROM INFORMATION_SCHEMA.TABLES 
			WHERE UPPER(table_type)='BASE TABLE'
			AND table_schema NOT IN ('mysql','performance_schema','sys')
			GROUP BY table_schema
			ORDER BY table_schema asc`
	
	rows, err := db.Query(sqlStr)
	// 关闭查询
	defer rows.Close()
	if err != nil {
		fmt.Printf("query table name error!err:%v\n", err)
		return nil
		//panic(err)
	}
	var result []string
	for rows.Next() {
		var tableName string
		err = rows.Scan(&tableName)
		if err != nil {
			fmt.Printf("scan table name error!err:%v\n", err)
			return nil
		}
		result = append(result, tableName)
	}
	return result

}

遍历查询结果,数据库名称以数组形式存储。

3.2 查询数据表信息

数据库中的所有表结构,从INFORMATION_SCHEMA.TABLES表中查询。代码逻辑与查询数据库信息一致

sqlStr查询语句,?传入数据库名称

sqlStr := `SELECT table_name tableName
			FROM INFORMATION_SCHEMA.TABLES 
			WHERE UPPER(table_type)='BASE TABLE'
			AND LOWER(table_schema) = ? 
			GROUP BY table_name
			ORDER BY table_name asc`

遍历查询结果,数据表名称以数组形式存储。

3.3 查询数据表字段信息

数据表中的所有字段结构,从INFORMATION_SCHEMA.TABLES表中查询。代码逻辑与查询数据库信息一致

数据表字段结构体:

  • Fname:表字段名
  • ColumnKey:表字段属性是否为主键(PRI)
  • dataType:表字段类型
type Field struct {
	Fname     string
	ColumnKey string
	dataType  string
}

sqlStr查询语句,?传入数据库和数据表名称

sqlStr := `SELECT COLUMN_NAME fName,COLUMN_KEY columnKey,DATA_TYPE dataType
			FROM information_schema.columns 
			WHERE table_schema = ? AND table_name = ?`

函数返回一个表字段结构体

3.4 查询表所有信息

在go语言原生的github.com/go-sql-driver/mysql中的查询,需要指定与查询结果同样数量的变量才能把查询结果输出。

官方文档说明:https://pkg.go.dev/database/sql#Row.Scan

因此博主通过上个函数获取到的表字段,一个个字段查询,汇总为一个按列查询结构的map类型数据。

  1. 遍历字段数组,通过Query查询出,数据表中该列的值
  2. 利用rows.Next遍历查询结果,rows.Scan获取列值,追加到数组中
  3. 使用map类型,以key(字段名):value[值数组]的方式存储一个表的所有数据
func TableData(db *sql.DB,field []Field,database, table string) (map[string][]string, int) {
	result := make(map[string][]string)
	var rowsLength int
	for i := 0; i < len(field); i++ {
		sqlStr := "SELECT " + field[i].Fname + " from " + database + "." + table
		//fmt.Println(sqlStr)
		rows, err := db.Query(sqlStr)
		if err != nil {
			fmt.Printf("Failed to query table! error: %v\n", err)
			return nil, 0
		}
		defer rows.Close()
		var columnValue string
		var oneResult []string
		for rows.Next() {
			err = rows.Scan(&columnValue)
			if err != nil {
				fmt.Printf("Failed to scan a field in a table!err:%v\n", err)
				return nil, 0
			}
			oneResult = append(oneResult, columnValue)
		}
		if len(oneResult) == 0 {
			fmt.Printf("%v.%v not data!\n", database, table)
			return nil, 0
		}
		result[field[i].Fname] = oneResult
		rowsLength = len(oneResult)
	}
	return result, rowsLength
}
  • db: *sql.DB 数据库连接

  • field: []Field 数据表字段

  • database: string 数据库

  • table: string 数据表

  • return: map[string][]string 返回每一列的数据

四、Amazon DynamoDB

4.1 创建DynamoDB表

在DynamoDB的设计中,只有一个分区键和一个排序键。当然Amazon DynamoDB中,还可以添加全局索引和本地索引,这个方式复杂,在这里只是使用了分区键和排序键

因为DynamoDB只有两个键,并且必须指定一个分区键。

而在MySQL数据库中可能会存在两个以上或无主键的情况,面对这两种情况,博主通过判断前面获取的字段属性值。

如果存在两个主键,就以查询结果前面的两个主键分别作为分区键和排序键;若不存在主键,以查询结果第一个列为分区键。

在这里最好的方式应该是写成一个接口,按实际生产来修改每个数据表转换为DynamoDB后的格式

另外创建表,默认都是String类型,并没有判断原字段的格式。博主理解最好的方法应该是使用Go语言的反射机制来判断转换的DynamoDB字段类型

func CreateDynamoDB(svc *dynamodb.Client, field [] Mysql.Field, tableName string) *dynamodb.CreateTableOutput {
	var attributeDefinitions []types.AttributeDefinition
	var keySchema []types.KeySchemaElement
	for i :=0; i < len(field); i++ {
		if (field[i].ColumnKey == "PRI") && (len(attributeDefinitions) < 1)  {
			// 第一个主键作为分区键
			Attribute := []types.AttributeDefinition{
				{
					AttributeName: aws.String(field[i].Fname),
					AttributeType: types.ScalarAttributeTypeS,
				},
			}
			schemaElement := []types.KeySchemaElement{
				{
					AttributeName: aws.String(field[i].Fname),
					KeyType:       types.KeyTypeHash,
				},
			}
			attributeDefinitions = append(attributeDefinitions, Attribute...)
			keySchema = append(keySchema, schemaElement...)
		} else if (field[i].ColumnKey == "PRI") && (len(attributeDefinitions) >= 1) {
			// 第二个主键作为排序键
			Attribute := []types.AttributeDefinition{
				{
					AttributeName: aws.String(field[i].Fname),
					AttributeType: types.ScalarAttributeTypeS,
				},
			}
			schemaElement := []types.KeySchemaElement{
				{
					AttributeName: aws.String(field[i].Fname),
					KeyType:       types.KeyTypeRange,
				},
			}
			attributeDefinitions = append(attributeDefinitions, Attribute...)
			keySchema = append(keySchema, schemaElement...)
		}
		// 当存在多于两个主键时,只选择前两个主键
		if len(attributeDefinitions) >= 2 {
			fmt.Printf("The database primary key is greater than or equal to 2!tableName:%v\n", tableName)
			break
		}
	}
	// 如果不存在主键,以第一个表字段为DynamoDB的分区键
	if len(attributeDefinitions) == 0 {
		attributeDefinitions = []types.AttributeDefinition{
			{
				AttributeName: aws.String(field[0].Fname),
				AttributeType: types.ScalarAttributeTypeS,
			},
		}
		keySchema = []types.KeySchemaElement{
			{
				AttributeName: aws.String(field[0].Fname),
				KeyType:       types.KeyTypeHash,
			},
		}
		fmt.Printf("No primary key exists in the database!tableName:%v\n", tableName)
	}
	//fmt.Println(attributeDefinitions[1].AttributeName)
	input := &dynamodb.CreateTableInput{
		AttributeDefinitions: attributeDefinitions,
		KeySchema: keySchema,
		ProvisionedThroughput: &types.ProvisionedThroughput{
			ReadCapacityUnits:  aws.Int64(5),
			WriteCapacityUnits: aws.Int64(5),
		},
		TableName: aws.String(tableName),
	}

	result, err := svc.CreateTable(context.TODO(),input)
	if err != nil {
		fmt.Printf("Failed to create DynamoDB! error: %v\n", err)
		return nil
	}
	// CreateTable为异步操作,需要等待一定时间,继续下一步
	time.Sleep(time.Second * 5)

	return result
}

4.2 插入数据

通过遍历获取到MySQL数据表的所有数据,将数据添加到符合DynamoDB格式的map中,调用PutItemInput接口添加数据

添加的数据类型都是String类型,并没有判断原字段的格式。博主理解最好的方法应该是使用Go语言的反射机制来判断转换的DynamoDB字段类型

     for k := 0; k < rowLength; k++ {
				itemMap := make(map[string]types.AttributeValue)
				for itemName, item := range tableData {
					itemMap[itemName] =  &types.AttributeValueMemberS{Value: item[k]}
				}
				// 5.3 把数据写入DynamoDB
				putItemReuslt := DynamoDB.PutItemDynamoDB(svc , itemMap, tableName)
				if putItemReuslt != nil {
					fmt.Println("put Item succeed!")
				} else {
					panic(putItemReuslt)
				}
			}
func PutItemDynamoDB(svc *dynamodb.Client, itemMap map[string]types.AttributeValue, tableName string) *dynamodb.PutItemOutput{

	input := &dynamodb.PutItemInput{
		Item: itemMap,
		ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal,
		TableName:              aws.String(tableName),
	}
	result, err := svc.PutItem(context.TODO(),input)
	if err != nil {
		fmt.Printf("Failed to put Item! error: %v\n", err)
		return nil
	}

	return result
}

附录:

参考

MySQL Driver:https://github.com/Go-SQL-Driver/MySQL/

AWS Go DynamoDB SDKv2:https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/dynamodb#Client.PutItem

代码地址

GitHub:https://github.com/MoGD2018/Go-mysql-convert-to-dynamodb

Gitee:https://gitee.com/MoGD/Go-mysql-convert-to-dynamodb