Skip to content

Zeze��̸�ݸ�


���

Zeze��һ��Ƕ�뿪�����Եķֲ�ʽ�����ܡ�
��
ֱ��
����
��ϣ��ۺ�������γ�����һ�㣬��⿪��ϲ���û�д��ⵥһ�Ĺ���
�պ�ԭ��
����δ��������������������Ҫ��ʱ���������ӡ�
�ֲ�˼·������ħ��
����ij���ֲ�ϵͳʱ��һ��ֻ���Ƕ�����������ϵͳ�����ջ��Ƕ༶�ġ�
Record����ֻ��Cache�е�һ�����ݣ�����������ع�����Ҫ��ʱ�򣬶�����Record��������Լ���״̬�������Ѿ���ú��Ӵ�
1. class Record
a) �ֹ����ͻ���
����������尴Դ��˳�����У�û�з��ࡣ
ReentrantLock fairLock; // ��¼����
boolean dirty; // ��¼�Ƿ���ı�־��
volatile Bean strongDirtyValue; // ���¼�������������Bean�����ã���ֹ���¼�������û��յ���
volatile long timestamp; // ʱ�����ֹ�������״̬��
volatile SoftReference<Bean> softValue; // ��¼�����ã�����һ���������Ա����գ������յ�����Bean������Ҫ��ʱ�������ر���Ӳ����װ�ؽ�����
volatile RelativeRecordSet relativeRecordSet; // ��¼�����Ĺ�����¼���ϡ���¼��������һ��������¼���ϡ�
volatile int state; // ��¼״̬��Ҳ�����Ƿ������ı�־��
SoftReference ��������
��¼��������İ汾���������ڴ��еġ�ͨ��ÿ�����Ļ����������ã�CacheCapacity�������ڴ��ʹ�á�
���ڱ��ļ�¼��С��һ��ʹ��Ƶ�ʲ�һ��ͨ�����ÿ����ڴ�ʹ����һ���ȽϷ��صĹ��������ԣ��������
�����õĶ������棬�ڴ�����SoftReference<Bean>��ͬʱ�ڱ���Ӳ�̱���һ�ݡ���������ʧЧʱ�ӱ���Ӳ��װ�ء�
�����������Ժ�JVM����������ڴ棬�Ѳ��ּ�¼���յ�������ÿ����¼���ڴ���ֻ��¼һ�����ã�
û�����ݣ����������Ϳ������õıȽϴ��ڴ�Ҳ�������ܵ���¼��С��Ӱ�졣�����ĺô��ǣ�I) ʹ�ñ���
���������������߻��������ʣ�II) �����ã���ͳһ�����б������������ó�һ���ģ����ұȽϴ����ֵ��
�ֹ����͹�����¼������صĺ�����˵��
b) ���ʵ��Ż�
boolean fresh;
long acquireTime;
������ʾ��¼�Ƿ����ʵġ��ոջ�ÿ���Ȩ�ļ�¼�������ֹ������п���û�������ɹ�ִ���������ֻᱻ�������������ߣ�
�����Դ�˷ѣ�����һֱ���������������ʧ�ܣ���Ȼ�����ԱȽϵ͡�
final boolean isFresh() {
// �����־��������ʱ�ᱻ���͸�Global����Ӱ��Global�����ķ��䡣
return fresh;
}
final boolean isFreshAcquire() {
// ���ʵģ����һ��ʱ��С��1�룬��ʱ��ķ�����������ᱻ�ܾ���
// fresh ������ʱ�����ó�false��
// acquireTime ��һ��������ʩ����ֹ�����������û��ʵ��ʹ�õ����������һ��Ҳ�ᱻ��Ϊ�������ʣ�
return fresh && System.currentTimeMillis() - acquireTime < 1000;
}
final void setNotFresh() {
// �������������������ķ������ˡ��������ʡ�
fresh = false;
}
final void setFreshAcquire() {
// ����ɹ�װ�ؼ�¼ʱ���á�
acquireTime = System.currentTimeMillis();
fresh = true;
}
c) Checkpoint����
Database.Transaction databaseTransactionTmp;
Database.Transaction databaseTransactionOldTmp;
Database.Transaction ��Zeze�ĺ�����ݿ⣬��MySql������
Zeze֧�ֶ��������ݿ⣬�������ǿ���һ��������ʹ�á�
Checkpointʱ���ᴴ��ÿ��������ݿ�����������ύ���ݡ�
Checkpoint�ǰ���¼�����ģ��Ѻ�����ݿ�����񱣴浽��¼����һ���Ż���
���������¼����ʱ������һ����ȥ���Һ�����ݿ�������ˡ�
databaseTransactionOldTmp
Zeze֧�ֵ�һ�����ԣ����԰�һ���ܴ�ĺ�����ݿⱸ�ݷŵ�һ�߱��"ֻ��"��
Zeze����ִ���У������ݴӱ��ݿ��и���ʵ��ʹ��װ�ص���ǰ�Ĺ������ݿ��С�
����ijЩ��Ϸ���ϺϷ�����ɺ�����ݿ�ܴ󣬵��ǻ�Ծ�����ֲ���ʱ�����
���Կ��Դ��ļ�С�������ݿ⣬���������ܺͼ���ȫ����ʱ�䡣
ʹ��������ԣ���Ҫ���޸����ã�����������ܱȽ���֣����ﲻ��ϸ˵�������ˡ�
ʵ����һ����Ҳ˵����������Ҫȥ�����롣"ֻ��"������¼�ӹ�����¼��ɾ��ʱ��
��Ҫɾ�����ݿ⣬������һ�β�ѯ��װ��һ�Σ��߼��Ͳ����ˡ�
2. class Record1<K, V> extends Record
�����������Record�����࣬�������ͣ��Լ�ϵ�л���صķ�����
a) ����
TableX<K, V> table; // ��¼�����ı���
final K key; // ��¼��Key��
b) ϵ�л�����
ByteBuffer snapshotKey; // ��¼��Keyϵ�л������ʱ���á�
ByteBuffer snapshotValue; // ��¼��Valueϵ�л������ʱ���á�
c) ��ȫ���Dirty
long savedTimestampForCheckpointPeriod; // ��������Checkpoint�ؼ���ʱ��ʱ����
// ��¼���޸ķ��ʲ����Ǻ�Checkpoint���̲������̲����ģ���������������
// ����¼�������պ��ֻᱻ�޸�ʱ��Checkpoint���ܰѼ�¼���óɲ��ࡣ
// Checkpoint���ڱ����ʱ���Ϳ���ʱ��ʱ��һ��ʱ���޸ļ�¼�����־��
d) ɾ�������ڵļ�¼�Ż�
boolean existInBackDatabase; // ��¼�Ƿ��ں�����ݿ��д��ڡ�
boolean existInBackDatabaseSavedForFlushRemove; // ����������ڱ�־��������߲�������������
// ��¼�Ƿ��ں�����ݿ��д��ڣ���Ϊ���Ż���¼������ʱ�������ɾ���������͸�������ݿ⡣
// һ�������ݿ�ɾ�������ڵļ�¼�IJ������ۺ�ɾ��������һ���ġ���������İ�����
// ���Ӧ�ô����˺ܶ���ʱ��¼�����ǻ�û���ύ��������ݿ�ǰ��ɾ���ˡ����û�������־��
// Zeze����������µİ�ɾ����������������ݿ⣬��������˷ѡ�
e) ConcurrentLruLike �ĸ�������
volatile ConcurrentHashMap<K, Record1<K, V>> lruNode;
Zeze.Util.ConcurrentLruLike ��һ��ͨ�õİ�װ��
Zeze.Transaction.TableCache Ҳ����ConcurrentLruLike������ר�õģ�����ʡһ���ڴ�����һ�����ܡ�
java Ĭ�ϵ�lru�������ܲ����ã�����д�����ConcurrentLruLike����ʵ�ʵIJ���������Դ��ConcurrentHashMap��
��˵��һ�£�
ConcurrentHashMap<K, Record1<K, V>> dataMap; // Lru�������еļ�¼ӳ�䡣
ConcurrentLinkedQueue<ConcurrentHashMap<K, Record1<K, V>>> lruQueue; // ��¼��Ծʱ������ӳ��Ķ��У���ʱ������
ConcurrentHashMap<K, Record1<K, V>> lruHot; // �ȵ�ӳ�䣬�϶����ᱻ���ա�
java��lru��LinkedList�ṹ��¼��ȫ����˳����У���������IJ������ܲ��ߡ�
ConcurrentLruLike����ConcurrentHashMap���ṩ�޸ĵIJ������ʣ�����˳����б�ò���ȷ����lruQueue�ľ����ƽ���
�㷨Ҫ��
1�����������ʵ���������ʵļ�¼�洢��SortedDictionary�С�
2�����������ͻ���������Ƿ�ı䡣��ͻ����������
3����ͻ����ʱ�����Ѿ��õ������������ڳ�ͻ�dz�����ʱ���ڶ���ִ������һ�㶼�ܳɹ���
����������һֱ��ͻ��������Զû����ɵ������
4�������������������������������ʵļ�¼���ܷ����仯������������Ȼ��Ҫ�ٴ�ִ��lockAndCheck�߼���
���Ҵ��������ʵļ�¼������������⡣
5. �߼��������ش���������쳣ʱҲ��Ҫ���lockAndCheck����Ϊ�ֹ�����ʵ�ʴ����߼�ʱû�м�����
���ܴ��ڲ���ԭ���±�����Ӧ�÷����߼����󣬴�ʱ����Ȼ��Ҫ��������ɳ�ͻ��飬�����ͻ�ˣ�
Ҳ��Ҫ������
�ϴ��롣
public long perform(Procedure procedure) {
try { // ��7.�� try 1. �ͷż�¼��
var checkpoint = procedure.getZeze().getCheckpoint();
if (checkpoint == null)
return Procedure.Closed;
for (int tryCount = 0; tryCount < 256; ++tryCount) { // ��6.�� ��ೢ�Դ���
// Ĭ���������ظ����ԣ�����CheckResult.RedoAndReleaseLock����������CheckResult.Redo�������ᵼ��������
checkpoint.enterFlushReadLock(); // ��8.�� ����������Global�ķֲ�ʽ������������ǿյġ�
try { // ��7.�� try 2. �ͷ�checkpoint������������RedoAndReleaseLockʱ����Ҫȫ���ͷ����������������
for (; tryCount < 256; ++tryCount) { // ��6.�� ��ೢ�Դ���
CheckResult checkResult = CheckResult.Redo; // ���������Ƿ��ͷ��������� _lock_and_check_ ��ȷ������Ҫ�ͷ��������򶼲��ͷš�
try { // ��7.�� try 3. ��������
var result = procedure.call();
switch (state) { // ��11.��state ���������ִ��״̬������Ҫ��״̬�仯��ʱ�����ϻ��ڴ������޸�state���������������顣
// �����߼����̲�׽����Ҫ�쳣������ʧ��״̬��������ǰxdb���ܲ�׽Error��ԭ�������������״̬����ʹ�߼����̳Ե���һЩ�쳣��
// ִ��������Ȼ���ᶪʧ״̬���ơ�
case Running:
var saveSize = savepoints.size();
if ((result == Procedure.Success && saveSize != 1)
|| (result != Procedure.Success && saveSize > 0)) {
// �������Ӧ������
logger.fatal("Transaction.Perform:{}. savepoints.Count != 1.", procedure);
finalRollback(procedure);
return Procedure.ErrorSavepoint;
}
checkResult = lockAndCheck(procedure.getTransactionLevel()); // ��10.������procedure.call����ʲô����Ҫ��������ͻ���μ�����ĵ�5�㡣
if (checkResult == CheckResult.Success) {
if (result == Procedure.Success) {
finalCommit(procedure); // ��9.������ɹ������ύ������
// ����һ�γɹ��IJ�ͳ�ƣ������۲�redo�಻�ࡣ
// ʧ���� Procedure.cs �е�ͳ�ơ�
if (tryCount > 0) {
if (Macro.enableStatistics) {
ProcedureStatistics.getInstance().getOrAdd("Zeze.Transaction.TryCount").getOrAdd(tryCount).increment();
}
}
return Procedure.Success;
}
finalRollback(procedure, true);
return result;
}
break; // retry
case Abort:
logger.warn("Transaction.Perform: Abort");
finalRollback(procedure);
return Procedure.AbortException;
case Redo:
//checkResult = CheckResult.Redo;
break; // retry
case RedoAndReleaseLock:
checkResult = CheckResult.RedoAndReleaseLock;
break; // retry
}
// retry clear in finally
if (alwaysReleaseLockWhenRedo && checkResult == CheckResult.Redo)
checkResult = CheckResult.RedoAndReleaseLock;
triggerRedoActions(); // ��11.������Redo��
} catch (Throwable e) {
// Procedure.Call �����Ѿ��������쳣��ֻ�� unit test �������������ڲ�����ᵽ�����
// �� unit test �£��쳣��־�ᱻ��¼���Ρ�
switch (state) {
case Running:
logger.error("Transaction.Perform:{} exception. run count:{}", procedure, tryCount, e);
if (!savepoints.isEmpty()) {
// �������Ӧ������
logger.fatal("Transaction.Perform:{}. exception. savepoints.Count != 0.", procedure, e);
finalRollback(procedure);
return Procedure.ErrorSavepoint;
}
// ���� unit test ���쳣���⴦������unit test����ܴ��乤��
if (e instanceof AssertionError) {
finalRollback(procedure);
throw (AssertionError)e;
}
checkResult = lockAndCheck(procedure.getTransactionLevel()); // ��10.���쳣Ҳ��Ҫ����ͻ���μ�����ĵ�5�㡣
if (checkResult == CheckResult.Success) {
finalRollback(procedure, true);
return Procedure.Exception;
}
// retry
break;
case Abort:
if (!"GlobalAgent.Acquire Failed".equals(e.getMessage()) &&
!"GlobalAgent In FastErrorPeriod".equals(e.getMessage()))
logger.warn("Transaction.Perform: Abort", e);
finalRollback(procedure);
return Procedure.AbortException;
case Redo:
checkResult = CheckResult.Redo;
break;
case RedoAndReleaseLock:
checkResult = CheckResult.RedoAndReleaseLock;
break;
default: // case Completed:
if (e instanceof AssertionError)
throw (AssertionError)e;
}
triggerRedoActions();
// retry
} finally {
if (checkResult == CheckResult.RedoAndReleaseLock) {
holdLocks.forEach(Lockey::exitLock);
holdLocks.clear();
}
// retry ���ܱ������е����������¼�ͱ���㡣
accessedRecords.clear();
savepoints.clear();
actions.clear();
redoActions.clear();
state = TransactionState.Running; // prepare to retry
}
if (checkResult == CheckResult.RedoAndReleaseLock) {
// logger.debug("CheckResult.RedoAndReleaseLock break {}", procedure);
break; // ��12.����Ҫ�ͷ���������������һ��ѭ����
}
}
} finally {
checkpoint.exitFlushReadLock();
}
//logger.Debug("Checkpoint.WaitRun {0}", procedure);
// ʵ��Fresh�����Ժ�ɾ��Sleep��// ��13.������æ�ȡ�
try {
Thread.sleep(Zeze.Util.Random.getInstance().nextInt(80) + 20);
} catch (InterruptedException e) {
logger.error("", e);
}
}
logger.error("Transaction.Perform:{}. too many try.", procedure);
finalRollback(procedure);
return Procedure.TooManyTry;
} finally {
holdLocks.forEach(Lockey::exitLock);
holdLocks.clear();
}
}
private CheckResult lockAndCheck(TransactionLevel level) {
boolean allRead = true;
var saveSize = savepoints.size();
if (saveSize > 0) {
// ȫ�� Rollback ʱ Count Ϊ 0������ύʱ Count ����Ϊ 1��
// �����������Begin,Commit,Rollback��ƥ�䡣�����顣
// ��ɨ���޸���־�����ü�¼Dirty��־�����ƶ�д����
var it = savepoints.get(saveSize - 1).logIterator();
if (it != null) {
while (it.moveToNext()) {
// ������־������ bean ���޸���־����ȻҲ�����޸� Record��
// ���ڲ��������������������δ����չ��Ҫ��
Log log = it.value();
if (log.getBean() == null)
continue;
TableKey tkey = log.getBean().tableKey();
var record = accessedRecords.get(tkey);
if (record != null) {
record.dirty = true;
allRead = false;
} else {
// ֻ�в��Դ����ѷ� Managed �� Bean ����־�ӽ�����
logger.fatal("impossible! record not found.");
}
}
}
}
if (allRead && level == TransactionLevel.AllowDirtyWhenAllRead)
return CheckResult.Success; // ʹ��һ���µ�enum��ʾһ�£�
boolean conflict = false; // ��ͻ�ˣ�Ҳ����������Ϊ������׼��������
if (holdLocks.isEmpty()) {
// ���Ż�����һ�μ�������˳�򼴿ɡ�
for (var e : accessedRecords.entrySet()) {
var r = lockAndCheck(e);
switch (r) {
case Success:
break;
case Redo:
conflict = true;
break; // continue lock
default:
return r;
}
}
return conflict ? CheckResult.Redo : CheckResult.Success;
}
// ����������õĶ��У�A. ���ʵļ�¼��B. �Ѿ����е�������
int index = 0;
int n = holdLocks.size();
final var ite = accessedRecords.entrySet().iterator();
var e = ite.hasNext() ? ite.next() : null;
while (null != e) {
// ��� holdLocks ȫ�����Ա���ϣ�ֱ��������
if (index >= n) {
var r = lockAndCheck(e);
switch (r) {
case Success:
break;
case Redo:
conflict = true;
break; // continue lock
default:
return r;
}
e = ite.hasNext() ? ite.next() : null;
continue;
}
Lockey curLock = holdLocks.get(index);
int c = curLock.getTableKey().compareTo(e.getKey());
// holdLocks a b ...
// needLocks a b ...
if (c == 0) {
// ������ܷ�����д������
if (e.getValue().dirty && !curLock.isWriteLockHeld()) {
// ������ȫ���ͷţ���������ǰ��¼������������ļ�¼��
// ֱ�� unlockRead��lockWrite��������
n = _unlock_start_(index, n);
// �ӵ�ǰindex֮�����¼���������index��n�������ٷ����仯��
// ���´ӵ�ǰ e ��������
continue;
}
// BUG ��ʹ���ڡ�Record.Global.State ����û����������Ҫˮƽ����Ҫ����_check_��
var r = _check_(e.getValue().dirty, e.getValue());
switch (r) {
case Success:
// �Ѿ����ڣ����Կ϶������ͻ����������������
break;
case Redo:
// Impossible!
conflict = true;
break; // continue lock
default:
// _check_������Ҫ��Global����״̬��������ܷ���GLOBAL-DEAD-LOCK��
return r;
}
++index;
e = ite.hasNext() ? ite.next() : null;
continue;
}
// holdLocks a b ...
// needLocks a c ...
if (c < 0) {
// �ͷŵ� �ȵ�ǰ����С��������Ϊ��ǰ�����в�����Ҫ��Щ��
int unlockEndIndex = index;
for (; unlockEndIndex < n && holdLocks.get(unlockEndIndex).getTableKey().compareTo(e.getKey()) < 0; ++unlockEndIndex)
holdLocks.get(unlockEndIndex).exitLock();
holdLocks.subList(index, unlockEndIndex).clear();
n = holdLocks.size();
// ���´ӵ�ǰ e ��������
continue;
}
// holdLocks a c ...
// needLocks a b ...
// Ϊ�˲�Υ�������ͷŴӵ�ǰ����ʼ��������
n = _unlock_start_(index, n);
// ���´ӵ�ǰ e ��������
}
return conflict ? CheckResult.Redo : CheckResult.Success;
}
private CheckResult lockAndCheck(Map.Entry<TableKey, RecordAccessed> e) {
Lockey lockey = locks.get(e.getKey());
boolean writeLock = e.getValue().dirty;
lockey.enterLock(writeLock);
holdLocks.add(lockey);
return _check_(writeLock, e.getValue());
}
private static CheckResult _check_(boolean writeLock, RecordAccessed e) {
e.atomicTupleRecord.record.enterFairLock();
try {
if (writeLock) {
switch (e.atomicTupleRecord.record.getState()) {
case GlobalCacheManagerConst.StateRemoved:
// ����cache������������иü�¼��Global�������������ɡ�
return CheckResult.Redo;
case GlobalCacheManagerConst.StateInvalid:
return CheckResult.RedoAndReleaseLock; // �����Invalid��������Reduce����
case GlobalCacheManagerConst.StateModify:
return e.atomicTupleRecord.timestamp != e.atomicTupleRecord.record.getTimestamp() ? CheckResult.Redo : CheckResult.Success;
case GlobalCacheManagerConst.StateShare:
// ���������������һ���Ȼ������������Ҫ�󱾻�Reduce�����DZ���Checkpoint�޷�������ȥ������ǰ����ס�ˡ�
// ͨ�� GlobalCacheManager �������������ʧ��;��Ҫ�������ͷ�����
var acquire = e.atomicTupleRecord.record.acquire(GlobalCacheManagerConst.StateModify,
e.atomicTupleRecord.record.isFresh(), false);
if (acquire.resultState != GlobalCacheManagerConst.StateModify) {
e.atomicTupleRecord.record.setNotFresh(); // ��ʧ�ܲ������ʡ�
logger.debug("Acquire Failed. Maybe DeadLock Found {}", e.atomicTupleRecord);
e.atomicTupleRecord.record.setState(GlobalCacheManagerConst.StateInvalid); // ���ﱣ��StateShare������
return CheckResult.RedoAndReleaseLock;
}
e.atomicTupleRecord.record.setState(GlobalCacheManagerConst.StateModify);
return e.atomicTupleRecord.timestamp != e.atomicTupleRecord.record.getTimestamp() ? CheckResult.Redo : CheckResult.Success;
}
return e.atomicTupleRecord.timestamp != e.atomicTupleRecord.record.getTimestamp() ? CheckResult.Redo : CheckResult.Success; // impossible
}
switch (e.atomicTupleRecord.record.getState()) {
case GlobalCacheManagerConst.StateRemoved:
// ����cache������������иü�¼��Global�������������ɡ�
return CheckResult.Redo;
case GlobalCacheManagerConst.StateInvalid:
return CheckResult.RedoAndReleaseLock; // ����Invalid��������Reduce������߱�Cache��������ʱ��������ͷ�����
}
return e.atomicTupleRecord.timestamp != e.atomicTupleRecord.record.getTimestamp() ? CheckResult.Redo : CheckResult.Success;
} finally {
e.atomicTupleRecord.record.exitFairLock();
}
}
1. ����
��̨������������̨���ݿ⡣ÿ̨������ӵ���Լ��Ļ��档һ���Ի������ά����̨������֮�仺���һ���ԡ�
zezeһ���Ի����CPU-Cache-Memory�Ľṹ�������Բο���CPU��MESIЭ���Լ�ʵ����һ����������ơ�
���һ���Ի���˼·�Ƿdz������ģ����ij��������Global��ȫ�ܵģ�֪�����ж����������㷨Ҳ�dz�ֱ�Ӽ򵥡�
ʹ��������MESI��״̬������¼�ֳɶ�д����������״̬�������ñ�ʾ��¼���ػ�û��Ȩ�ޣ���״̬����ͬʱ������
��̨�����������У�д״ֻ̬������һ̨�������С���һ���Ի���֮�ϣ�ÿһ̨������������������Լ���ռ����
����һ������ɱ������񼴿ɡ�����ǻ���һ���Ի���ķֲ�ʽ����
2. ��������
��¼��������״̬��Modify,Share,Invalid��
�����߼���������Ҫ���ʻ��޸�����ʱ����ȫ����������������������Global�ƺ�������Modify��Share����
Global֪�����м�¼�����ķֲ�״̬���������������������ӵ���߷�����Ӧ�Ľ�������ӵ�����ͷ�����
��������ˢ�µ���˷������󣬲Ÿ�Global���ؽ����Global�Ǽ������ߵ���״̬���������߷��ؽ����
3. ����ԭ��
����ͬһ����¼�����е������������ŶӴ����ģ�һ��ʱ�������һ�����󡣸��������Ƕ���д������д����
��������������������
���ڲ�ͬ�ļ�¼������ִ���Dz����ġ����಻��Ӱ�졣
4. ����
����ÿ������������ȡ������������ֹ����������Զ����¼���Ѿ����������ˡ�
��Global��ͬһ����¼�ķ��ʣ���ʹֻ��һ����¼Ҳ���п��������ġ����磺
ServerA ӵ�м�¼1�Ķ�����Ȼ��ȥ����д����
ServerB Ҳӵ�м�¼1�Ķ�����Ȼ��ȥ����д����
��Global�ڴ���ServerA������ʱ��Ҫ֪ͨServerB��������ServerB��ʱҲ�ڵȴ�д�����룬
��ʱServerB�ȴ���¼1��Globalд��ʱ���Ѿ��ڱ�����ռ���˼�¼1��������������ò���ִ�С�
���û��һ�������ƴ������ѭ�����������ˡ�
�����һ����¼��������
5. ����ʵ��α��
int acquireShare(Acquire rpc) {
while (true) {
// ��ѯ�õ���¼��
CacheState cs = global.computeIfAbsent(rpc.Argument.globalKey, CacheState::new);
// ��ס�����¼��
synchronized (cs) {
// ��¼״̬�DZ�ɾ���ģ�����ѭ������ȡ�µļ�¼��
// ������ΪcomputeIfAbsent��synchronized (cs)֮�����ʱ�䴰�ڣ��п�����������ڼ�¼�պñ�ɾ���ˡ�
// �������ѭ����������һ�����������¼�������ɡ�
// ����Ĺؼ��ǣ����е�ͬһ��globalKey�����󣬶��������ͬһ��CacheState��
if (cs.acquireStatePending == StateRemoved)
continue;
while (cs.acquireStatePending != StateInvalid && cs.acquireStatePending != StateRemoved) {
if (��⵽����()) {
// ��������������ʧ�ܽ��
rpc.Result.state = StateInvalid;
rpc.SendResultCode(AcquireShareDeadLockFound);
return 0;
}
cs.wait(); //await ��֪ͨ�����CacheState�н����е����󡣵ȴ���ɡ�������Dz����Ŷӡ�
// Ϊʲô����IJ������ʲ��ܼ򵥵�ͨ��synchronized (cs)�����Ȼ���͵ȴ�����Ҫ��cs.wait()�أ�
// ����Ϊ�������������⣬�������ͨ�������⣬��ô�ȴ���������������������ܵ��������ڵȴ�
// ��������ģ����޷����������飬��ô����������ʱ�򣬾�û�������ˡ�ʹ��cs.wait()������
// �����е������ڵȴ�������ʱ��Ҳ�����cs.wait()���������ܿ��ŷŹ����ѵ�ǰ���е������߶�
// �Ž�������������⣬���������ſ��ܱ���ϡ�
}
if (cs.acquireStatePending == StateRemoved)
continue; // concurrent release��ͬ��һ��StateRemoved�жϡ�
// ���ִ��Ȩ�ޣ�������������״̬��
cs.acquireStatePending = StateShare;
if (cs.modify != null) {
// �����ǰcs����д״̬����Ҫ�Ե�ǰ��ӵ���߽��н���������
cs.modify.reduce(() -> {
// �Ե�ǰдӵ���߷����첽���������������ɹ��ص����lambda��
...
// �õ����������notifyAll��
synchronized (cs) {
cs.notifyAll(); //notify ֪ͨ�����Ŷӵ������߽��� (line a)
}
});
cs.wait(); // �ȴ�reduce�����
// �������wait����������Ŷӵ������ߣ��Ϳ��Ի�����������������ѭ�����
// �������(line a)�еõ������������֪ͨʱ�����еĵȴ��Ķ��ỽ�ѣ��������
// û�л��ִ��Ȩ�Ļ��ٴεȴ����������ѭ���ĸ�cs.wait()���
// ��������Ժ���������Լ����������̡�
// �������������״̬�޸ģ�
sender.acquired.put(gKey, StateShare);
cs.modify = null;
cs.share.add(sender);
// �ͷ�ִ��״̬��
cs.acquireStatePending = StateInvalid;
cs.notifyAll(); //notify ֪ͨ�����Ŷӵ������߽�����
// ���ͽ����
rpc.SendResultCode(0);
return 0;
}
// ��ǰӵ����Ҳ��ӵ�ж���������������ȡ��ֱ��˳���������������ɡ�
// �������������״̬�޸ģ�
sender.acquired.put(gKey, StateShare);
cs.share.add(sender);
// �ͷ�ִ��״̬��
cs.acquireStatePending = StateInvalid;
cs.notifyAll(); //notify ֪ͨ�����Ŷӵ������߽�����
// ���ͽ����
rpc.SendResultCode(0);
return 0;
}
}
Ϊʲô�� cs.notifyAll() ?
��Ϊ�������Ҫ�����еȴ��������߶��������һ�飬���������һ����������ô����Ž�������ᷢ��������
���������ȴ��������������Ŷӣ���ô�������ͻ�ʧ�ܡ�����ÿ�ζ���Ҫȫ�����ѡ�
5. Global��������Ľ������
Global�ӽṹ���㷨���ǵ���ġ�ϵͳ��ģ�ܴ��ʱ�򣬵����������ܻ������ޡ��������Ľ���������ж��Global��
GlobalIndex = hash(GlobalTableKey) % GlobalInstanceCount; �Ѽ�¼������ֲ������Globalʵ���У����������3���
������������������������ȷ�ģ����ҿ��԰Ѹ��طֲ�����
6. Global����ɿ��ԵĽ������
Global�ǵ���ģ�������ֽ��̻��߻����쳣���ᵼ��ϵͳ�����á��������Ľ������������Raft�㷨������ÿ��Globalʵ����
���ж��Raft���ݣ�ֻҪRaftϵͳ�еĽ����ڵ㳬��Raft�ڵ���/2����ϵͳ���ܳ����ṩ����
Raftϵͳ�����������ܻ��½��ܶࡣ����Ӧ����Ҫ�����Լ������󣬾����Ƿ����Global Raft�汾��
һ���������޸ĵļ�¼���Ͼ���һ��������¼���ϡ�ֻҪ����������¼����һ������ķ�ʽ���浽������ݿ⣬����ͱ���֤�ˡ�
���Ǽ������⣺1. ������Ҫ�ܲ�����ִ�е����⣻2. ��������Ҫϵ�л��ģ���ʱ������⣻3. ���񱣴浽������ݿ�IJ������⣻
1. ��򵥵ķ�����ֻ��һ��Ψһ�ı���������У������ύ��ʱ��ͼ�����У��������������1��2�ͽ���ˣ�����3�������ˡ�
2. ������ǰ���г����뷨����ʱÿ������һ���������ϣ��������Ϻϲ�������Ч�ʲ���������������Ĺ��������п����н���������
���ղ��������ʱ����Ҫ��ʱ������һ��������˼·��ÿ����¼һ�����ߣ�ÿ����������ļ�¼��������һ�����ӣ�������������
�����������䣬ij����¼����������ڶ������ʱ�������Ⱥ��ϵ�Զ��γ�˳�򣬲����������������ȡ��������ӣ�����ͬʱ
��ȡ�����Ŀ��Բ������棬�����ϵ������İ�ʱ�򱣴档���˼·�IJ������淽�����Ǻܺõ�ʵ�֡�ʵ�����һ�û����Ƿ����һ��
���ķ���ʵ�����˼·��˵��������ǣ�������е���˼��
| | | | | |
| +-+-+ | |
| | | +-+-+
1 2 3 4 5 6
3. ���¿����������ʱ�������µı���ԭ���������ύ��ʱ�����ϰ��н����Ĺ�����¼���Ϻϲ��������������ڵ�
������¼���϶���û�н����ģ����������÷dz��򵥣��ṹҲ�����������Zeze�ĵ�ǰ������������������ύ����һЩ��
�
void tryUpdateAndCheckpoint(Transaction trans) {
var transAccessRecords = new HashSet<Record>(); // ��ǰ��������з��ʵļ�¼
var relativeRecordSets = new TreeMap<Long, RelativeRecordSet>(); // ��Ҫ��������¼���ϵı������
for (var ar : trans.getAccessedRecords().values()) {
var record = ar.atomicTupleRecord.record;
transAccessRecords.add(record);
relativeRecordSets.put(volatileRrs.id, volatileRrs); // �Ѽ���ǰ�����м�¼�Ĺ�����¼���ϡ�
}
var lockedRelativeRecordSets = new ArrayList<RelativeRecordSet>();
try {
// ��ס���е�ǰ�Ĺ�����¼���ϡ���������㷨���ֹ����㷨����ԭ����һ���ġ�
_lock_(lockedRelativeRecordSets, relativeRecordSets, transAccessRecords);
if (!lockedRelativeRecordSets.isEmpty()) {
var mergedSet = _merge_(lockedRelativeRecordSets, trans, allRead);
// �������ã����ϱ��棬���߼ӵ��ܵĹ�����¼���������еȴ���ʱ����
flush(mergedSet) or relativeRecordSetMap.add(mergedSet);
}
} finally {
lockedRelativeRecordSets.forEach(RelativeRecordSet::unLock);
}
}
�������ݱ����
ʵ����ʷ
1. Xdbʱ���������¼ͨ���������ͬʱ�ռ������Ϣ�����Listener��ʵ�ִ���ͺ�����������������dz��������⡣
2. Zeze��Listener�ڶ��棬��ȡ��ȫ�������㷨�͹��򣬸��������߼��ֿ�����ø������ˡ�����Ч�ʲ��
3. Zeze��Lisnter��ǰ�棬�㷨�͹����Ƕ����ģ�����ʵ�ֱ���ɢ����ͬ���޸���־�ࡣ����汾ʵ����û�еڶ���������
��ʵ�����Ժ󣬴���һ��ͨ�õ�����ͬ�����ݵ�����������汾һ��ʼ��ΪRocksRaftд�ģ�RocksRaft��Ҫ����ͬ�����ݵ�������
���������������ͦ�����ģ��Ͱ�Zeze��ListenerҲ�ij������ʽ�ˡ�
����Java RocksDbʱ��ʱ����ȣ�����Ϥ��װ��������c#��FamilyColumn�Ļ�������һ��Db��ʵ�ֶ��ű���
�����ĺô�������Ӧ��һ��RocksDbʵ����ʹ��Batch��û��ʹ��������Ϊ����Ҫ��չ��ϵͳ���պ��ˡ