Skip to content

Commit

Permalink
test: add a soda worker for majordomo
Browse files Browse the repository at this point in the history
  • Loading branch information
aminya committed Jan 6, 2025
1 parent 586fe8d commit 133135f
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 23 deletions.
30 changes: 17 additions & 13 deletions examples/majordomo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ The example will start a broker and some workers, then do some requests. The
output will be similar to this:

```
starting broker on tcp://127.0.0.1:5555
starting worker on tcp://127.0.0.1:5555
starting worker on tcp://127.0.0.1:5555
starting worker on tcp://127.0.0.1:5555
starting broker on tcp://127.0.0.1:5555
starting worker soda on tcp://127.0.0.1:5555
starting worker tea on tcp://127.0.0.1:5555
starting worker coffee on tcp://127.0.0.1:5555
starting worker tea on tcp://127.0.0.1:5555
---------- Started -----------
requesting 'cola' from 'soda'
requesting 'oolong' from 'tea'
Expand All @@ -41,27 +42,30 @@ registered worker 00800041a9 for 'tea'
dispatching 'tea' 00800041ab req -> 00800041a7
dispatching 'tea' 00800041ac req -> 00800041a9
dispatching 'coffee' 00800041af req -> 00800041a8
dispatching 'tea' 00800041ac <- rep 00800041a9
dispatching 'tea' 00800041ad req -> 00800041a9
received 'sencha' from 'tea'
dispatching 'tea' 00800041ad <- rep 00800041a9
dispatching 'tea' 00800041ae req -> 00800041a9
received 'earl grey, with milk' from 'tea'
dispatching 'coffee' 00800041af <- rep 00800041a8
dispatching 'coffee' 00800041b0 req -> 00800041a8
received 'cappuccino' from 'coffee'
dispatching 'tea' 00800041ab <- rep 00800041a7
dispatching 'tea' 00800041ad req -> 00800041a7
received 'oolong' from 'tea'
dispatching 'coffee' 00800041b0 <- rep 00800041a8
dispatching 'coffee' 00800041b1 req -> 00800041a8
received 'latte, with soy milk' from 'coffee'
dispatching 'coffee' 00800041b1 <- rep 00800041a8
dispatching 'coffee' 00800041b2 req -> 00800041a8
received 'espresso' from 'coffee'
registered worker 00800041b3 for 'soda'
dispatching 'soda' 00800041aa req -> 00800041b3
dispatching 'soda' 00800041aa <- rep 00800041b3
received 'cola' from 'soda'
dispatching 'tea' 00800041ac <- rep 00800041a9
dispatching 'tea' 00800041ae req -> 00800041a9
received 'sencha' from 'tea'
dispatching 'tea' 00800041ae <- rep 00800041a9
received 'jasmine' from 'tea'
dispatching 'coffee' 00800041b2 <- rep 00800041a8
received 'irish coffee' from 'coffee'
dispatching 'tea' 00800041ab <- rep 00800041a7
received 'oolong' from 'tea'
timeout expired waiting for 'soda'
dispatching 'tea' 00800041ad <- rep 00800041a7
received 'earl grey, with milk' from 'tea'
---------- Stopping -----------
```
29 changes: 24 additions & 5 deletions examples/majordomo/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ async function sleep(msec: number) {
})
}

class SodaWorker extends Worker {
service = "soda"

override async process(...msgs: Buffer[]): Promise<Buffer[]> {
await sleep(Math.random() * 300)
return msgs
}
}

class TeaWorker extends Worker {
service = "tea"

Expand All @@ -29,7 +38,12 @@ class CoffeeWorker extends Worker {

const broker = new Broker()

const workers = [new TeaWorker(), new CoffeeWorker(), new TeaWorker()]
const workers = [
new SodaWorker(),
new TeaWorker(),
new CoffeeWorker(),
new TeaWorker(),
]

async function request(
service: string,
Expand All @@ -46,12 +60,12 @@ async function request(
console.log(`received '${res.join(", ")}' from '${service}'`)
return res
} catch (err) {
console.log(`timeout expired waiting for '${service}'`)
console.log(`timeout expired waiting for '${service}'`, err)
}
}

async function main() {
const started = Promise.all([
const _started = Promise.all([
// start the broker
broker.start(),
// start the workers
Expand Down Expand Up @@ -81,11 +95,16 @@ async function main() {
// stop the workers
...workers.map(worker => worker.stop()),
])
// await outstanding promises
await started
}

main().catch(err => {
console.error(err)
process.exit(1)
})

if (process.env.CI) {
// exit after 1 second in CI environment
setTimeout(() => {
process.exit(0)
}, 2000)
}
2 changes: 1 addition & 1 deletion examples/majordomo/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export class Worker {
}

async start() {
console.log(`starting worker on ${this.address}`)
console.log(`starting worker ${this.service} on ${this.address}`)
await this.socket.send([null, Header.Worker, Message.Ready, this.service])

for await (const [_blank1, _header, _type, client, _blank2, ...req] of this
Expand Down
8 changes: 4 additions & 4 deletions examples/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import {Queue} from "./queue"

async function main() {
const sender = new Dealer()
await sender.bind("tcp://127.0.0.1:5555")
console.log("sender bound to port 5555")
await sender.bind("tcp://127.0.0.1:4444")
console.log("sender bound to port 4444")

const queue = new Queue(sender)

Expand All @@ -16,8 +16,8 @@ async function main() {
])

const receiver = new Dealer()
receiver.connect("tcp://127.0.0.1:5555")
console.log("receiver connected to port 5555")
receiver.connect("tcp://127.0.0.1:4444")
console.log("receiver connected to port 4444")

for await (const [msg] of receiver) {
if (msg.length === 0) {
Expand Down

0 comments on commit 133135f

Please sign in to comment.