Skip to content

Commit

Permalink
Merge pull request psf#1770 from kmadac/master
Browse files Browse the repository at this point in the history
Implemetation of IP address ranges for no_proxy environment variable
  • Loading branch information
kennethreitz committed Dec 5, 2013
2 parents 1cac72b + 1d42d9d commit ba3dab5
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 5 deletions.
1 change: 1 addition & 0 deletions AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,4 @@ Patches and Suggestions
- Thomas Weißschuh <[email protected]> @t-8ch
- Jayson Vantuyl <[email protected]> @kagato
- Pengfei.X <[email protected]>
- Kamil Madac <[email protected]>
69 changes: 64 additions & 5 deletions requests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import platform
import re
import sys
import socket
import struct

from . import __version__
from . import certs
Expand Down Expand Up @@ -405,6 +407,56 @@ def requote_uri(uri):
return quote(unquote_unreserved(uri), safe="!#$%&'()*+,/:;=?@[]~")


def address_in_network(ip, net):
"""
This function allows you to check if on IP belongs to a network subnet
Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24
returns False if ip = 192.168.1.1 and net = 192.168.100.0/24
"""
ipaddr = struct.unpack('=L', socket.inet_aton(ip))[0]
netaddr, bits = net.split('/')
netmask = struct.unpack('=L', socket.inet_aton(dotted_netmask(int(bits))))[0]
network = struct.unpack('=L', socket.inet_aton(netaddr))[0] & netmask
return (ipaddr & netmask) == (network & netmask)


def dotted_netmask(mask):
"""
Converts mask from /xx format to xxx.xxx.xxx.xxx
Example: if mask is 24 function returns 255.255.255.0
"""
bits = 0xffffffff ^ (1 << 32 - mask) - 1
return socket.inet_ntoa(struct.pack('>I', bits))


def is_ipv4_address(string_ip):
try:
socket.inet_aton(string_ip)
except socket.error:
return False
return True


def is_valid_cidr(string_network):
"""Very simple check of the cidr format in no_proxy variable"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False

if mask < 1 or mask > 32:
return False

try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True


def get_environ_proxies(url):
"""Return a dict of environment proxies."""

Expand All @@ -420,11 +472,18 @@ def get_environ_proxies(url):
# the end of the netloc, both with and without the port.
no_proxy = no_proxy.replace(' ', '').split(',')

for host in no_proxy:
if netloc.endswith(host) or netloc.split(':')[0].endswith(host):
# The URL does match something in no_proxy, so we don't want
# to apply the proxies on this URL.
return {}
ip = netloc.split(':')[0]
if is_ipv4_address(ip):
for proxy_ip in no_proxy:
if is_valid_cidr(proxy_ip):
if address_in_network(ip, proxy_ip):
return {}
else:
for host in no_proxy:
if netloc.endswith(host) or netloc.split(':')[0].endswith(host):
# The URL does match something in no_proxy, so we don't want
# to apply the proxies on this URL.
return {}

# If the system proxy settings indicate that this URL should be bypassed,
# don't proxy.
Expand Down
41 changes: 41 additions & 0 deletions test_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -963,5 +963,46 @@ def test_super_len_io_streams(self):
else:
assert super_len(cStringIO.StringIO('but some how, some way...')) == 25

def test_get_environ_proxies_ip_ranges(self):
""" Ensures that IP addresses are correctly matches with ranges in no_proxy variable """
from requests.utils import get_environ_proxies
os.environ['no_proxy'] = "192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1"
assert get_environ_proxies('http://192.168.0.1:5000/') == {}
assert get_environ_proxies('http://192.168.0.1/') == {}
assert get_environ_proxies('http://172.16.1.1/') == {}
assert get_environ_proxies('http://172.16.1.1:5000/') == {}
assert get_environ_proxies('http://192.168.1.1:5000/') != {}
assert get_environ_proxies('http://192.168.1.1/') != {}

def test_get_environ_proxies(self):
""" Ensures that IP addresses are correctly matches with ranges in no_proxy variable """
from requests.utils import get_environ_proxies
os.environ['no_proxy'] = "127.0.0.1,localhost.localdomain,192.168.0.0/24,172.16.1.1"
assert get_environ_proxies('http://localhost.localdomain:5000/v1.0/') == {}
assert get_environ_proxies('http://www.requests.com/') != {}

def test_is_ipv4_address(self):
from requests.utils import is_ipv4_address
assert is_ipv4_address('8.8.8.8')
assert not is_ipv4_address('8.8.8.8.8')
assert not is_ipv4_address('localhost.localdomain')

def test_is_valid_cidr(self):
from requests.utils import is_valid_cidr
assert not is_valid_cidr('8.8.8.8')
assert is_valid_cidr('192.168.1.0/24')

def test_dotted_netmask(self):
from requests.utils import dotted_netmask
assert dotted_netmask(8) == '255.0.0.0'
assert dotted_netmask(24) == '255.255.255.0'
assert dotted_netmask(25) == '255.255.255.128'

def test_address_in_network(self):
from requests.utils import address_in_network
assert address_in_network('192.168.1.1', '192.168.1.0/24')
assert not address_in_network('172.16.0.1', '192.168.1.0/24')


if __name__ == '__main__':
unittest.main()

0 comments on commit ba3dab5

Please sign in to comment.