Bifrost Document

Version 1.6.x

Bifrost ---- 面向生产环境的 MySQL 同步到 Redis,MongoDB,ClickHouse 等服务的异构中间件

插件开发教程

Bifrost 本身提供了相关的接口进行插件注入,也同样提供了相关的测试接口,方便插件开过程中进行调试

版本

v1.7.x

创建插件目录

  • 在 github.com/brokercap/Bifrost/plugin 目录下创建和插件名称一致的目录名称 (当然这个你也可以在自己的新建一个项目开发插件,但这个目录名必须和插件名保持一致)

拿redis插件作为案例来讲,创建redis 目录

github.com/brokercap/Bifrost/plugin/redis

在插件目录下子目录及文件

-rwxr-xr-x 1 root root    90 Mar 18 18:38 redis.go
-rwxr-xr-x 1 root root 14732 Mar 27 10:30 redisTest.go
drwxr-xr-x 0 root root     0 Mar 26 18:41 src
drwxr-xr-x 0 root root     0 Mar 26 08:58 www

src 目录

-rwxr-xr-x 1 root root 5783 Mar 26 18:41 redis.go

www 目录

-rwxr-xr-x 1 root root 1325 Mar 26 08:58 doc.html
-rwxr-xr-x 1 root root 3864 Mar 18 10:57 redis.html
-rwxr-xr-x 1 root root 1839 Mar 20 08:50 redis.js

src 是插件go源码处理逻辑所放目录,当然这个你可以根据自己的意愿改,只要在编译的时候,指定这个目录就行了

www 是插件存放html及js等静态文件的目录,这个目录名必须为www, 当编译完运行的时候,需要前 当前插件目录及www拷贝到 Bifrost 运行目录下的 plugin 下

redis.html 及 redis.js 两个文件名必须和插件名保持一致,在Bifrsot 管理界面进行加载插件界面的时候,是通过插件名 + .html ,+ .js 进行加载的, 先加载html 再加载 js

doc.html 为插件使用手册界面

插件接口

github.com/brokercap/Bifrost/plugin/driver
  • 往bifrost注册插件

    func init(){
    driver.Register("redis",&MyConn{},VERSION,BIFROST_VERION)
    }
    
    

redis : 插件名称,这个名称必须和插件目录名保持一致,以及www目录下的html及js文件名 &MyConn{} : 插件提供bifrost调用的 对象 VERSION : 当前自己开发的插件版本 BIFROST_VERION : 当前开发时候bifrost的版本

实现接口

SetOption
func (This *Conn) SetOption(uri *string,param map[string]interface{}) {
	This.Uri = uri
	return
}

在调用 Open 方法之前,会先调用 SetOption 函数, uri 是界面上配置的 连接 uri,param是个预留接口,在后续的版本中,可能会使用

Open
func (This *Conn) Open() error {
	This.Connect()
	return nil
}

在实例化边的时候,会调用 Open 函数

Close
func (c *PluginDriverInterface) Close() bool {
	return true
}

Bifrost server 在某些情况下,可能会对插件的连接进行Close方法调用执行

比如当在线修改了 目标库 Uri 连接地址的时候,会进行一次Close 操作,这个操作在从线程池中拿出来的时候,被操作的,并不会有线程安全的问题,不用担心

GetUriExample
func (This *Conn) GetUriExample() string{
	return "pwd@tcp(127.0.0.1:6379)/0 or 127.0.0.1:6379 or pwd@tcp(127.0.0.1:6379,127.0.0.1:6380)/0 or 127.0.0.1:6379,127.0.0.1:6380"
}

在界面,用显示 uri 边接案例的时候,调用的

CheckUri
func (This *Conn) CheckUri() error{
	This.Connect()
	if This.err != nil{
		return This.err
	}
	This.Close()
	return nil
}

界面点击 CheckUri 的时候,进行调用验证 uri 是否能连接正常的,调用之前,会优先调用 SetOption 函数

SetParam
func (This *Conn) SetParam(p interface{}) (interface{},error){
	if p == nil{
		return nil,fmt.Errorf("param is nil")
	}
	switch p.(type) {
	case *PluginParam:
		This.p = p.(*PluginParam)
		return p,nil
	default:
		return This.GetParam(p)
	}
}

