Commit 30cb28e4 authored by Linshizhi's avatar Linshizhi

update

parent 2802c91c
......@@ -4,6 +4,7 @@ set -euo pipefail
EM_VERSION=2.0.24
BUILD_EXTERNAL=${EXTERNAL:-false}
BUILD_FFMPEG=${BUILD_FFMPEG:-true}
if [[ "$(docker images -q emscripten/emsdk:2.0.24 2> /dev/null)" == "" ]]; then
docker pull emscripten/emsdk:$EM_VERSION
......@@ -15,5 +16,6 @@ docker run \
-v /etc/passwd:/etc/passwd:ro \
-v /etc/group:/etc/group:ro \
-e EXTERNAL="$BUILD_EXTERNAL" \
-e BUILD_FFMPEG="$BUILD_FFMPEG" \
emscripten/emsdk:$EM_VERSION \
bash ./wasm-build.sh "$(id -u)" "$(id -g)" "$@"
......@@ -50,33 +50,25 @@ class Channel {
assert(size >= 2, `Channel require its data area has at least 2 Bytes.`)
this.#size = size;
this.#size = size + 1;
// Init shared memory
this.#metaSize = this.#rFieldPosLen + this.#wFieldPosLen + this.#priFieldLen;
this.#shMem = shMem == null ? new bufferType(size + this.#metaSize) : shMem;
this.#shMem = shMem == null ? new bufferType(this.#size + this.#metaSize) : shMem;
this.#view = new DataView(this.#shMem);
this.#buffer = new Uint8Array(this.#shMem);
if (shMem == null) {
this.#writePointerCache = this.#metaSize;
this.#readPointerCache = this.#metaSize;
this.#writePointerCache = this.#metaSize;
this.#readPointerCache = this.#metaSize;
// Init readPointer and writePointer to
// the first bytes of data area.
this.#view.setUint32(0, this.#writePointerCache);
this.#view.setUint32(4, this.#readPointerCache);
// Init readPointer and writePointer to
// the first bytes of data area.
this.#view.setUint32(0, this.#writePointerCache);
this.#view.setUint32(4, this.#readPointerCache);
} else {
this.#writePointerCache = this.#getWritePointer();
this.#readPointerCache = this.#getReadPointer();
}
this.#size = size;
this.#totalSize = this.#metaSize + this.#size;
this.#endPos = this.#metaSize + this.#size;
}
#getReadPointer() {
......@@ -392,6 +384,7 @@ function isDataMsg(msg) {
* */
let PRIV_FLAGS = Object.freeze({
EXECUTING : 0x00000001,
EOF : 0x00000002,
});
......
......@@ -166,8 +166,6 @@ async function EOFProcessing() {
}
}
let i = 0;
async function RGBProcessing(frame) {
let ret = 0, size_;
......@@ -184,8 +182,7 @@ async function RGBProcessing(frame) {
throw new Error("Unknown error");
}
++i;
console.log("Encoder Send size: " + size_);
sendToMuxer(size_);
return true;
}
......
......@@ -11,7 +11,7 @@ let channels = [];
// Constants
const SLEEP_INTERVAL = 100;
const SPIN_TIMEOUT = 500;
const SPIN_TIMEOUT = 100000;
// WASM Objects
let MUX_BUFFER_SIZE = Math.pow(2, 22);
......@@ -41,7 +41,7 @@ async function main(msg) {
if (!isInited) await init(msg);
break;
case MESSAGE_TYPE.BRIDGE:
if (!isBridged) await bridging();
if (!isBridged) await bridging(msg);
break;
case MESSAGE_TYPE.BRIDGE_DESTROY:
if (isBridged) await deBridging();
......@@ -65,8 +65,6 @@ async function init(msg) {
if ('numofEncs' in info)
numOfEncs = info.numofEncs;
muxBuffer = muxer._malloc(MUX_BUFFER_SIZE);
/* Init Muxer */
muxer._muxInit(numOfEncs);
......@@ -75,10 +73,6 @@ async function init(msg) {
async function destroy() {
/* Free all wasm memories */
muxer._free(muxBuffer);
muxBuffer = null
/* Destroy MUXER_WASM */
muxer.muxDestruct();
}
......@@ -102,39 +96,45 @@ async function deBridging() {
// Return value:
// True : Success
// False : Fail
// True : AGAIN
// False : EOF
async function step() {
let chn = undefined;
console.log("Step");
// Read Datas
for (let i = 0; i < numOfEncs; ++i) {
let data = channels[i].readData(MUX_BUFFER_SIZE);
chn = channels[i];
let data = chn.readData(MUX_BUFFER_SIZE);
console.log("Mux Receive (" + data.byteLength + " Bytes ) " + data)
if (data.byteLength == 0)
if (data.byteLength == 0) {
// Check EOF
let flag = chn.readPriv();
if (flag & PRIV_FLAGS.EOF) {
muxer._eof(i);
}
continue;
}
/* Write data into wasm */
/* Then handle data from wasm */
muxBuffer = muxer._malloc(MUX_BUFFER_SIZE);
muxer.HEAP8.set(data, muxBuffer);
muxer._muxPush(i, muxBuffer, data.byteLength);
}
// Handle Datas
muxer._muxStep();
console.log("Mux step done")
return true;
}
async function steps() {
let timeout = SPIN_TIMEOUT;
src.setPriv(MSG.PRIV_FLAGS.EXECUTING);
while (timeout > 0) {
if (await step() === false) {
await sleep(SLEEP_INTERVAL);
timeout -= SLEEP_INTERVAL;
} else {
timeout = SPIN_TIMEOUT;
}
while (await step()) {
await sleep(SLEEP_INTERVAL);
}
src.unsetPriv(MSG.PRIV_FLAGS.EXECUTING);
postMessage(makeMsg(MESSAGE_TYPE.DATA_REQUIRE, {}));
}
......@@ -49,12 +49,12 @@ export class Channel {
assert(size >= 2, `Channel require its data area has at least 2 Bytes.`)
this.#size = size;
this.#size = size+1;
// Init shared memory
this.#metaSize = this.#rFieldPosLen + this.#wFieldPosLen + this.#priFieldLen;
this.#shMem = shMem == null ? new bufferType(size + this.#metaSize) : shMem;
this.#shMem = shMem == null ? new bufferType(this.#size + this.#metaSize) : shMem;
this.#view = new DataView(this.#shMem);
this.#buffer = new Uint8Array(this.#shMem);
......@@ -66,7 +66,6 @@ export class Channel {
this.#view.setUint32(0, this.#writePointerCache);
this.#view.setUint32(4, this.#readPointerCache);
this.#size = size;
this.#totalSize = this.#metaSize + this.#size;
this.#endPos = this.#metaSize + this.#size;
......@@ -127,14 +126,14 @@ export class Channel {
}
#remain() {
let readPos = this.#getReadPointer();
this.#readPointerCache = this.#getReadPointer();
if (this.#writePointerCache == readPos) {
if (this.#writePointerCache == this.#readPointerCache) {
return this.#size - 1;
} else if (this.#writePointerCache > readPos) {
return this.#size - (this.#writePointerCache - readPos) - 1;
} else if (this.#writePointerCache > this.#readPointerCache) {
return this.#size - (this.#writePointerCache - this.#readPointerCache) - 1;
} else {
return readPos - this.#writePointerCache - 1;
return this.#readPointerCache - this.#writePointerCache - 1;
}
}
......@@ -218,6 +217,7 @@ export class Channel {
push(data /* Uint8Array */) {
let writePos = this.#writePointerCache;
this.#readPointerCache = this.#getReadPointer();
if (!this.#isAbleToWrite(data.byteLength)) {
return false;
......
......@@ -120,4 +120,5 @@ export function isDataMsg(msg) {
* */
export let PRIV_FLAGS = Object.freeze({
EXECUTING : 0x00000001,
EOF : 0x00000002,
});
......@@ -95,15 +95,22 @@ EM_PORT_API(uint8_t) encodeInit(int width, int height, int fps) {
// Init bitstream filter
bsFilter = av_bsf_get_by_name("h264_mp4toannexb");
if (bsFilter == NULL) {
printf("Fail to get h264_mp4toannexb\n");
return 5;
}
av_bsf_alloc(bsFilter, &bsCtx);
if (bsCtx == NULL) {
printf("Fail to alloc bitstream filter context\n");
return 6;
}
avcodec_parameters_from_context(bsCtx->par_in, cc);
avcodec_parameters_from_context(bsCtx->par_out, cc);
if (av_bsf_init(bsCtx) < 0) {
printf("failed to init bitstream filter context\n");
return 7;
}
return 0;
}
......@@ -196,10 +203,6 @@ EM_PORT_API(int) getPackets(uint8_t *buffer, uint32_t size, uint32_t *osize) {
goto DONE;
}
// Do bitstream modification to let packet can be
// streaming.
// For video frame avcodec_receive_packet should return
// only once.
if (remainSize > packet->size) {
......@@ -227,6 +230,7 @@ EM_PORT_API(int) getPackets(uint8_t *buffer, uint32_t size, uint32_t *osize) {
#include "ioctx.h"
#include "proto/movMemProto.h"
#include <vector>
#include <array>
#include <queue>
#include <memory>
......@@ -267,11 +271,20 @@ private:
bool ioCtxInited = false;
int numOfStreams = 0;
PacketBuffer pktBuffer;
int finishedCount = 0;
std::shared_ptr<bool[]> finished;
std::shared_ptr<bool[]> inited;
bool oCtxInited = false;
std::vector<IOEndpoint> protos;
std::vector<IOCtx::InCtx> ctxs;
IOCtx::InCtx **ctxs = nullptr;
constexpr size_t PKT_BUF_UPPER_BOUND = 300;
int currentChannel = 0;
const char *outputPath = "/tmp/output.mp4";
IOCtx::OutCtx oCtx { outputPath };
bool done = false;
};
EM_PORT_API(int) muxInit(int numOfStreams) {
......@@ -279,7 +292,9 @@ EM_PORT_API(int) muxInit(int numOfStreams) {
// Setup MuxEnv Status
MuxEnv::numOfStreams = numOfStreams;
MuxEnv::pktBuffer = MuxEnv::PacketBuffer(numOfStreams);
MuxEnv::finished = std::shared_ptr<bool[]>(new bool[numOfStreams]);
MuxEnv::finished = std::shared_ptr<bool[]>(new bool[numOfStreams]{false});
MuxEnv::inited = std::shared_ptr<bool[]>(new bool[numOfStreams]{false});
MuxEnv::ctxs = (IOCtx::InCtx**)malloc(sizeof(IOCtx::InCtx*));
for (int i = 0; i < numOfStreams; ++i) {
......@@ -308,22 +323,39 @@ int bufferPackets() {
// then buffer such packets.
for (int i = 0; i < MuxEnv::numOfStreams; ++i) {
if (MuxEnv::finished[i])
if (MuxEnv::finished[i] || !MuxEnv::inited[i])
continue;
IOCtx::InCtx &c = MuxEnv::ctxs[i];
IOCtx::InCtx *c = MuxEnv::ctxs[i];
pkt = pkt == nullptr ? av_packet_alloc() : pkt;
ret = c.readFrame_(pkt);
ret = c->readFrame_(pkt);
if (ret < 0) {
if (ret == AVERROR_EOF) {
MuxEnv::finishedCount++;
MuxEnv::finished[i] = true;
if (MuxEnv::finishedCount == MuxEnv::numOfStreams) {
AVPacket *nullpkt = av_packet_alloc();
nullpkt->data = nullptr;
nullpkt->size = 0;
MuxEnv::pktBuffer.push(i, nullpkt);
}
goto END_LOOP;
}
if (ret == AVERROR(EAGAIN)) {
printf("READFRAME AGAIN\n");
}
printf("Failed to read frame\n");
av_packet_unref(pkt);
continue;
}
printf("PKT SIZE %d\n", pkt->size);
MuxEnv::pktBuffer.push(i, pkt);
pkt = nullptr;
......@@ -337,24 +369,88 @@ int bufferPackets() {
// Packets reside in PKT Buffer is more than
// 'PKT_BUF_UPPER_BOUND', exit the loop
total >= MuxEnv::PKT_BUF_UPPER_BOUND) {
break;
}
END_LOOP:;
}
return total;
}
void ioCtxInitialize() {
printf("Size of Buffer: %ld\n", MuxEnv::protos[0].size());
for (int i = 0; i < MuxEnv::numOfStreams; ++i) {
// IOProto require 20KB to probe stream informations.
if (!MuxEnv::inited[i] && MuxEnv::protos[i].size() > 60000) {
MuxEnv::ctxs[i] = new IOCtx::InCtx{"", &MuxEnv::protos[0]};
MuxEnv::inited[i] = true;
if (!MuxEnv::oCtxInited) {
AVStream *s = MuxEnv::ctxs[i]->getStream([](AVStream *s) {
return s->codecpar->codec_type == AVMEDIA_TYPE_VIDEO;
});
MuxEnv::oCtx.newStream(s->codecpar);
MuxEnv::oCtx.writeHeader();
}
printf("INIT DONE\n");
}
}
}
EM_PORT_API(int) muxStep() {
int i;
// Buffer Encoded frames from
// Encoder into 'MuxEnv::pktBuffer'
bufferPackets();
// Initialize IOCtx
ioCtxInitialize();
// Write out to file
while (true)
for (i = MuxEnv::currentChannel; i < MuxEnv::numOfStreams; ++i) {
AVPacket *p = MuxEnv::pktBuffer.pop(i);
if (p == nullptr)
goto END_LOOP;
if (p->data == nullptr && p->size == 0) {
av_packet_free(&p);
goto MUX_DONE;
}
// Write to output context
//MuxEnv::oCtx.writeFrame(p);
printf("Write Done\n");
// Cleaning
av_packet_free(&p);
}
END_LOOP:
MuxEnv::currentChannel = i;
return 0;
MUX_DONE:
MuxEnv::done = true;
MuxEnv::oCtx.writeTrailer();
// Cleaning
return 0;
}
EM_PORT_API(int) isDone() {
return MuxEnv::done;
}
EM_PORT_API(void) eof(int i) {
MuxEnv::protos[i].eof();
}
int main(int argc, char *argv[]) {}
......@@ -176,7 +176,7 @@ describe("EncWW Specifications", () => {
let memB = new SharedArrayBuffer(1024);
enc.postMessage(makeMsg(MESSAGE_TYPE.BRIDGE, { shm: memB, size: 1024 }));
await sleep(500);
await sleep(1000);
expect(enc.getState() == WW_STATE.READY).toBe(true);
await sleep(1000);
......@@ -190,4 +190,61 @@ describe("EncWW Specifications", () => {
await sleep(1000);
});
fit("Encoder and Muxer", async () => {
const RGBAFrameSize = 1920*1080*4;
const BRIDGE_SIZE = Math.pow(2, 26);
const INPUT_SIZE = RGBAFrameSize*30;
// Create Encoder
let enc = new WW("ENC", "../resources/workers/encWW.js");
let mem = new SharedArrayBuffer(INPUT_SIZE+1+12);
enc.postMessage(makeMsg(
MESSAGE_TYPE.INIT, { shm:mem, size: INPUT_SIZE, ident: "ENC" }));
// Create Muxer
let mux = new WW("Mux", "../resources/workers/muxWW.js");
mux.postMessage(makeMsg(
MESSAGE_TYPE.INIT, { numOfEncs: 1 }
))
// Bridge
let bridgeMem = new SharedArrayBuffer(BRIDGE_SIZE);
enc.postMessage(makeMsg(
MESSAGE_TYPE.BRIDGE, { shm:bridgeMem, size: BRIDGE_SIZE }
))
mux.postMessage(makeMsg(
MESSAGE_TYPE.BRIDGE, { shm:bridgeMem, size: BRIDGE_SIZE }
))
let input = new Channel(INPUT_SIZE, SharedArrayBuffer, mem);
const data = new Uint8Array([...Array(RGBAFrameSize).keys()]);
await new Promise(r => {
let intvler = setInterval(() => {
if (enc.getState() == WW_STATE.READY) {
clearInterval(intvler);
r();
}
}, 100);
});
/* Send Data */
enc.postMessage(makeMsg(
MESSAGE_TYPE.DATA, {}
));
mux.postMessage(makeMsg(
MESSAGE_TYPE.DATA, {}
))
for (let i = 0; i < 60; ++i) {
while (input.push(data) == false) {
await sleep(100);
}
}
await sleep(100000);
}, 100000);
});
import { sleep } from '../src/utils.js';
import { H264EncWWGroup } from '../src/encGroup.js';
import { Obervable, Observable } from 'rxjs';
import { Observable } from 'rxjs';
import { Channel } from '../src/channel.js';
const areEqual = (first, second) =>
......@@ -184,15 +183,3 @@ describe("Channel Spec", () => {
}, 10000);
});
describe("H264EncWWGroup Spec", () => {
it("Instantiation", async () => {
let wg = new H264EncWWGroup("h264enc", { numOfWW: 2 });
await wg.start();
await sleep(1000);
expect(wg.numOfWorker()).toBe(2);
});
})
......@@ -37,16 +37,20 @@ if [ ! -d "${BUILD_DIR}/lib" ]; then
mkdir ${BUILD_DIR}/lib
fi
if [ "$BUILD_FFMPEG" == "true" ]; then
# Build ffmpeg.wasm-core
cd ${LIB_DIR}/ffmpeg.wasm-core
bash ./build.sh
cp lib*/*.a ${BUILD_DIR}/lib
cp build/include/* ${BUILD_DIR}/include
cp build/lib/* ${BUILD_DIR}/lib
cp -r build/include/* ${BUILD_DIR}/include
cp -r build/lib/* ${BUILD_DIR}/lib
fi
# Build Protocols
cd ${LIB_DIR}/ffmpeg.protos
mkdir build
if [ ! -d "build" ]; then
mkdir build
fi
cd build
emcmake cmake .. -DCMAKE_INSTALL_PREFIX=${BUILD_DIR} -Denable_trmem=OFF -Denable_movmem=ON -DDEBUG=OFF -DIS_EMCC_ENV=ON \
-DCMAKE_CXX_FLAGS=-isystem\ ${LIB_DIR}/ffmpeg.wasm-core
......@@ -80,7 +84,7 @@ fi
FLAGS=(
-I$BUILD_DIR/include -L$BUILD_DIR/lib -I$LIB_DIR/ffmpeg.wasm-core -Wno-deprecated-declarations
-Wno-pointer-sign -Wno-implicit-int-float-conversion -Wno-switch -Wno-parentheses -Qunused-arguments
-lavdevice -lavfilter -lavformat -lavcodec -lswresample -lswscale -lavutil -lpostproc -lm -lharfbuzz -lfribidi -lass -lx264 -lx265 -lvpx -lwavpack -lmp3lame -lfdk-aac -lvorbis -lvorbisenc -lvorbisfile -logg -ltheora -ltheoraenc -ltheoradec -lz -lfreetype -lopus -lwebp
-lavdevice -lavfilter -lavformat -lavcodec -lswresample -lswscale -lavutil -lpostproc -lm -lx264 -lz
-lshmproto
$WASM_DIR/interfaces.cc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment