test_package/scripts/utils.py

434 lines
14 KiB
Python

import requests
import csv
import base64
from itertools import islice
from ldap3 import Server, Connection, ALL, SUBTREE
import pandas as pd
import json
import re
from config import *
def replace_in_list(lst, translation_map):
"""Replaces values in a list based on a given mapping dictionary."""
return [translation_map[item] for item in lst if item in translation_map]
def generate_json(value_array, objecttypeid, objectTypeAttributeId):
"""Generates a JSON payload for the API request."""
return {
"objectId": objecttypeid,
"objectTypeAttributeId": objectTypeAttributeId,
"objectAttributeValues": [{"value": v} for v in value_array]
}
def generate_create_zone_json(value_array, objecttypeid, objectTypeAttributeId):
"""Generates a JSON payload for the API request."""
return {
"objectTypeId": objecttypeid,
"attributes": [{
"objectTypeAttributeId": objectTypeAttributeId,
"objectAttributeValues": [{"value": v} for v in value_array]
}]
}
def split_csv(file_path, max_lines, uuid):
"""Splits a CSV file into multiple chunks ensuring no chunk exceeds the specified number of lines."""
chunks = []
with open(file_path, newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
next(reader) # Skip header row
while True:
chunk = list(islice(reader, max_lines))
if not chunk:
break
ad_groups = [{"name": row[0]} for row in chunk] # Assuming first column contains group names
output_data = {
"data": {
"adGroups": ad_groups
},
"clientGeneratedId": uuid
}
chunks.append(output_data)
return chunks
def get_import_info(token):
"""Fetches import information from Atlassian API and removes the 'links' branch."""
url = "https://api.atlassian.com/jsm/assets/v1/imports/info"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
return data['links']
else:
response.raise_for_status()
def initiate_schema_map(url, token):
data_structure = {
"schema": {
"objectSchema": {
"name": "Active Directory Groups",
"description": "Data imported from AD",
"objectTypes": [
{
"externalId": "object-type/ad-group",
"name": "AD Groups",
"description": "AD Group found during scanning",
"attributes": [
{
"externalId": "object-type-attribute/adgroup-name",
"name": "Name",
"description": "Ad Group Name",
"type": "text",
"label": True
}
]
}
]
}
},
"mapping": {
"objectTypeMappings": [
{
"objectTypeExternalId": "object-type/ad-group",
"objectTypeName": "AD Groups",
"selector": "adGroups",
"description": "Mapping for AD Groups",
"attributesMapping": [
{
"attributeExternalId": "object-type-attribute/adgroup-name",
"attributeName": "Name",
"attributeLocators": ["name"]
}
]
}
]
}
}
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
response = requests.put(url, headers=headers, json=data_structure)
if response.status_code == 200 or response.status_code == 201:
print("Request successful:", response.json())
else:
print(f"Error {response.status_code}: {response.text}")
return response
def start_import(url, token):
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
data_structure={}
response = requests.post(url, headers=headers, json=data_structure)
if response.status_code == 200:
data = response.json()
return data['links']
else:
response.raise_for_status()
def send_data(url, token, payload):
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200 or response.status_code == 201:
print("Request successful:", response.json())
else:
print(f"Error {response.status_code}: {response.text}")
def get_ad_groups_entries():
server = Server(ldap_server, get_info=ALL)
conn = Connection(server, ldap_user, ldap_password, auto_bind=True)
page_size = 1000
cookie = None
filtered_sAMAccountNames = []
try:
while True:
conn.search(
search_base=base_dn,
search_filter='(objectClass=group)',
search_scope=SUBTREE,
attributes=['sAMAccountName'],
paged_size=page_size,
paged_cookie=cookie
)
bad_chars = ['!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '_', '-', '=', '+', '[', ']', '{', '}', ';', ':', '"', "'", '<', '>', ',', '.', '/', '?', '|', '\\', ' ']
bad_chars_pattern = f"^[{re.escape(''.join(bad_chars))}]"
for entry in conn.entries:
if 'sAMAccountName' in entry:
sAMAccountName = entry.sAMAccountName.value
if re.match(bad_chars_pattern, sAMAccountName):
continue
if ' ' not in sAMAccountName and '$' not in sAMAccountName:
filtered_sAMAccountNames.append(sAMAccountName)
cookie = conn.result['controls']['1.2.840.113556.1.4.319']['value']['cookie']
# Break the loop if no more pages
if not cookie:
break
except Exception as e:
print(f"Error during LDAP search: {e}")
finally:
conn.unbind()
# Remove duplicates and sort the list
unique_sorted_names = sorted(set(filtered_sAMAccountNames))
df = pd.DataFrame(unique_sorted_names, columns=["name"])
return df
def rows_not_in_df2(df1: pd.DataFrame, df2: pd.DataFrame, compare_cols=None) -> pd.DataFrame:
"""
Returns all rows in df1 that do not exist in df2, based on specified columns.
:param df1: First DataFrame (source)
:param df2: Second DataFrame (reference)
:param compare_cols: List of column names to compare (default: all common columns)
:return: DataFrame containing rows from df1 that are not in df2
"""
# If no specific columns are given, use all common columns
if compare_cols is None:
compare_cols = df1.columns.intersection(df2.columns).tolist()
# Ensure specified columns exist in both DataFrames
compare_cols = [col for col in compare_cols if col in df1.columns and col in df2.columns]
# Convert column types to match in both DataFrames (avoid dtype mismatches)
df1 = df1.copy()
df2 = df2.copy()
for col in compare_cols:
df1[col] = df1[col].astype(str)
df2[col] = df2[col].astype(str)
# Perform an anti-join using merge with an indicator column
df_merged = df1.merge(df2, on=compare_cols, how='left', indicator=True)
# Keep rows that exist only in df1
df1_not_in_df2 = df_merged[df_merged['_merge'] == 'left_only'].drop(columns=['_merge'])
return df1_not_in_df2.reset_index(drop=True) # Reset index for clean output
def rows_new_in_df2(df1: pd.DataFrame, df2: pd.DataFrame, compare_cols=None) -> pd.DataFrame:
"""
Returns all rows in df2 that do not exist in df1, based on specified columns.
:param df1: First DataFrame (previous dataset)
:param df2: Second DataFrame (new dataset)
:param compare_cols: List of column names to compare (default: all common columns)
:return: DataFrame containing rows from df2 that are new (not in df1)
"""
# If no specific columns are given, use all common columns
if compare_cols is None:
compare_cols = df1.columns.intersection(df2.columns).tolist()
# Ensure specified columns exist in both DataFrames
compare_cols = [col for col in compare_cols if col in df1.columns and col in df2.columns]
# Convert column types to match in both DataFrames (to avoid dtype mismatches)
df1 = df1.copy()
df2 = df2.copy()
for col in compare_cols:
df1[col] = df1[col].astype(str)
df2[col] = df2[col].astype(str)
# Perform an anti-join using merge with an indicator column
df_merged = df2.merge(df1, on=compare_cols, how='left', indicator=True)
# Keep rows that exist only in df2 (newly added rows)
df2_new_rows = df_merged[df_merged['_merge'] == 'left_only'].drop(columns=['_merge'])
return df2_new_rows.reset_index(drop=True) # Reset index for clean output
def send_request(url, credentials, payload=None, method="POST", headers=None):
"""
Sends an HTTP request with credentials and a data payload.
Parameters:
url (str): The endpoint URL.
credentials (str): The username:api_token
payload (dict): The data payload to send.
method (str): The HTTP method (GET, POST, PUT, DELETE). Default is POST.
headers (dict, optional): Additional headers for the request.
Returns:
response (Response): The response object from the request.
"""
encoded_credentials = base64.b64encode(credentials.encode()).decode()
# Default headers if none are provided
if headers is None:
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": f"Basic {encoded_credentials}"
}
else:
headers["Authorization"] = f"Basic {encoded_credentials}"
request_params = {
"method": method,
"url": url,
"headers": headers
}
if method.upper() != "GET" and payload:
request_params["data"] = payload # Add payload only for non-GET requests
# Send the request with basic authentication
try:
response = requests.request(**request_params)
# Check response status codes and return appropriate messages
if response.status_code == 201:
return {"message": "Created Successfully", "data": response.json()}
elif response.status_code == 400:
return {"error": "Bad Request", "details": response.text}
elif response.status_code == 401:
return {"error": "Unauthorized Access", "details": response.text}
elif response.status_code == 500:
return {"error": "Internal Server Error", "details": response.text}
else:
response.raise_for_status() # Raise an error for other HTTP errors
return response.json() # Return successful response data
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def fetch_all_records(url, credentials, object_count=None, page_size=100):
"""
Fetch all objects from JSM Assets API in paginated requests.
:param object_count: Total number of objects to retrieve.
:param page_size: Number of objects per request (default: 100).
:return: List of all retrieved objects.
"""
encoded_credentials = base64.b64encode(credentials.encode()).decode()
# Default headers if none are provided
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": f"Basic {encoded_credentials}"
}
all_objects = []
total_pages = (object_count // page_size) + (1 if object_count % page_size else 0)
for page in range(total_pages):
params = {
"objectTypeId": objectTypeId,
"objectSchemaId": objectSchemaId,
"page": page + 1,
"attributesToDisplay": {
"attributesToDisplayIds": attributesToDisplayIds
},
"asc": 1,
"resultsPerPage": page_size,
"qlQuery": ""
}
# print(json.dumps(params, indent=2))
response = requests.post(url, headers=headers, json=params)
if response.status_code == 200:
data = response.json()
all_objects.extend(data.get("objectEntries", [])) # Extract objects from response
else:
print(f"Error on page {page + 1}: {response.status_code} - {response.text}")
break # Stop fetching if an error occurs
columns = {"name": [], "id": []}
for entry in all_objects:
columns["name"].append(entry["name"])
columns["id"].append(entry["id"])
df = pd.DataFrame(columns)
return df
def get_cmdb_zones():
return
def get_centrify_zones():
return
def set_inactive_in_cmdb(df1: pd.DataFrame):
for index, row in df1.iterrows():
objectid = row['id']
name = row['name']
attributes_id = ad_groups_attributes_id["Active"]
url = f"{BASE_URL}/{WORKSPACE_ID}/v1/objectattribute/{objectid}/attribute/{attributes_id}"
payload = {
"objectAttributeValues":[{"value":"false"}]
}
response = send_request(url, credentials, payload=json.dumps(payload), method="PUT", headers=None)
return response
def create_new_in_cmdb(df1: pd.DataFrame):
url = f"{BASE_URL}/{WORKSPACE_ID}/v1/object/create"
for index, row in df1.iterrows():
name = row['name']
payload = {
"objectTypeId": objectTypeId,
"attributes": [
{
"objectTypeAttributeId": ad_groups_attributes_id["Name"],
"objectAttributeValues": [
{
"value": name
}
]
},
{
"objectTypeAttributeId": ad_groups_attributes_id["Active"],
"objectAttributeValues": [
{
"value": "true"
}
]
}
],
"hasAvatar": False,
"avatarUUID": ""
}
response = send_request(url, credentials, payload=json.dumps(payload), method="POST", headers=None)
return response