| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- import _overlapped
- import _thread
- import _winapi
- import math
- import struct
- import winreg
- # Seconds per measurement
- SAMPLING_INTERVAL = 1
- # Exponential damping factor to compute exponentially weighted moving average
- # on 1 minute (60 seconds)
- LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
- # Initialize the load using the arithmetic mean of the first NVALUE values
- # of the Processor Queue Length
- NVALUE = 5
- class WindowsLoadTracker():
- """
- This class asynchronously reads the performance counters to calculate
- the system load on Windows. A "raw" thread is used here to prevent
- interference with the test suite's cases for the threading module.
- """
- def __init__(self):
- # Pre-flight test for access to the performance data;
- # `PermissionError` will be raised if not allowed
- winreg.QueryInfoKey(winreg.HKEY_PERFORMANCE_DATA)
- self._values = []
- self._load = None
- self._running = _overlapped.CreateEvent(None, True, False, None)
- self._stopped = _overlapped.CreateEvent(None, True, False, None)
- _thread.start_new_thread(self._update_load, (), {})
- def _update_load(self,
- # localize module access to prevent shutdown errors
- _wait=_winapi.WaitForSingleObject,
- _signal=_overlapped.SetEvent):
- # run until signaled to stop
- while _wait(self._running, 1000):
- self._calculate_load()
- # notify stopped
- _signal(self._stopped)
- def _calculate_load(self,
- # localize module access to prevent shutdown errors
- _query=winreg.QueryValueEx,
- _hkey=winreg.HKEY_PERFORMANCE_DATA,
- _unpack=struct.unpack_from):
- # get the 'System' object
- data, _ = _query(_hkey, '2')
- # PERF_DATA_BLOCK {
- # WCHAR Signature[4] 8 +
- # DWOWD LittleEndian 4 +
- # DWORD Version 4 +
- # DWORD Revision 4 +
- # DWORD TotalByteLength 4 +
- # DWORD HeaderLength = 24 byte offset
- # ...
- # }
- obj_start, = _unpack('L', data, 24)
- # PERF_OBJECT_TYPE {
- # DWORD TotalByteLength
- # DWORD DefinitionLength
- # DWORD HeaderLength
- # ...
- # }
- data_start, defn_start = _unpack('4xLL', data, obj_start)
- data_base = obj_start + data_start
- defn_base = obj_start + defn_start
- # find the 'Processor Queue Length' counter (index=44)
- while defn_base < data_base:
- # PERF_COUNTER_DEFINITION {
- # DWORD ByteLength
- # DWORD CounterNameTitleIndex
- # ... [7 DWORDs/28 bytes]
- # DWORD CounterOffset
- # }
- size, idx, offset = _unpack('LL28xL', data, defn_base)
- defn_base += size
- if idx == 44:
- counter_offset = data_base + offset
- # the counter is known to be PERF_COUNTER_RAWCOUNT (DWORD)
- processor_queue_length, = _unpack('L', data, counter_offset)
- break
- else:
- return
- # We use an exponentially weighted moving average, imitating the
- # load calculation on Unix systems.
- # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
- # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
- if self._load is not None:
- self._load = (self._load * LOAD_FACTOR_1
- + processor_queue_length * (1.0 - LOAD_FACTOR_1))
- elif len(self._values) < NVALUE:
- self._values.append(processor_queue_length)
- else:
- self._load = sum(self._values) / len(self._values)
- def close(self, kill=True):
- self.__del__()
- return
- def __del__(self,
- # localize module access to prevent shutdown errors
- _wait=_winapi.WaitForSingleObject,
- _close=_winapi.CloseHandle,
- _signal=_overlapped.SetEvent):
- if self._running is not None:
- # tell the update thread to quit
- _signal(self._running)
- # wait for the update thread to signal done
- _wait(self._stopped, -1)
- # cleanup events
- _close(self._running)
- _close(self._stopped)
- self._running = self._stopped = None
- def getloadavg(self):
- return self._load
|