Databus 首章 - mysql主从实例

2017/08/13 Blog

Databus

  • 来源独立:Databus支持多种数据来源的变更抓取,包括Oracle和MySQL。
  • 可扩展、高度可用:Databus能扩展到支持数千消费者和事务数据来源,同时保持高度可用性。
  • 事务按序提交:Databus能保持来源数据库中的事务完整性,并按照事务分组和来源的提交顺寻交付变更事件。
  • 低延迟、支持多种订阅机制:数据源变更完成后,Databus能在毫秒级内将事务提交给消费者。同时,消费者使用Databus中的服务器端过滤功能,可以只获取自己需要的特定数据。
  • 无限回溯:对消费者支持无限回溯能力,例如当消费者需要产生数据的完整拷贝时,它不会对数据库产生任何额外负担。当消费者的数据大大落后于来源数据库时,也可以使用该功能。

数据一致性方案:

  • 应用驱动双向写:同时向数据库及MQ写入,一致性不可靠,较灵活
  • 数据库日志挖掘:基于数据库binlog,一致性可靠,不灵活

Databus架构图

类似于Search Index及Read Replica这些作为Databus的Consumer(类节点)使用的将是Client Library(客户端库)。当对一个主OLTP数据库做写操作时,连接了这个数据库的Relay们将会把改变存入Relay中;Databus这些被嵌入内存或者索引的Consumer将会把它从Relay或Bootstrap(引导程序)中取出,并且根据情况修改索引或者缓存,这就做到了根据源数据库的状态实时的更新索引。

Databus Demo

数据库配置

  • 数据库版本5.6,支持checknum特性,需要关闭
log-bin=mysqlbin-log	# 开启binlog,前缀是mysqlbin-log
server_id=1001			# master服务器ID
binlog_format=row       # binlog的format是row,当然还有Statement,MiXED
binlog-row-image=full	
binlog_checksum=none	# 关闭checksum
  • 创建mysql数据库及表
CREATE DATABASE or_test;

DROP TABLE IF EXISTS `person`;

CREATE TABLE `person` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `first_name` varchar(120) DEFAULT NULL,
  `last_name` varchar(120) DEFAULT NULL,
  `birth_date` date DEFAULT NULL,
  `deleted` varchar(5) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

编译Databus

  • git clone https://github.com/linkedin/databus
cd ~/databus/databus2-example
  • 编辑 databus2-example-relay-pkg

需要修改databus2-example-relay-pkg/conf下的sources-or-person.json, 对应relay_or_person.properties. 其中sources-person.json是oracle下使用的

{
    "name" : "person",
    "id"  : 1,
    "uri" : "mysql://username%2Fpassword@localhost:3306/1/mysqlbin-log",
    "slowSourceQueryThreshold" : 2000,
    "sources" :
    [
        {
        "id" : 40,
        "name" : "com.linkedin.events.example.or_test.Person",
        "uri": "or_test.person",
        "partitionFunction" : "constant:1"
         }
    ]
}

databus2-example-relay-pkg/schemas_registry下的 com.linkedin.events.example.or_test.Person.1.avsc:mysql使用的avsc com.linkedin.events.example.person.Person.1.avsc:oracle使用的avsc

  • 编辑 databus2-example-client-pkg

databus2-example-client-pkg/conf/client_person.properties默认不需要修改 修改databus2-example-client下的PersonClientMain.java

public class PersonClientMain
{
  static final String PERSON_SOURCE = "com.linkedin.events.example.or_test.Person"; //修改记录

  public static void main(String[] args) throws Exception
  {
    DatabusHttpClientImpl.Config configBuilder = new DatabusHttpClientImpl.Config();

    //Try to connect to a relay on localhost
    configBuilder.getRuntime().getRelay("1").setHost("localhost");
    configBuilder.getRuntime().getRelay("1").setPort(11115);
    configBuilder.getRuntime().getRelay("1").setSources(PERSON_SOURCE);

    //Instantiate a client using command-line parameters if any
    DatabusHttpClientImpl client = DatabusHttpClientImpl.createFromCli(args, configBuilder);

    //register callbacks
    PersonConsumer personConsumer = new PersonConsumer();
    client.registerDatabusStreamListener(personConsumer, null, PERSON_SOURCE);
    client.registerDatabusBootstrapListener(personConsumer, null, PERSON_SOURCE);

    //fire off the Databus client
    client.startAndBlock();
  }

}

  • 编译源代码 download ojdbc6.jar with version at 11.2.0.2.0 here 把jar包放入到databus/sandbox-repo/com/oracle/ojdbc6/11.2.0.2.0/ojdbc6-11.2.0.2.0.jar
gradle -Dopen_source=true assemble

启动relay、client

编译后databus根目录产生build文件夹

  • 启动relay
cd build/databus2-example-relay-pkg/distributions
tar -xvf databus2-example-relay-pkg.tar.gz
./bin/start-example-relay.sh or_person -Y ./conf/sources-or-person.json
  • 启动client
cd build/databus2-example-client-pkg/distributions
tar -xvf databus2-example-client-pkg.tar.gz
./bin/start-example-client.sh person

验证结果

  • dml 数据库
  • 查看event
curl -s http://localhost:11115/containerStats/inbound/events/total?pretty | grep -m1 numDataEvents
  • 发现错误可以查看日志:
    • client.log、databus2-client-person.out
    • relay.log、databus2-relay-or_person.out

Search

    Post Directory