Skip to content

CsQueue 使用文档

CsQueue(Concurrent Server Queue)是 Zeze 框架提供的分布式服务器队列实现。每个服务器拥有自己私有的队列,只能操作自己的队列。当服务器宕机时,其他服务器会自动接管它的队列数据,实现高可用性。

Zeze.Collections.CsQueue<V extends Bean>
特性说明
服务器隔离每个服务器操作自己的私有队列
故障转移服务器宕机时自动被其他服务器接管
高可用队列数据不会因单点故障丢失
双模式支持 FIFO(队列)和 LIFO(栈)两种模式
持久化数据自动同步到数据库
事务安全所有操作在事务中执行

// 在应用启动时创建 Module
Zeze.Application zeze = new Zeze.Application(config);
Queue.Module queueModule = new Queue.Module(zeze);
// 打开一个名为 "taskQueue" 的 CsQueue
// 自动使用当前服务器的 serverId
CsQueue<Task> taskQueue = queueModule.openCsQueue("taskQueue", Task.class);
// 指定节点大小
CsQueue<Task> taskQueue = queueModule.openCsQueue("taskQueue", Task.class, 50);
// 开启事务
zeze.newProcedure(() -> {
// ===== FIFO 队列模式 =====
// 添加元素到队尾
taskQueue.add(new Task());
// 查看队首元素(不删除)
Task head = taskQueue.peek();
// 取出队首元素(删除)
Task task = taskQueue.poll();
// ===== LIFO 栈模式 =====
// 压入元素到栈顶
taskQueue.push(new Task());
// 弹出栈顶元素(删除)
Task top = taskQueue.pop();
// ===== 通用操作 =====
// 获取元素数量
long count = taskQueue.size();
// 判断是否为空
boolean empty = taskQueue.isEmpty();
// 清空队列
taskQueue.clear();
return 0;
}, "example").call();

方法说明
openCsQueue(String name, Class<T> valueClass)打开 CsQueue,默认节点大小30
openCsQueue(String name, Class<T> valueClass, int nodeSize)打开 CsQueue,指定节点大小
方法返回值说明
add(V value)void添加元素到队尾
poll()V取出并返回队首元素,队列为空返回 null
peek()V查看队首元素(不删除),队列为空返回 null
方法返回值说明
push(V value)void压入元素到栈顶(队首)
pop()V弹出栈顶元素,栈为空返回 null
方法返回值说明
pollNode()BQueueNode删除并返回整个头节点
peekNode()BQueueNode查看头节点(不删除)
方法返回值说明
size()long获取元素总数
isEmpty()boolean判断是否为空
getName()String获取队列名称
getInnerName()String获取内部队列名称(包含 serverId)
getLoadSerialNo()long获取加载序列号
方法说明
walk(TableWalkHandle<BQueueNodeKey, V> func)遍历所有元素
clear()清空所有元素
方法说明
splice(int serverId, long loadSerialNo)接管指定服务器的队列数据

// 定义任务Bean
public class Task extends Bean {
public String taskId;
public String taskType;
public long createTime;
public Map<String, String> params;
}
// 使用 CsQueue 作为分布式任务队列
CsQueue<Task> taskQueue = queueModule.openCsQueue("distributed_tasks", Task.class);
// 生产者:添加任务
zeze.newProcedure(() -> {
Task task = new Task();
task.taskId = UUID.randomUUID().toString();
task.taskType = "send_email";
task.createTime = System.currentTimeMillis();
taskQueue.add(task);
return 0;
}, "add_task").call();
// 消费者:处理任务
zeze.newProcedure(() -> {
Task task = taskQueue.poll();
if (task != null) {
processTask(task);
}
return 0;
}, "process_task").call();
// 定义邮件Bean
public class EmailTask extends Bean {
public String to;
public String subject;
public String content;
public int retryCount;
}
// 使用 CsQueue 作为邮件发送队列
CsQueue<EmailTask> emailQueue = queueModule.openCsQueue("email_queue", EmailTask.class);
// 添加邮件任务
zeze.newProcedure(() -> {
EmailTask email = new EmailTask();
email.to = "user@example.com";
email.subject = "Welcome";
email.content = "Hello!";
email.retryCount = 0;
emailQueue.add(email);
return 0;
}, "send_email").call();
// 批量处理邮件
zeze.newProcedure(() -> {
BQueueNode node = emailQueue.pollNode();
if (node != null) {
for (var value : node.getValues()) {
EmailTask email = (EmailTask)value.getValue().getBean();
sendEmail(email);
}
}
return 0;
}, "batch_send").call();
// 定义操作日志Bean
public class OperationLog extends Bean {
public String operationType;
public String targetId;
public long timestamp;
public String operatorId;
}
// 使用栈模式记录操作历史
CsQueue<OperationLog> logStack = queueModule.openCsQueue("operation_logs", OperationLog.class);
// 记录操作
zeze.newProcedure(() -> {
OperationLog log = new OperationLog();
log.operationType = "update_config";
log.targetId = "config_001";
log.timestamp = System.currentTimeMillis();
log.operatorId = "admin";
logStack.push(log); // 压入栈顶
return 0;
}, "log_operation").call();
// 撤销最近的操作
zeze.newProcedure(() -> {
OperationLog lastLog = logStack.pop(); // 弹出最近的操作
if (lastLog != null) {
undoOperation(lastLog);
}
return 0;
}, "undo_operation").call();
// 遍历当前服务器的队列
long processed = taskQueue.walk((nodeKey, value) -> {
System.out.println("NodeId: " + nodeKey.getNodeId() + ", Task: " + value.taskId);
return true; // 返回true继续遍历,false停止
});
System.out.println("Total processed: " + processed);

