Agent-Spice/agent_spice.py
chaosnet fb2ffcc82c adding agent_spice.py
Initial commit - Alpha version
2025-03-10 22:45:12 -04:00

150 lines
5.4 KiB
Python

import random
import time
import webbrowser
import threading
import yaml
import requests
import subprocess
import logging
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
# Configure logging
logging.basicConfig(
filename="agent_spice.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
def load_config(config_path="config.yaml"):
"""Loads configuration from a YAML file."""
with open(config_path, "r") as file:
return yaml.safe_load(file)
def get_random_user_agent(config):
"""Selects a user-agent profile from the available options in the config."""
user_agents = [ua for ua in config if ua.startswith("user_agent_platform_")]
if not user_agents:
return UserAgent().random # Default to a fully random UA if no profiles exist
profile_id = random.choice(range(1, len(user_agents) + 1))
platform = config.get(f"user_agent_platform_{profile_id}", "random")
browser = config.get(f"user_agent_browser_{profile_id}", "random")
ua = UserAgent()
if platform == "random" and browser == "random":
return ua.random
elif platform == "random":
return getattr(ua, browser, ua.random)
elif browser == "random":
return getattr(ua, platform, ua.random)
else:
try:
return getattr(ua, f"{browser}_{platform}", ua.random)
except AttributeError:
return ua.random
def fetch_top_sites(source_url):
"""Fetches a list of top websites from an external source specified in the config."""
try:
response = requests.get(source_url, timeout=10)
if response.status_code == 200:
return [f"https://{site}" for site in response.text.splitlines()]
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching top sites: {e}")
return []
def scrape_links(site, config):
"""Scrapes relevant links from a visited page."""
headers = {"User-Agent": get_random_user_agent(config)}
try:
response = requests.get(site, headers=headers, timeout=10)
if response.status_code != 200:
return []
soup = BeautifulSoup(response.text, "html.parser")
all_links = [a["href"] for a in soup.find_all("a", href=True)]
# Filtering unwanted links
filtered_links = [
link for link in all_links
if link.startswith("http") and not any(
excluded in link.lower() for excluded in ["privacy", "terms", "login", "logout", "account", "settings", "gov", "compliance", "policy"]
)
]
return filtered_links
except requests.exceptions.RequestException as e:
logging.error(f"Error scraping links from {site}: {e}")
return []
def visit_site(site, use_browser=True, depth=2, config=None):
"""Visits a site and follows relevant links up to the specified depth."""
headers = {"User-Agent": get_random_user_agent(config)}
try:
logging.info(f"Visiting site: {site}")
if use_browser:
webbrowser.open(site)
else:
requests.get(site, headers=headers, timeout=10)
for _ in range(depth):
links = scrape_links(site, config)
if not links:
break
site = random.choice(links)
logging.info(f"Following link: {site}")
if use_browser:
webbrowser.open(site)
else:
requests.get(site, headers=headers, timeout=10)
except requests.exceptions.RequestException as e:
logging.error(f"Error visiting {site}: {e}")
def perform_dns_query(config):
"""Performs randomized DNS queries to generate additional noise."""
dns_servers = config.get("dns_servers", [])
if not dns_servers:
logging.warning("No DNS servers configured in YAML file. Skipping DNS query.")
return
random_domain = f"random{random.randint(1000, 9999)}.com"
server = random.choice(dns_servers)
try:
subprocess.run(["nslookup", random_domain, server], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
logging.info(f"Performed DNS query for {random_domain} using {server}")
except Exception as e:
logging.error(f"Error performing DNS query: {e}")
def load_random_sites(config):
"""Main loop to open random websites at specified intervals."""
site_list = []
if config.get("websites", False):
if isinstance(config["websites"], list):
site_list.extend(config["websites"])
if not site_list:
logging.error("No websites to visit. Exiting...")
return
while True:
site = random.choice(site_list)
use_browser = random.choice(config["use_browser"])
depth = config.get("link_follow_depth", 2)
visit_site(site, use_browser, depth, config)
if config.get("enable_dns_queries", False):
perform_dns_query(config)
wait_time = random.randint(config["interval_min"], config["interval_max"]) if config["randomize"] else config["interval_min"]
logging.info(f"Sleeping for {wait_time} seconds")
time.sleep(wait_time)
def start_traffic_generation(config_path="config.yaml"):
config = load_config(config_path)
thread = threading.Thread(target=load_random_sites, args=(config,))
thread.daemon = True
thread.start()
logging.info("Agent Spice started...")
if __name__ == "__main__":
start_traffic_generation()