• 利用 Python 分析 Juniper SRX 防火墙策略
    01
    背景描述

    某客户部署在 DMZ 区的 SRX 345 防火墙之前是透明模式部署在网络内,现需要对其增加 4 层的安全策略,由于涉及到大量客户端与服务器通信的需求,以及服务器之间通信的需求,且网段规划不合理,网关在核心上,导致策略比较混乱。

    整体配置策略调试花了三周左右,期间多次更改策略,需要和客户核对,并且需要对接多个部门负责应用的人确认业务的流量。

    SRX 的命令网络工程师看还可以,但是客户对防火墙不是很了解,所以需要自动化的方式将策略整理为其他形式,例如 excel 的方式更直观,同时准确性也得到了保证,这一点非常重要。

    02
    Juniper 策略的基本知识

     

    一条策略包含如下内容

    • policy-name

    • from-zone

    • to-zone

    • source-address

      不是直接写地址,而是 address-book 或 address-set

    • destination-address

      同 source-address 一样

    • application

    • action

    • address-set

      包含一个或多个address-book

    • address-book

      包含一个地址或域名

    03
    实现流程
      1. 获取 SRX 策略的配置

        Juniper 的 Junos 可以输出多种配置方式,例如 set 类型的配置命令、xml、json(12版本不支持),为使代码有更强的适应性,使用了 xml 格式的源配置文件

        如下代码定义一个函数将 xml 转为 json 格式,方便分析

        def xml_to_json(xml_str):   xmlparse = xmltodict.parse(xml_str)   jsonstr = json.dumps(xmlparse, indent=1)   return jsonstr
      2. 获取 address_book 与 address_set

    def get_address_book():   finnal_address_list = []   # os.chdir('/Users/houmingming/cache')   with open('SRX345config.xml') as file:     j = json.loads(xml_to_json(file.read()))   address_book_raw = j['rpc-reply']['configuration']['security']['address-book']   for x in address_book_raw['address']:     try:       finnal_address_list.append([address_book_raw['name'], x['name'], x['ip-prefix'], 'ip-prefix'])     except KeyError:       finnal_address_list.append([address_book_raw['name'], x['name'], x['dns-name']['name'], 'dns-name'])   address_book_set = address_book_raw['address-set']   for x in address_book_set:     for i in x['address']:       for y in finnal_address_list:         if i['name'] == y[1]:           y.insert(1, x['name'])   for x in finnal_address_list:     if len(x) == 4:       x.insert(1, 'None')   return finnal_address_list
     
     def get_address_set():   # os.chdir('/Users/houmingming/cache')   with open('SRX345config.xml') as file:     j = json.loads(xml_to_json(file.read()))   address_book_raw = j['rpc-reply']['configuration']['security']['address-book']['address-set']   finally_address_set = []   for x in address_book_raw:     for i in x['address']:       # print(i['name'])       finally_address_set.append([x['name'], i['name']])   return finally_address_set
      1. 分析策略

        从字典中获取策略包含的元素,并返回一个列表

        def policy_analysis():   with open('SRX345config.xml') as file:     j = json.loads(xml_to_json(file.read()))   finally_policy = []   for value in j['rpc-reply']['configuration']['security']['policies']['policy']:     for x in value['policy']:       try:         for i in x['then'].keys():           finally_policy.append([x['name'], value['from-zone-name'], value['to-zone-name'],  x['match']['source-address'], x['match']['destination-address'], x['match']['application'], i])       except TypeError:         print(value)   return finally_policy
      2. 写入到数据库内

        写入数据库的优点是可以很多种方式调用,例如放到网站上,也可以导出为 excel

        def write_to_db():   conn = psycopg2.connect(dbname="houm01db", user="houm01dbuser", password="packet@123", host="docker.houm01.com")   cursor = conn.cursor()   cursor.execute("truncate table app01_example_srx_address_book")   for x in get_address_book():     cursor.execute("INSERT INTO app01_example_srx_address_book (name, address_book,address,type )"           "VALUES (%s, %s, %s, %s)", (x[0], x[2], x[3], x[4]))   cursor.execute("truncate table app01_example_srx_address_set")   for y in get_address_set():     cursor.execute("INSERT INTO app01_example_srx_address_set (address_set, address_book_name )"           "VALUES (%s, %s)", (y[0], y[1]))   cursor.execute("truncate table app01_example_srx_policy")   for i in policy_analysis():     cursor.execute("INSERT INTO app01_example_srx_policy "            "(policy_name, source_zone, destination_zone, source_address, destination_address, application, action) "            "VALUES (%s, %s, %s, %s, %s, %s, %s)", (i[0], i[1], i[2], str(i[3]), str(i[4]),  str(i[5]), i[6]))   conn.commit()   cursor.close()   conn.close()
      3. 创建 excel 并写入

        每个策略方向一个工作表,address-book 和 set 各一个工作表

        同时做了一些调整字体、间距的操作

        def policy_to_excel():   conn = psycopg2.connect(dbname="houm01db", user="houm01dbuser", password="packet@123", host="docker.houm01.com")   cursor = conn.cursor()   cursor.execute("select policy_name, source_zone, destination_zone, source_address, destination_address, application, action from app01_example_srx_policy where action != 'log'")   data = cursor.fetchall()   cursor.execute("select * from app01_example_srx_address_set")   address_book_set_data = cursor.fetchall()   cursor.execute("select * from app01_example_srx_address_book")   address_book_data = cursor.fetchall()   cursor.close()   conn.close()   book = xlwt.Workbook(encoding='utf-8')   worksheet = book.add_sheet('SRX 345 policy')   address_book_set_sheet = book.add_sheet('address_book_set')   address_book_sheet = book.add_sheet('address_book')   # 初始化自定义的style   tittle_style = xlwt.XFStyle()   mystyle = xlwt.XFStyle()   # 定义标题字体   tittle_font = xlwt.Font()   tittle_font.name = 'Times New Roman'   tittle_font.height = 20 * 14   tittle_font.bold = True   # 定义正文字体   font = xlwt.Font()   font.name = 'Times New Roman'   font.height = 20 * 12   font.bold = False  # 加粗   # 定义居中方式   al = xlwt.Alignment()   al.horz = 0x02   al.vert = 0x01   # 定义背景颜色   pa = xlwt.Pattern()   pa.pattern = xlwt.Pattern.SOLID_PATTERN   pa.pattern_fore_colour = 47   tittle_style.font = tittle_font   tittle_style.alignment = al   tittle_style.pattern = pa   mystyle.font = font   mystyle.alignment = al   # 定义表头   worksheet.write(0, 0, label='policy-name', style=tittle_style)   worksheet.write(0, 1, label='from-zone', style=tittle_style)   worksheet.write(0, 2, label='to-zone', style=tittle_style)   worksheet.write(0, 3, label='source-address', style=tittle_style)   worksheet.write(0, 4, label='destination-address', style=tittle_style)   worksheet.write(0, 5, label='application', style=tittle_style)   worksheet.write(0, 6, label='action', style=tittle_style)   address_book_set_sheet.write(0, 0, label='address-set', style=tittle_style)   address_book_set_sheet.write(0, 1, label='address-book-name', style=tittle_style)   address_book_sheet.write(0, 0, label='name', style=tittle_style)   address_book_sheet.write(0, 1, label='address-book', style=tittle_style)   address_book_sheet.write(0, 2, label='address', style=tittle_style)   address_book_sheet.write(0, 3, label='type', style=tittle_style)   # 定义内容   val = 1   for x in data:     worksheet.write(val, 0, str(x[0]), style=mystyle)     worksheet.write(val, 1, str(x[1]), style=mystyle)     worksheet.write(val, 2, str(x[2]), style=mystyle)     worksheet.write(val, 3, str(x[3]), style=mystyle)     worksheet.write(val, 4, str(x[4]), style=mystyle)     worksheet.write(val, 5, str(x[5]), style=mystyle)     worksheet.write(val, 6, str(x[6]), style=mystyle)     val = val + 1   val = 1  # 重置 val 值   for y in address_book_set_data:     # print(y)     address_book_set_sheet.write(val, 0, str(y[1]), style=mystyle)     address_book_set_sheet.write(val, 1, str(y[2]), style=mystyle)     val = val + 1   val = 1  # 重置 val 值   for z in address_book_data:     # print(z)     address_book_sheet.write(val, 0, str(z[1]), style=mystyle)     address_book_sheet.write(val, 1, str(z[2]), style=mystyle)     address_book_sheet.write(val, 2, str(z[3]), style=mystyle)     address_book_sheet.write(val, 3, str(z[4]), style=mystyle)     val = val + 1   # 设置宽度   # 宽的基本单位是256,所以用乘法的方式,容易查看   worksheet.col(0).width = 256 * 30   worksheet.col(1).width = 256 * 12   worksheet.col(2).width = 256 * 10   worksheet.col(3).width = 256 * 50   worksheet.col(4).width = 256 * 50   worksheet.col(5).width = 256 * 30   address_book_set_sheet.col(0).width = 256 * 15   address_book_set_sheet.col(1).width = 256 * 30   address_book_sheet.col(0).width = 256 * 10   address_book_sheet.col(1).width = 256 * 30   address_book_sheet.col(2).width = 256 * 30   address_book_sheet.col(3).width = 256 * 15   # 行高的基本单位为20   serial = 0   while serial < len(address_book_data) + 1:     worksheet.row(serial).height_mismatch = True  # 设置行高可以修改     address_book_set_sheet.row(serial).height_mismatch = True     address_book_sheet.row(serial).height_mismatch = True     worksheet.row(serial).height = 20 * 20     address_book_set_sheet.row(serial).height = 20 * 20     address_book_sheet.row(serial).height = 20 * 20     serial = serial + 1   book.save('example_srx345_policy.xls')
        def session_analysis():   with open('/Users/houmingming/cache/srx/SRX345log.txt') as file:     raw_txt = file.read()   raw_txt.index('nnode0:n---')  # 查找node0的所在位置   raw_txt.index('nnode1:n---')  # 查找node1的所在位置   node0_session = raw_txt[raw_txt.index('nnode0:n---'):raw_txt.index('nnode1:n---')]   node1_session = raw_txt[raw_txt.index('nnode1:n---'):]   split_node0_session = re.split('Session ', node0_session)  # 利用 re.split 切分字符串,以 "Session" 这个字符来切   # 该正则将所有内容都匹配上了,但只使用其中一部分   re_session = re.findall('.*ID: (.*), Policy name: (.*)/(.*), State: (.*), Timeout: (.*), (.*)n  In: (.*)/(.*) --> (.*)/(.*);(.*), Conn.*If: (.*), Pkts: (.*), Bytes: (.*), n  Out: (.*) --> (.*)/(.*);(.*), Conn.*If: (.*), Pkts: (.*), Bytes: (.*), ', node0_session)   return re_session
        def session_into_db():   conn = psycopg2.connect(dbname="houm01db", user="houm01dbuser", password="packet@123", host="docker.houm01.com")   cursor = conn.cursor()   insert_time = input("请输入获取session的时间,格式示例'2019-11-10 10:00':n")   for i in session_analysis():     cursor.execute("INSERT INTO app01_example_srx_session "            "(insert_time, session_id, policy_name, policy_seq, state, source_ip, source_port, destination_ip, destination_port, protocol, source_interface, destination_interface)"            "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", (insert_time, i[0], i[1], i[2], i[3], i[6], i[7], i[8], i[9], i[10], i[11], i[18]))   conn.commit()   cursor.close()   conn.close()
      4. 排列方法,做调用

         if __name__ == '__main__':   code = input('请选择要执行的功能n'          '选择"1":分析SRX的xml文件,并插入到数据库n'          '选择"2":生成策略Excel文件n'          '选择"3": 分析Sessionn')   if code == "1":     write_to_db()   elif code == "2":     policy_to_excel()   elif code == "3":     session_into_db()   else:     print("输入错误,已退出")

     

     

    04
    效果展示

    输出后的 excel 部分内容如下

    利用 Python 分析 Juniper SRX 防火墙策略

     

    利用 Python 分析 Juniper SRX 防火墙策略

    利用 Python 分析 Juniper SRX 防火墙策略

     

     

    «
    »
以专业成就每一位客户,让企业IT只为效果和安全买单

以专业成就每一位客户,让企业IT只为效果和安全买单