Source code for nodes.adc_lib_node.server_expose

import sys
import os
import zmq
from zmq.utils.monitor import recv_monitor_message
from zmq.utils.monitor import parse_monitor_message
import pickle
import time
from DistributedOscilloscope.utilities.publisher import Publisher
from DistributedOscilloscope.utilities.ipaddr import get_ip
from .devices_access import DevicesAccess
import logging
logger = logging.getLogger(__name__)

thismodule = sys.modules[__name__]


[docs]class ServerExpose(): def __init__(self, port, port_server, pci_addr, trtl, unique_ADC_name): self.__port = port self.__port_server = port_server self.server_publisher = None self.__devices_access = DevicesAccess(pci_addr, trtl, unique_ADC_name)
[docs] def __getattr__(self, function_name): """ If the required function is not defined here, look for it in the devices_access object """ return getattr(self.__devices_access, function_name)
[docs] def set_server_address(self, addr): """ If address of the Server is provided by the user, this function is called from main. If address of the Server is discovered by zeroconf, this function is called by RPC call from the Server. It creates the Publisher object used for sending the notifications and acquisition data to the Server. :param addr: address of the Server """ self.server_publisher = Publisher(addr, self.__port_server)
def exit(self): """ This fucntion is just for testing and will be removed after addding ZeroMQ""" """doesn'r work with zeroconf""" data = {'function_name': 'unregister_ADC', 'args': [self.__devices_access.unique_ADC_name]} self.server_publisher.send_message(data) time.sleep(0.1) # otherwise the message is lost os._exit(1) def run(self): context = zmq.Context() socket = context.socket(zmq.ROUTER) monitor = socket.get_monitor_socket() ip = get_ip() port_zmq = str(self.__port) socket.bind("tcp://" + ip + ":" + port_zmq) poller = zmq.Poller() poller.register(monitor, zmq.POLLIN | zmq.POLLERR) poller.register(socket, zmq.POLLIN | zmq.POLLERR) EVENT_MAP = {} for name in dir(zmq): if name.startswith('EVENT_'): value = getattr(zmq, name) EVENT_MAP[value] = name self.__devices_access.selector = poller while True: socks = dict(poller.poll()) if socket in socks: [identity, message] = socket.recv_multipart() message = pickle.loads(message) try: func = getattr(self, message[0]) ret = func(*message[1:]) ret = pickle.dumps(ret) socket.send_multipart([identity, ret]) except AttributeError: socket.send_multipart([identity, b"Error"]) if monitor in socks: evt = recv_monitor_message(monitor) evt.update({'description': EVENT_MAP[evt['event']]}) # logger.info("Event: {}".format(evt)) if self.__devices_access.fileno() in socks: dev_ac = self.__devices_access try: poller.unregister(dev_ac) logger.debug("The ADC device unregistered from the poller " "selector") except KeyError: logger.warning("The ADC device not available to unregister" " from the poller selector, expose") [timestamp, pre_post, data] = dev_ac.retrieve_ADC_data() data = {'function_name': 'update_data', 'args': [timestamp, pre_post, data, dev_ac.unique_ADC_name]} self.server_publisher.send_message(data)