Bifrost Document

Version 1.6.x

Bifrost ---- A heterogeneous middleware which can synchronize MySQL binlog data To Redis,MongoDB,ClickHouse...

Plug-in Development Tutorial

Bifrost itself provides an interface for plug-in injection, as well as a test interface for debugging while the plug-in is open

版本

v1.7.x

创建插件目录

in github.com/brokercap/Bifrost/plugin directory to create and the plug-in name consistent directory name

Taking the Redis plugin as an example, create a Redis directory

github.com/brokercap/Bifrost/plugin/redis

path and files

-rwxr-xr-x 1 root root    90 Mar 18 18:38 redis.go
-rwxr-xr-x 1 root root 14732 Mar 27 10:30 redisTest.go
drwxr-xr-x 0 root root     0 Mar 26 18:41 src
drwxr-xr-x 0 root root     0 Mar 26 08:58 www

src path

-rwxr-xr-x 1 root root 5783 Mar 26 18:41 redis.go

www path

-rwxr-xr-x 1 root root 1325 Mar 26 08:58 doc.html
-rwxr-xr-x 1 root root 3864 Mar 18 10:57 redis.html
-rwxr-xr-x 1 root root 1839 Mar 20 08:50 redis.js

** src ** is the directory in which the plugin’s Go source code processing logic is placed. You can change this directory if you wish, as long as you specify this directory at compile time

** www ** is the plugin directory where static files such as HTML and JS are stored. The directory name must be WWW. When the plugin is compiled and run, the previous and current plugin directory and WWW need to be copied to the plugin under the Bifrost running directory

HTML and redis.js must be the same as the name of the plugin. When loading the plugin in the Bifrsot admin interface, it is loaded by the plugin name +.html,+.js.

Load HTML first and then load JS

doc.html is the plug-in manual interface

插件接口

github.com/brokercap/Bifrost/plugin/driver

Register

func init(){
    driver.Register("redis",&MyConn{},VERSION,BIFROST_VERION)
}

redis : The name of the plugin, which must be the same as the plugin directory name, and the HTML and JS file names in the WWW directory

VERSION :The current self-developed plug-in version

BIFROST_VERION : The current version of Bifrost is in development

实现接口

SetOption
func (This *Conn) SetOption(uri *string,param map[string]interface{}) {
	This.Uri = uri
	return
}

Before calling the Open method, the setOption function is called. The URI is the connection URI configured on the interface. Param is a reserved interface that may be used in a later version

Open
func (This *Conn) Open() error {
	This.Connect()
	return nil
}

When an edge is instantiated, the Open function is called

Close
func (c *PluginDriverInterface) Close() bool {
	return true
}

In some cases, Bifrost Server may perform a Close method call on a plug-in’s connection

For example, when the target library URI connection address is changed online, a Close operation will be performed. This operation will not be thread safe when it is taken out of the thread pool

GetUriExample
func (This *Conn) GetUriExample() string{
	return "pwd@tcp(127.0.0.1:6379)/0 or 127.0.0.1:6379 or pwd@tcp(127.0.0.1:6379,127.0.0.1:6380)/0 or 127.0.0.1:6379,127.0.0.1:6380"
}

Called when the case is bordered by the display URI in the interface

CheckUri
func (This *Conn) CheckUri() error{
	This.Connect()
	if This.err != nil{
		return This.err
	}
	This.Close()
	return nil
}

When the checkURI is clicked, a call is made to verify that the URI can be connected properly. The setOption function is called first before the call

SetParam
func (This *Conn) SetParam(p interface{}) (interface{},error){
	if p == nil{
		return nil,fmt.Errorf("param is nil")
	}
	switch p.(type) {
	case *PluginParam:
		This.p = p.(*PluginParam)
		return p,nil
	default:
		return This.GetParam(p)
	}
}

SetParam function, in every time before submit the data to the plugin, will be called once, such as Insert, Update, Delete, Query, Commit, TimeOutCommit function, before a SetParam function will be invoked

The first time it is executed, the content of the parameter p is:

  • Parameter information that the user configures in the plug-in interface. The parameters passed in are the data *.. returned from Result.Data by the plugin js method doGetPluginParam(). Map [string]interface{}

The second time it is executed, the content of the parameter p is:

The last call returned a pointer to the body of the body, so that we can continue to use this pointer, so that every time the plug-in processing data, is specified to a certain synchronized data, the data will not be deranged

Insert
func (This *Conn) Insert(data *driver.PluginDataType,retry bool) (*driver.PluginDataType, *driver.PluginDataType,error) {
	return This.Update(data,retry)
}

The INSERT function method is called when the INSERT event is invoked

The data parameter has the site information, the event information, and the Row field is the corresponding value information of the field

The retry parameter code is not submitted more than twice

Return:

LastSuccessCommitData : The Bifrost Server layer will save the site for the data information that is currently synchronized and last successfully processed, if it is a COMMIT event

ErrData : When synchronizing data, error data information is returned to Bifrost Server and will be shown to the user. What error information and which line of data can be conveniently checked

err : error info