SetParam 函数,在每次提交数据到插件之前,都会调用一次,比如 Insert,Update,Delete,Query,Commit,TimeOutCommit 函数之前,都会被调用一次SetParam函数

参数 P 是动态的内容

第一次被执行的时候,参数p 的内容为:

用户在插件界面配置的参数信息。传进来的参数是由插件js ,方法doGetPluginParam() 里返回的result.data 的数据.. 数据结构为 map[string]interface{}

将 map 参数转成插件代码里的结构体后,再将结构提指针返回回去,待下一次再提交进来

第二次被被执行的时候,参数p 的内容为:

上一次调用返回的结体体指针,这样就能继续用这个指针的数据,这样保存每一次插件处理的数据,都是指定某一个同步的数据了,数据不会错乱

Insert
func (This *Conn) Insert(data *driver.PluginDataType,retry bool) (*driver.PluginDataType, *driver.PluginDataType,error) {
	return This.Update(data,retry)
}

当 Insert 事件的时候,会调用 Insert 函数方法

data 参数中有位点信息,事件信息,Row 字段是字段对应的值 信息

retry 参数代码是是不是第二次以上重复提交进来的

返回值:

LastSuccessCommitData : 当前同步,最后成功处理的数据信息,假如是 commit事件的话,bifrost server 层将会保存位点

ErrData : 当前同步,同步数据的时候,出错的数据信息,返回给bifrost server, 将会展示给用户,什么错误信息,哪一行数据,可以方便进行排查

err : 同步出错的错误内容

Update
func (This *Conn) Update(data *driver.PluginDataType,retry bool) (*driver.PluginDataType, *driver.PluginDataType,error) {
	if This.err != nil {
		This.ReConnect()
	}
	index := len(data.Rows)-1
	Key := This.getKeyVal(data,index)
	var err error
	switch This.p.Type {
	case "set":
		if This.p.ValConfig != ""{
			err =This.conn.Set(Key, This.getVal(data,index), time.Duration(This.p.Expir) * time.Second).Err()
		}else {
			vbyte, _ := json.Marshal(data.Rows[index])
			err =This.conn.Set(Key, string(vbyte), time.Duration(This.p.Expir) * time.Second).Err()
		}
		break
	case "list":
		return This.SendToList(Key,data)
		break
	default:
		err = fmt.Errorf(This.p.Type+ " not in(set,list)")
		break
	}

	if err != nil {
		This.err = err
		return nil,data,err
	}
	return nil,nil,nil
}

当 Update 事件的时候,会调用 Update 函数方法

Update 事件和Insert,Delete 事件提交进来的 Row 字段信息,是有区别的, Update 事件的Row这个数组里是最少有2个下标的,偶数下标代表是旧数据,奇数是新数据

每个插件,都可以自行设置缓存等提升性能,插件开发很开放,只要实现只个接口,但是接口怎么处理数据,由插件自行决定

Del

当 Delete 事件的时候,会调用 Del 函数方法

Query

当是 sql 语句事件的时候,会调用 Query 函数方法

在Binlog里,不管你 format 是row,mix 还是 statement,所有 DDl 语句都是以 query事件保存

所以像 ClickHouse,MySQL 插件处理 DDL 同步的时候,都是在 Query函数里进行触发处理的

Commit

当 Commit事件或者 是Query事件,但是SQL内容是 COMMIT 字样的时候,也会提交到 Commit函数方法里来

因为有部分 MySQL 版本中没有 Commit的单独事件,但是有 Query事件,但是Query里记录的是 COMMIT字段

TimeOutCommit

默认配置中在 同步配置中,假如5秒前有数据提交到插件里来,并紧接着 连续 5 秒 没有数据传的时候,会调用一次 TimeOutCommit 函数

假如 TimeOutCommit 函数返回了 Err 信息给 bifrost server 层,那将下一次会继续提交进来,直接 TimeOutCommit 返回 nil 给 bifrost server,server 判定 插件层没有需要继续处理的数据了,就不会再次提交了

插件Driver接口数据结构

type PluginDataType struct {
	Timestamp 		uint32
	EventType 		string
	Rows            []map[string]interface{}
	Query          	string
	SchemaName     	string
	TableName      	string
	BinlogFileNum 	int
	BinlogPosition 	uint32
	Gtid			string
	Pri				[]string
	EventID			uint64
	ColumnMapping   map[string]string
}

