Commit 50ffd1cc authored by Linshizhi's avatar Linshizhi

Update Channel.

parent fac579ca
...@@ -82,7 +82,7 @@ module.exports = function(config) { ...@@ -82,7 +82,7 @@ module.exports = function(config) {
// start these browsers // start these browsers
// available browser launchers: https://www.npmjs.com/search?q=keywords:karma-launcher // available browser launchers: https://www.npmjs.com/search?q=keywords:karma-launcher
browsers: ['ChromeHeadless'], //browsers: ['ChromeHeadless'],
// Continuous Integration mode // Continuous Integration mode
// if true, Karma captures browsers, runs the tests and exits // if true, Karma captures browsers, runs the tests and exits
......
...@@ -41,8 +41,8 @@ export class Channel { ...@@ -41,8 +41,8 @@ export class Channel {
#view = undefined; #view = undefined;
#buffer = undefined; #buffer = undefined;
#writeValCache = 0; #writePointerCache = 0;
#readValCache = 0; #readPointerCache = 0;
#endPos = 0; #endPos = 0;
// size's unit is byte // size's unit is byte
...@@ -59,16 +59,16 @@ export class Channel { ...@@ -59,16 +59,16 @@ export class Channel {
this.#shMem = new bufferType(size + this.#metaSize); this.#shMem = new bufferType(size + this.#metaSize);
this.#view = new DataView(this.#shMem); this.#view = new DataView(this.#shMem);
this.#buffer = new Uint8Array(this.#shMem); this.#buffer = new Uint8Array(this.#shMem);
this.#writeValCache = this.#metaSize; this.#writePointerCache = this.#metaSize;
this.#readValCache = this.#metaSize; this.#readPointerCache = this.#metaSize;
this.#size = size; this.#size = size;
this.#totalSize = this.#metaSize + this.#size; this.#totalSize = this.#metaSize + this.#size;
this.#endPos = this.#metaSize + this.#size; this.#endPos = this.#metaSize + this.#size;
// Init readPointer and writePointer to // Init readPointer and writePointer to
// the first bytes of data area. // the first bytes of data area.
this.#view.setUint32(0, this.#writeValCache); this.#view.setUint32(0, this.#writePointerCache);
this.#view.setUint32(4, this.#readValCache); this.#view.setUint32(4, this.#readPointerCache);
} }
#getReadPointer() { #getReadPointer() {
...@@ -97,7 +97,6 @@ export class Channel { ...@@ -97,7 +97,6 @@ export class Channel {
for (let i = 8; i < 12; ++i) { for (let i = 8; i < 12; ++i) {
this.#view.setUint8(i); this.#view.setUint8(i);
} }
} }
/* Semantic: Is able to write 'size' of datas /* Semantic: Is able to write 'size' of datas
...@@ -111,14 +110,14 @@ export class Channel { ...@@ -111,14 +110,14 @@ export class Channel {
} }
#remain() { #remain() {
let readPos = this.#readPointer(); let readPos = this.#getReadPointer();
if (this.#writePos == readPos) { if (this.#writePointerCache == readPos) {
return this.#size - 1; return this.#size - 1;
} else if (this.#writePos > readPos) { } else if (this.#writePointerCache > readPos) {
return this.#size - (this.#writePos - readPos) - 1; return this.#size - (this.#writePointerCache - readPos) - 1;
} else { } else {
return readPos - this.#writePos - 1; return readPos - this.#writePointerCache - 1;
} }
} }
...@@ -133,22 +132,21 @@ export class Channel { ...@@ -133,22 +132,21 @@ export class Channel {
#cpySchedule(size) { #cpySchedule(size) {
let firstCpySize = 0, secondCpySize = 0, spaceToTail = 0; let firstCpySize = 0, secondCpySize = 0, spaceToTail = 0;
let schedule = new CpySchedule(); let schedule = new CpySchedule();
let readPos = this.#readPos(); let readPos = this.#getReadPointer();
if (this.#writePos >= readPos) { if (this.#writePointerCache >= readPos) {
spaceToTail = this.#endPos - this.#writePos; spaceToTail = this.#endPos - this.#writePointerCache;
firstCpySize = Math.min(size, spaceToTail); firstCpySize = Math.min(size, spaceToTail);
secondCpySize = firstCpySize < size ? size - firstCpySize : 0; secondCpySize = firstCpySize < size ? size - firstCpySize : 0;
secondCpySize = Math.min(secondCpySize, readPos - this.#metaSize); secondCpySize = Math.min(secondCpySize, readPos - this.#metaSize);
schedule.first.pos = this.#writePos; schedule.first.pos = this.#writePointerCache;
schedule.first.size = firstCpySize; schedule.first.size = firstCpySize;
schedule.second.pos = secondCpySize > 0 ? this.#metaSize : 0; schedule.second.pos = secondCpySize > 0 ? this.#metaSize : 0;
schedule.second.size = secondCpySize; schedule.second.size = secondCpySize;
} else { } else {
schedule.first.pos = this.#writePos; schedule.first.pos = this.#writePointerCache;
schedule.first.size = Math.min(readPos - this.#writePos - 1, size); schedule.first.size = Math.min(readPos - this.#writePointerCache - 1, size);
} }
return schedule; return schedule;
...@@ -159,24 +157,23 @@ export class Channel { ...@@ -159,24 +157,23 @@ export class Channel {
} }
isEmpty() { isEmpty() {
return this.#writePos == this.#readPos(); return this.#writePointerCache == this.#readPointerCache;
} }
// This method is for testing purposes. // This method is for testing purposes.
readData(size) { readData(size) {
let writePos = this.#writePos(); let writePos = this.#getWritePointer();
let readTo = 0, readBuffer = null; let readTo = 0, readBuffer = null;
if (this.#readPointerCache == writePos) {
if (this.#readPos == writePos) {
return new Uint8Array(0); return new Uint8Array(0);
} else if (this.#readPos < writePos) { } else if (this.#readPointerCache < writePos) {
readTo = this.#readPos + Math.min(size, writePos - this.#readPos); readTo = this.#readPointerCache + Math.min(size, writePos - this.#readPointerCache);
readBuffer = this.#buffer.slice(this.#readPos, readTo); readBuffer = this.#buffer.slice(this.#readPointerCache, readTo);
this.#readPosUpdate(readTo); this.#readPointerUpdate(readTo);
} else { } else {
// Read two times // Read two times
let firstRSize = Math.min(size, this.#buffer.byteLength-this.#readPos); let firstRSize = Math.min(size, this.#buffer.byteLength-this.#readPointerCache);
let secondRSize = firstRSize < size ? size - firstRSize : 0; let secondRSize = firstRSize < size ? size - firstRSize : 0;
secondRSize = Math.min(secondRSize, writePos-this.#metaSize); secondRSize = Math.min(secondRSize, writePos-this.#metaSize);
...@@ -184,18 +181,18 @@ export class Channel { ...@@ -184,18 +181,18 @@ export class Channel {
// First read // First read
readBuffer.set(this.#buffer.slice( readBuffer.set(this.#buffer.slice(
this.#readPos, this.#readPos+firstRSize), 0); this.#readPointerCache, this.#readPointerCache+firstRSize), 0);
// Second Read // Second Read
if (secondRSize > 0) { if (secondRSize > 0) {
readBuffer.set( readBuffer.set(
this.#buffer.slice(this.#metaSize, this.#metaSize+secondRSize), this.#buffer.slice(this.#metaSize, this.#metaSize+secondRSize),
firstRSize); firstRSize);
this.#readPosUpdate(this.#metaSize+secondRSize); this.#readPointerUpdate(this.#metaSize+secondRSize);
} else { } else {
let newPos = this.#readPos+firstRSize; let newPos = this.#readPointerCache+firstRSize;
newPos = newPos == this.#buffer.byteLength ? this.#metaSize : newPos; newPos = newPos == this.#buffer.byteLength ? this.#metaSize : newPos;
this.#readPosUpdate(newPos); this.#readPointerUpdate(newPos);
} }
} }
...@@ -204,7 +201,7 @@ export class Channel { ...@@ -204,7 +201,7 @@ export class Channel {
push(data /* Uint8Array */) { push(data /* Uint8Array */) {
let writePos = this.#writePos; let writePos = this.#writePointerCache;
if (!this.#isAbleToWrite(data.byteLength)) { if (!this.#isAbleToWrite(data.byteLength)) {
return false; return false;
...@@ -235,18 +232,18 @@ export class Channel { ...@@ -235,18 +232,18 @@ export class Channel {
// all datas are writed but not before or // all datas are writed but not before or
// at intermediate of some writes otherwise // at intermediate of some writes otherwise
// oppsite side may read invalid datas. // oppsite side may read invalid datas.
this.#writePosUpdate(writePos); this.#writePointerUpdate(writePos);
return true; return true;
} }
#writePointerUpdate(pos) { #writePointerUpdate(pos) {
this.#writePos = pos; this.#writePointerCache = pos;
this.#view.setUint32(4, pos); this.#view.setUint32(4, pos);
} }
#readPointerUpdate(pos) { #readPointerUpdate(pos) {
this.#readPos = pos; this.#readPointerCache = pos;
this.#view.setUint32(0, pos); this.#view.setUint32(0, pos);
} }
} }
...@@ -131,7 +131,7 @@ describe("Channel Spec", () => { ...@@ -131,7 +131,7 @@ describe("Channel Spec", () => {
it("Transfer 64 MB to WebWorker", async () => { it("Transfer 64 MB to WebWorker", async () => {
let url = new URL('./workers/channelWW.js', import.meta.url), let url = new URL('./workers/channelWW.js', import.meta.url),
ret = true; ret = true;
let channel = new Channel(Math.pow(2,20)); let channel = new Channel(Math.pow(2,10));
let worker = new Worker(url, { type: 'module' }); let worker = new Worker(url, { type: 'module' });
let sended = 0; let sended = 0;
...@@ -139,9 +139,9 @@ describe("Channel Spec", () => { ...@@ -139,9 +139,9 @@ describe("Channel Spec", () => {
[...Array(getRandomInt(Math.pow(2,10))).keys()]); [...Array(getRandomInt(Math.pow(2,10))).keys()]);
let cur = 0; let cur = 0;
let size = Math.pow(2, 26); let size = Math.pow(2, 15);
let rBuffer = new Uint8Array(Math.pow(2, 27)); let rBuffer = new Uint8Array(Math.pow(2, 16));
let sBuffer = new Uint8Array(Math.pow(2, 27)); let sBuffer = new Uint8Array(Math.pow(2, 16));
worker.postMessage(channel.getShMem()); worker.postMessage(channel.getShMem());
...@@ -159,7 +159,7 @@ describe("Channel Spec", () => { ...@@ -159,7 +159,7 @@ describe("Channel Spec", () => {
ret = channel.push(dataToWrite); ret = channel.push(dataToWrite);
if (ret == false) { if (ret == false) {
await sleep(100); await sleep(10);
continue; continue;
} else { } else {
sBuffer.set(dataToWrite, sended); sBuffer.set(dataToWrite, sended);
...@@ -174,16 +174,17 @@ describe("Channel Spec", () => { ...@@ -174,16 +174,17 @@ describe("Channel Spec", () => {
// Wait for Web Worker // Wait for Web Worker
await new Promise(r => { await new Promise(r => {
setInterval(() => { setInterval(() => {
console.log(cur, sended);
if (cur == sended) { if (cur == sended) {
r(); r();
} }
}, 10); }, 10);
}, 1000000); }, 100000);
worker.terminate(); worker.terminate();
expect(areEqual(rBuffer, sBuffer)).toBe(true); expect(areEqual(rBuffer, sBuffer)).toBe(true);
}, 10000); }, 100000);
}); });
......
...@@ -49,7 +49,6 @@ class ChannelReader { ...@@ -49,7 +49,6 @@ class ChannelReader {
let writePos = this.#writePos(); let writePos = this.#writePos();
let readTo = 0, readBuffer = null; let readTo = 0, readBuffer = null;
if (this.#readPos == writePos) { if (this.#readPos == writePos) {
return new Uint8Array(0); return new Uint8Array(0);
} else if (this.#readPos < writePos) { } else if (this.#readPos < writePos) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment