diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 2c61fb15..834cec28 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -158,10 +158,9 @@ jobs: uses: nick-fields/retry@v3 with: timeout_minutes: 5 - max_attempts: 1 + max_attempts: 2 command: | pnpm run test.unit - continue-on-error: true - name: Clean Tmp run: rm -rf ./tmp @@ -172,7 +171,7 @@ jobs: uses: nick-fields/retry@v3 with: timeout_minutes: 5 - max_attempts: 1 + max_attempts: 2 command: | pnpm run test.unit.compat continue-on-error: true @@ -182,11 +181,11 @@ jobs: shell: bash - name: Test Electron Windows/MacOS - if: "${{ !matrix.dockerfile }}" + if: "${{ !contains(matrix.os, 'ubuntu') && !matrix.dockerfile }}" uses: nick-fields/retry@v3 with: timeout_minutes: 5 - max_attempts: 1 + max_attempts: 2 command: | pnpm run test.electron.main continue-on-error: true @@ -196,7 +195,7 @@ jobs: uses: nick-fields/retry@v3 with: timeout_minutes: 5 - max_attempts: 1 + max_attempts: 2 command: | sudo apt-get install xvfb xvfb-run --auto-servernum pnpm run test.electron.main diff --git a/.mocharc.js b/.mocharc.js index 3cc6d952..8bb67cfa 100644 --- a/.mocharc.js +++ b/.mocharc.js @@ -6,9 +6,9 @@ const config = { "expose-gc": true, "v8-expose-gc": true, exit: true, - parallel: true, - timeout: 5000, - retries: 1, + parallel: false, + timeout: 6000, + retries: 3, fullTrace: true, bail: false, } diff --git a/.vscode/settings.json b/.vscode/settings.json index c0a09f4b..2887daba 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -20,6 +20,7 @@ }, "mochaExplorer.files": "test/unit/**/*-test.ts", "mochaExplorer.mochaPath": "./node_modules/mocha", + "mochaExplorer.timeout": 6000, "files.exclude": { "**/.DS_Store": true, "**/Thumbs.db": true, diff --git a/package.json b/package.json index f8ad0303..785bbeb8 100644 --- a/package.json +++ b/package.json @@ -109,7 +109,7 @@ "test.unit.debug": "run-s clean.temp build.debug && mocha ./test/unit/*-test.ts", "test.unit.compat": "run-s clean.temp build && cross-env INCLUDE_COMPAT_TESTS=true mocha ./test/unit/compat/*-test.ts", "test.unit.nogc": "run-s clean.temp build && cross-env SKIP_GC_TESTS=true mocha", - "test.electron.main": "run-s clean.temp build && electron-mocha ./test/unit/*-test.ts", + "test.electron.main": "run-s clean.temp build && cross-env SKIP_GC_TESTS=true electron-mocha ./test/unit/*-test.ts", "test.electron.renderer": "run-s build && electron-mocha --renderer ./test/unit/*-test.ts", "test.smoke": "bash ./script/smoke-test.bash", "format": "run-s format.prettier format.clang-format", diff --git a/test/unit/helpers.ts b/test/unit/helpers.ts index 005f50ec..c449d98b 100644 --- a/test/unit/helpers.ts +++ b/test/unit/helpers.ts @@ -17,7 +17,7 @@ if (semver.satisfies(zmq.version, ">= 4.2")) { * Get a unique id to be used as a port number or IPC path. * This function is thread-safe and will use a lock file to ensure that the id is unique. */ -let idFallback = 5000 +let idFallback = 6000 async function getUniqueId() { const idPath = path.resolve(__dirname, "../../tmp/port-id.lock") await fs.promises.mkdir(path.dirname(idPath), {recursive: true}) @@ -25,10 +25,10 @@ async function getUniqueId() { try { // Create the file if it doesn't exist if (!fs.existsSync(idPath)) { - await fs.promises.writeFile(idPath, "5000", "utf8") + await fs.promises.writeFile(idPath, "6000", "utf8") /* Windows cannot bind on a ports just above 1014; start higher to be safe. */ - return 5000 + return 6000 } await lockfile.lock(idPath, {retries: 10}) @@ -63,7 +63,7 @@ async function getUniqueId() { } } -type Proto = "ipc" | "tcp" | "udp" | "inproc" +export type Proto = "ipc" | "tcp" | "udp" | "inproc" export async function uniqAddress(proto: Proto) { const id = await getUniqueId() @@ -84,6 +84,19 @@ export async function uniqAddress(proto: Proto) { } } +export async function cleanSocket(address: string) { + const [proto, path] = address.split("://")[1] + if (proto !== "ipc" || !path) { + return + } + const exists = await fs.promises + .access(path, fs.constants.F_OK) + .catch(() => false) + if (exists) { + await fs.promises.rm(path) + } +} + export function testProtos(...requested: Proto[]) { const set = new Set(requested) diff --git a/test/unit/proxy-router-dealer-test.ts b/test/unit/proxy-router-dealer-test.ts index 51e3d945..1164747e 100644 --- a/test/unit/proxy-router-dealer-test.ts +++ b/test/unit/proxy-router-dealer-test.ts @@ -1,104 +1,16 @@ -import * as semver from "semver" -import * as zmq from "../../src" - -import {assert} from "chai" -import {testProtos, uniqAddress} from "./helpers" +import {Worker} from "worker_threads" +import {testProtos} from "./helpers" for (const proto of testProtos("tcp", "ipc", "inproc")) { - describe(`proxy with ${proto} router/dealer`, function () { - /* ZMQ < 4.0.5 has no steerable proxy support. */ - if (semver.satisfies(zmq.version, "< 4.0.5")) { - return - } - - let proxy: zmq.Proxy - - let frontAddress: string - let backAddress: string - - let req: zmq.Request - let rep: zmq.Reply - - beforeEach(async function () { - proxy = new zmq.Proxy(new zmq.Router(), new zmq.Dealer()) - - frontAddress = await uniqAddress(proto) - backAddress = await uniqAddress(proto) - - req = new zmq.Request() - rep = new zmq.Reply() - }) - - afterEach(function () { - /* Closing proxy sockets is only necessary if run() fails. */ - proxy.frontEnd.close() - proxy.backEnd.close() - - req.close() - rep.close() - global.gc?.() - }) - - describe("run", function () { - it("should proxy messages", async function () { - /* REQ -> foo -> ROUTER <-> DEALER -> foo -> REP - <- foo <- <- foo <- - -> bar -> -> bar -> - <- bar <- <- bar <- - pause - resume - -> baz -> -> baz -> - <- baz <- <- baz <- - -> qux -> -> qux -> - <- qux <- <- qux <- - */ - - await proxy.frontEnd.bind(frontAddress) - await proxy.backEnd.bind(backAddress) - - const done = proxy.run() - - const messages = ["foo", "bar", "baz", "qux"] - const received: string[] = [] - - await req.connect(frontAddress) - await rep.connect(backAddress) - - const echo = async () => { - for await (const msg of rep) { - await rep.send(msg) - } - } - - const send = async () => { - for (const msg of messages) { - if (received.length === 2) { - proxy.pause() - proxy.resume() - } - - await req.send(Buffer.from(msg)) - - const [res] = await req.receive() - received.push(res.toString()) - if (received.length === messages.length) { - break - } - } - - rep.close() - } - - console.log( - `waiting for messages for proxy with ${proto} router/dealer...`, - ) - - await Promise.all([echo(), send()]) - assert.deepEqual(received, messages) - - proxy.terminate() - await done - console.log(`Done proxying with ${proto} router/dealer`) + describe(`proxy with ${proto} router/dealer`, () => { + describe("run", () => { + it("should proxy messages", async () => { + const worker = new Worker(__filename, { + workerData: { + proto, + }, + }) + await worker.terminate() }) }) }) diff --git a/test/unit/proxy-router-dealer-worker.ts b/test/unit/proxy-router-dealer-worker.ts new file mode 100644 index 00000000..c1e08e00 --- /dev/null +++ b/test/unit/proxy-router-dealer-worker.ts @@ -0,0 +1,98 @@ +import {assert} from "chai" +import * as semver from "semver" +import * as zmq from "../../src" +import type {Proto} from "./helpers" +import {cleanSocket, uniqAddress} from "./helpers" +import {workerData} from "worker_threads" + +async function testProxyRouterDealer(proto: Proto) { + /* ZMQ < 4.0.5 has no steerable proxy support. */ + if (semver.satisfies(zmq.version, "< 4.0.5")) { + return + } + + const proxy = new zmq.Proxy(new zmq.Router(), new zmq.Dealer()) + + const frontAddress = await uniqAddress(proto) + const backAddress = await uniqAddress(proto) + + const req = new zmq.Request() + const rep = new zmq.Reply() + + try { + /* REQ -> foo -> ROUTER <-> DEALER -> foo -> REP + <- foo <- <- foo <- + -> bar -> -> bar -> + <- bar <- <- bar <- + pause + resume + -> baz -> -> baz -> + <- baz <- <- baz <- + -> qux -> -> qux -> + <- qux <- <- qux <- + */ + await proxy.frontEnd.bind(frontAddress) + await proxy.backEnd.bind(backAddress) + + const done = proxy.run() + + const messages = ["foo", "bar", "baz", "qux"] + const received: string[] = [] + + await req.connect(frontAddress) + await rep.connect(backAddress) + + const echo = async () => { + for await (const msg of rep) { + await rep.send(msg) + } + } + + const send = async () => { + for (const msg of messages) { + if (received.length === 2) { + proxy.pause() + proxy.resume() + } + + await req.send(Buffer.from(msg)) + + const [res] = await req.receive() + received.push(res.toString()) + if (received.length === messages.length) { + break + } + } + + rep.close() + } + + console.log(`waiting for messages for proxy with ${proto} router/dealer...`) + + await Promise.all([echo(), send()]) + assert.deepEqual(received, messages) + + proxy.terminate() + await done + console.log(`Done proxying with ${proto} router/dealer`) + } catch (err) { + /* Closing proxy sockets is only necessary if run() fails. */ + proxy.frontEnd.close() + proxy.backEnd.close() + throw err + } finally { + req.close() + rep.close() + global.gc?.() + await Promise.all([cleanSocket(frontAddress), cleanSocket(backAddress)]) + } +} + +// Receive the proto from the main thread +testProxyRouterDealer(workerData.proto as Proto).catch(err => { + console.error( + `Error testing proxy with ${workerData.proto} router/dealer:`, + err, + ) + process.exit(1) +}) diff --git a/test/unit/proxy-run-test.ts b/test/unit/proxy-run-test.ts index 7f2955e3..3aff5164 100644 --- a/test/unit/proxy-run-test.ts +++ b/test/unit/proxy-run-test.ts @@ -2,7 +2,7 @@ import * as semver from "semver" import * as zmq from "../../src" import {assert} from "chai" -import {testProtos, uniqAddress} from "./helpers" +import {cleanSocket, testProtos, uniqAddress} from "./helpers" import {isFullError} from "../../src/errors" for (const proto of testProtos("tcp", "ipc", "inproc")) { diff --git a/test/unit/typings-compatibility-test.ts b/test/unit/typings-compatibility-test.ts index 1c2ee070..b83abbc0 100644 --- a/test/unit/typings-compatibility-test.ts +++ b/test/unit/typings-compatibility-test.ts @@ -129,7 +129,11 @@ describe("compatibility of typings for typescript versions", async function () { }) afterEach(async () => { - await remove(tscTargetPath) + try { + await remove(tscTargetPath) + } catch (err) { + console.error(`Failed to remove ${tscTargetPath}:`, err) + } }) } })