准备一台新的 broker, 每 50个分区是分布在 3台 broker 为一组的 broker上, 所以每 50 个分区是分配在 3 个 broker 上的,现在需要换掉其中的一个 broker,假如现在你需要用 broker 9 换掉 broker 3 , 需要进行以下操作,
分为两种情况,如果你已经把 老的 broker 停掉了, 直接启动一个 broker 跟下掉的是一样的 broker id, 就好自动开始同步,如果你使用一个 不同的 broker id进行替换,需要进行以下操作1 使用 python 脚本生成 assign.json 文件, 用来对 kafka 分区进行 assign 切换, 在 replicas 里面换掉你需要换掉的 broker
python 脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
import json partitions = [] ''' for partition in range(0, 150): if partition in range(0, 16): replicas = [0,1,2] if partition in range(16, 32): replicas = [1,2,0] if partition in range(32, 48): replicas = [2,0,1] if partition in range(48, 50): replicas = [0,1,2] if partition in range(50, 66): replicas = [6,7,8] if partition in range(66, 82): replicas = [7,8,6] if partition in range(82, 98): replicas = [8,6,7] if partition in range(98, 100): replicas = [6,7,8] if partition in range(100, 116): replicas = [3,4,5] if partition in range(116, 132): replicas = [4,5,3] if partition in range(132, 148): replicas = [5,3,4] if partition in range(148, 150): replicas = [3,4,5] p = { "topic": "fusion-rtlog-std-prod-new", "partition": partition, "replicas": replicas, "log_dirs": ["any","any","any"] } partitions.append(p) ''' p = { "topic": "fusion-rtlog-std-prod-new", "partition": 118, "replicas": [5,9,4], "log_dirs": ["any","any","any"] } partitions.append(p) ressignJson = { "version": 1, "partitions": partitions } print json.dumps(ressignJson) |
scala 脚本, 这里采用的跟 kafka 默认使用的未考虑机架因素分配策略是一样的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
package com.qiniu.deftones.test import scala.collection.{Map, Seq, mutable} import java.util.Random import scala.collection.immutable /** * Created by sunbiaobiao on 2018/9/12. */ object kafka_reassign extends App { val rand = new Random def assignReplicasToBrokersRackUnaware(nPartitions: Int, replicationFactor: Int, brokerList: Seq[Int], fixedStartIndex: Int, startPartitionId: Int): Map[Int, Seq[Int]] = { val ret = mutable.Map[Int, Seq[Int]]() val brokerArray = brokerList.toArray val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) var currentPartitionId = math.max(0, startPartitionId) var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) for (_ <- 0 until nPartitions) { if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)) nextReplicaShift += 1 val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex)) for (j <- 0 until replicationFactor - 1) replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length)) ret.put(currentPartitionId, replicaBuffer) currentPartitionId += 1 } ret } def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = { val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1) (firstReplicaIndex + shift) % nBrokers } val ret = assignReplicasToBrokersRackUnaware(50, 3, scala.collection.immutable.List(0,1,2,16,17,18), 0, 0) for (i <- 0 to 49) { println(ret(i)) } import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ val json = ( ("version" -> 1 ) ~ ("partitions" -> ret.map{kv => val partition = kv._1 val replicas = kv._2 ("topic" -> "fusion-rtlog-std-prod-new") ~ ("partition" -> partition) ~ ("log_dirs" -> List("any", "any", "any") ) ~ ("replicas" -> replicas) }) ) println(compact(render(json))) } |
2 执行 kafka reassign 命令, 其中的assign.json文件就是上一步生成的
1 |
JMX_PORT=9855 ./kafka-reassign-partitions.sh --zookeeper host:2181/kafka-jjh-cluster1 --reassignment-json-file assign.json --execute |
然后使用 命令来验证是否 reassign 成功
1 |
JMX_PORT=9855 ./kafka-reassign-partitions.sh --zookeeper host:2181/kafka-jjh-cluster1 --reassignment-json-file assign.json --verify |
3 如果要同步lag 很大的数据,同步数据会导致 cpu 爆掉,需要先把 日志保留时间调小一些,可以快速同步 lag,例如调为5分钟,可以设置配置 retention.ms=300000,但是不能太小导致 spark streaming 因为 offset 数据 outofrange 挂掉, 观察监控,如果 lag 同步上,broker 在 isr中后,再把 保留时间调整回来 86400000 (24 小时)。
使用
1 |
JMX_PORT=8988 ./kafka-topics.sh -zookeeper host:2181/kafka-jjh-cluster1 --alter --topic fusion-rtlog-std-prod-new -config "retention.ms=345600000" |
来进行调整
4 观察 kafka 集群的监控 portal, 看看lag是否同步过,使用 describe 命令看看 分区的 副本是否已经换成新的 broker, 如果 reassign过程完成,但是 有可能部分 分区的 leader 不在 prefer broker 上,这时候可以运行命令进行均衡
1 |
JMX_PORT=9855 ./kafka-preferred-replica-election.sh --zookeeper host:2181/kafka-jjh-cluster1 --path-to-json-file ./preferred.json |