您可以尝试使用来自的websocket包装器
rxjs
https://rxjs.dev/api/webSocket/webSocket#examples
import { webSocket } from 'rxjs/webSocket';
const subject = webSocket('ws://localhost:8081');
const observableA = subject.multiplex(
() => ({ subscribe: 'A' }), // When server gets this message, it will start sending messages for 'A'...
() => ({ unsubscribe: 'A' }), // ...and when gets this one, it will stop.
message => message.type === 'A' // If the function returns `true` message is passed down the stream. Skipped if the function returns false.
);
const observableB = subject.multiplex( // And the same goes for 'B'.
() => ({ subscribe: 'B' }),
() => ({ unsubscribe: 'B' }),
message => message.type === 'B'
);
const subA = observableA.subscribe(messageForA => console.log(messageForA));
// At this moment WebSocket connection is established. Server gets '{"subscribe": "A"}' message and starts sending messages for 'A',
// which we log here.
const subB = observableB.subscribe(messageForB => console.log(messageForB));
// Since we already have a connection, we just send '{"subscribe": "B"}' message to the server. It starts sending messages for 'B',
// which we log here.
subB.unsubscribe();
// Message '{"unsubscribe": "B"}' is sent to the server, which stops sending 'B' messages.
subA.unsubscribe();
// Message '{"unsubscribe": "A"}' makes the server stop sending messages for 'A'. Since there is no more subscribers to root Subject,
// socket connection closes.
off
当我们取消订阅时,模拟应该正确实现并运行