infoblox-experts-program.jpg

bulk .csv upload: DHCP > IPv4 Filter > Mac Filter automation

My goal is to automate our mac filter (allow list) for DHCP IPv4. We (the enterprise) use several systems to enforce computer compliance (encryption, AV, removable media protection). And when someone is not protected, we don't want them on our network. After our compliance systems gives me a list of compliant devices, I add them to this allow list. 

With lots of reading, trial and error, and help from SBaksh (many thanks!)

First I clear the existing list (all of these imports are not needed, but eventually I will merge the two code snips so I've left them there):

 

import requests
import json
import csv
import getpass
import sys
import os

url = 'https://infobloxtesting.com/wapi/v2.3.1/macfilteraddress?filter=SN_Test'
urld = 'https://infobloxtesting.com/wapi/v2.3.1/'
id = 'admin'
pw = 'Infoblox'

r = requests.get(url , auth=(id, pw), verify=False)

json_input = r.text
decoded = json.loads(json_input)
for j in decoded:
    response = requests.request("DELETE", urld + j['_ref'], auth=(id, pw), verify=False)

 

 

Second, I import the new list. This list comes out of my reporting system in the format as needed by infoblox macaddressfilter (credit for this one to 

import requests
import json
import csv
import getpass
import sys
import os

url = 'https://infobloxtest.com/wapi/v2.3.1/'
id = 'admin'
pw = 'infoblox'

valid_cert = False 

# Helper functions.
def sanitize_filename(pathname):

    # Get the base filename without the directory path, convert dashes
    # to underscores, and get rid of other special characters.
    filename = ''
    for c in os.path.basename(pathname):
        if c == '-':
            c = '_'
        if c.isalnum() or c == '_' or c == '.':
            filename += c
    return filename


# Prompt for the API user password.
 # pw = getpass.getpass('Password for user ' + id + ': ')

# If running on Windows avoid error due to a self-signed cert.
 #if sys.platform.startswith('win') and not valid_cert:
 #requests.packages.urllib3.disable_warnings()

# The CSV file we want to import (in the local filesystem).
csv_data ='/upload/Macfilteraddresss.csv'

# Initiate a file upload operation, providing a filename (with
# alphanumeric, underscore, or periods only) for the CSV job manager.
req_params = {'filename': sanitize_filename(csv_data)}
r = requests.post(url + 'fileop?_function=uploadinit',
                  params=req_params,
                  auth=(id, pw),
                  verify=valid_cert)
if r.status_code != requests.codes.ok:
    print (r.text)
    exit_msg = 'Error {} initiating upload: {}'
    sys.exit(exit_msg.format(r.status_code, r.reason))
results = r.json()

# Save the authentication cookie for use in subsequent requests.
ibapauth_cookie = r.cookies['ibapauth']

# Save the returned URL and token for subsequent requests.
upload_url = results['url']
upload_token = results['token']

# Specify a file handle for the file data to be uploaded.
req_files = {'filedata': open(csv_data,'rb')}
req_params = {'name': sanitize_filename(csv_data)}

# Use the ibapauth cookie to authenticate instead of userid/password.
req_cookies = {'ibapauth': ibapauth_cookie}

# Perform the actual upload. (NOTE: It does NOT return JSON results.)
r = requests.post(upload_url,
                  params=req_params,
                  files=req_files,
                  cookies=req_cookies,
                  verify=valid_cert)
if r.status_code != requests.codes.ok:
    print (r.text)
    exit_msg = 'Error {} uploading file: {}'
    sys.exit(exit_msg.format(r.status_code, r.reason))

# Initiate the actual import task.
#valid values are: INSERT, UPDATE, REPLACE, DELETE, CUSTOM
req_params = {'token': upload_token,
              'doimport': True,
              'on_error': 'CONTINUE',
              'operation': 'INSERT',
              'update_method': 'OVERRIDE'}
r = requests.post(url + 'fileop?_function=csv_import',
                  params=req_params,
                  cookies=req_cookies,
                  verify=valid_cert)
if r.status_code != requests.codes.ok:
    print (r.text)
    exit_msg = 'Error {} starting CSV import: {}'
    sys.exit(exit_msg.format(r.status_code, r.reason))
results = r.json()

# Record cvsimporttask object reference for possible future use.
csvimporttask = results['csv_import_task']['_ref']
print (csvimporttask)

I hope this helps...

tibby

 

 

Showing results for 
Search instead for 
Did you mean: