Commit 97ac88b2 authored by Linshizhi's avatar Linshizhi

update channel in defs.js

parent dbd3435c
...@@ -2,6 +2,13 @@ ...@@ -2,6 +2,13 @@
// Definitions // // Definitions //
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
class ChnlStepConflictError extends Error {
constructor() {
super("Channel Step error");
}
}
class CpySchedule { class CpySchedule {
first = { pos : 0, size: 0 }; first = { pos : 0, size: 0 };
second = { pos : 0, size: 0 }; second = { pos : 0, size: 0 };
...@@ -142,39 +149,46 @@ class Channel { ...@@ -142,39 +149,46 @@ class Channel {
return true; return true;
} }
enableBlockMode(size) { enableBlockMode(blksize) {
let capacity = this.#size - 1; let capacity = this.#size - 1;
if (capacity % size) { if (capacity % blksize) {
return false; return false;
} }
this.#readPointerUpdate(this.#metaSize); this.#readPointerUpdate(this.#metaSize);
this.#writePointerUpdate(this.#metaSize); this.#writePointerUpdate(this.#metaSize);
this.#setRBlks(0);
this.#setWBlks(0);
this.#mode = CHANNEL_MODE.BLOCK; this.#mode = CHANNEL_MODE.BLOCK;
this.#blockSize = size; this.#blockSize = blksize;
this.#scheduler = new CopyStrategyInBlock(
this.#size-1, this.#metaSize, this.#blockSize);
} }
enableByteMode() { enableByteMode() {
this.#readPointerUpdate(this.#metaSize); this.#readPointerUpdate(this.#metaSize);
this.#writePointerUpdate(this.#metaSize); this.#writePointerUpdate(this.#metaSize);
this.#mode = CHANNEL_MODE.BYTE; this.#mode = CHANNEL_MODE.BYTE;
thsi.#scheduler = new CopyStrategyInByte(
this.#size ,this.#metaSize);
} }
getRBlks() { #getRBlks() {
return this.#view.getUint8(this.#rBlkShift); return this.#view.getUint8(this.#rBlkShift);
} }
setRBlks(num) { #setRBlks(num) {
this.#view.setUint8(this.#rBlkShift, num); this.#view.setUint8(this.#rBlkShift, num);
} }
getWBlks() { #getWBlks() {
return this.#view.getUint8(this.#wBlkShift); return this.#view.getUint8(this.#wBlkShift);
} }
setWBlks(num) { #setWBlks(num) {
this.#view.setUint8(this.#wBlkShift, num); this.#view.setUint8(this.#wBlkShift, num);
} }
...@@ -207,7 +221,13 @@ class Channel { ...@@ -207,7 +221,13 @@ class Channel {
dataSize() { dataSize() {
this.#readPointerCache = this.#getReadPointer(); this.#readPointerCache = this.#getReadPointer();
this.#writePointerCache = this.#getWritePointer(); this.#writePointerCache = this.#getWritePointer();
return this.#size - this.#remain() - 1;
switch (this.#mode) {
case CHANNEL_MODE.BYTE:
return this.#size - this.#remain() -1;
case CHANNEL_MODE.BLOCK:
return (this.#size - 1) - this.#remain();
}
} }
#remain() { #remain() {
...@@ -220,12 +240,27 @@ class Channel { ...@@ -220,12 +240,27 @@ class Channel {
} }
#remainBlocks() { #remainBlocks() {
if (this.#readPointerCache == this.#writePointerCache) {
// Check RBLKS and WBLKS
let rBlks = this.#getRBlks(),
wBlks = this.#getWBlks();
if (rBlks == wBlks) {
/* Empty */
return this.#size-1;
} else {
/* Full */
return 0;
}
} else if (this.#readPointerCache > this.#writePointerCache) {
return this.#readPointerCache - this.#writePointerCache;
} else {
// this.#readPointerCache < this.#writePointerCache
return (this.#size - 1) - (this.#writePointerCache - this.#readPointerCache);
}
} }
#remainBytes() { #remainBytes() {
this.#readPointerCache = this.#getReadPointer();
if (this.#writePointerCache == this.#readPointerCache) { if (this.#writePointerCache == this.#readPointerCache) {
return this.#size - 1; return this.#size - 1;
} else if (this.#writePointerCache > this.#readPointerCache) { } else if (this.#writePointerCache > this.#readPointerCache) {
...@@ -235,6 +270,17 @@ class Channel { ...@@ -235,6 +270,17 @@ class Channel {
} }
} }
#scheduleArgs(size) {
let args = new CopyArgument();
args.rPos = this.#readPointerCache;
args.wPos = this.#writePointerCache;
args.rBlks = this.#getRBlks();
args.wBlks = this.#getWBlks();
args.size = size;
return args;
}
/* Channel use an array buffer as /* Channel use an array buffer as
* a circular buffer, so some datas * a circular buffer, so some datas
* may unable to be done in one copy. * may unable to be done in one copy.
...@@ -244,8 +290,13 @@ class Channel { ...@@ -244,8 +290,13 @@ class Channel {
* Caution: Before making a copy schedule you must * Caution: Before making a copy schedule you must
* make sure there has enough spaces.*/ * make sure there has enough spaces.*/
#cpySchedule(size) { #cpySchedule(size) {
return this.#scheduler.schedule( let args = this.#scheduleArgs(size);
this.#readPointerCache, this.#writePointerCache , size); let sch = this.#scheduler.schedule(args);
if (this.#mode == CHANNEL_MODE.BLOCK)
this.#setWBlks(args.wBlks);
return sch;
} }
dataView() { dataView() {
...@@ -253,11 +304,11 @@ class Channel { ...@@ -253,11 +304,11 @@ class Channel {
} }
isEmpty() { isEmpty() {
return this.#getWritePointer() == this.#getReadPointer(); return this.#getWritePointer() == this.#getReadPointer() &&
this.#getRBlks() == this.#getWBlks();
} }
// This method is for testing purposes. #readDataBytes(size) {
readData(size) {
let writePos = this.#getWritePointer(); let writePos = this.#getWritePointer();
let readTo = 0, readBuffer = null; let readTo = 0, readBuffer = null;
...@@ -295,6 +346,61 @@ class Channel { ...@@ -295,6 +346,61 @@ class Channel {
return readBuffer; return readBuffer;
} }
#readDataBlks(size) {
let buffer = undefined;
let endPos = this.#endPos - 1;
let rEnd = this.#readPointerCache;
this.#writePointerCache = this.#getWritePointer();
if (this.dataSize() < size)
return new Uint8Array(0);
let blksRequired = Math.floor(size / this.#blockSize);
if (blksRequired == 0) {
return new Uint8Array(0);
}
let rBlks = this.#getRBlks();
buffer = new Uint8Array(blksRequired * this.#blockSize);
if (this.#readPointerCache >= this.#writePointerCache) {
let spaceBetweenEnd = endPos - this.#readPointerCache;
let blks = Math.floor(spaceBetweenEnd / this.#blockSize);
if (blks >= blksRequired) {
rEnd = this.#readPointerCache + (blksRequired * this.#blockSize);
buffer.set(this.#buffer.subarray(this.#readPointerCache, rEnd));
} else {
// Read two times
rEnd = this.#metaSize + (blksRequired - blks) * this.#blockSize;
buffer.set(this.#buffer.subarray(
this.#readPointerCache, blks * this.#blockSize));
buffer.set(this.#buffer.subarray(this.#metaSize, rEnd),
blks * this.#blockSize);
}
} else if (this.#readPointerCache < this.#writePointerCache) {
rEnd = this.#readPointerCache + (blksRequired * this.#blockSize);
buffer.set(this.#buffer.subarray(
this.#readPointerCache, rEnd));
}
this.#setRBlks((rBlks+blksRequired)%Math.pow(2, 8));
// Similar to push() must not change readPointer field in SharedMemory
// until all reads done.
this.#readPointerUpdate(rEnd);
return buffer;
}
readData(size) {
switch (this.#mode) {
case CHANNEL_MODE.BYTE:
return this.#readDataBytes(size);
case CHANNEL_MODE.BLOCK:
return this.#readDataBlks(size);
}
}
/* Return array of buffers to caller /* Return array of buffers to caller
* so caller able to directly write to * so caller able to directly write to
* channel. */ * channel. */
...@@ -303,12 +409,12 @@ class Channel { ...@@ -303,12 +409,12 @@ class Channel {
this.#readPointerCache = this.#getReadPointer(); this.#readPointerCache = this.#getReadPointer();
if (!this.#isAbleToWrite(size)) { if (!this.#isAbleToWrite(size) || size == 0) {
return []; return [];
} }
if (this.#writePhase != WRITE_PHASE.NO_STEP_WRITE) if (this.#writePhase != WRITE_PHASE.NO_STEP_WRITE)
throw new Error("Unable to call multiple writeStep1() consecutively") throw new ChnlStepConflictError();
this.#writePhase = WRITE_PHASE.WAIT_CALLER_WRITE; this.#writePhase = WRITE_PHASE.WAIT_CALLER_WRITE;
let schedule = this.#cpySchedule(size); let schedule = this.#cpySchedule(size);
...@@ -316,9 +422,6 @@ class Channel { ...@@ -316,9 +422,6 @@ class Channel {
let plan_1 = schedule['first']; let plan_1 = schedule['first'];
let plan_2 = schedule['second']; let plan_2 = schedule['second'];
console.log(plan_1);
console.log(plan_2);
let endPos = undefined; let endPos = undefined;
let buffers = []; let buffers = [];
...@@ -336,12 +439,13 @@ class Channel { ...@@ -336,12 +439,13 @@ class Channel {
if (endPos != undefined) if (endPos != undefined)
this.#writePhaseEndPos = endPos; this.#writePhaseEndPos = endPos;
return buffers; return buffers;
} }
writeStep2() { writeStep2() {
if (this.#writePhase != WRITE_PHASE.WAIT_CALLER_WRITE) if (this.#writePhase != WRITE_PHASE.WAIT_CALLER_WRITE)
return; throw new ChnlStepConflictError();
this.#writePointerUpdate(this.#writePhaseEndPos); this.#writePointerUpdate(this.#writePhaseEndPos);
this.#writePhase = WRITE_PHASE.NO_STEP_WRITE; this.#writePhase = WRITE_PHASE.NO_STEP_WRITE;
} }
...@@ -404,8 +508,18 @@ class Channel { ...@@ -404,8 +508,18 @@ class Channel {
// Copy Strategies // // Copy Strategies //
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
class CopyArgument {
rPos = 0;
wPos = 0;
rBlks = 0;
wBlks = 0;
size = 0;
};
class CopyStrategy { class CopyStrategy {
schedule(rPos, wPos) { /* This method assume that Channel has enough space
* to hold datas you want to writes. */
schedule(args) {
return NEED_TO_IMPLEMENT; return NEED_TO_IMPLEMENT;
} }
}; };
...@@ -424,10 +538,14 @@ class CopyStrategyInByte extends CopyStrategy { ...@@ -424,10 +538,14 @@ class CopyStrategyInByte extends CopyStrategy {
this.#endPos = this.#shift + this.#size; this.#endPos = this.#shift + this.#size;
} }
schedule(rPos, wPos, size) { schedule(args) {
let firstCpySize = 0, secondCpySize = 0, spaceToTail = 0; let firstCpySize = 0, secondCpySize = 0, spaceToTail = 0;
let schedule = new CpySchedule(); let schedule = new CpySchedule();
let rPos = args.rPos;
let wPos = args.wPos;
let size = args.size;
if (wPos >= rPos) { if (wPos >= rPos) {
spaceToTail = this.#endPos - wPos; spaceToTail = this.#endPos - wPos;
firstCpySize = Math.min(size, spaceToTail); firstCpySize = Math.min(size, spaceToTail);
...@@ -448,6 +566,74 @@ class CopyStrategyInByte extends CopyStrategy { ...@@ -448,6 +566,74 @@ class CopyStrategyInByte extends CopyStrategy {
}; };
class CopyStrategyInBlock extends CopyStrategy {
#size = 0;
#shift = 0;
#blockSize = 0;
#endPos = 0;
constructor(size, shift, blockSize) {
super();
this.#size = size;
this.#shift = shift;
this.#endPos = this.#shift + this.#size;
this.#blockSize = blockSize;
}
schedule(args) {
let retSchedule = new CpySchedule();
let wPos = args.wPos == this.#endPos ? this.#shift : args.wPos;
let rPos = args.rPos;
let size = args.size;
let blocksRequired = Math.floor(size / this.#blockSize);
if (blocksRequired == 0) {
// Schedules with all zero size
return retSchedule;
}
if (wPos > rPos) {
/* Make sure block has consecutive bytes */
let spaceBetweenEnd = this.#endPos - wPos;
let blksAbleToWrite = Math.floor(spaceBetweenEnd / this.#blockSize);
if (blksAbleToWrite == 0) {
return retSchedule();
}
/* Do first copy schedule */
retSchedule.first.pos = wPos;
if (blksAbleToWrite >= blocksRequired) {
retSchedule.first.size = blocksRequired * this.#blockSize;
} else {
retSchedule.first.size = blksAbleToWrite * this.#blockSize;
blocksRequired -= blksAbleToWrite;
/* Second one is required */
retSchedule.second.pos = this.#shift;
retSchedule.second.size = blocksRequired * this.#blockSize;
}
} else if (wPos == rPos) {
// Under the assumption of schedule() 'wPos == rPos' means
// channel is empty
retSchedule.first.pos = wPos;
retSchedule.first.size = blocksRequired * this.#blockSize;
} else {
// wPos < rPos
retSchedule.first.pos = wPos;
retSchedule.first.size = blocksRequired * this.#blockSize;
}
args.wBlks = (args.wBlks + blocksRequired) % Math.pow(2, 8);
return retSchedule;
}
}
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// Message Definitions // // Message Definitions //
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment