Responder/servers/QUIC.py
2025-04-03 15:16:00 -04:00

169 lines
6.1 KiB
Python

import asyncio
import logging
import ssl
import argparse
import netifaces
from utils import *
from aioquic.asyncio import serve
from aioquic.asyncio.protocol import QuicConnectionProtocol
from aioquic.quic.configuration import QuicConfiguration
from aioquic.quic.events import QuicEvent, StreamDataReceived, StreamReset, ConnectionTerminated
BUFFER_SIZE = 11000
def get_interface_ip(interface_name):
"""Get the IP address of a network interface."""
try:
# Get address info for the specified interface
addresses = netifaces.ifaddresses(interface_name)
# Get IPv4 address (AF_INET = IPv4)
if netifaces.AF_INET in addresses:
return addresses[netifaces.AF_INET][0]['addr']
# If no IPv4 address, try IPv6 (AF_INET6 = IPv6)
if netifaces.AF_INET6 in addresses:
return addresses[netifaces.AF_INET6][0]['addr']
logging.error(f"[!] No IP address found for interface {interface_name}")
return None
except ValueError:
logging.error(f"[!] Interface {interface_name} not found")
return None
class QUIC(QuicConnectionProtocol):
def __init__(self, *args, target_address=None, **kwargs):
super().__init__(*args, **kwargs)
self.tcp_connections = {} # stream_id -> (reader, writer)
self.target_address = target_address or "localhost"
def quic_event_received(self, event):
if isinstance(event, StreamDataReceived):
asyncio.create_task(self.handle_stream_data(event.stream_id, event.data))
elif isinstance(event, StreamReset) or isinstance(event, ConnectionTerminated):
# Only try to close connections if we have any
if self.tcp_connections:
asyncio.create_task(self.close_all_tcp_connections())
async def handle_stream_data(self, stream_id, data):
if stream_id not in self.tcp_connections:
# Create a new TCP connection to the target interface:445
try:
reader, writer = await asyncio.open_connection(self.target_address, 445)
self.tcp_connections[stream_id] = (reader, writer)
# Start task to read from TCP and write to QUIC
asyncio.create_task(self.tcp_to_quic(stream_id, reader))
logging.info(f"[*] Connected to {self.target_address}:445\n[*] Starting relaying process...")
print(text("[QUIC] Forwarding QUIC connection to SMB server"))
except Exception as e:
logging.error(f"[!] Error connecting to {self.target_address}:445: {e}")
return
# Forward data from QUIC to TCP
try:
_, writer = self.tcp_connections[stream_id]
writer.write(data)
await writer.drain()
except Exception as e:
logging.error(f"[!] Error writing to TCP: {e}")
await self.close_tcp_connection(stream_id)
async def tcp_to_quic(self, stream_id, reader):
try:
while True:
data = await reader.read(BUFFER_SIZE)
if not data:
break
self._quic.send_stream_data(stream_id, data)
self.transmit()
except Exception as e:
logging.error(f"[!] Error reading from TCP: {e}")
finally:
await self.close_tcp_connection(stream_id)
async def close_tcp_connection(self, stream_id):
if stream_id in self.tcp_connections:
_, writer = self.tcp_connections[stream_id]
writer.close()
await writer.wait_closed()
del self.tcp_connections[stream_id]
async def close_all_tcp_connections(self):
try:
# Make a copy of the keys to avoid modification during iteration
stream_ids = list(self.tcp_connections.keys())
for stream_id in stream_ids:
try:
await self.close_tcp_connection(stream_id)
except KeyError:
# Silently ignore if the stream ID no longer exists
pass
except Exception as e:
# Catch any other exceptions that might occur
logging.debug(f"[!] Error closing TCP connections: {e}")
async def start_quic_server(listen_interface, cert_path, key_path):
# Configure QUIC
configuration = QuicConfiguration(
alpn_protocols=["smb"],
is_client=False,
)
# Load certificate and private key
try:
configuration.load_cert_chain(cert_path, key_path)
except Exception as e:
logging.error(f"[!] Could not load {cert_path} and {key_path}: {e}")
return
# Resolve interfaces to IP addresses
listen_ip = listen_interface
if not is_ip_address(listen_interface):
listen_ip = get_interface_ip(listen_interface)
if not listen_ip:
logging.error(f"[!] Could not resolve IP address for interface {listen_interface}")
return
target_ip = listen_interface
if not is_ip_address(listen_interface):
target_ip = get_interface_ip(listen_interface)
if not target_ip:
logging.error(f"[!] Could not resolve IP address for interface {listen_interface}")
return
# Start QUIC server with correct protocol factory
server = await serve(
host=listen_ip,
port=443,
configuration=configuration,
create_protocol=lambda *args, **kwargs: QUIC(
*args,
target_address=target_ip,
**kwargs
)
)
logging.info(f"[*] Started listening on {listen_ip}:443 (UDP)")
logging.info(f"[*] Forwarding connections to {target_ip}:445 (TCP)")
# Keep the server running forever
await asyncio.Future()
def is_ip_address(address):
"""Check if a string is a valid IP address."""
import socket
try:
socket.inet_pton(socket.AF_INET, address)
return True
except socket.error:
try:
socket.inet_pton(socket.AF_INET6, address)
return True
except socket.error:
return False