Routerfleet integration

This commit is contained in:
Eduardo Silva 2024-04-03 15:19:01 -03:00
parent 6ffa1abb15
commit 31fc3a1098
6 changed files with 127 additions and 5 deletions

View file

@ -1,9 +1,14 @@
from django.contrib.auth.models import User
from django.contrib import auth
from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect
from django.views.decorators.http import require_http_methods
from django.http import HttpResponseForbidden
from django.conf import settings
from django.utils import timezone
from user_manager.models import UserAcl, AuthenticationToken
from wireguard.models import WebadminSettings, Peer, PeerStatus
import requests
import subprocess
@ -13,9 +18,14 @@ import os
import uuid
def get_api_key():
api_file_path = '/etc/wireguard/api_key'
def get_api_key(api_name):
api_key = None
if api_name == 'api':
api_file_path = '/etc/wireguard/api_key'
elif api_name == 'routerfleet':
api_file_path = '/etc/wireguard/routerfleet_key'
else:
return api_key
if os.path.exists(api_file_path) and os.path.isfile(api_file_path):
with open(api_file_path, 'r') as api_file:
@ -31,12 +41,80 @@ def get_api_key():
return api_key
def routerfleet_authenticate_session(request):
AuthenticationToken.objects.filter(created__lt=timezone.now() - timezone.timedelta(minutes=1)).delete()
authentication_token = get_object_or_404(AuthenticationToken, uuid=request.GET.get('token'))
auth.login(request, authentication_token.user)
authentication_token.delete()
return redirect('/')
@require_http_methods(["GET"])
def routerfleet_get_user_token(request):
data = {'status': '', 'message': '', 'authentication_token': ''}
if request.GET.get('key'):
api_key = get_api_key('routerfleet')
if api_key and api_key == request.GET.get('key'):
pass
else:
return HttpResponseForbidden()
else:
return HttpResponseForbidden()
try:
default_user_level = int(request.GET.get('default_user_level'))
if default_user_level not in [10, 20, 30, 40, 50]:
default_user_level = 0
except:
default_user_level = 0
if request.GET.get('username'):
user = User.objects.filter(username=request.GET.get('username')).first()
if request.GET.get('action') == 'test':
if UserAcl.objects.filter(user=user, user_level__gte=50).exists():
data['status'] = 'success'
data['message'] = 'User exists and is an administrator'
else:
data['status'] = 'error'
data['message'] = f'Administrator with username {request.GET.get("username")} not found at wireguard_webadmin.'
elif request.GET.get('action') == 'login':
if user:
user_acl = UserAcl.objects.filter(user=user).first()
else:
if default_user_level == 0:
data['status'] = 'error'
data['message'] = 'User not found'
else:
user = User.objects.create_user(username=request.GET.get('username'), password=str(uuid.uuid4()))
user_acl = UserAcl.objects.create(user=user, user_level=default_user_level)
if user and user_acl:
authentication_token = AuthenticationToken.objects.create(user=user)
data['status'] = 'success'
data['message'] = 'User authenticated successfully'
data['authentication_token'] = str(authentication_token.uuid)
else:
data['status'] = 'error'
data['message'] = 'Invalid action'
else:
data['status'] = 'error'
data['message'] = 'No username provided'
if data['status'] == 'error':
return JsonResponse(data, status=400)
else:
return JsonResponse(data)
@require_http_methods(["GET"])
def wireguard_status(request):
if request.user.is_authenticated:
pass
elif request.GET.get('key'):
api_key = get_api_key()
api_key = get_api_key('api')
if api_key and api_key == request.GET.get('key'):
pass
else: