Nov-15-2024, 04:36 PM
(This post was last modified: Nov-15-2024, 05:48 PM by snippsat.
Edit Reason: Added code tag
)
Hi all,
last resort for me trying to resolve an issue. been using ossec, and auto_ossec on centos 8 for ages, with python 3.6.8. centos8 is EOL, so built a new centos9 server.
on the server, im running (manually so i can see console output) the following...
/bin/python3 /bin/auto_server.py
i will paste the auto_server.py contents at the bottom of this post, and highlight the lines from the error.
when i run the client, i get the following on the server output...
last resort for me trying to resolve an issue. been using ossec, and auto_ossec on centos 8 for ages, with python 3.6.8. centos8 is EOL, so built a new centos9 server.
on the server, im running (manually so i can see console output) the following...
/bin/python3 /bin/auto_server.py
i will paste the auto_server.py contents at the bottom of this post, and highlight the lines from the error.
when i run the client, i get the following on the server output...
Error:[*] The auto enrollment OSSEC Server is now listening on 9654
Client connected with ('*.*.*.*', 64241)
new() missing 1 required positional argument: 'mode'
Traceback (most recent call last):
File "/bin/auto_server.py", line 173, in handle
data = aescall(secret, data, "decrypt")
File "/bin/auto_server.py", line 153, in aescall
cipher = AES.new(secret)
TypeError: new() missing 1 required positional argument: 'mode'
Pairing complete. Terminating connection to client.as far as i had read (and i have read a LOT about this), the old server used pycrypto, and i now need to use pycryptodome. im sure this is wher the issues lie.#!/usr/bin/python
#
# Auto-OSSEC Server
#
# This is the server piece to the client/server pair (ossec_client.py). Auto-OSSEC will create a protocol
# and allow automatic deployment of OSSEC keys through an enterprise. One of the biggest challenges with
# OSSEC is the key management pieces which auto-ossec tries to solve. When run, this will listen for comms
# with auto_ossec.py which is the OSSEC client and pass a key request to the client through an AES
# encrypted tunnel. Once the exchange completes, auto_ossec will integrate the key and rewrite the conf
# file for you to incorporate the server IP address. View the README.md for usage and how to effecitvely
# use auto-ossec. This also works with AlienVault pairing.
#
# Written by: Dave Kennedy and the Binary Defense Systems (BDS) Team
# Twitter: @HackingDave, @Binary_Defense
# Website: https://www.binarydefense.com
#
# Recommended: Place this python file under supervisor to ensure health, stability, and service starts.
#
# Usage: python auto_sever.py - This will spawn a port listening and wait for connections from auto_ossec.py
#
# Will listen on port 9654 for an incoming challege
#
# Python Crypto and Python Pexpect is required - apt-get install python-crypto python-pexpect
#
# needed for python2/3 compatibility
try: import SocketServer as socketserver
except ImportError: import socketserver
from threading import Thread
import subprocess
import sys
import traceback
import base64
import time
import socket
import os
# python2/3 compatibility
try: import _thread as thread
except ImportError: import thread
# check python crypto library
try:
from Crypto.Cipher import AES
except ImportError:
print("[!] ERROR: pycryptodome not installed. Run 'python3 -m install pycrypto' to fix.")
sys.exit()
# check pexpect library
try:
import pexpect
except ImportError:
print("[!] ERROR: pexpect not installed. Run 'python3 -m install pexpect' to fix.")
sys.exit()
# global lock to restart ossec service
global counter
counter = 0
# global lock for queue
global queue_lock
queue_lock = 0
# main service handler for auto_server
class service(socketserver.BaseRequestHandler):
def handle(self):
# parse OSSEC hids client certificate
def parse_client(hostname, ipaddr):
child = pexpect.spawn("/var/ossec/bin/manage_agents")
child.timeout=300
child.expect("Choose your action")
child.sendline("a")
child.expect("for the new agent")
child.sendline(hostname)
i = child.expect(['IP Address of the new agent', 'already present'])
# if we haven't already added the hostname
if i == 0:
child.sendline(ipaddr)
child.expect("for the new agent")
child.sendline("")
for line in child:
try: line = str(line, 'UTF-8')
except TypeError: line = str(line) # python2 compatibility
# pull id
if "[" in line:
id = line.replace("[", "").replace("]", "").replace(":", "").rstrip()
break
child.expect("Confirm adding it?")
child.sendline("y")
child.sendline("q")
child.close()
child = pexpect.spawn("/var/ossec/bin/manage_agents -e %s" % (id))
child.timeout=300
for line in child: key = line.rstrip() # actual key export
# when no agents are there and one is removed - the agent wont be added properly right away - need to go through the addition again - appears to be an ossec manage bug - going through everything again appears to solve this
time.sleep(0.5)
if "Invalid ID" in str(key): return 0
return key
# if we have a duplicate hostname
else:
child.close()
child = pexpect.spawn("/var/ossec/bin/manage_agents -l")
child.timeout=300
for line in child:
try: line = str(line, 'UTF-8').rstrip()
except TypeError: line = str(line).rstrip() # python 2 and 3 compatibility
if hostname in line:
remove_id = line.split(",")[0].replace("ID: ", "").replace(" ", "").rstrip()
break
child.close()
time.sleep(0.5)
child = pexpect.spawn("/var/ossec/bin/manage_agents -r %s" % (remove_id))
child.timeout=300
child.expect("manage_agents: Exiting.")
time.sleep(2)
child.close()
time.sleep(1)
return 0
def decryptaes(cipher, data, padding):
result = str(cipher.decrypt(base64.b64decode(data)), 'UTF-8').rstrip(padding)
return result
def decryptaes_py2(cipher, data, padding):
result = cipher.decrypt(base64.b64decode(data)).rstrip(padding)
return result
def encryptaes(cipher, data, padding, blocksize):
# one-liner to sufficiently pad the text to be encrypted
pad = lambda s: s + (blocksize - len(s) % blocksize) * padding
try: data1 = str(data, 'UTF-8') #print('d1', data1)
except TypeError: data1 = str(data)
data2 = pad(data1) #; print('d2', data2)
data3 = cipher.encrypt(data2) #; print('d3', data3, type(data3))
result = base64.b64encode(data3)
return result
# main AES encrypt and decrypt function with 32 block size padding
def aescall(secret, data, format)
# padding and block size
PADDING = '{'
BLOCK_SIZE = 32
# random value here to randomize builds
a = 50 * 5
# generate the cipher
cipher = AES.new(secret)
if format == "encrypt":
aes = encryptaes(cipher, data, PADDING, BLOCK_SIZE)
return aes
if format == "decrypt":
try: aes = decryptaes(cipher, data, PADDING)
except TypeError: aes = decryptaes_py2(cipher, data, PADDING)
return str(aes)
# recommend changing this - if you do, change auto_ossec.py as well - -
# would recommend this is the default published to git
secret = "(3j+-sa!333hNA2u3h@*!~h~2&^lk<!B"
print("Client connected with ", self.client_address)
try:
data = self.request.recv(1024)
if data != "":
try:
data = aescall(secret, data, "decrypt")
# if this section clears -we know that it is a legit
# request, has been decrypted and we're ready to rock
if "BDSOSSEC" in data:
# if we are using star IP addresses
if "BDSOSSEC*" in data: star = 1
else: star = 0
# process to restart OSSEC if needed every 10 minutes -
# if lock variable is 1 is present then it will trigger a
# restart of OSSEC server
global counter
counter = 1
# strip identifier
data = data.replace("BDSOSSEC*", "").replace("BDSOSSEC", "")
hostname = data
# pull the true IP, not the NATed one if they are using VMWare
if star == 0: ipaddr = self.client_address[0]
else: ipaddr = "0.0.0.0/0"
# this will provision the key
def provision_key(hostname, ipaddr):
ossec_key = parse_client(hostname, ipaddr)
if ossec_key == 0:
ossec_key = parse_client(hostname, ipaddr)
# run through again for trouble ones - ossec bug looks like - but this is a decent workaround
if ossec_key == 0:
ossec_key = parse_client(hostname, ipaddr)
print("[*] Provisioned new key for hostname: %s with IP of: %s" % (hostname, ipaddr))
try: ossec_key = ossec_key.decode('UTF-8')
except: ossec_key = str(ossec_key) # python2 compatibility
ossec_key_crypt = aescall(secret, ossec_key, "encrypt")
try: ossec_key_crypt = str(ossec_key_crypt, 'UTF-8')
except TypeError: ossec_key_crypt = str(ossec_key_crypt)
print("[*] Sending new key to %s: " % (hostname) + str(ossec_key))
# if client disconnected dont crash everything
try: self.request.send(ossec_key_crypt.encode('UTF-8'))
except: pass
time.sleep(1)
# here if the hostname was already used, we need to
# remove it and call it again
global queue_lock
# if our queue lock is 0
if queue_lock == 0:
# locking up the queue to process
queue_lock = 1
provision_key(hostname, ipaddr)
time.sleep(0.2)
queue_lock = 0
# if we aren't ready to provision wait
else:
# we need to wait until its finished
while 1:
# sleep 5 seconds and wait for lock
time.sleep(5)
if queue_lock == 0:
print("Queue is now free, proceeding with current agent additions..")
queue_lock = 1
provision_key(hostname, ipaddr)
time.sleep(0.2)
queue_lock = 0
break
except Exception as e:
print(e)
traceback.print_exc(file=sys.stdout)
pass
except Exception as e:
print(e)
pass
print("Pairing complete. Terminating connection to client.")
self.request.close()
# this waits 5 minutes to check if new ossec agents have been deployed, if
# so it restarts the server
def ossec_monitor():
while 1:
time.sleep(300)
global counter
if counter == 1:
# if we dont have any new agents being added at the time
if queue_lock == 0:
print("[*] New OSSEC agent added - triggering restart of service to add..")
subprocess.Popen("service ossec restart", stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True).wait()
counter = 0
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): pass
print("[*] The auto enrollment OSSEC Server is now listening on 9654")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# set is so that when we cancel out we can reuse port
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# bind to all interfaces on port 10900
ThreadedTCPServer.allow_reuse_address = True
t = ThreadedTCPServer(('', 9654), service)
# start the server and listen forever
try:
# start a threaded counter
thread.start_new_thread(ossec_monitor, ())
t.serve_forever()
except KeyboardInterrupt: print("[*] Exiting the automatic enrollment OSSEC daemon")
snippsat write Nov-15-2024, 05:48 PM:
Added code tag
Added code tag
