Hi,
From a client side, I'd like to be notified that jobs are achieved in a
distant lava server. I started with the example provided at https://validation.linaro.org/static/docs/v2/data-export.html#write-your-own...
1. In a 1st try I've changed the example - because I'm using port 10080 - and it works without the <lookup_publisher> . For that, I've hard coded the url returned by lookup_publisher but this only prints out one status at a time, ie :I needed to restart the script each time to have updates : "Submited"->*Ctrl-C*->Running->*Ctrl-C*->complete 2. In a 2nd time I've tried to implement lookup_publisher thinking the status would be updated automatically. In that case it wants to connect to 5500 and obviously fails after timeout
import xmlrpclib
import argparse import yaml import signal import zmq import xmlrpclib from urlparse import urlsplit
FINISHED_JOB_STATUS = ["Complete", "Incomplete", "Canceled"]
token = "mytoken" username = "username" #hostname = "lava-server:10080"
class JobEndTimeoutError(Exception): """ Raise when the specified job does not finish in certain timeframe. """
class Timeout(object): """ Timeout error class with ALARM signal. Accepts time in seconds. """ class TimeoutError(Exception): pass
def __init__(self, sec): self.sec = sec def __enter__(self): signal.signal(signal.SIGALRM, self.timeout_raise) signal.alarm(self.sec) def __exit__(self, *args): signal.alarm(0) def timeout_raise(self, *args): raise Timeout.TimeoutError()
class JobListener(object):
def __init__(self, url): self.context = zmq.Context.instance() self.sock = self.context.socket(zmq.SUB) self.sock.setsockopt(zmq.SUBSCRIBE, b"") self.sock.connect(url) def wait_for_job_end(self, job_id, timeout=None): try: with Timeout(timeout): while True: msg = self.sock.recv_multipart() try: (topic, uuid, dt, username, data) = msg[:] except IndexError: # Droping invalid message continue data = yaml.safe_load(data) if "job" in data: if data["job"] == job_id: if data["status"] in FINISHED_JOB_STATUS: return data except Timeout.TimeoutError: raise JobEndTimeoutError( "JobListener timed out after %s seconds." % timeout)
def lookup_publisher(hostname): """ Lookup the publisher details using XML-RPC on the specified hostname. """ xmlrpc_url = "http://%s:10080/RPC2" % (hostname) server = xmlrpclib.ServerProxy(xmlrpc_url) socket = server.scheduler.get_publisher_event_socket() port = urlsplit(socket).port listener_url = 'tcp://%s:%s' % (hostname,port) print("Using %s" % listener_url) return listener_url
if __name__ == '__main__': # timeout=1200
parser = argparse.ArgumentParser() parser.add_argument("-j", "--job-id", type=int, help="Job ID to wait
for") parser.add_argument("-t", "--timeout", type=int, help="Timeout in seconds") parser.add_argument("--hostname", help="hostname of the instance")
options = parser.parse_args()
# server = xmlrpclib.ServerProxy("http://%s:%s@%s/RPC2" % (username, token, hostname))
#print(server.system.listMethods())
# ret_status=server.scheduler.job_status(options.job_id) # print (ret_status['job_status'])
#publisher = 'tcp://%s' % (hostname) publisher = lookup_publisher(options.hostname) listener = JobListener(publisher) listener.wait_for_job_end(options.job_id, options.timeout)
Hi David,
Yes you can remove the lookup_publisher and hardcode the hostname:port you use, it's not the cause of the problem.
The script is written so it waits until the job is finished and returns the event data. If you want to continue listening to the publisher after the event is received, you should change a script a bit here:
if data["status"] in FINISHED_JOB_STATUS: return data
Instead of returning the data, you should just print it out (or do whatever you do with it) and the script will continue working until the timout is reached.
HTH, Stevan
On 12/08/2017 10:57 AM, David Lewin wrote:
Hi,
From a client side, I'd like to be notified that jobs are achieved in a distant lava server. I started with the example provided at https://validation.linaro.org/static/docs/v2/data-export.html#write-your-own...
In a 1st tryI've changed the example - because I'm using port 10080 -and it works without the <lookup_publisher> . For that, I've hard coded the url returned by lookup_publisher but this only prints out one status at a time, ie :I needed to restart the script each time to have updates : "Submited"->/Ctrl-C/->Running->/Ctrl-C/->complete
In a 2nd time I've tried to implement lookup_publisher thinking the status would be updated automatically. In that case it wants to connect to 5500 and obviously fails after timeout
import xmlrpclib
import argparse import yaml import signal import zmq import xmlrpclib from urlparse import urlsplit
FINISHED_JOB_STATUS = ["Complete", "Incomplete", "Canceled"]
token = "mytoken" username = "username" #hostname = "lava-server:10080"
class JobEndTimeoutError(Exception): """ Raise when the specified job does not finish in certain timeframe. """
class Timeout(object): """ Timeout error class with ALARM signal. Accepts time in seconds. """ class TimeoutError(Exception): pass
def __init__(self, sec): self.sec = sec
def __enter__(self): signal.signal(signal.SIGALRM, self.timeout_raise) signal.alarm(self.sec)
def __exit__(self, *args): signal.alarm(0)
def timeout_raise(self, *args): raise Timeout.TimeoutError()
class JobListener(object):
def __init__(self, url): self.context = zmq.Context.instance() self.sock = self.context.socket(zmq.SUB)
self.sock.setsockopt(zmq.SUBSCRIBE, b"") self.sock.connect(url)
def wait_for_job_end(self, job_id, timeout=None):
try: with Timeout(timeout): while True: msg = self.sock.recv_multipart() try: (topic, uuid, dt, username, data) = msg[:] except IndexError: # Droping invalid message continue
data = yaml.safe_load(data) if "job" in data: if data["job"] == job_id: if data["status"] in FINISHED_JOB_STATUS: return data
except Timeout.TimeoutError: raise JobEndTimeoutError( "JobListener timed out after %s seconds." % timeout)
def lookup_publisher(hostname): """ Lookup the publisher details using XML-RPC on the specified hostname. """ xmlrpc_url = "http://%s:10080/RPC2" % (hostname) server = xmlrpclib.ServerProxy(xmlrpc_url) socket = server.scheduler.get_publisher_event_socket() port = urlsplit(socket).port listener_url = 'tcp://%s:%s' % (hostname,port) print("Using %s" % listener_url) return listener_url
if __name__ == '__main__': # timeout=1200
parser = argparse.ArgumentParser() parser.add_argument("-j", "--job-id", type=int, help="Job ID to wait for") parser.add_argument("-t", "--timeout", type=int, help="Timeout in seconds") parser.add_argument("--hostname", help="hostname of the instance")
options = parser.parse_args()
# server = xmlrpclib.ServerProxy("http://%s:%s@%s/RPC2" % (username, token, hostname))
#print(server.system.listMethods()) # ret_status=server.scheduler.job_status(options.job_id) # print (ret_status['job_status'])
#publisher = 'tcp://%s' % (hostname) publisher = lookup_publisher(options.hostname)
listener = JobListener(publisher) listener.wait_for_job_end(options.job_id, options.timeout)
Lava-users mailing list Lava-users@lists.linaro.org https://lists.linaro.org/mailman/listinfo/lava-users