Plug-in Development Tutorial
Bifrost itself provides an interface for plug-in injection, as well as a test interface for debugging while the plug-in is open
版本
v1.7.x
创建插件目录
in github.com/brokercap/Bifrost/plugin directory to create and the plug-in name consistent directory name
Taking the Redis plugin as an example, create a Redis directory
github.com/brokercap/Bifrost/plugin/redis
path and files
-rwxr-xr-x 1 root root 90 Mar 18 18:38 redis.go
-rwxr-xr-x 1 root root 14732 Mar 27 10:30 redisTest.go
drwxr-xr-x 0 root root 0 Mar 26 18:41 src
drwxr-xr-x 0 root root 0 Mar 26 08:58 www
src path
-rwxr-xr-x 1 root root 5783 Mar 26 18:41 redis.go
www path
-rwxr-xr-x 1 root root 1325 Mar 26 08:58 doc.html
-rwxr-xr-x 1 root root 3864 Mar 18 10:57 redis.html
-rwxr-xr-x 1 root root 1839 Mar 20 08:50 redis.js
** src ** is the directory in which the plugin’s Go source code processing logic is placed. You can change this directory if you wish, as long as you specify this directory at compile time
** www ** is the plugin directory where static files such as HTML and JS are stored. The directory name must be WWW. When the plugin is compiled and run, the previous and current plugin directory and WWW need to be copied to the plugin under the Bifrost running directory
HTML and redis.js must be the same as the name of the plugin. When loading the plugin in the Bifrsot admin interface, it is loaded by the plugin name +.html,+.js.
Load HTML first and then load JS
doc.html is the plug-in manual interface
插件接口
github.com/brokercap/Bifrost/plugin/driver
Register
func init(){
driver.Register("redis",&MyConn{},VERSION,BIFROST_VERION)
}
redis : The name of the plugin, which must be the same as the plugin directory name, and the HTML and JS file names in the WWW directory
VERSION :The current self-developed plug-in version
BIFROST_VERION : The current version of Bifrost is in development
实现接口
SetOption
func (This *Conn) SetOption(uri *string,param map[string]interface{}) {
This.Uri = uri
return
}
Before calling the Open method, the setOption function is called. The URI is the connection URI configured on the interface. Param is a reserved interface that may be used in a later version
Open
func (This *Conn) Open() error {
This.Connect()
return nil
}
When an edge is instantiated, the Open function is called
Close
func (c *PluginDriverInterface) Close() bool {
return true
}
In some cases, Bifrost Server may perform a Close method call on a plug-in’s connection
For example, when the target library URI connection address is changed online, a Close operation will be performed. This operation will not be thread safe when it is taken out of the thread pool
GetUriExample
func (This *Conn) GetUriExample() string{
return "pwd@tcp(127.0.0.1:6379)/0 or 127.0.0.1:6379 or pwd@tcp(127.0.0.1:6379,127.0.0.1:6380)/0 or 127.0.0.1:6379,127.0.0.1:6380"
}
Called when the case is bordered by the display URI in the interface
CheckUri
func (This *Conn) CheckUri() error{
This.Connect()
if This.err != nil{
return This.err
}
This.Close()
return nil
}
When the checkURI is clicked, a call is made to verify that the URI can be connected properly. The setOption function is called first before the call
SetParam
func (This *Conn) SetParam(p interface{}) (interface{},error){
if p == nil{
return nil,fmt.Errorf("param is nil")
}
switch p.(type) {
case *PluginParam:
This.p = p.(*PluginParam)
return p,nil
default:
return This.GetParam(p)
}
}
SetParam function, in every time before submit the data to the plugin, will be called once, such as Insert, Update, Delete, Query, Commit, TimeOutCommit function, before a SetParam function will be invoked
The first time it is executed, the content of the parameter p is:
- Parameter information that the user configures in the plug-in interface. The parameters passed in are the data *.. returned from Result.Data by the plugin js method doGetPluginParam(). Map [string]interface{}
The second time it is executed, the content of the parameter p is:
The last call returned a pointer to the body of the body, so that we can continue to use this pointer, so that every time the plug-in processing data, is specified to a certain synchronized data, the data will not be deranged
Insert
func (This *Conn) Insert(data *driver.PluginDataType,retry bool) (*driver.PluginDataType, *driver.PluginDataType,error) {
return This.Update(data,retry)
}
The INSERT function method is called when the INSERT event is invoked
The data parameter has the site information, the event information, and the Row field is the corresponding value information of the field
The retry parameter code is not submitted more than twice
Return:
LastSuccessCommitData : The Bifrost Server layer will save the site for the data information that is currently synchronized and last successfully processed, if it is a COMMIT event
ErrData : When synchronizing data, error data information is returned to Bifrost Server and will be shown to the user. What error information and which line of data can be conveniently checked
err : error info
Update
func (This *Conn) Update(data *driver.PluginDataType,retry bool) (*driver.PluginDataType, *driver.PluginDataType,error) {
if This.err != nil {
This.ReConnect()
}
index := len(data.Rows)-1
Key := This.getKeyVal(data,index)
var err error
switch This.p.Type {
case "set":
if This.p.ValConfig != ""{
err =This.conn.Set(Key, This.getVal(data,index), time.Duration(This.p.Expir) * time.Second).Err()
}else {
vbyte, _ := json.Marshal(data.Rows[index])
err =This.conn.Set(Key, string(vbyte), time.Duration(This.p.Expir) * time.Second).Err()
}
break
case "list":
return This.SendToList(Key,data)
break
default:
err = fmt.Errorf(This.p.Type+ " not in(set,list)")
break
}
if err != nil {
This.err = err
return nil,data,err
}
return nil,nil,nil
}
The Update function method is called when the Update event is invoked
Update and Insert, Delete events submitted in the Row field information, there is a difference between, the Update event in the Row of the array is at least 2 subscript, even the subscript is on behalf of the old data, is an odd number of new data
Each plug-in can set its own cache to improve performance, plug-in development is very open, as long as the implementation of only the interface, but how the interface processing data, it is up to the plug-in to decide
Del
The Del function method is called when the Delete event is invoked
Query
When an SQL statement event occurs, the Query function method is called
In Binlog, all DDL statements are stored as query events, regardless of whether your format is row,mix, or statement
ClickHouse and MySQL plugins trigger DDL synchronization in the Query function
Commit
When a Commit event or a Query event is committed, the SQL content is also committed to the Commit function method
Some versions of MySQL do not have a Commit event, but they do have a Query event, which records the Commit field
TimeOutCommit
In the synchronization configuration, TimeOutCommit is called once if data was delivered to the plug-in 5 seconds ago and no data was delivered for 5 consecutive seconds
If the TimeoutCommit function returns ERR information to the Bifrost Server layer, the next commit will be made. TimeoutCommit directly returns nil to the Bifrost Server, and the Server determines that the plug-in layer has no more data to process and does not commit again
插件Driver接口数据结构
type PluginDataType struct {
Timestamp uint32
EventType string
Rows []map[string]interface{}
Query string
SchemaName string
TableName string
BinlogFileNum int
BinlogPosition uint32
Gtid string
Pri []string
EventID uint64
ColumnMapping map[string]string
}
TimeStamp : sync timestamp
EventType :insert,uupdate,delete,sql
Rows :In the event type as an insert, update, delete, are in the array data, insert, delete, only have Rows [0], but at the time of the update type, Rows [0] to update the previous data, Rows [1] for updated data
Query : sql
SchemaName: 数据名名称
TableName : 表名
BinlogFileNum : Mysql > delete ‘000’ from ‘mysql-bin.000070’ from ‘mysql-bin.000070’
BinlogPosition : position
Gtid : GTID
EventID : A self-incrementing ID maintained by the Bifrost Server layer
Pri : []string
ColumnMapping : Type parameters for each field https://github.com/brokercap/Bifrost/blob/v1.7.x/plugin/driver/json.go(https://github.com/brokercap/Bifrost/blob/v1.7.x/plugin/driver/json.go)
Plug-in Driver other functions
func TransfeResult(val string, data *PluginDataType,rowIndex int) interface{} {
val : src string
data : data
rowIndex : which index of data.Rows
{$TableName} : data.TableName的值
{$SchemaName} : data.SchemaName
{$EventType} : data.EventType
{$FieldName} : Rows[rowIndex][FieldName]
Unit testing
github.com/brokercap/Bifrost/sdk/pluginTestData
The Bifrost PlugInNavigation Data package provides simulation data for adding, updating, deleting, and SQL events
pluginTestData Funtions
pluginTestData init
event = pluginTestData.NewEvent()
event.SetSchema(SchemaName)
event.SetTable(TableName)
GetTestInsertData
event.GetTestInsertData()
GetTestUpdateData
event.GetTestUpdateData()
GetTestDeleteData
event.GetTestDeleteData()
GetTestQueryData
event.GetTestQueryData()
GetTestCommitData
event.GetTestCommitData()
Verify that the data is all right
func (This *Event) CheckData(src map[string]interface{},destJsonString string) (map[string][]string,error){
Use:
checkResult,err = e.CheckData(insertData.Rows[0],getFromClickHouseData)
Simulation formal environment brush data test plug-in
func TestSyncLikeProduct(t *testing.T) {
p := pluginTestData.NewPlugin("redis",url)
err0 := p.SetParam(getParam())
if err0 != nil{
t.Fatal(err0)
}
var n uint = 10
err := p.DoTestStart(n)
if err != nil{
t.Fatal(err)
}else{
t.Log("test success")
}
}
Simulate formal environment performance tests
Only one piece of data is generated randomly, and it is committed in a loop
func TestSyncLikeProductForSpeed(t *testing.T) {
p := pluginTestData.NewPlugin("redis",url)
err0 := p.SetParam(getParam())
p.SetEventType(pluginTestData.INSERT)
if err0 != nil{
t.Fatal(err0)
}
var n uint = 100
err := p.DoTestStartForSpeed(n)
if err != nil{
t.Fatal(err)
}else{
t.Log("test success")
}
}
Build
modify Bifrost/plugin/load/import_toserver.go
github.com/brokercap/Bifrost/plugin/import_toserver.go
build.sh