Update
func (This *Conn) Update(data *driver.PluginDataType,retry bool) (*driver.PluginDataType, *driver.PluginDataType,error) {
	if This.err != nil {
		This.ReConnect()
	}
	index := len(data.Rows)-1
	Key := This.getKeyVal(data,index)
	var err error
	switch This.p.Type {
	case "set":
		if This.p.ValConfig != ""{
			err =This.conn.Set(Key, This.getVal(data,index), time.Duration(This.p.Expir) * time.Second).Err()
		}else {
			vbyte, _ := json.Marshal(data.Rows[index])
			err =This.conn.Set(Key, string(vbyte), time.Duration(This.p.Expir) * time.Second).Err()
		}
		break
	case "list":
		return This.SendToList(Key,data)
		break
	default:
		err = fmt.Errorf(This.p.Type+ " not in(set,list)")
		break
	}

	if err != nil {
		This.err = err
		return nil,data,err
	}
	return nil,nil,nil
}

The Update function method is called when the Update event is invoked

Update and Insert, Delete events submitted in the Row field information, there is a difference between, the Update event in the Row of the array is at least 2 subscript, even the subscript is on behalf of the old data, is an odd number of new data

Each plug-in can set its own cache to improve performance, plug-in development is very open, as long as the implementation of only the interface, but how the interface processing data, it is up to the plug-in to decide

Del

The Del function method is called when the Delete event is invoked

Query

When an SQL statement event occurs, the Query function method is called

In Binlog, all DDL statements are stored as query events, regardless of whether your format is row,mix, or statement

ClickHouse and MySQL plugins trigger DDL synchronization in the Query function

Commit

When a Commit event or a Query event is committed, the SQL content is also committed to the Commit function method

Some versions of MySQL do not have a Commit event, but they do have a Query event, which records the Commit field

TimeOutCommit

In the synchronization configuration, TimeOutCommit is called once if data was delivered to the plug-in 5 seconds ago and no data was delivered for 5 consecutive seconds

If the TimeoutCommit function returns ERR information to the Bifrost Server layer, the next commit will be made. TimeoutCommit directly returns nil to the Bifrost Server, and the Server determines that the plug-in layer has no more data to process and does not commit again

插件Driver接口数据结构

type PluginDataType struct {
	Timestamp 		uint32
	EventType 		string
	Rows            []map[string]interface{}
	Query          	string
	SchemaName     	string
	TableName      	string
	BinlogFileNum 	int
	BinlogPosition 	uint32
	Gtid			string
	Pri				[]string
	EventID			uint64
	ColumnMapping   map[string]string
}

TimeStamp : sync timestamp

EventType :insert,uupdate,delete,sql

Rows :In the event type as an insert, update, delete, are in the array data, insert, delete, only have Rows [0], but at the time of the update type, Rows [0] to update the previous data, Rows [1] for updated data

Query : sql

SchemaName: 数据名名称

TableName : 表名

BinlogFileNum : Mysql > delete ‘000’ from ‘mysql-bin.000070’ from ‘mysql-bin.000070’

BinlogPosition : position

Gtid : GTID

EventID : A self-incrementing ID maintained by the Bifrost Server layer

Pri : []string

ColumnMapping : Type parameters for each field https://github.com/brokercap/Bifrost/blob/v1.7.x/plugin/driver/json.go(https://github.com/brokercap/Bifrost/blob/v1.7.x/plugin/driver/json.go)

Plug-in Driver other functions

func TransfeResult(val string, data *PluginDataType,rowIndex int) interface{} {

val : src string

data : data

rowIndex : which index of data.Rows

{$TableName} : data.TableName的值

{$SchemaName} : data.SchemaName

{$EventType} : data.EventType

{$FieldName} : Rows[rowIndex][FieldName]

Unit testing

github.com/brokercap/Bifrost/sdk/pluginTestData

The Bifrost PlugInNavigation Data package provides simulation data for adding, updating, deleting, and SQL events

pluginTestData Funtions

pluginTestData init
	event = pluginTestData.NewEvent()
	event.SetSchema(SchemaName)
	event.SetTable(TableName)
GetTestInsertData
event.GetTestInsertData()

GetTestUpdateData
event.GetTestUpdateData()

GetTestDeleteData
event.GetTestDeleteData()

GetTestQueryData
event.GetTestQueryData()

GetTestCommitData
event.GetTestCommitData()

Verify that the data is all right
func (This *Event) CheckData(src map[string]interface{},destJsonString string) (map[string][]string,error){

Use:

checkResult,err = e.CheckData(insertData.Rows[0],getFromClickHouseData)

Simulation formal environment brush data test plug-in

func TestSyncLikeProduct(t *testing.T)  {
	p := pluginTestData.NewPlugin("redis",url)
	err0 := p.SetParam(getParam())
	if err0 != nil{
		t.Fatal(err0)
	}

	var n uint = 10
	err := p.DoTestStart(n)

	if err != nil{
		t.Fatal(err)
	}else{
		t.Log("test success")
	}
}

Simulate formal environment performance tests

Only one piece of data is generated randomly, and it is committed in a loop


func TestSyncLikeProductForSpeed(t *testing.T)  {
	p := pluginTestData.NewPlugin("redis",url)
	err0 := p.SetParam(getParam())
	p.SetEventType(pluginTestData.INSERT)
	if err0 != nil{
		t.Fatal(err0)
	}

	var n uint = 100
	err := p.DoTestStartForSpeed(n)

	if err != nil{
		t.Fatal(err)
	}else{
		t.Log("test success")
	}

}

Build

  1. modify Bifrost/plugin/load/import_toserver.go

    github.com/brokercap/Bifrost/plugin/import_toserver.go
    
  2. build.sh

Last updated on 22 Jan 2021
Edit on GitHub