forked from nedbat/coveragepy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
osinfo.py
88 lines (75 loc) · 3.24 KB
/
osinfo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
"""OS information for testing."""
from __future__ import annotations
import sys
if sys.platform == "win32":
# Windows implementation
def process_ram() -> int:
"""How much RAM is this process using? (Windows)"""
import ctypes
from ctypes import wintypes
# From: http://lists.ubuntu.com/archives/bazaar-commits/2009-February/011990.html
# Updated from: https://stackoverflow.com/a/16204942/14343
class PROCESS_MEMORY_COUNTERS_EX(ctypes.Structure):
"""Used by GetProcessMemoryInfo"""
_fields_ = [
('cb', wintypes.DWORD),
('PageFaultCount', wintypes.DWORD),
('PeakWorkingSetSize', ctypes.c_size_t),
('WorkingSetSize', ctypes.c_size_t),
('QuotaPeakPagedPoolUsage', ctypes.c_size_t),
('QuotaPagedPoolUsage', ctypes.c_size_t),
('QuotaPeakNonPagedPoolUsage', ctypes.c_size_t),
('QuotaNonPagedPoolUsage', ctypes.c_size_t),
('PagefileUsage', ctypes.c_size_t),
('PeakPagefileUsage', ctypes.c_size_t),
('PrivateUsage', ctypes.c_size_t),
]
GetProcessMemoryInfo = ctypes.windll.psapi.GetProcessMemoryInfo
GetProcessMemoryInfo.argtypes = [
wintypes.HANDLE,
ctypes.POINTER(PROCESS_MEMORY_COUNTERS_EX),
wintypes.DWORD,
]
GetProcessMemoryInfo.restype = wintypes.BOOL
GetCurrentProcess = ctypes.windll.kernel32.GetCurrentProcess
GetCurrentProcess.argtypes = []
GetCurrentProcess.restype = wintypes.HANDLE
counters = PROCESS_MEMORY_COUNTERS_EX()
ret = GetProcessMemoryInfo(
GetCurrentProcess(),
ctypes.byref(counters),
ctypes.sizeof(counters),
)
if not ret: # pragma: part covered
return 0 # pragma: cant happen
return counters.PrivateUsage
elif sys.platform.startswith("linux"):
# Linux implementation
import os
_scale = {'kb': 1024, 'mb': 1024*1024}
def _VmB(key: str) -> int:
"""Read the /proc/PID/status file to find memory use."""
try:
# Get pseudo file /proc/<pid>/status
with open(f"/proc/{os.getpid()}/status") as t:
v = t.read()
except OSError: # pragma: cant happen
return 0 # non-Linux?
# Get VmKey line e.g. 'VmRSS: 9999 kB\n ...'
i = v.index(key)
vp = v[i:].split(None, 3)
if len(vp) < 3: # pragma: part covered
return 0 # pragma: cant happen
# Convert Vm value to bytes.
return int(float(vp[1]) * _scale[vp[2].lower()])
def process_ram() -> int:
"""How much RAM is this process using? (Linux implementation)"""
return _VmB('VmRSS')
else:
# Generic implementation.
def process_ram() -> int:
"""How much RAM is this process using? (stdlib implementation)"""
import resource
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss