插件开发教程
Bifrost 本身提供了相关的接口进行插件注入,也同样提供了相关的测试接口,方便插件开过程中进行调试
版本
v1.7.x
创建插件目录
- 在 github.com/brokercap/Bifrost/plugin 目录下创建和插件名称一致的目录名称 (当然这个你也可以在自己的新建一个项目开发插件,但这个目录名必须和插件名保持一致)
拿redis插件作为案例来讲,创建redis 目录
github.com/brokercap/Bifrost/plugin/redis
在插件目录下子目录及文件
-rwxr-xr-x 1 root root 90 Mar 18 18:38 redis.go
-rwxr-xr-x 1 root root 14732 Mar 27 10:30 redisTest.go
drwxr-xr-x 0 root root 0 Mar 26 18:41 src
drwxr-xr-x 0 root root 0 Mar 26 08:58 www
src 目录
-rwxr-xr-x 1 root root 5783 Mar 26 18:41 redis.go
www 目录
-rwxr-xr-x 1 root root 1325 Mar 26 08:58 doc.html
-rwxr-xr-x 1 root root 3864 Mar 18 10:57 redis.html
-rwxr-xr-x 1 root root 1839 Mar 20 08:50 redis.js
src 是插件go源码处理逻辑所放目录,当然这个你可以根据自己的意愿改,只要在编译的时候,指定这个目录就行了
www 是插件存放html及js等静态文件的目录,这个目录名必须为www, 当编译完运行的时候,需要前 当前插件目录及www拷贝到 Bifrost 运行目录下的 plugin 下
redis.html 及 redis.js 两个文件名必须和插件名保持一致,在Bifrsot 管理界面进行加载插件界面的时候,是通过插件名 + .html ,+ .js 进行加载的, 先加载html 再加载 js
doc.html 为插件使用手册界面
插件接口
github.com/brokercap/Bifrost/plugin/driver
往bifrost注册插件
func init(){ driver.Register("redis",&MyConn{},VERSION,BIFROST_VERION) }
redis : 插件名称,这个名称必须和插件目录名保持一致,以及www目录下的html及js文件名 &MyConn{} : 插件提供bifrost调用的 对象 VERSION : 当前自己开发的插件版本 BIFROST_VERION : 当前开发时候bifrost的版本
实现接口
SetOption
func (This *Conn) SetOption(uri *string,param map[string]interface{}) {
This.Uri = uri
return
}
在调用 Open 方法之前,会先调用 SetOption 函数, uri 是界面上配置的 连接 uri,param是个预留接口,在后续的版本中,可能会使用
Open
func (This *Conn) Open() error {
This.Connect()
return nil
}
在实例化边的时候,会调用 Open 函数
Close
func (c *PluginDriverInterface) Close() bool {
return true
}
Bifrost server 在某些情况下,可能会对插件的连接进行Close方法调用执行
比如当在线修改了 目标库 Uri 连接地址的时候,会进行一次Close 操作,这个操作在从线程池中拿出来的时候,被操作的,并不会有线程安全的问题,不用担心
GetUriExample
func (This *Conn) GetUriExample() string{
return "pwd@tcp(127.0.0.1:6379)/0 or 127.0.0.1:6379 or pwd@tcp(127.0.0.1:6379,127.0.0.1:6380)/0 or 127.0.0.1:6379,127.0.0.1:6380"
}
在界面,用显示 uri 边接案例的时候,调用的
CheckUri
func (This *Conn) CheckUri() error{
This.Connect()
if This.err != nil{
return This.err
}
This.Close()
return nil
}
界面点击 CheckUri 的时候,进行调用验证 uri 是否能连接正常的,调用之前,会优先调用 SetOption 函数
SetParam
func (This *Conn) SetParam(p interface{}) (interface{},error){
if p == nil{
return nil,fmt.Errorf("param is nil")
}
switch p.(type) {
case *PluginParam:
This.p = p.(*PluginParam)
return p,nil
default:
return This.GetParam(p)
}
}
SetParam 函数,在每次提交数据到插件之前,都会调用一次,比如 Insert,Update,Delete,Query,Commit,TimeOutCommit 函数之前,都会被调用一次SetParam函数
参数 P 是动态的内容
第一次被执行的时候,参数p 的内容为:
用户在插件界面配置的参数信息。传进来的参数是由插件js ,方法doGetPluginParam() 里返回的result.data 的数据.. 数据结构为 map[string]interface{}
将 map 参数转成插件代码里的结构体后,再将结构提指针返回回去,待下一次再提交进来
第二次被被执行的时候,参数p 的内容为:
上一次调用返回的结体体指针,这样就能继续用这个指针的数据,这样保存每一次插件处理的数据,都是指定某一个同步的数据了,数据不会错乱
Insert
func (This *Conn) Insert(data *driver.PluginDataType,retry bool) (*driver.PluginDataType, *driver.PluginDataType,error) {
return This.Update(data,retry)
}
当 Insert 事件的时候,会调用 Insert 函数方法
data 参数中有位点信息,事件信息,Row 字段是字段对应的值 信息
retry 参数代码是是不是第二次以上重复提交进来的
返回值:
LastSuccessCommitData : 当前同步,最后成功处理的数据信息,假如是 commit事件的话,bifrost server 层将会保存位点
ErrData : 当前同步,同步数据的时候,出错的数据信息,返回给bifrost server, 将会展示给用户,什么错误信息,哪一行数据,可以方便进行排查
err : 同步出错的错误内容
Update
func (This *Conn) Update(data *driver.PluginDataType,retry bool) (*driver.PluginDataType, *driver.PluginDataType,error) {
if This.err != nil {
This.ReConnect()
}
index := len(data.Rows)-1
Key := This.getKeyVal(data,index)
var err error
switch This.p.Type {
case "set":
if This.p.ValConfig != ""{
err =This.conn.Set(Key, This.getVal(data,index), time.Duration(This.p.Expir) * time.Second).Err()
}else {
vbyte, _ := json.Marshal(data.Rows[index])
err =This.conn.Set(Key, string(vbyte), time.Duration(This.p.Expir) * time.Second).Err()
}
break
case "list":
return This.SendToList(Key,data)
break
default:
err = fmt.Errorf(This.p.Type+ " not in(set,list)")
break
}
if err != nil {
This.err = err
return nil,data,err
}
return nil,nil,nil
}
当 Update 事件的时候,会调用 Update 函数方法
Update 事件和Insert,Delete 事件提交进来的 Row 字段信息,是有区别的, Update 事件的Row这个数组里是最少有2个下标的,偶数下标代表是旧数据,奇数是新数据
每个插件,都可以自行设置缓存等提升性能,插件开发很开放,只要实现只个接口,但是接口怎么处理数据,由插件自行决定
Del
当 Delete 事件的时候,会调用 Del 函数方法
Query
当是 sql 语句事件的时候,会调用 Query 函数方法
在Binlog里,不管你 format 是row,mix 还是 statement,所有 DDl 语句都是以 query事件保存
所以像 ClickHouse,MySQL 插件处理 DDL 同步的时候,都是在 Query函数里进行触发处理的
Commit
当 Commit事件或者 是Query事件,但是SQL内容是 COMMIT 字样的时候,也会提交到 Commit函数方法里来
因为有部分 MySQL 版本中没有 Commit的单独事件,但是有 Query事件,但是Query里记录的是 COMMIT字段
TimeOutCommit
默认配置中在 同步配置中,假如5秒前有数据提交到插件里来,并紧接着 连续 5 秒 没有数据传的时候,会调用一次 TimeOutCommit 函数
假如 TimeOutCommit 函数返回了 Err 信息给 bifrost server 层,那将下一次会继续提交进来,直接 TimeOutCommit 返回 nil 给 bifrost server,server 判定 插件层没有需要继续处理的数据了,就不会再次提交了
插件Driver接口数据结构
type PluginDataType struct {
Timestamp uint32
EventType string
Rows []map[string]interface{}
Query string
SchemaName string
TableName string
BinlogFileNum int
BinlogPosition uint32
Gtid string
Pri []string
EventID uint64
ColumnMapping map[string]string
}
TimeStamp : 事件发生的时间戳
EventType :事件类型,insert,uupdate,delete,sql
Rows :在事件类型为 insert ,update,delete 的时候,这个数组里会有数据,insert , delete ,只有有Rows[0],但是在update类型的时候,Rows[0]为更新之前的数据,Rows[1] 为更新之后的数据。
Query : 当EventType == sql 的时候,这个字段不为空,存储的是执行的sql语句
SchemaName: 数据名名称
TableName : 表名
BinlogFileNum : 二进制日志文件后缀名,去掉了多余了000,比如二进制文件为 mysql-bin.000070,而这个字段存的是70
BinlogPosition : 二进制位点
Gtid : GTID 信息
EventID : 由 Bifrost server 层维护的一个 自增ID
Pri : 数据源表的主键字段名列表
ColumnMapping : 每个字段对应的类型参数 参考 https://github.com/brokercap/Bifrost/blob/v1.7.x/plugin/driver/json.go(https://github.com/brokercap/Bifrost/blob/v1.7.x/plugin/driver/json.go)
插件Driver其他函数
func TransfeResult(val string, data *PluginDataType,rowIndex int) interface{} {
这个函数是由插件调用,将用户input输入的参数里的{$SchemaName} 等标签进行替换的方法
val : 拥有标签的字符串
data : 事件数据
rowIndex : 这个为Rows[] 里的下标值,代表用哪一个Rows[rowIndex]里的数据进行替换指定的标签
{$TableName} : 会被替换成 data.TableName的值
{$SchemaName} : 会被替换成 data.SchemaName
{$EventType} : 会被替换成 data.EventType
{$字段名} : 会被替换成 个Rows[rowIndex][字段名] 的值
插件单元测试
github.com/brokercap/Bifrost/sdk/pluginTestData
Bifrost pluginTestData 包提供了 新增,更新,删除,及 SQL 事件的模拟数据
pluginTestData方法
pluginTestData初始化
event = pluginTestData.NewEvent()
event.SetSchema(SchemaName)
event.SetTable(TableName)
获取Insert事件数据
event.GetTestInsertData()
获取Update事件数据
event.GetTestUpdateData()
获取Delete事件数据
event.GetTestDeleteData()
获取Query事件数据
event.GetTestQueryData()
获取Commit事件数据
event.GetTestCommitData()
验证数据是否准确
func (This *Event) CheckData(src map[string]interface{},destJsonString string) (map[string][]string,error){
使用:
checkResult,err = e.CheckData(insertData.Rows[0],getFromClickHouseData)
模拟正式环境刷数据测试插件
func TestSyncLikeProduct(t *testing.T) {
p := pluginTestData.NewPlugin("redis",url)
err0 := p.SetParam(getParam())
if err0 != nil{
t.Fatal(err0)
}
var n uint = 10
err := p.DoTestStart(n)
if err != nil{
t.Fatal(err)
}else{
t.Log("test success")
}
}
模拟正式环境性能测试
只随机生成一条数据。循环提交
func TestSyncLikeProductForSpeed(t *testing.T) {
p := pluginTestData.NewPlugin("redis",url)
err0 := p.SetParam(getParam())
p.SetEventType(pluginTestData.INSERT)
if err0 != nil{
t.Fatal(err0)
}
var n uint = 100
err := p.DoTestStartForSpeed(n)
if err != nil{
t.Fatal(err)
}else{
t.Log("test success")
}
}
编译
和 Bifrost 源码一起静态编译
修改Bifrost/plugin/load/import_toserver.go文件
github.com/brokercap/Bifrost/plugin/import_toserver.go
在import_toserver.go 文件里 import 插件源码
go build ./Bifrost.go
将插目录下的 www 目录拷贝到 github.com/brokercap/Bifrost/plugin/插件名/www
假如采用 ./build.sh 或者 make install 方式编译,不用自行拷 www 目录,全由于脚本打包生成