Commit a939bbfb authored by Linshizhi's avatar Linshizhi

update

parent b13a7d7e
......@@ -4,7 +4,7 @@ async function sleep(ms) {
}
async function main(mem) {
let chnlReader = new Channel(Math.pow(2, 10), SharedArrayBuffer, mem);
let chnlReader = new Channel(Math.pow(2, 10), SharedArrayBuffer, mem);
let ret = true;
let rSize = Math.pow(2, 10);
......@@ -214,6 +214,7 @@ class Channel {
let writePos = this.#getWritePointer();
let readTo = 0, readBuffer = null;
if (this.#readPointerCache == writePos) {
return new Uint8Array(0);
} else if (this.#readPointerCache < writePos) {
......
......@@ -2,10 +2,31 @@
import { assert } from './utils.js';
class CpySchedule {
first = { pos: 0, size: 0 }
second = { pos: 0, size: 0 }
first = { pos : 0, size: 0 };
second = { pos : 0, size: 0 };
isEmptySchedule() {
return first.size == 0 && second.size == 0;
}
reset() {
first.pos = 0;
first.size = 0;
second.pos = 0;
second.size = 0;
}
}
const CHANNEL_MODE = Object.freeze({
BYTE : 0,
BLOCK : 1,
});
const WRITE_PHASE = Object.freeze({
NO_STEP_WRITE : 0,
WAIT_CALLER_WRITE : 1,
});
export function createChnlBuffer(size, bufferType=SharedArrayBuffer) {
return new bufferType(size+12+1);
}
......@@ -43,10 +64,16 @@ export class Channel {
#view = undefined;
#buffer = undefined;
#mode = CHANNEL_MODE.BYTE;
#blockSize = 0;
#writePointerCache = 0;
#readPointerCache = 0;
#endPos = 0;
#writePhase = WRITE_PHASE.NO_STEP_WRITE;
#writePhaseEndPos = 0;
// size's unit is byte
// bufferType parameter is mainly for testability.
constructor(size, bufferType = SharedArrayBuffer, shMem = null) {
......@@ -102,6 +129,26 @@ export class Channel {
return true;
}
enableBlockMode(size) {
let capacity = this.#size - 1;
if (capacity % size) {
return false;
}
this.#readPointerUpdate(this.#metaSize);
this.#writePointerUpdate(this.#metaSize);
this.#mode = CHANNEL_MODE.BLOCK;
this.#blockSize = size;
}
enableByteMode() {
this.#readPointerUpdate(this.#metaSize);
this.#writePointerUpdate(this.#metaSize);
this.#mode = CHANNEL_MODE.BYTE;
}
isSetPriv(flag) {
let flags = this.readPriv();
return flags & flag;
......@@ -155,6 +202,59 @@ export class Channel {
* Caution: Before making a copy schedule you must
* make sure there has enough spaces.*/
#cpySchedule(size) {
switch (this.#mode) {
case CHANNEL_MODE.BYTE:
return this.#cpyScheduleBytes(size);
case CHANNEL_MODE.BLOCK:
return this.#cpyScheduleBlocks(size);
}
}
#cpyScheduleBlocks(size) {
let retSchedule = new CpySchedule();
let blocksRequired = Math.floor(size / this.#blockSize);
let endPos = this.#endPos - 1;
if (blocksRequired == 0) {
// Schedules with all zero size
return retSchedule;
}
if (this.#writePointerCache > this.#readPointerCache) {
/* Make sure block has consecutive bytes */
let spaceBetweenEnd = endPos - this.#writePointerCache;
let numOfBlksFirstAblePlace = Math.floor(spaceBetweenEnd / this.#blockSize);
if (numOfBlksFirstAblePlace == 0 || (spaceBetweenEnd % this.#blockSize) > 0) {
/* Must not happen */
return retSchedule();
}
/* Do first copy schedule */
retSchedule.first.pos = this.#writePointerCache;
if (numOfBlksFirstAblePlace >= blocksRequired) {
retSchedule.first.size = blocksRequired * this.#blockSize;
return retSchedule;
} else {
retSchedule.first.size = numOfBlksFirstAblePlace * this.#blockSize;
blocksRequired -= numOfBlksFirstAblePlace;
}
/* Second one is required */
retSchedule.second.pos = this.#metaSize;
let spaceBetweenRWPtr = this.#readPointerCache - this.#writePointerCache;
/* No enough space */
if (Math.floor(spaceBetweenRWPtr/this.#blockSize) < blocksRequired) {
retSchedule.reset();
return retSchedule;
}
retSchedule.second.size = blocksRequired * this.#blockSize;
return retSchedule;
}
#cpyScheduleBytes(size) {
let firstCpySize = 0, secondCpySize = 0, spaceToTail = 0;
let schedule = new CpySchedule();
let readPos = this.#getReadPointer();
......@@ -224,7 +324,64 @@ export class Channel {
return readBuffer;
}
/* Return array of buffers to caller
* so caller able to directly write to
* channel. */
writeStep1(size) {
this.#writePointerCache = this.#getWritePointer();
this.#readPointerCache = this.#getReadPointer();
console.log(this.#readPointerCache, this.#writePointerCache);
if (!this.#isAbleToWrite(size)) {
return [];
}
if (this.#writePhase != WRITE_PHASE.NO_STEP_WRITE)
throw new Error("Unable to call multiple writeStep1() consecutively")
this.#writePhase = WRITE_PHASE.WAIT_CALLER_WRITE;
let schedule = this.#cpySchedule(size);
let plan_1 = schedule['first'];
let plan_2 = schedule['second'];
console.log(plan_1);
console.log(plan_2);
let endPos = undefined;
let buffers = [];
if (plan_1.size != 0) {
endPos = plan_1.pos + plan_1.size;
buffers.push(this.#buffer.subarray(
plan_1.pos, endPos));
}
if (plan_2.size != 0) {
endPos = plan_2.pos + plan_2.size;
buffers.push(this.#buffer.subarray(
plan_2.pos, endPos));
}
if (endPos != undefined)
this.#writePhaseEndPos = endPos;
return buffers;
}
writeStep2() {
if (this.#writePhase != WRITE_PHASE.WAIT_CALLER_WRITE)
return;
this.#writePointerUpdate(this.#writePhaseEndPos);
this.#writePhase = WRITE_PHASE.NO_STEP_WRITE;
}
push(data /* Uint8Array */) {
if (this.#writePhase != WRITE_PHASE.NO_STEP_WRITE) {
throw new Error("Unable to push into Channel during Step Write");
}
let writePos = this.#writePointerCache;
this.#readPointerCache = this.#getReadPointer();
......
......@@ -79,6 +79,38 @@ describe("Channel Spec", () => {
expect(channel.isEmpty()).toBe(true);
});
it("32 KB Datas Write and Read with StepWrite", () => {
let ret = false;
let remainToWrite = Math.pow(2, 15), readData = null;
let channel = new Channel(Math.pow(2, 10), ArrayBuffer);
let pos = 0, endPos = 0;
while (remainToWrite > 0) {
let dataToWrite = new Uint8Array(
[...Array(getRandomInt(Math.pow(2,10))).keys()]);
pos = 0;
endPos = 0;
let arrays = channel.writeStep1(dataToWrite.byteLength);
for (let i = 0; i < arrays.length; ++i) {
endPos = pos + arrays[i].length;
arrays[i].set(dataToWrite.subarray(pos, endPos), 0);
pos = endPos;
}
channel.writeStep2();
readData = channel.readData(dataToWrite.byteLength);
ret = areEqual(dataToWrite, readData);
expect(ret).withContext("Data Mismatch").toBe(true);
remainToWrite -= readData.byteLength;
}
expect(channel.isEmpty()).toBe(true);
})
it("32 KB Datas Write and Read", () => {
let ret = false;
let remainToWrite = Math.pow(2, 15), readData = null;
......@@ -126,6 +158,39 @@ describe("Channel Spec", () => {
expect(channel.isEmpty()).toBe(true);
});
fit("Two step transfer", async () => {
let unitSize = 10;
let channel = new Channel(unitSize * 10);
let datas;
let arrays;
let wData = [ 0,1,2,3,4,5,6,7,8,9 ];
for (let i = 0; i < 10; ++i) {
arrays = channel.writeStep1(unitSize);
arrays[0].set(wData, 0);
channel.writeStep2();
}
expect(channel.dataSize()).toBe(unitSize*10);
arrays = channel.writeStep1(unitSize);
expect(arrays.length).toBe(0);
for (let i = 0; i < 10; ++i) {
datas = channel.readData(unitSize);
expect(areEqual(datas, wData)).toBe(true);
}
arrays = channel.writeStep1(unitSize);
console.log(arrays);
expect(arrays.length).toBe(1);
arrays[0].set(wData, 0);
channel.writeStep2();
datas = channel.readData(unitSize);
expect(areEqual(datas, wData)).toBe(true);
});
it("Transfer 64 MB to WebWorker", async () => {
let ret = true;
let channel = new Channel(Math.pow(2,16));
......@@ -182,4 +247,73 @@ describe("Channel Spec", () => {
}, 10000);
it("Transfer 64 MB to WebWorker with StepWrite", async () => {
let ret = true;
let channel = new Channel(Math.pow(2,16));
let worker = new Worker('../resources/tests/channelWW.js');
let sended = 0;
let dataToWrite = new Uint8Array(
[...Array(getRandomInt(Math.pow(2,10))).keys()]);
let cur = 0;
let size = Math.pow(2, 16);
let rBuffer = new Uint8Array(Math.pow(2, 17));
let sBuffer = new Uint8Array(Math.pow(2, 17));
worker.postMessage(channel.getShMem());
let stream = new Observable(sub => {
worker.onmessage = e => { sub.next(e.data); }
});
stream.subscribe(data => {
rBuffer.set(data, cur);
cur += data.byteLength;
});
let pos , endPos;
while (size > 0) {
// Write to remote
pos = 0;
endPos = 0;
let arrays = channel.writeStep1(dataToWrite.byteLength);
if (arrays.length > 0) {
for (let i = 0; i < arrays.length; ++i) {
endPos = pos + arrays[i].length;
arrays[i].set(dataToWrite.subarray(pos, endPos), 0);
pos = endPos;
}
channel.writeStep2();
}
if (arrays.length == 0) {
await sleep(10);
continue;
} else {
sBuffer.set(dataToWrite, sended);
sended += dataToWrite.byteLength;
}
size -= dataToWrite.byteLength;
dataToWrite = new Uint8Array(
[...Array(getRandomInt(Math.pow(2,10))).keys()]);
}
// Wait for Web Worker
await new Promise(r => {
setInterval(() => {
if (cur == sended) {
r();
}
}, 100);
}, 100000);
worker.terminate();
expect(areEqual(rBuffer, sBuffer)).toBe(true);
}, 10000);
});
......@@ -18,7 +18,7 @@ beforeEach(async () => {
describe("ParaEncoder", () => {
fit("Encode With ParaEncoder", async () => {
it("Encode With ParaEncoder", async () => {
const data = new Uint8Array([...Array(RGBAFrameSize).keys()]);
let st = new Date();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment