Source: node/source/SocketReceive.js

import BaseLfo from '../../core/BaseLfo';
import { opcodes, decoders, encoders } from '../../common/utils/wsUtils';
import { wsServerFactory } from '../utils/wsServerFactory';


const parameters = {
  port: {
    type: 'integer',
    default: 8000,
    constant: true,
    nullable: true,
  },
  server: {
    type: 'any',
    default: null,
    constant: true,
    nullable: true,
  },
};

/**
 * Receive an lfo frame as a socket message from a `client.sink.SocketSend`
 * instance.
 *
 * <p class="warning">Experimental</p>

 * @memberof module:node.source
 *
 * @params {Object} options
 *
 * @example
 * const socket = new lfo.source.SocketReceive({ port: 8000 });
 * const logger = new lfo.sink.Logger({
 *   time: true,
 *   data: true,
 * });
 *
 * socket.connect(logger);
 */
class SocketReceive extends BaseLfo {
  constructor(options = {}) {
    super(parameters, options);

    this._onConnection = this._onConnection.bind(this);
    this._dispatch = this._dispatch.bind(this);

    this.wss = wsServerFactory({
      port: this.params.get('port'),
      server: this.params.get('server'),
    });

    this.wss.on('connection', this._onConnection);
  }

  /** @private */
  initModule(socket) {
    const promises = this.nextModules.map((mod) => mod.initModule());
    // wait for children promises and send INIT_MODULE_ACK
    Promise.all(promises).then(() => {
      const buffer = encoders.initModuleAck();
      socket.send(buffer);
    });
  }

  // process any type
  /** @private */
  processScalar() {}
  /** @private */
  processVector() {}
  /** @private */
  processSignal() {}

  /** @private */
  processFrame(frame) {
    this.prepareFrame();
    this.frame = frame;
    this.propagateFrame();
  }

  /** @private */
  _onConnection(socket) {
    socket.on('message', this._dispatch(socket));
  }

  /**
   * Decode and dispatch incomming frame according to opcode
   * @private
   */
  _dispatch(socket) {
    return (arrayBuffer) => {
      const opcode = decoders.opcode(arrayBuffer);

      switch (opcode) {
        case opcodes.INIT_MODULE_REQ:
          this.initModule(socket);
          break;
        case opcodes.PROCESS_STREAM_PARAMS:
          const prevStreamParams = decoders.streamParams(arrayBuffer);
          this.processStreamParams(prevStreamParams);
          break;
        case opcodes.RESET_STREAM:
          this.resetStream();
          break;
        case opcodes.FINALIZE_STREAM:
          const endTime = decoders.finalizeStream(arrayBuffer);
          this.finalizeStream(endTime);
          break;
        case opcodes.PROCESS_FRAME:
          const frameSize = this.streamParams.frameSize;
          const frame = decoders.processFrame(arrayBuffer, frameSize);
          this.processFrame(frame);
          break;
      }
    }
  }
}

export default SocketReceive;