150 lines
5.4 KiB
Python
150 lines
5.4 KiB
Python
import random
|
|
import time
|
|
import webbrowser
|
|
import threading
|
|
import yaml
|
|
import requests
|
|
import subprocess
|
|
import logging
|
|
from bs4 import BeautifulSoup
|
|
from fake_useragent import UserAgent
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
filename="agent_spice.log",
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s"
|
|
)
|
|
|
|
def load_config(config_path="config.yaml"):
|
|
"""Loads configuration from a YAML file."""
|
|
with open(config_path, "r") as file:
|
|
return yaml.safe_load(file)
|
|
|
|
def get_random_user_agent(config):
|
|
"""Selects a user-agent profile from the available options in the config."""
|
|
user_agents = [ua for ua in config if ua.startswith("user_agent_platform_")]
|
|
if not user_agents:
|
|
return UserAgent().random # Default to a fully random UA if no profiles exist
|
|
|
|
profile_id = random.choice(range(1, len(user_agents) + 1))
|
|
platform = config.get(f"user_agent_platform_{profile_id}", "random")
|
|
browser = config.get(f"user_agent_browser_{profile_id}", "random")
|
|
|
|
ua = UserAgent()
|
|
if platform == "random" and browser == "random":
|
|
return ua.random
|
|
elif platform == "random":
|
|
return getattr(ua, browser, ua.random)
|
|
elif browser == "random":
|
|
return getattr(ua, platform, ua.random)
|
|
else:
|
|
try:
|
|
return getattr(ua, f"{browser}_{platform}", ua.random)
|
|
except AttributeError:
|
|
return ua.random
|
|
|
|
def fetch_top_sites(source_url):
|
|
"""Fetches a list of top websites from an external source specified in the config."""
|
|
try:
|
|
response = requests.get(source_url, timeout=10)
|
|
if response.status_code == 200:
|
|
return [f"https://{site}" for site in response.text.splitlines()]
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"Error fetching top sites: {e}")
|
|
return []
|
|
|
|
def scrape_links(site, config):
|
|
"""Scrapes relevant links from a visited page."""
|
|
headers = {"User-Agent": get_random_user_agent(config)}
|
|
try:
|
|
response = requests.get(site, headers=headers, timeout=10)
|
|
if response.status_code != 200:
|
|
return []
|
|
soup = BeautifulSoup(response.text, "html.parser")
|
|
all_links = [a["href"] for a in soup.find_all("a", href=True)]
|
|
|
|
# Filtering unwanted links
|
|
filtered_links = [
|
|
link for link in all_links
|
|
if link.startswith("http") and not any(
|
|
excluded in link.lower() for excluded in ["privacy", "terms", "login", "logout", "account", "settings", "gov", "compliance", "policy"]
|
|
)
|
|
]
|
|
return filtered_links
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"Error scraping links from {site}: {e}")
|
|
return []
|
|
|
|
def visit_site(site, use_browser=True, depth=2, config=None):
|
|
"""Visits a site and follows relevant links up to the specified depth."""
|
|
headers = {"User-Agent": get_random_user_agent(config)}
|
|
try:
|
|
logging.info(f"Visiting site: {site}")
|
|
if use_browser:
|
|
webbrowser.open(site)
|
|
else:
|
|
requests.get(site, headers=headers, timeout=10)
|
|
|
|
for _ in range(depth):
|
|
links = scrape_links(site, config)
|
|
if not links:
|
|
break
|
|
site = random.choice(links)
|
|
logging.info(f"Following link: {site}")
|
|
if use_browser:
|
|
webbrowser.open(site)
|
|
else:
|
|
requests.get(site, headers=headers, timeout=10)
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"Error visiting {site}: {e}")
|
|
|
|
def perform_dns_query(config):
|
|
"""Performs randomized DNS queries to generate additional noise."""
|
|
dns_servers = config.get("dns_servers", [])
|
|
if not dns_servers:
|
|
logging.warning("No DNS servers configured in YAML file. Skipping DNS query.")
|
|
return
|
|
random_domain = f"random{random.randint(1000, 9999)}.com"
|
|
server = random.choice(dns_servers)
|
|
try:
|
|
subprocess.run(["nslookup", random_domain, server], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
logging.info(f"Performed DNS query for {random_domain} using {server}")
|
|
except Exception as e:
|
|
logging.error(f"Error performing DNS query: {e}")
|
|
|
|
def load_random_sites(config):
|
|
"""Main loop to open random websites at specified intervals."""
|
|
site_list = []
|
|
|
|
if config.get("websites", False):
|
|
if isinstance(config["websites"], list):
|
|
site_list.extend(config["websites"])
|
|
|
|
if not site_list:
|
|
logging.error("No websites to visit. Exiting...")
|
|
return
|
|
|
|
while True:
|
|
site = random.choice(site_list)
|
|
use_browser = random.choice(config["use_browser"])
|
|
depth = config.get("link_follow_depth", 2)
|
|
visit_site(site, use_browser, depth, config)
|
|
|
|
if config.get("enable_dns_queries", False):
|
|
perform_dns_query(config)
|
|
|
|
wait_time = random.randint(config["interval_min"], config["interval_max"]) if config["randomize"] else config["interval_min"]
|
|
logging.info(f"Sleeping for {wait_time} seconds")
|
|
time.sleep(wait_time)
|
|
|
|
def start_traffic_generation(config_path="config.yaml"):
|
|
config = load_config(config_path)
|
|
thread = threading.Thread(target=load_random_sites, args=(config,))
|
|
thread.daemon = True
|
|
thread.start()
|
|
logging.info("Agent Spice started...")
|
|
|
|
if __name__ == "__main__":
|
|
start_traffic_generation()
|