mirror of
https://github.com/eduardogsilva/routerfleet.git
synced 2025-06-21 01:25:41 +02:00
131 lines
5.8 KiB
Python
131 lines
5.8 KiB
Python
from backup.models import BackupProfile
|
|
from router_manager.models import SUPPORTED_ROUTER_TYPES, SSHKey, RouterGroup
|
|
from django import forms
|
|
from crispy_forms.helper import FormHelper
|
|
from crispy_forms.layout import Layout, Submit, Row, Column, HTML
|
|
from .models import CsvData
|
|
import csv
|
|
import io
|
|
import re
|
|
from django.core.exceptions import ValidationError
|
|
|
|
SUPPORTED_ROUTER_TYPES = [rt[0] for rt in SUPPORTED_ROUTER_TYPES]
|
|
|
|
|
|
class CsvDataForm(forms.ModelForm):
|
|
class Meta:
|
|
model = CsvData
|
|
fields = ['raw_csv_data']
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(CsvDataForm, self).__init__(*args, **kwargs)
|
|
self.helper = FormHelper()
|
|
self.helper.form_method = 'post'
|
|
self.helper.layout = Layout(
|
|
Row(
|
|
Column('raw_csv_data', css_class='form-group col-md-12 mb-0'),
|
|
css_class='form-row'
|
|
),
|
|
Row(
|
|
Column(
|
|
Submit('submit', 'Save', css_class='btn btn-success'),
|
|
HTML(' <a class="btn btn-secondary" href="/router/import_tool/">Back</a> '),
|
|
css_class='col-md-12'),
|
|
css_class='form-row'
|
|
)
|
|
)
|
|
|
|
def clean(self):
|
|
cleaned_data = super().clean()
|
|
raw_csv_data = cleaned_data.get('raw_csv_data')
|
|
|
|
expected_fields = [
|
|
'name', 'username', 'password', 'ssh_key', 'address', 'port', 'router_type',
|
|
'backup_profile', 'router_group', 'monitoring'
|
|
]
|
|
|
|
if raw_csv_data:
|
|
csv_file = io.StringIO(raw_csv_data)
|
|
reader = csv.DictReader(csv_file)
|
|
|
|
if reader.fieldnames is None:
|
|
raise ValidationError("The input is not a valid CSV file or it's empty.")
|
|
|
|
if reader.fieldnames != expected_fields:
|
|
raise ValidationError(f"The CSV header does not match the expected format. Expected fields: {expected_fields}")
|
|
|
|
required_fields = {'name', 'username', 'address', 'port', 'router_type', 'monitoring'}
|
|
optional_fields = {'backup_profile', 'router_group'}
|
|
valid_fields = required_fields | optional_fields | {'password', 'ssh_key'}
|
|
seen_names = set()
|
|
|
|
import_data = []
|
|
import_id = 0
|
|
ssh_key_list = []
|
|
router_group_list = []
|
|
backup_profile_list = []
|
|
|
|
for line_number, row in enumerate(reader, start=2): # start=2 to account for the header
|
|
if not any(row.values()):
|
|
continue # Skip empty lines
|
|
|
|
missing_fields = required_fields - row.keys()
|
|
if missing_fields:
|
|
raise ValidationError(f"Missing required fields: {missing_fields} on line {line_number}")
|
|
|
|
extra_fields = set(row.keys()) - valid_fields
|
|
if extra_fields:
|
|
raise ValidationError(f"Unexpected fields: {extra_fields} on line {line_number}")
|
|
|
|
if not (row.get('password') or row.get('ssh_key')):
|
|
raise ValidationError(f"Either 'password' or 'ssh_key' must be provided on line {line_number}")
|
|
if row.get('password') and row.get('ssh_key'):
|
|
raise ValidationError(f"Both 'password' and 'ssh_key' cannot be provided together on line {line_number}")
|
|
|
|
if row['name'] in seen_names:
|
|
raise ValidationError(f"Duplicate name '{row['name']}' found on line {line_number}")
|
|
seen_names.add(row['name'])
|
|
|
|
if row['router_type'] not in SUPPORTED_ROUTER_TYPES:
|
|
raise ValidationError(f"Invalid router_type '{row['router_type']}' on line {line_number}")
|
|
|
|
if row['monitoring'].lower() not in {'true', 'false'}:
|
|
raise ValidationError(f"Invalid value for 'monitoring' on line {line_number}. Must be 'true' or 'false'.")
|
|
|
|
if not self._is_valid_ip_or_hostname(row['address']):
|
|
raise ValidationError(f"Invalid address '{row['address']}' on line {line_number}")
|
|
|
|
if not row['port'].isdigit() or not (1 <= int(row['port']) <= 65535):
|
|
raise ValidationError(f"Invalid port '{row['port']}' on line {line_number}")
|
|
|
|
if row['ssh_key']:
|
|
if row['ssh_key'] not in ssh_key_list:
|
|
ssh_key_list.append(row['ssh_key'])
|
|
if row['router_group']:
|
|
if row['router_group'] not in router_group_list:
|
|
router_group_list.append(row['router_group'])
|
|
if row['backup_profile']:
|
|
if row['backup_profile'] not in backup_profile_list:
|
|
backup_profile_list.append(row['backup_profile'])
|
|
|
|
import_id += 1
|
|
row['import_id'] = import_id
|
|
import_data.append(row)
|
|
|
|
for ssh_key in ssh_key_list:
|
|
if not SSHKey.objects.filter(name=ssh_key).exists():
|
|
raise ValidationError(f"SSH Key '{ssh_key}' does not exist")
|
|
for router_group in router_group_list:
|
|
if not RouterGroup.objects.filter(name=router_group).exists():
|
|
raise ValidationError(f"Router Group '{router_group}' does not exist")
|
|
for backup_profile in backup_profile_list:
|
|
if not BackupProfile.objects.filter(name=backup_profile).exists():
|
|
raise ValidationError(f"Backup Profile '{backup_profile}' does not exist")
|
|
|
|
cleaned_data['import_data'] = import_data
|
|
return cleaned_data
|
|
|
|
def _is_valid_ip_or_hostname(self, value):
|
|
ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$")
|
|
hostname_pattern = re.compile(r"^(?=.{1,253}$)(?:(?!-)[A-Za-z0-9-]{1,63}(?<!-)\.)+[A-Za-z]{2,6}$")
|
|
return ip_pattern.match(value) or hostname_pattern.match(value)
|