Server A (serverId=1) Server B (serverId=2)
┌─────────────────────┐ ┌─────────────────────┐
│ CsQueue "tasks@1" │ │ CsQueue "tasks@2" │
│ [task1][task2]... │ │ [task5][task6]... │
└─────────────────────┘ └─────────────────────┘
│ │
│ Server A 宕机 │
▼ │
离线通知 ─────────────────────────────▶│
│ splice(1, loadSerialNo)
┌─────────────────────┐
│ CsQueue "tasks@2" │
│ [task1][task2]... │ ← 接管的数据
│ [task5][task6]... │
└─────────────────────┘
  1. loadSerialNo(加载序列号)

    • 用于标识队列的版本
    • 每次创建 CsQueue 时递增
    • 用于判断是否需要接管
  2. OfflineNotify(离线通知)

    • 通过 ServiceManager 注册
    • 服务器离线时触发
    • 自动调用 splice 方法
  3. splice(拼接)

    • 将离线服务器的队列数据拼接到当前服务器队列头部
    • 保证数据不丢失
// 1. 创建 CsQueue 时自动注册离线通知
var offlineNotify = new BOfflineNotify();
offlineNotify.serverId = module.zeze.getConfig().getServerId();
offlineNotify.notifySerialId = loadSerialNo;
offlineNotify.notifyId = "Zeze.Collections.CsQueue.OfflineNotify";
module.zeze.getServiceManager().offlineRegister(offlineNotify,
(notify) -> splice(notify.serverId, notify.notifySerialId));
// 2. 服务器离线时,其他服务器收到通知并调用 splice
// 3. splice 将离线服务器的队列数据拼接到自己的队列

CsQueue (name="tasks")
├── Server 1 → Queue "tasks@1" (私有)
├── Server 2 → Queue "tasks@2" (私有)
├── Server 3 → Queue "tasks@3" (私有)
└── ...
每个服务器只能操作自己的队列(tasks@当前serverId)
  • 外部名称tasks(用户指定的名称)
  • 内部名称tasks@1(名称@serverId)
_tQueues 表
├── "tasks@1" → BQueue (Server 1 的队列根节点)
├── "tasks@2" → BQueue (Server 2 的队列根节点)
└── ...
_tQueueNodes 表
├── ("tasks@1", nodeId) → BQueueNode
├── ("tasks@2", nodeId) → BQueueNode
└── ...

特性CsQueueQueue
服务器隔离是(每个服务器私有)否(共享队列)
故障转移支持不支持
适用场景分布式任务处理单服务器队列
并发安全
数据隔离按 serverId 隔离共享数据

  • 分布式任务队列(每台服务器处理自己的任务)
  • 邮件/消息发送队列
  • 需要高可用的队列场景
  • 多服务器协同消费的场景
  • 需要全局共享的队列
  • 简单的单机应用
  • 不需要故障转移的场景

  1. 事务要求:所有操作必须在 Procedure 中执行
  2. 服务器标识:依赖 zeze.getConfig().getServerId() 获取当前服务器 ID
  3. ServiceManager 依赖:需要正确配置 ServiceManager 才能实现故障转移
  4. 名称限制:名称不能包含 @ 字符(保留用于 serverId 分隔)
  5. 值类型:值类型必须继承自 Bean
  6. splice 调用:一般由系统自动调用,手动调用需谨慎

ZezeJava/ZezeJava/src/main/java/Zeze/Collections/CsQueue.java