Source code for datasource

"""
A command line utility for monitoring a Jenkins instance and reporting
the current state to a blink1 LED.

:author: Martin Norbury (martin.norbury@gmail.com)
"""
import logging

import webcolors

import outputsource
import pattern


_LOGGER = logging.getLogger(__name__)

from urllib.request import urlopen
import json

import itertools


[docs]class JenkinsDataSource(object): def __init__(self, build_host, loop_frequency, decay_period, hardware_source=None, pattern_factory=None): self._build_host = build_host self._loop_frequency = loop_frequency self._decay_period = decay_period self._hardware_source = hardware_source if hardware_source else outputsource.Blink1Indicator() self._pattern_factory = pattern_factory if pattern_factory else pattern.pulse_rgb self._last_state = None self._pattern = itertools.cycle([(0, 0, 0)])
[docs] def update_data(self): jobs = self._read_jobs() activity = len([x for x in jobs if 'anime' in x['color']]) color = self._aggregate(jobs) current_state = dict(color=color, activity=activity) if current_state != self._last_state: target_rgb = webcolors.name_to_rgb(color) self._pattern = self._pattern_factory(target_rgb, self._loop_frequency, activity, self._decay_period) self._last_state = current_state return current_state
[docs] def update_hardware(self): red, green, blue = next(self._pattern) self._hardware_source.update_hardware(red, green, blue) return
[docs] def close(self): self._hardware_source.close() return
def _read_jobs(self): """ Read jobs list for Jenkins. returns the Jenkins job list. """ build_url = 'http://{0}/api/json'.format(self._build_host) connection = urlopen(build_url) data = connection.read() connection.close() return json.loads(data.decode('utf-8'))['jobs'] @staticmethod def _aggregate(jobs): """ Aggregate the Jenkins jobs into a single build state. """ states = [x['color'] for x in jobs] cleaned_states = [x.replace('_anime', '') for x in states] result = 'grey' for state in ['red', 'yellow', 'blue']: if state in cleaned_states: result = state break return result