Skip to content

Commit

Permalink
Schopp/fix async (#378)
Browse files Browse the repository at this point in the history
* Fixed async handling

* fixed tests

* Fixed version

* Fixing ci

* Fixed eslint

* Testing new ci

* Trying with old version

* Fixing

* Added release

* Fixed example

* Testing ci stuff

* Fixed version

* Finalized tests

* Deleted dry-run

* Deleted comment

---------

Co-authored-by: Florian Schopp <schopp@hbkworld.com>
  • Loading branch information
Florian-Schopp and Florian Schopp authored Oct 31, 2023
1 parent 4be6d3f commit 8a71aab
Show file tree
Hide file tree
Showing 15 changed files with 74 additions and 99 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ env:
on:
release:
types: [created]
pull_request:
branches:
- master

jobs:
build:
Expand All @@ -20,6 +23,7 @@ jobs:
- run: npm test

publish-npm:
if: startsWith(github.ref, 'refs/tags/v')
needs: build
runs-on: ubuntu-latest
steps:
Expand All @@ -29,6 +33,9 @@ jobs:
node-version: ${{ env.NODE_VERSION }}
registry-url: https://registry.npmjs.org/
- run: npm ci
- run: git config --global user.email "schopp@hbkworld.com"
- run: git config --global user.name "Florian Schopp"
- run: npm version ${{ env.RELEASE_VERSION }}
- run: npm run build
- run: npm publish
env:
Expand Down
2 changes: 1 addition & 1 deletion examples/balls/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Jet client-server communications:
*/
import { select, selectAll, pointer } from 'd3-selection'
import { Fetcher, Peer } from '../../../src'
import { Fetcher, Peer } from '../../../lib'
import { canvasSize } from '../defs'
import { ballType } from '../server'

Expand Down
2 changes: 1 addition & 1 deletion examples/balls/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env node

import { Daemon, Method, Peer, State } from '../../../src'
import { Daemon, Method, Peer, State } from '../../../lib'
import { canvasSize } from '../defs'

var port = parseInt(process.argv[2]) || 8081
Expand Down
9 changes: 7 additions & 2 deletions examples/chat/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
/*
* Jet client-server communications:
*/
import { Fetcher, Peer } from '../../../src'
import { Peer } from '../../../lib/3_jet/peer/index.js'
import { Fetcher } from '../../../lib/3_jet/peer/fetcher.js'
import './base.css'
import { LogLevel } from '../../../lib/jet.js'

const peer = new Peer({ url: 'ws://localhost:8081/' })
const peer = new Peer({
url: 'ws://localhost:8081/',
log: { logName: '', logCallbacks: [console.log], logLevel: LogLevel.socket }
})

const renderMessages = (messages: { value: string[] }) => {
const messageContainer = document.getElementById('messages')!
Expand Down
16 changes: 10 additions & 6 deletions examples/chat/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env node

import { Daemon, Method, Peer, State } from '../../../src'
import { Daemon, LogLevel, Method, Peer, State } from '../../../lib/jet.js'

const wsPort = parseInt(process.argv[2]) || 8081
const internalPort = 10222
Expand All @@ -21,7 +21,8 @@ console.log('listening on port', wsPort)

// Create Jet Peer
const peer = new Peer({
port: internalPort
port: internalPort,
log: { logName: '', logCallbacks: [console.log], logLevel: LogLevel.socket }
})

// the messages state is simply an array
Expand All @@ -44,10 +45,13 @@ const clear = new Method('chat/clear')
clear.on('call', () => {
messages.value([])
})

peer
.connect()
.then(() =>
Promise.all([peer.add(messages), peer.add(append), peer.add(clear)])
)
.then(() => {
peer.batch(() => {
peer.add(messages)
peer.add(append)
peer.add(clear)
})
})
.then(() => {})
2 changes: 1 addition & 1 deletion examples/chat/webpack.config.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@ const clientConfig = {
]
}

module.exports = [clientConfig, serverConfig]
module.exports = [serverConfig,clientConfig]
2 changes: 1 addition & 1 deletion examples/todo/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* Jet client-server communications:
*/
import { Fetcher, Peer, PublishMessage, ValueType } from '../../../src'
import { Fetcher, Peer, PublishMessage, ValueType } from '../../../lib'
import { Todo } from '../server/Todo'
import './base.css'

Expand Down
2 changes: 1 addition & 1 deletion examples/todo/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Daemon, Method, Peer, State, ValueType } from '../../../src'
import { Daemon, Method, Peer, State, ValueType } from '../../../lib'
import { Todo } from './Todo'

