Manage AllowedIPs for client config file

This commit is contained in:
Eduardo Silva 2024-03-09 16:02:48 -03:00
parent 7561156235
commit 32931dfd16
11 changed files with 188 additions and 106 deletions

View file

@ -18,8 +18,10 @@ class PeerForm(forms.ModelForm):
class PeerAllowedIPForm(forms.ModelForm):
def __init__(self, *args, **kwargs):
current_peer = kwargs.pop('current_peer', None)
config_file = kwargs.pop('config_file', None)
super().__init__(*args, **kwargs)
self.current_peer = current_peer
self.config_file = config_file
allowed_ip = forms.GenericIPAddressField(label='Allowed IP or Network', required=True)
netmask = forms.ChoiceField(choices=NETMASK_CHOICES, label='Netmask', initial=24, required=True)
@ -33,29 +35,34 @@ class PeerAllowedIPForm(forms.ModelForm):
if allowed_ip is None:
raise forms.ValidationError("Please provide a valid IP address.")
wireguard_network = ipaddress.ip_network(f"{self.current_peer.wireguard_instance.address}/{self.current_peer.wireguard_instance.netmask}", strict=False)
if self.config_file == 'server':
wireguard_network = ipaddress.ip_network(f"{self.current_peer.wireguard_instance.address}/{self.current_peer.wireguard_instance.netmask}", strict=False)
if priority == 0:
zero_priority_ips_query = PeerAllowedIP.objects.filter(peer=self.current_peer, config_file='server', priority=0)
if self.instance:
zero_priority_ips_query = zero_priority_ips_query.exclude(uuid=self.instance.uuid)
if zero_priority_ips_query.exists():
raise forms.ValidationError("A peer can have only one IP with priority zero.")
if priority == 0:
zero_priority_ips_query = PeerAllowedIP.objects.filter(peer=self.current_peer, priority=0)
if self.instance:
zero_priority_ips_query = zero_priority_ips_query.exclude(uuid=self.instance.uuid)
if zero_priority_ips_query.exists():
raise forms.ValidationError("A peer can have only one IP with priority zero.")
duplicated_ip = PeerAllowedIP.objects.filter(allowed_ip=allowed_ip)
if self.instance:
duplicated_ip = duplicated_ip.exclude(uuid=self.instance.uuid)
if duplicated_ip.exists():
raise forms.ValidationError("This IP is already in use by another peer.")
if ipaddress.ip_address(allowed_ip) not in wireguard_network:
raise forms.ValidationError("The IP address does not belong to the Peer's WireGuard instance network range. Please check the IP address or change the priority.")
if str(netmask) != str(32):
raise forms.ValidationError("The netmask for priority 0 IP must be 32.")
if self.current_peer.wireguard_instance.address == allowed_ip:
raise forms.ValidationError("The IP address is the same as the Peer's WireGuard instance address.")
duplicated_ip = PeerAllowedIP.objects.filter(config_file='server', allowed_ip=allowed_ip)
if self.instance:
duplicated_ip = duplicated_ip.exclude(uuid=self.instance.uuid)
if duplicated_ip.exists():
raise forms.ValidationError("This IP is already in use by another peer.")
if ipaddress.ip_address(allowed_ip) not in wireguard_network:
raise forms.ValidationError("The IP address does not belong to the Peer's WireGuard instance network range. Please check the IP address or change the priority.")
if str(netmask) != str(32):
raise forms.ValidationError("The netmask for priority 0 IP must be 32.")
if self.current_peer.wireguard_instance.address == allowed_ip:
raise forms.ValidationError("The IP address is the same as the Peer's WireGuard instance address.")
else:
if ipaddress.ip_address(allowed_ip) in wireguard_network:
raise forms.ValidationError("The IP address belongs to the Peer's WireGuard instance network range. Please check the IP address or change use priority 0 instead.")
elif self.config_file == 'client':
if priority < 1:
raise forms.ValidationError("Priority must be greater than or equal to 1")
else:
if ipaddress.ip_address(allowed_ip) in wireguard_network:
raise forms.ValidationError("The IP address belongs to the Peer's WireGuard instance network range. Please check the IP address or change use priority 0 instead.")
raise forms.ValidationError('Invalid config file')
class Meta:
model = PeerAllowedIP

