Zeze��̸�ݸ�
Zeze��̸
Section titled “Zeze��̸”���
Zeze��һ��Ƕ�뿪�����Եķֲ�ʽ�����ܡ�˼��仯����
Section titled “˼��仯����”��ֱ��������ϣ��ۺ�������γ�����һ�㣬����ϲ���û�д��ⵥһ�Ĺ����պ�ԭ�� ����δ��������������������Ҫ��ʱ���������ӡ��ֲ�˼·������ħ�� ����ij���ֲ�ϵͳʱ��һ��ֻ���Ƕ�����������ϵͳ�����ջ��Ƕ༶�ġ��Ӵ�ļ�¼(Record)
Section titled “�Ӵ�ļ�¼(Record)”Record����ֻ��Cache�е�һ�����ݣ�����������ع�����Ҫ��ʱ������Record��������Լ���״̬�������Ѿ���ú��Ӵ�
1. class Record
a) �ֹ����ͻ�������������尴Դ��˳�����У�û�з��ࡣReentrantLock fairLock; // ��¼����boolean dirty; // ��¼�Ƿ���ı�־��volatile Bean strongDirtyValue; // ���¼�������������Bean�����ã���ֹ���¼�������û��յ���volatile long timestamp; // ʱ�����ֹ�������״̬��volatile SoftReference<Bean> softValue; // ��¼�����ã�����һ���������Ա����գ������յ�����Bean������Ҫ��ʱ�������ر���Ӳ����װ�ؽ�����volatile RelativeRecordSet relativeRecordSet; // ��¼�����Ĺ�����¼���ϡ���¼��������һ��������¼���ϡ�volatile int state; // ��¼״̬��Ҳ�����Ƿ������ı�־��
SoftReference ����������¼��������İ汾���������ڴ��еġ�ͨ��ÿ�����Ļ����������ã�CacheCapacity�������ڴ��ʹ�á����ڱ��ļ�¼��С��һ��ʹ��Ƶ�ʲ�һ��ͨ�����ÿ����ڴ�ʹ����һ���ȽϷ��صĹ��������ԣ�������������õĶ������棬�ڴ�����SoftReference<Bean>��ͬʱ�ڱ���Ӳ�̱���һ�ݡ���������ʧЧʱ�ӱ���Ӳ��װ�ء������������Ժ�JVM����������ڴ棬�Ѳ��ּ�¼���յ�������ÿ����¼���ڴ���ֻ��¼һ�����ã�û�����ݣ����������Ϳ������õıȽϴ��ڴ�Ҳ�������ܵ���¼��С��Ӱ�졣�����ĺô��ǣ�I) ʹ�ñ��������������������������ʣ�II) �����ã���ͳһ�����б������������ó�һ���ģ����ұȽϴ����ֵ��
�ֹ���������¼������صĺ�����˵��
b) ���ʵ��Ż�boolean fresh;long acquireTime;������ʾ��¼�Ƿ����ʵġ��ոջ�ÿ���Ȩ�ļ�¼�������ֹ������п���û�������ɹ�ִ���������ֻᱻ�������������ߣ������Դ�˷ѣ�����һֱ���������������ʧ�ܣ���Ȼ�����ԱȽϵ͡� final boolean isFresh() { // �����־��������ʱ�ᱻ����Global����Ӱ��Global�����ķ��䡣 return fresh; }
final boolean isFreshAcquire() { // ���ʵģ����һ��ʱ��С��1�룬��ʱ��ķ�����������ᱻ�ܾ��� // fresh ������ʱ�����ó�false�� // acquireTime ��һ��������ʩ����ֹ�����������û��ʵ��ʹ�õ����������һ��Ҳ�ᱻ��Ϊ�������ʣ� return fresh && System.currentTimeMillis() - acquireTime < 1000; }
final void setNotFresh() { // �������������������ķ������ˡ��������ʡ� fresh = false; }
final void setFreshAcquire() { // ����ɹ�װ�ؼ�¼ʱ���á� acquireTime = System.currentTimeMillis(); fresh = true; }c) Checkpoint���Ż�Database.Transaction databaseTransactionTmp;Database.Transaction databaseTransactionOldTmp;
Database.Transaction ��Zeze�ĺ�����ݿ⣬��MySql������Zeze֧�ֶ��������ݿ⣬�������ǿ���һ��������ʹ�á�Checkpointʱ���ᴴ��ÿ��������ݿ�����������ύ���ݡ�Checkpoint�ǰ���¼�����ģ��Ѻ�����ݿ�����浽��¼����һ���Ż������������¼����ʱ������һ����ȥ���Һ�����ݿ�������ˡ�
databaseTransactionOldTmpZeze֧�ֵ�һ�����ԣ�����һ���ܴ�ĺ�����ݿⱸ�ݷŵ�һ�߱��"ֻ��"��Zeze����ִ���У������ݴӱ��ݿ��и���ʵ��ʹ��װ�ص���ǰ�Ĺ������ݿ��С�����ijЩ��Ϸ���ϺϷ�����ɺ�����ݿ�ܴ��ǻ�Ծ�����ֲ���ʱ��������Կ��Դ��ļ�С�������ݿ⣬���������ܺͼ���ȫ����ʱ�䡣ʹ��������ԣ���Ҫ�������ã�����������ܱȽ���֣����ﲻ��ϸ˵�������ˡ�ʵ����һ����Ҳ˵����������Ҫȥ�����롣"ֻ��"������¼�ӹ�����¼��ɾ��ʱ����Ҫɾ�����ݿ⣬������һ�β�ѯ��װ��һ�Σ����Ͳ����ˡ�
2. class Record1<K, V> extends Record�����������Record�����࣬�������ͣ��Լ�ϵ�л���صķ�����
a) ����TableX<K, V> table; // ��¼�����ı���final K key; // ��¼��Key��
b) ϵ�л�����ByteBuffer snapshotKey; // ��¼��Keyϵ�л������ʱ���á�ByteBuffer snapshotValue; // ��¼��Valueϵ�л������ʱ���á�
c) ��ȫ���Dirtylong savedTimestampForCheckpointPeriod; // ��������Checkpoint�ؼ���ʱ��ʱ����// ��¼���ķ��ʲ����Ǻ�Checkpoint���̲������̲����ģ���������������// ����¼�������պ��ֻᱻ��ʱ��Checkpoint���ܰѼ�¼���óɲ��ࡣ// Checkpoint���ڱ����ʱ���Ϳ���ʱ��ʱ��һ��ʱ���ļ�¼�����־��
d) ɾ�������ڵļ�¼�Ż�boolean existInBackDatabase; // ��¼�Ƿ��ں�����ݿ��д��ڡ�boolean existInBackDatabaseSavedForFlushRemove; // ����������ڱ�־��������߲�������������// ��¼�Ƿ��ں�����ݿ��д��ڣ���Ϊ���Ż���¼������ʱ�������ɾ����������������ݿ⡣// һ�������ݿ�ɾ�������ڵļ�¼�IJ������ۺ�ɾ��������һ���ġ���������İ�����// ���Ӧ�ô����˺ܶ���ʱ��¼�����ǻ�û���ύ��������ݿ�ǰ��ɾ���ˡ����û�������־��// Zeze����������µİ�ɾ����������������ݿ⣬��������˷ѡ�
e) ConcurrentLruLike �ĸ�������volatile ConcurrentHashMap<K, Record1<K, V>> lruNode;
Zeze.Util.ConcurrentLruLike ��һ��ͨ�õİ�װ��Zeze.Transaction.TableCache Ҳ����ConcurrentLruLike������ר�õģ�����ʡһ���ڴ�����һ�����ܡ�
java Ĭ�ϵ�lru�������ܲ����ã�����д�����ConcurrentLruLike����ʵ�ʵIJ���������Դ��ConcurrentHashMap����˵��һ�£�ConcurrentHashMap<K, Record1<K, V>> dataMap; // Lru�������еļ�¼ӳ�䡣ConcurrentLinkedQueue<ConcurrentHashMap<K, Record1<K, V>>> lruQueue; // ��¼��Ծʱ������ӳ��Ķ��У���ʱ������ConcurrentHashMap<K, Record1<K, V>> lruHot; // �ȵ�ӳ�䣬�϶����ᱻ���ա�java��lru��LinkedList�ṹ��¼��ȫ����˳����У���������IJ������ܲ��ߡ�ConcurrentLruLike����ConcurrentHashMap���ṩ�ĵIJ������ʣ�����˳����б�ò���ȷ����lruQueue�ľ����ƽ����ֹ������
Section titled “�ֹ������”�㷨Ҫ��1�����������ʵ���������ʵļ�¼�洢��SortedDictionary�С�2�����������ͻ���������Ƿ�ı䡣��ͻ����������3����ͻ����ʱ�����Ѿ��õ������������ڳ�ͻ�dz�����ʱ���ڶ���ִ������һ�㶼�ܳɹ��� ����������һֱ��ͻ��������Զû����ɵ������4�������������������������������ʵļ�¼���ܷ����仯������������Ȼ��Ҫ�ٴ�ִ��lockAndCheck���� ���Ҵ��������ʵļ�¼������������⡣5. ���������ش���������쳣ʱҲ��Ҫ���lockAndCheck����Ϊ�ֹ�����ʵ�ʴ�����ʱû�м����� ���ܴ��ڲ���ԭ���±�����Ӧ�÷���������ʱ����Ȼ��Ҫ��������ɳ�ͻ��飬�����ͻ�ˣ� Ҳ��Ҫ������
�ϴ��롣 public long perform(Procedure procedure) { try { // ��7.�� try 1. �ͷż�¼�� var checkpoint = procedure.getZeze().getCheckpoint(); if (checkpoint == null) return Procedure.Closed; for (int tryCount = 0; tryCount < 256; ++tryCount) { // ��6.�� ��ೢ�Դ��� // Ĭ���������ظ����ԣ�����CheckResult.RedoAndReleaseLock����������CheckResult.Redo�������ᵼ�������� checkpoint.enterFlushReadLock(); // ��8.�� ����������Global�ķֲ�ʽ������������ǿյġ� try { // ��7.�� try 2. �ͷ�checkpoint������������RedoAndReleaseLockʱ����Ҫȫ���ͷ���������������� for (; tryCount < 256; ++tryCount) { // ��6.�� ��ೢ�Դ��� CheckResult checkResult = CheckResult.Redo; // ���������Ƿ��ͷ��������� _lock_and_check_ ��ȷ������Ҫ�ͷ����������ͷš� try { // ��7.�� try 3. �������� var result = procedure.call(); switch (state) { // ��11.��state ���������ִ��״̬������Ҫ��״̬�仯��ʱ�����ϻ��ڴ�������state���������������顣 // ���������̲�����Ҫ�쳣������ʧ��״̬��������ǰxdb���ܲ�Error��ԭ�������������״̬����ʹ�����̳Ե���һЩ�쳣�� // ִ��������Ȼ���ᶪʧ״̬���ơ� case Running: var saveSize = savepoints.size(); if ((result == Procedure.Success && saveSize != 1) || (result != Procedure.Success && saveSize > 0)) { // �������Ӧ������ logger.fatal("Transaction.Perform:{}. savepoints.Count != 1.", procedure); finalRollback(procedure); return Procedure.ErrorSavepoint; } checkResult = lockAndCheck(procedure.getTransactionLevel()); // ��10.������procedure.call����ʲô����Ҫ��������ͻ���μ�����ĵ�5�㡣 if (checkResult == CheckResult.Success) { if (result == Procedure.Success) { finalCommit(procedure); // ��9.������ɹ������ύ������ // ����һ�γɹ��IJ�ͳ�ƣ������۲�redo��ࡣ // ʧ���� Procedure.cs �е�ͳ�ơ� if (tryCount > 0) { if (Macro.enableStatistics) { ProcedureStatistics.getInstance().getOrAdd("Zeze.Transaction.TryCount").getOrAdd(tryCount).increment(); } } return Procedure.Success; } finalRollback(procedure, true); return result; } break; // retry
case Abort: logger.warn("Transaction.Perform: Abort"); finalRollback(procedure); return Procedure.AbortException;
case Redo: //checkResult = CheckResult.Redo; break; // retry
case RedoAndReleaseLock: checkResult = CheckResult.RedoAndReleaseLock; break; // retry } // retry clear in finally if (alwaysReleaseLockWhenRedo && checkResult == CheckResult.Redo) checkResult = CheckResult.RedoAndReleaseLock; triggerRedoActions(); // ��11.������Redo�� } catch (Throwable e) { // Procedure.Call �����Ѿ��������쳣��ֻ�� unit test �������������ڲ�����ᵽ����� // �� unit test �£��쳣��־�ᱻ��¼���Ρ� switch (state) { case Running: logger.error("Transaction.Perform:{} exception. run count:{}", procedure, tryCount, e); if (!savepoints.isEmpty()) { // �������Ӧ������ logger.fatal("Transaction.Perform:{}. exception. savepoints.Count != 0.", procedure, e); finalRollback(procedure); return Procedure.ErrorSavepoint; } // ���� unit test ���쳣���������unit test����ܴ��乤�� if (e instanceof AssertionError) { finalRollback(procedure); throw (AssertionError)e; } checkResult = lockAndCheck(procedure.getTransactionLevel()); // ��10.���쳣Ҳ��Ҫ����ͻ���μ�����ĵ�5�㡣 if (checkResult == CheckResult.Success) { finalRollback(procedure, true); return Procedure.Exception; } // retry break;
case Abort: if (!"GlobalAgent.Acquire Failed".equals(e.getMessage()) && !"GlobalAgent In FastErrorPeriod".equals(e.getMessage())) logger.warn("Transaction.Perform: Abort", e); finalRollback(procedure); return Procedure.AbortException;
case Redo: checkResult = CheckResult.Redo; break;
case RedoAndReleaseLock: checkResult = CheckResult.RedoAndReleaseLock; break;
default: // case Completed: if (e instanceof AssertionError) throw (AssertionError)e; } triggerRedoActions(); // retry } finally { if (checkResult == CheckResult.RedoAndReleaseLock) { holdLocks.forEach(Lockey::exitLock); holdLocks.clear(); } // retry ���ܱ������е����������¼�ͱ���㡣 accessedRecords.clear(); savepoints.clear(); actions.clear(); redoActions.clear();
state = TransactionState.Running; // prepare to retry }
if (checkResult == CheckResult.RedoAndReleaseLock) { // logger.debug("CheckResult.RedoAndReleaseLock break {}", procedure); break; // ��12.����Ҫ�ͷ���������������һ��ѭ���� } } } finally { checkpoint.exitFlushReadLock(); } //logger.Debug("Checkpoint.WaitRun {0}", procedure); // ʵ��Fresh�����Ժ�ɾ��Sleep��// ��13.������æ�ȡ� try { Thread.sleep(Zeze.Util.Random.getInstance().nextInt(80) + 20); } catch (InterruptedException e) { logger.error("", e); } } logger.error("Transaction.Perform:{}. too many try.", procedure); finalRollback(procedure); return Procedure.TooManyTry; } finally { holdLocks.forEach(Lockey::exitLock); holdLocks.clear(); } }
private CheckResult lockAndCheck(TransactionLevel level) { boolean allRead = true; var saveSize = savepoints.size(); if (saveSize > 0) { // ȫ�� Rollback ʱ Count Ϊ 0������ύʱ Count ����Ϊ 1�� // �����������Begin,Commit,Rollback��ƥ�䡣�����顣 // ��ɨ������־�����ü�¼Dirty��־�����ƶ�д���� var it = savepoints.get(saveSize - 1).logIterator(); if (it != null) { while (it.moveToNext()) { // ������־������ bean ������־����ȻҲ������ Record�� // ���ڲ��������������������δ����չ��Ҫ�� Log log = it.value(); if (log.getBean() == null) continue;
TableKey tkey = log.getBean().tableKey(); var record = accessedRecords.get(tkey); if (record != null) { record.dirty = true; allRead = false; } else { // ֻ�в��Դ����ѷ� Managed �� Bean ����־�ӽ����� logger.fatal("impossible! record not found."); } } } }
if (allRead && level == TransactionLevel.AllowDirtyWhenAllRead) return CheckResult.Success; // ʹ��һ���µ�enum��ʾһ�£�
boolean conflict = false; // ��ͻ�ˣ�Ҳ����������Ϊ�������������� if (holdLocks.isEmpty()) { // ���Ż�����һ�μ�������˳�ɡ� for (var e : accessedRecords.entrySet()) { var r = lockAndCheck(e); switch (r) { case Success: break; case Redo: conflict = true; break; // continue lock default: return r; } } return conflict ? CheckResult.Redo : CheckResult.Success; }
// ����������õĶ��У�A. ���ʵļ�¼��B. �Ѿ����е������� int index = 0; int n = holdLocks.size(); final var ite = accessedRecords.entrySet().iterator(); var e = ite.hasNext() ? ite.next() : null; while (null != e) { // ��� holdLocks ȫ�����Ա���ϣ�ֱ�������� if (index >= n) { var r = lockAndCheck(e); switch (r) { case Success: break; case Redo: conflict = true; break; // continue lock default: return r; } e = ite.hasNext() ? ite.next() : null; continue; }
Lockey curLock = holdLocks.get(index); int c = curLock.getTableKey().compareTo(e.getKey());
// holdLocks a b ... // needLocks a b ... if (c == 0) { // ������ܷ�����д������ if (e.getValue().dirty && !curLock.isWriteLockHeld()) { // ������ȫ���ͷţ���������ǰ��¼������������ļ�¼�� // ֱ�� unlockRead��lockWrite�������� n = _unlock_start_(index, n); // �ӵ�ǰindex֮�����¼���������index��n�������ٷ����仯�� // ���´ӵ�ǰ e �������� continue; } // BUG ��ʹ���ڡ�Record.Global.State ����û����������Ҫˮƽ����Ҫ����_check_�� var r = _check_(e.getValue().dirty, e.getValue()); switch (r) { case Success: // �Ѿ����ڣ����Կ϶������ͻ���������������� break; case Redo: // Impossible! conflict = true; break; // continue lock default: // _check_������Ҫ��Global����״̬��������ܷ���GLOBAL-DEAD-LOCK�� return r; } ++index; e = ite.hasNext() ? ite.next() : null; continue; } // holdLocks a b ... // needLocks a c ... if (c < 0) { // �ͷŵ� �ȵ�ǰ����С��������Ϊ��ǰ�����в�����Ҫ��Щ�� int unlockEndIndex = index; for (; unlockEndIndex < n && holdLocks.get(unlockEndIndex).getTableKey().compareTo(e.getKey()) < 0; ++unlockEndIndex) holdLocks.get(unlockEndIndex).exitLock(); holdLocks.subList(index, unlockEndIndex).clear(); n = holdLocks.size(); // ���´ӵ�ǰ e �������� continue; }
// holdLocks a c ... // needLocks a b ... // Ϊ�˲�Υ�������ͷŴӵ�ǰ����ʼ�������� n = _unlock_start_(index, n); // ���´ӵ�ǰ e �������� } return conflict ? CheckResult.Redo : CheckResult.Success; }
private CheckResult lockAndCheck(Map.Entry<TableKey, RecordAccessed> e) { Lockey lockey = locks.get(e.getKey()); boolean writeLock = e.getValue().dirty; lockey.enterLock(writeLock); holdLocks.add(lockey); return _check_(writeLock, e.getValue()); }
private static CheckResult _check_(boolean writeLock, RecordAccessed e) { e.atomicTupleRecord.record.enterFairLock(); try { if (writeLock) { switch (e.atomicTupleRecord.record.getState()) { case GlobalCacheManagerConst.StateRemoved: // ����cache������������иü�¼��Global�������������ɡ� return CheckResult.Redo;
case GlobalCacheManagerConst.StateInvalid: return CheckResult.RedoAndReleaseLock; // �����Invalid��������Reduce����
case GlobalCacheManagerConst.StateModify: return e.atomicTupleRecord.timestamp != e.atomicTupleRecord.record.getTimestamp() ? CheckResult.Redo : CheckResult.Success;
case GlobalCacheManagerConst.StateShare: // ���������������һ���Ȼ������������Ҫ��Reduce�����DZ���Checkpoint��������ȥ������ǰ����ס�ˡ� // ͨ�� GlobalCacheManager �������������ʧ��;��Ҫ�������ͷ����� var acquire = e.atomicTupleRecord.record.acquire(GlobalCacheManagerConst.StateModify, e.atomicTupleRecord.record.isFresh(), false); if (acquire.resultState != GlobalCacheManagerConst.StateModify) { e.atomicTupleRecord.record.setNotFresh(); // ��ʧ�ܲ������ʡ� logger.debug("Acquire Failed. Maybe DeadLock Found {}", e.atomicTupleRecord); e.atomicTupleRecord.record.setState(GlobalCacheManagerConst.StateInvalid); // ���ﱣ��StateShare������ return CheckResult.RedoAndReleaseLock; } e.atomicTupleRecord.record.setState(GlobalCacheManagerConst.StateModify); return e.atomicTupleRecord.timestamp != e.atomicTupleRecord.record.getTimestamp() ? CheckResult.Redo : CheckResult.Success; } return e.atomicTupleRecord.timestamp != e.atomicTupleRecord.record.getTimestamp() ? CheckResult.Redo : CheckResult.Success; // impossible } switch (e.atomicTupleRecord.record.getState()) { case GlobalCacheManagerConst.StateRemoved: // ����cache������������иü�¼��Global�������������ɡ� return CheckResult.Redo;
case GlobalCacheManagerConst.StateInvalid: return CheckResult.RedoAndReleaseLock; // ����Invalid��������Reduce������߱�Cache��������ʱ��������ͷ����� } return e.atomicTupleRecord.timestamp != e.atomicTupleRecord.record.getTimestamp() ? CheckResult.Redo : CheckResult.Success; } finally { e.atomicTupleRecord.record.exitFairLock(); } }�����Global
Section titled “�����Global”1. ������̨������������̨���ݿ⡣ÿ̨������ӵ���Լ��Ļ��档һ���Ի������ά����̨������֮�仺���һ���ԡ�zezeһ���Ի����CPU-Cache-Memory�Ľṹ�������Բο���CPU��MESIЭ���Լ�ʵ����һ����������ơ����һ���Ի���˼·�Ƿdz������ģ����ij��������Global��ȫ�ܵģ�֪�����ж����������㷨Ҳ�dz�ֱ�Ӽ�ʹ��������MESI��״̬������¼�ֳɶ�д����������״̬�������ñ�ʾ��¼���ػ�û��Ȩ�ޣ���״̬����ͬʱ��������̨�����������У�д״ֻ̬������һ̨�������С���һ���Ի���֮�ϣ�ÿһ̨������������������Լ���ռ��������һ������ɱ������ɡ�����ǻ���һ���Ի���ķֲ�ʽ����
2. ����������¼��������״̬��Modify,Share,Invalid����������������Ҫ���ʻ�������ʱ����ȫ����������������������Global�ƺ�������Modify��Share����Global֪�����м�¼�����ķֲ�״̬���������������������ӵ���߷�����Ӧ�Ľ�������ӵ�����ͷ�������������ˢ�µ���˷������Ÿ�Global���ؽ����Global�Ǽ������ߵ���״̬���������߷��ؽ����
3. ����ԭ������ͬһ����¼�����е������������ŶӴ����ģ�һ��ʱ�������һ�������������Ƕ���д������д�����������������ķ�����������ڲ�ͬ�ļ�¼������ִ���Dz����ġ������Ӱ�졣
4. ��������ÿ������������ȡ������������ֹ����������Զ����¼���Ѿ����������ˡ���Global��ͬһ����¼�ķ��ʣ���ʹֻ��һ����¼Ҳ���п��������ġ����磺ServerA ӵ�м�¼1�Ķ�����Ȼ��ȥ����д����ServerB Ҳӵ�м�¼1�Ķ�����Ȼ��ȥ����д������Global�ڴ���ServerA������ʱ��Ҫ֪ͨServerB��������ServerB��ʱҲ�ڵȴ�д�����룬��ʱServerB�ȴ���¼1��Globalд��ʱ���Ѿ��ڱ�����ռ���˼�¼1��������������ò���ִ�С����û��һ�������ƴ������ѭ�����������ˡ������һ����¼��������
5. ����ʵ��α��int acquireShare(Acquire rpc) { while (true) { // ��ѯ�õ���¼�� CacheState cs = global.computeIfAbsent(rpc.Argument.globalKey, CacheState::new);
// ��ס�����¼�� synchronized (cs) { // ��¼״̬�DZ�ɾ���ģ�����ѭ������ȡ�µļ�¼�� // ������ΪcomputeIfAbsent��synchronized (cs)֮�����ʱ�䴰�ڣ��п�����������ڼ�¼�պñ�ɾ���ˡ� // �������ѭ����������һ�����������¼�������ɡ� // ����Ĺؼ��ǣ����е�ͬһ��globalKey�������������ͬһ��CacheState�� if (cs.acquireStatePending == StateRemoved) continue;
while (cs.acquireStatePending != StateInvalid && cs.acquireStatePending != StateRemoved) { if (������()) { // ��������������ʧ�ܽ�� rpc.Result.state = StateInvalid; rpc.SendResultCode(AcquireShareDeadLockFound); return 0; } cs.wait(); //await ��֪ͨ�����CacheState�н����е����ȴ���ɡ�������Dz����Ŷӡ� // Ϊʲô����IJ������ʲ��ܼ�ͨ��synchronized (cs)�����Ȼ���͵ȴ�����Ҫ��cs.wait()�أ� // ����Ϊ�������������⣬�������ͨ�������⣬��ô�ȴ���������������������ܵ��������ڵȴ� // ��������ģ��������������飬��ô����������ʱ��û�������ˡ�ʹ��cs.wait()������ // �����е������ڵȴ�������ʱ��Ҳ�����cs.wait()���������ܿ��ŷŹ����ѵ�ǰ���е������߶� // �Ž�������������⣬���������ſ��ܱ���ϡ� } if (cs.acquireStatePending == StateRemoved) continue; // concurrent release��ͬ��һ��StateRemoved�жϡ�
// ���ִ��Ȩ�ޣ�������������״̬�� cs.acquireStatePending = StateShare;
if (cs.modify != null) { // �����ǰcs����д״̬����Ҫ�Ե�ǰ��ӵ���߽��н��������� cs.modify.reduce(() -> { // �Ե�ǰдӵ���߷����첽���������������ɹ��ص����lambda�� ... // �õ����������notifyAll�� synchronized (cs) { cs.notifyAll(); //notify ֪ͨ�����Ŷӵ������߽��� (line a) } });
cs.wait(); // �ȴ�reduce����� // �������wait����������Ŷӵ������ߣ��Ϳ��Ի�����������������ѭ����� // �������(line a)�еõ������������֪ͨʱ�����еĵȴ��Ķ��ỽ�ѣ�������� // û�л��ִ��Ȩ�Ļ��ٴεȴ����������ѭ���ĸ�cs.wait()��� // ��������Ժ���������Լ����������̡�
// �������������״̬�ģ� sender.acquired.put(gKey, StateShare); cs.modify = null; cs.share.add(sender);
// �ͷ�ִ��״̬�� cs.acquireStatePending = StateInvalid; cs.notifyAll(); //notify ֪ͨ�����Ŷӵ������߽�����
// ���ͽ���� rpc.SendResultCode(0); return 0; }
// ��ǰӵ����Ҳ��ӵ�ж���������������ȡ��ֱ��˳���������������ɡ�
// �������������״̬�ģ� sender.acquired.put(gKey, StateShare); cs.share.add(sender);
// �ͷ�ִ��״̬�� cs.acquireStatePending = StateInvalid; cs.notifyAll(); //notify ֪ͨ�����Ŷӵ������߽�����
// ���ͽ���� rpc.SendResultCode(0); return 0; } }
Ϊʲô�� cs.notifyAll() ? ��Ϊ�������Ҫ�����еȴ��������߶��������һ�飬���������һ����������ô����Ž�������ᷢ�������� ���������ȴ��������������Ŷӣ���ô�������ͻ�ʧ�ܡ�����ÿ�ζ���Ҫȫ�����ѡ�
5. Global��������Ľ������Global�ӽṹ���㷨���ǵ���ġ�ϵͳ��ģ�ܴ��ʱ�����������ܻ������ޡ��������Ľ���������ж��Global��GlobalIndex = hash(GlobalTableKey) % GlobalInstanceCount; �Ѽ�¼������ֲ������Globalʵ���У����������3���������������������������ȷ�ģ����ҿ��Ѹ��طֲ�����
6. Global����ɿ��ԵĽ������Global�ǵ���ģ�������ֽ��̻������쳣���ᵼ��ϵͳ�����á��������Ľ������������Raft�㷨������ÿ��Globalʵ�������ж��Raft���ݣ�ֻҪRaftϵͳ�еĽ����ڵ㳬��Raft�ڵ���/2����ϵͳ���ܳ����ṩ����Raftϵͳ�����������ܻ��½��ܶࡣ����Ӧ����Ҫ�����Լ����������Ƿ����Global Raft�汾��������¼����
Section titled “������¼����”һ���������ĵļ�¼���Ͼ���һ��������¼���ϡ�ֻҪ����������¼����һ������ķ�ʽ���浽������ݿ⣬����ͱ���֤�ˡ����Ǽ������⣺1. ������Ҫ�ܲ�����ִ�е����⣻2. ��������Ҫϵ�л��ģ���ʱ������⣻3. ���浽������ݿ�IJ������⣻
1. ��ķ�����ֻ��һ��Ψһ�ı���������У������ύ��ʱ��ͼ�����У��������������1��2�ͽ���ˣ�����3�������ˡ�
2. ������ǰ���г����뷨����ʱÿ������һ���������ϣ��������Ϻϲ�������Ч�ʲ���������������Ĺ��������п����н������������ղ��������ʱ����Ҫ��ʱ������һ��������˼·��ÿ����¼һ�����ߣ�ÿ����������ļ�¼��������һ�����ӣ������������������������䣬ij����¼����������ڶ������ʱ�������Ⱥ��ϵ�Զ��γ�˳�����������������ȡ��������ӣ�����ͬʱ��ȡ�����Ŀ��Բ������棬�����ϵ������İ�ʱ�档���˼·�IJ������淽�����Ǻܺõ�ʵ�֡�ʵ�����һ�û����Ƿ����һ�����ķ���ʵ�����˼·��˵��������ǣ�������е���˼��
| | | | | || +-+-+ | || | | +-+-+1 2 3 4 5 6
3. ���¿����������ʱ�������µı���ԭ���������ύ��ʱ�����ϰ��н����Ĺ�����¼���Ϻϲ��������������ڵ�������¼���϶���û�н����ģ����������÷dz����ṹҲ�����������Zeze�ĵ�ǰ������������������ύ����һЩ��
α��void tryUpdateAndCheckpoint(Transaction trans) { var transAccessRecords = new HashSet<Record>(); // ��ǰ��������з��ʵļ�¼ var relativeRecordSets = new TreeMap<Long, RelativeRecordSet>(); // ��Ҫ��������¼���ϵı������ for (var ar : trans.getAccessedRecords().values()) { var record = ar.atomicTupleRecord.record; transAccessRecords.add(record); relativeRecordSets.put(volatileRrs.id, volatileRrs); // �Ѽ���ǰ�����м�¼�Ĺ�����¼���ϡ� }
var lockedRelativeRecordSets = new ArrayList<RelativeRecordSet>(); try { // ��ס���е�ǰ�Ĺ�����¼���ϡ���������㷨���ֹ����㷨����ԭ����һ���ġ� _lock_(lockedRelativeRecordSets, relativeRecordSets, transAccessRecords); if (!lockedRelativeRecordSets.isEmpty()) { var mergedSet = _merge_(lockedRelativeRecordSets, trans, allRead); // �������ã����ϱ��棬���ӵ��ܵĹ�����¼���������еȴ���ʱ���� flush(mergedSet) or relativeRecordSetMap.add(mergedSet); } } finally { lockedRelativeRecordSets.forEach(RelativeRecordSet::unLock); }}Listener
Section titled “Listener”�������ݱ����
ʵ����ʷ1. Xdbʱ���������¼ͨ���������ͬʱ�ռ������Ϣ�����Listener��ʵ�ִ���ͺ�����������������dz��������⡣2. Zeze��Listener�ڶ��棬��ȡ��ȫ�������㷨�������������ֿ�����ø������ˡ�����Ч�ʲ��3. Zeze��Lisnter��ǰ�棬�㷨�����Ƕ����ģ�����ʵ�ֱ���ɢ����ͬ������־�ࡣ����汾ʵ����û�еڶ��������� ��ʵ�����Ժ���һ��ͨ�õ�����ͬ�����ݵ�����������汾һ��ʼ��ΪRocksRaftд�ģ�RocksRaft��Ҫ����ͬ�����ݵ������� ���������������ͦ�����ģ��Ͱ�Zeze��ListenerҲ�ij������ʽ�ˡ�RocksDb �����ײ
Section titled “RocksDb �����ײ”����Java RocksDbʱ��ʱ����ȣ�����Ϥ��װ��������c#��FamilyColumn�Ļ�������һ��Db��ʵ�ֶ��ű��������ĺô�������Ӧ��һ��RocksDbʵ����ʹ��Batch��û��ʹ��������Ϊ����Ҫ��չ��ϵͳ���պ��ˡ