var port = parseInt(process.argv[2]) || 8081
Expand Down
2 changes: 1 addition & 1 deletion examples/todo/webpack.config.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,4 @@ const clientConfig = {
]
}

module.exports = [clientConfig, serverConfig]
module.exports = [serverConfig,clientConfig]
12 changes: 6 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "node-jet",
"version": "3.0.11",
"version": "3.0.12",
"description": "Jet Realtime Message Bus for the Web. Daemon and Peer implementation.",
"url": "https://github.com/hbm/node-jet",
"author": {
Expand Down Expand Up @@ -40,10 +40,10 @@
"ci:prettier": "prettier --check $npm_package_config_files ",
"ci:type-check": "tsc --noEmit --skipLibCheck",
"prettier": "prettier --write $npm_package_config_files",
"eslint": "eslint --max-warnings 0 --ignore-path .prettierignore $npm_package_config_files",
"ex:chat": "concurrently \"webpack serve --config examples/chat/webpack.config.cjs\" \"wait-on examples/chat/dist/server.cjs && node examples/chat/dist/server.cjs\"",
"ex:todo": "concurrently \"webpack serve --config examples/todo/webpack.config.cjs\" \"wait-on examples/todo/dist/server.cjs && node examples/todo/dist/server.cjs\"",
"ex:balls": "concurrently \"webpack serve --config examples/balls/webpack.config.cjs\" \"wait-on examples/balls/dist/server.cjs && node examples/balls/dist/server.cjs\""
"eslint": "eslint --fix --max-warnings 0 --ignore-path .prettierignore $npm_package_config_files",
"ex:chat": "npm run build && concurrently \"webpack serve --config examples/chat/webpack.config.cjs\" \"wait-on examples/chat/dist/server.cjs && node examples/chat/dist/server.cjs\"",
"ex:todo": "npm run build && concurrently \"webpack serve --config examples/todo/webpack.config.cjs\" \"wait-on examples/todo/dist/server.cjs && node examples/todo/dist/server.cjs\"",
"ex:balls": "npm run build && concurrently \"webpack serve --config examples/balls/webpack.config.cjs\" \"wait-on examples/balls/dist/server.cjs && node examples/balls/dist/server.cjs\""
},
"dependencies": {
"css-loader": "^6.8.1",
Expand Down Expand Up @@ -77,7 +77,7 @@
"typescript": "^5.2.2",
"wait-for-expect": "^3.0.2",
"wait-on": "^7.0.1",
"webpack": "^5.88.2",
"webpack": "^5.89.0",
"webpack-cli": "^5.1.4",
"webpack-dev-server": "^4.15.1"
},
Expand Down
80 changes: 21 additions & 59 deletions src/2_jsonrpc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,9 @@ export class JsonRPC extends EventEmitter {
_dispatchSingleMessage = (
message: MethodRequest | ResultMessage | ErrorMessage
) => {
if (isResultMessage(message) || isErrorMessage(message)) {
if (isResultMessage(message) || isErrorMessage(message))
this._dispatchResponse(message)
} else {
this._dispatchRequest(castMessage<MethodRequest>(message))
}
else this._dispatchRequest(castMessage<MethodRequest>(message))
}

/**
Expand All @@ -219,12 +217,8 @@ export class JsonRPC extends EventEmitter {
*/
_dispatchResponse = (message: ResultMessage | ErrorMessage) => {
const mid = message.id
if (isResultMessage(message)) {
this.successCb(mid, message.result)
}
if (isErrorMessage(message)) {
this.errorCb(mid, message.error)
}
if (isResultMessage(message)) this.successCb(mid, message.result)
if (isErrorMessage(message)) this.errorCb(mid, message.error)
}

/**
Expand All @@ -237,28 +231,16 @@ export class JsonRPC extends EventEmitter {
if (this.listenerCount(message.method) === 0) {
this.logger.error(`Method ${message.method} is unknown`)
this.respond(message.id, new methodNotFoundError(message.method), false)
} else {
this.emit(message.method, this, message.id, message.params)
}
} else this.emit(message.method, this, message.id, message.params)
}

/**
* Queue.
*/
queue = <T extends MessageParams | Message>(message: T, id = '') => {
if (!this._isOpen) {
return Promise.reject(new ConnectionClosed())
}
if (id) {
this.messages.push({ method: id, params: message } as Message)
} else {
this.messages.push(message as Message)
}
if (this.sendImmediate) {
return this.send()
} else {
return Promise.resolve()
}
if (!this._isOpen) return Promise.reject(new ConnectionClosed())
if (id) this.messages.push({ method: id, params: message } as Message)
else this.messages.push(message as Message)
}

/**
Expand All @@ -272,19 +254,7 @@ export class JsonRPC extends EventEmitter {
this.logger.sock(`Sending message: ${encoded}`)
this.sock.send(encoded)
this.messages = []
} else {
return Promise.resolve()
}
return Promise.all(this.batchPromises)
.then((res) => {
this.batchPromises = []
return Promise.resolve(res)
})
.catch((ex) => {
this.batchPromises = []
this.logger.error(JSON.stringify(ex))
return Promise.reject(ex)
})
}

/**
Expand All @@ -295,6 +265,7 @@ export class JsonRPC extends EventEmitter {
*/
respond = (id: string, params: ValueType, success: boolean) => {
this.queue({ id, [success ? 'result' : 'error']: params })
if (this.sendImmediate) this.send()
}

successCb = (id: string, result: ValueType) => {
Expand All @@ -314,36 +285,27 @@ export class JsonRPC extends EventEmitter {
*/
sendRequest = <T extends ValueType>(
method: string,
params: JsonParams,
immediate: boolean | undefined = undefined
): Promise<T> => {
const promise = new Promise<ValueType>((resolve, reject) => {
if (!this._isOpen) {
reject(new ConnectionClosed())
} else {
params: JsonParams
): Promise<T> =>
new Promise<T>((resolve, reject) => {
if (!this._isOpen) reject(new ConnectionClosed())
else {
const rpcId = this.messageId.toString()
this.messageId++
this.openRequests[rpcId] = { resolve, reject }
this.openRequests[rpcId] = {
resolve: resolve as (
value: ValueType | PromiseLike<ValueType>
) => void,
reject
}
this.queue({
id: rpcId.toString(),
method,
params
})
if (immediate) {
this.send()
}
if (this.sendImmediate) this.send()
}
})
this.batchPromises.push(promise)
if (immediate || this.sendImmediate)
return promise.catch((err) => {
this.logger.error(JSON.stringify(err))
return Promise.reject(err)
}) as Promise<T>
else {
return Promise.resolve({} as T)
}
}
}

export default JsonRPC
2 changes: 1 addition & 1 deletion src/3_jet/daemon/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ export class Daemon extends EventEmitter {
) {
return Promise.reject(new NotAuthorized(params.path))
}
return this.routes[params.path].owner.sendRequest(method, params, true)
return this.routes[params.path].owner.sendRequest(method, params)
}

/*
Expand Down
4 changes: 1 addition & 3 deletions src/3_jet/peer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,7 @@ export class Peer extends EventEmitter {
*
*/
batch = (action: () => void) => {
if (this.#daemonInfo.features?.batches) {
this.#jsonrpc.sendImmediate = false
}
this.#jsonrpc.sendImmediate = false
action()
this.#jsonrpc.sendImmediate = true
return this.#jsonrpc.send()
Expand Down
19 changes: 9 additions & 10 deletions test/jsonrpc/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ describe('Testing JsonRpc', () => {
done()
})
const jsonrpc = new JsonRPC(new Logger())
jsonrpc.connect().then(() =>
jsonrpc.connect().then(() => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
jsonrpc.queue({ event: 'Add', path: 'foo', value: 1 } as any, '_f')
)
jsonrpc.send()
})
sock.emit('open')
})
it('Should test batch notify', (done) => {
Expand Down Expand Up @@ -496,14 +497,12 @@ describe('Testing JsonRpc', () => {
jsonrpc.connect().then(async () => {
jsonrpc.sendImmediate = false
jsonrpc.sendRequest('add', { path: 'foo', value: 3 })
jsonrpc.sendRequest('add', { path: 'foo1', value: 4 })
await waitForExpect(() =>
expect(() => jsonrpc.send()).rejects.toEqual({
code: 0,
name: 'error'
})
)
done()
jsonrpc.sendRequest('add', { path: 'foo1', value: 4 }).catch((ex) => {
expect(ex).toEqual({ code: 0, name: 'error' })
done()
})
jsonrpc.sendImmediate = true
jsonrpc.send()
})

sock.emit('open')
Expand Down

0 comments on commit 8a71aab

Please sign in to comment.