View file

@ -22,7 +22,7 @@ def generate_peer_default(wireguard_instance):
# the code below can be an issue for larger networks, for now it's fine, but it should be optimized in the future
used_ips = set(WireGuardInstance.objects.all().values_list('address', flat=True)) | \
set(PeerAllowedIP.objects.filter(priority=0).values_list('allowed_ip', flat=True))
set(PeerAllowedIP.objects.filter(config_file='server', priority=0).values_list('allowed_ip', flat=True))
free_ip_address = None
for ip in network.hosts():
@ -88,6 +88,7 @@ def view_wireguard_peer_manage(request):
wireguard_instance=current_instance,
)
PeerAllowedIP.objects.create(
config_file='server',
peer=new_peer,
allowed_ip=new_peer_data['allowed_ip'],
priority=0,
@ -115,7 +116,8 @@ def view_wireguard_peer_manage(request):
messages.warning(request, 'Error deleting peer|Invalid confirmation message. Type "delete" to confirm.')
return redirect('/peer/manage/?peer=' + str(current_peer.uuid))
page_title = 'Update Peer '
peer_ip_list = current_peer.peerallowedip_set.all().order_by('priority')
peer_ip_list = current_peer.peerallowedip_set.filter(config_file='server').order_by('priority')
peer_client_ip_list = current_peer.peerallowedip_set.filter(config_file='client').order_by('priority')
if current_peer.name:
page_title += current_peer.name
else:
@ -133,31 +135,28 @@ def view_wireguard_peer_manage(request):
else:
return redirect('/peer/list/')
context = {
'page_title': page_title, 'current_instance': current_instance, 'current_peer': current_peer, 'form': form, 'peer_ip_list': peer_ip_list
'page_title': page_title, 'current_instance': current_instance, 'current_peer': current_peer, 'form': form,
'peer_ip_list': peer_ip_list, 'peer_client_ip_list': peer_client_ip_list
}
return render(request, 'wireguard/wireguard_manage_peer.html', context)
def view_manage_ip_address(request):
if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=30).exists():
return render(request, 'access_denied.html', {'page_title': 'Access Denied'})
config_file = request.GET.get('config', 'server')
if request.GET.get('peer'):
current_peer = get_object_or_404(Peer, uuid=request.GET.get('peer'))
page_title = 'Add new IP address for Peer '
#page_title = 'Add new IP address for Peer ' + str(current_peer)
current_ip = None
if current_peer.name:
page_title += current_peer.name
else:
page_title += current_peer.public_key
elif request.GET.get('ip'):
current_ip = get_object_or_404(PeerAllowedIP, uuid=request.GET.get('ip'))
current_peer = current_ip.peer
page_title = 'Update IP address for Peer '
if current_peer.name:
page_title += current_peer.name
else:
page_title += current_peer.public_key[:10] + ("..." if len(current_peer.public_key) > 16 else "")
config_file = current_ip.config_file
#page_title = 'Update IP address for Peer ' + str(current_peer)
if request.GET.get('action') == 'delete':
if request.GET.get('confirmation') == 'delete':
current_ip.delete()
@ -168,13 +167,20 @@ def view_manage_ip_address(request):
else:
messages.warning(request, 'Error deleting IP address|Invalid confirmation message. Type "delete" to confirm.')
return redirect('/peer/ip/?ip=' + str(current_ip.uuid))
if config_file not in ['client', 'server']:
config_file = 'server'
if config_file == 'client':
page_title = 'Manage client route'
else:
page_title = 'Manage IP address or Network'
if request.method == 'POST':
form = PeerAllowedIPForm(request.POST or None, instance=current_ip, current_peer=current_peer)
form = PeerAllowedIPForm(request.POST or None, instance=current_ip, current_peer=current_peer, config_file=config_file)
if form.is_valid():
this_form = form.save(commit=False)
if not current_ip:
this_form.peer = current_peer
this_form.config_file = config_file
this_form.save()
current_peer.wireguard_instance.pending_changes = True
current_peer.wireguard_instance.save()