Skip to content

MQ Draft

##MQ Message Queue。参考Kafka,TaskOneByOne。

##MQ Api

class MQ {
public static void createMQ(String topic, int partition, Options options);
// 只能增加partitionCount,增加的时候,队列中最好没有消息,否则onebyone特性可能被破坏。
public static void alterMQ(String topic, int partition);
}
class MQProducer {
public MQProducer(String topic);
// 普通消息发送接口
public boolean sendMessage(Message message);
// 事务消息接口
private void prepareMessage(String transactionId, Message msg ...); // 半消息API,可以重复调用,暂不准备公开。
public void prepareMessage(Message msg ...); // 用于zeze,内部自动获取transactionId。
private void commitMessage(String transactionId, Message msg ...); // 半消息API,可以重复调用,暂不准备公开。
public void commitMessage(Message msg ...); // 用于zeze,内部自动获取transactionId。
private void rollbackMessage(String transactionId); // 半消息API,调用一次,忽略重复的调用,暂不准备公开。zeze系统不需要用户主动调用。
// 其他辅助接口
public String getName();
public Options getOptions();
public int getPartition();
public void close();
}
class MQConsumer {
public MQConsumer(String topic,MQListener listener);
public String getName(); // 订阅的名字,可能包含多个队列。
public Options getOptions();
public int getPartition();
public MQListener getListener();
public void close();
}
interface MQListener {
void onMessage(String mqName, Message message);
}
interface TransactionListener { // 回查接口,zeze不需实现,内部完成,暂不准备公开。
LocalTransactionState checkLocalTransaction(String transactionId);
}
class Options {
// MQ 类型
public static int Single = 1;
public static int DoubleWrite = 2;
public static int Raft3 = 4;
}
class Message {
String topic;
int partitionIndex;
long timestamp;
Properties properties; // 定义这个?
Binary body;
}
  1. MQType-Single 单一MQ Server实现,没有任何备份。

  2. MQType-DoubleWrite 定义:写入端为Writer;读取端为Reader;服务器的两个备份成为Node, 其中主要服务Node为Leader,另一个为Follower;主控服务器为Master。 运行Node的进程为manager。 a) WriterOpenMQ或ReaderOpenMQ第一次调用时,Master根据负载均衡选择两个manager,创建两个node。 随机选择其中一个node为leader。返回leader给writer或者reader。 b) writer或reader都和同一个leader交互。 c) 写入leader的消息会被复制给follower,并且复制完成以后,才返回消息成功写入。 d) 当reader或者writer联系不上leader,向master报告错误。 I. master探测leader,如果存在,不理错误报告(估计局部错误),否则提升follower为leader(关键点,此后旧leader即使存活也不再提供send消息的能力), 并且从manager中再挑选一个新的follower组成新的double write。follower为leader以后,允许旧leader继续提供read能力一会儿,此时新leader需要接收新的 消息已经消费的offset。 II. 提升要不要等待新follower完成复制,就是要不要引入“风险阶段”。有风险阶段可以更加快速的提供连续服务。

  3. MQType-Raft a) 基于zeze-raft实现。队列存储直接使用raft-queue实现,不支持WriteBatch,这个选项被当作WriteFileSystem处理。 b) MQ-raft的消息不需要apply,可能需要对zeze-raft做出点加强性修改。 c) 读取采用snapshot定时丢弃队列头部消息,多个raft节点的snapshot同步不通过raft保护,而是自定义额外的协议通告。 这会导致Reader可能读取重复的消息,这点表现和RocksMQ一样。 d) 参考dbh2管理模式,比如:manager n(>3)个,打开队列时,负载均衡方式选取manager并建立queue。

  1. partitionCount 的数量决定并发。