TimeStamp : 事件发生的时间戳

EventType :事件类型,insert,uupdate,delete,sql

Rows :在事件类型为 insert ,update,delete 的时候,这个数组里会有数据,insert , delete ,只有有Rows[0],但是在update类型的时候,Rows[0]为更新之前的数据,Rows[1] 为更新之后的数据。

Query : 当EventType == sql 的时候,这个字段不为空,存储的是执行的sql语句

SchemaName: 数据名名称

TableName : 表名

BinlogFileNum : 二进制日志文件后缀名,去掉了多余了000,比如二进制文件为 mysql-bin.000070,而这个字段存的是70

BinlogPosition : 二进制位点

Gtid : GTID 信息

EventID : 由 Bifrost server 层维护的一个 自增ID

Pri : 数据源表的主键字段名列表

ColumnMapping : 每个字段对应的类型参数 参考 https://github.com/brokercap/Bifrost/blob/v1.7.x/plugin/driver/json.go(https://github.com/brokercap/Bifrost/blob/v1.7.x/plugin/driver/json.go)

插件Driver其他函数

func TransfeResult(val string, data *PluginDataType,rowIndex int) interface{} {

这个函数是由插件调用,将用户input输入的参数里的{$SchemaName} 等标签进行替换的方法

val : 拥有标签的字符串

data : 事件数据

rowIndex : 这个为Rows[] 里的下标值,代表用哪一个Rows[rowIndex]里的数据进行替换指定的标签

{$TableName} : 会被替换成 data.TableName的值

{$SchemaName} : 会被替换成 data.SchemaName

{$EventType} : 会被替换成 data.EventType

{$字段名} : 会被替换成 个Rows[rowIndex][字段名] 的值

插件单元测试

github.com/brokercap/Bifrost/sdk/pluginTestData

Bifrost pluginTestData 包提供了 新增,更新,删除,及 SQL 事件的模拟数据

pluginTestData方法

pluginTestData初始化
	event = pluginTestData.NewEvent()
	event.SetSchema(SchemaName)
	event.SetTable(TableName)
获取Insert事件数据
event.GetTestInsertData()

获取Update事件数据
event.GetTestUpdateData()

获取Delete事件数据
event.GetTestDeleteData()

获取Query事件数据
event.GetTestQueryData()

获取Commit事件数据
event.GetTestCommitData()

验证数据是否准确
func (This *Event) CheckData(src map[string]interface{},destJsonString string) (map[string][]string,error){

使用:

checkResult,err = e.CheckData(insertData.Rows[0],getFromClickHouseData)

模拟正式环境刷数据测试插件

func TestSyncLikeProduct(t *testing.T)  {
	p := pluginTestData.NewPlugin("redis",url)
	err0 := p.SetParam(getParam())
	if err0 != nil{
		t.Fatal(err0)
	}

	var n uint = 10
	err := p.DoTestStart(n)

	if err != nil{
		t.Fatal(err)
	}else{
		t.Log("test success")
	}
}

模拟正式环境性能测试

只随机生成一条数据。循环提交


func TestSyncLikeProductForSpeed(t *testing.T)  {
	p := pluginTestData.NewPlugin("redis",url)
	err0 := p.SetParam(getParam())
	p.SetEventType(pluginTestData.INSERT)
	if err0 != nil{
		t.Fatal(err0)
	}

	var n uint = 100
	err := p.DoTestStartForSpeed(n)

	if err != nil{
		t.Fatal(err)
	}else{
		t.Log("test success")
	}

}

编译

和 Bifrost 源码一起静态编译

  1. 修改Bifrost/plugin/load/import_toserver.go文件

    github.com/brokercap/Bifrost/plugin/import_toserver.go
    

在import_toserver.go 文件里 import 插件源码

  1. go build ./Bifrost.go

  2. 将插目录下的 www 目录拷贝到 github.com/brokercap/Bifrost/plugin/插件名/www

假如采用 ./build.sh 或者 make install 方式编译,不用自行拷 www 目录,全由于脚本打包生成

Last updated on 22 Jan 2021
Edit on GitHub