Source: node/sink/SocketSend.js

  1. import BaseLfo from '../../core/BaseLfo';
  2. import { opcodes, encoders, decoders } from '../../common/utils/wsUtils';
  3. import { WebSocket, wsServerFactory } from '../utils/wsServerFactory';
  4. const parameters = {
  5. port: {
  6. type: 'integer',
  7. default: 8000,
  8. constant: true,
  9. nullable: true,
  10. },
  11. server: {
  12. type: 'any',
  13. default: null,
  14. constant: true,
  15. nullable: true,
  16. },
  17. };
  18. /**
  19. * Send an lfo frame as a socket message to a `client.source.SocketReceive`
  20. * instance.
  21. *
  22. * <p class="warning">Experimental</p>
  23. *
  24. * @memberof module:node.sink
  25. *
  26. * @params {Object} options
  27. */
  28. class SocketSend extends BaseLfo {
  29. constructor(options = {}) {
  30. super(parameters, options);
  31. this.wss = wsServerFactory({
  32. port: this.params.get('port'),
  33. server: this.params.get('server'),
  34. });
  35. // this.wss.onconnection = (socket) => {
  36. // if (this.initialized) {
  37. // // socket.send(initModule)
  38. // // then
  39. // // socket.procesStreamParams
  40. // // socket.
  41. // }
  42. // }
  43. }
  44. _broadcast(buffer) {
  45. this.wss.clients.forEach((client) => {
  46. if (client.readyState === WebSocket.OPEN)
  47. client.send(buffer);
  48. });
  49. }
  50. initModule() {
  51. // send a INIT_MODULE_REQ to each client and wait for INIT_MODULE_ACK
  52. // no need to get children promises as we are in a leef
  53. const promises = [];
  54. this.wss.clients.forEach((client) => {
  55. const promise = new Promise((resolve, reject) => {
  56. client.onmessage = (e) => {
  57. const opcode = decoders.opcode(e.data);
  58. if (opcode === opcodes.INIT_MODULE_ACK)
  59. resolve();
  60. }
  61. });
  62. promises.push(promise);
  63. });
  64. const buffer = encoders.initModuleReq();
  65. this._broadcast(buffer);
  66. return Promise.all(promises);
  67. }
  68. processStreamParams(prevStreamParams) {
  69. super.processStreamParams(prevStreamParams);
  70. const buffer = encoders.streamParams(this.streamParams);
  71. this._broadcast(buffer);
  72. }
  73. resetStream() {
  74. super.resetStream();
  75. const buffer = encoders.resetStream();
  76. this._broadcast(buffer);
  77. }
  78. /** @private */
  79. finalizeStream(endTime) {
  80. super.finalizeStream(endTime);
  81. const buffer = encoders.finalizeStream(endTime);
  82. this._broadcast(buffer);
  83. }
  84. // process any type
  85. /** @private */
  86. processScalar() {}
  87. /** @private */
  88. processVector() {}
  89. /** @private */
  90. processSignal() {}
  91. processFrame(frame) {
  92. const frameSize = this.streamParams.frameSize;
  93. this.frame.time = frame.time;
  94. this.frame.data.set(frame.data, 0);
  95. this.frame.metadata = frame.metadata;
  96. const buffer = encoders.processFrame(this.frame, frameSize);
  97. this._broadcast(buffer);
  98. }
  99. }
  100. export default SocketSend;