mirror of
https://github.com/lgandx/Responder.git
synced 2025-06-04 20:35:00 -07:00
169 lines
6.1 KiB
Python
169 lines
6.1 KiB
Python
import asyncio
|
|
import logging
|
|
import ssl
|
|
import argparse
|
|
import netifaces
|
|
from utils import *
|
|
from aioquic.asyncio import serve
|
|
from aioquic.asyncio.protocol import QuicConnectionProtocol
|
|
from aioquic.quic.configuration import QuicConfiguration
|
|
from aioquic.quic.events import QuicEvent, StreamDataReceived, StreamReset, ConnectionTerminated
|
|
|
|
BUFFER_SIZE = 11000
|
|
|
|
def get_interface_ip(interface_name):
|
|
"""Get the IP address of a network interface."""
|
|
try:
|
|
# Get address info for the specified interface
|
|
addresses = netifaces.ifaddresses(interface_name)
|
|
|
|
# Get IPv4 address (AF_INET = IPv4)
|
|
if netifaces.AF_INET in addresses:
|
|
return addresses[netifaces.AF_INET][0]['addr']
|
|
|
|
# If no IPv4 address, try IPv6 (AF_INET6 = IPv6)
|
|
if netifaces.AF_INET6 in addresses:
|
|
return addresses[netifaces.AF_INET6][0]['addr']
|
|
|
|
logging.error(f"[!] No IP address found for interface {interface_name}")
|
|
return None
|
|
except ValueError:
|
|
logging.error(f"[!] Interface {interface_name} not found")
|
|
return None
|
|
|
|
|
|
class QUIC(QuicConnectionProtocol):
|
|
def __init__(self, *args, target_address=None, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.tcp_connections = {} # stream_id -> (reader, writer)
|
|
self.target_address = target_address or "localhost"
|
|
|
|
def quic_event_received(self, event):
|
|
if isinstance(event, StreamDataReceived):
|
|
asyncio.create_task(self.handle_stream_data(event.stream_id, event.data))
|
|
elif isinstance(event, StreamReset) or isinstance(event, ConnectionTerminated):
|
|
# Only try to close connections if we have any
|
|
if self.tcp_connections:
|
|
asyncio.create_task(self.close_all_tcp_connections())
|
|
|
|
async def handle_stream_data(self, stream_id, data):
|
|
if stream_id not in self.tcp_connections:
|
|
# Create a new TCP connection to the target interface:445
|
|
try:
|
|
reader, writer = await asyncio.open_connection(self.target_address, 445)
|
|
self.tcp_connections[stream_id] = (reader, writer)
|
|
|
|
# Start task to read from TCP and write to QUIC
|
|
asyncio.create_task(self.tcp_to_quic(stream_id, reader))
|
|
|
|
logging.info(f"[*] Connected to {self.target_address}:445\n[*] Starting relaying process...")
|
|
print(text("[QUIC] Forwarding QUIC connection to SMB server"))
|
|
except Exception as e:
|
|
logging.error(f"[!] Error connecting to {self.target_address}:445: {e}")
|
|
return
|
|
|
|
# Forward data from QUIC to TCP
|
|
try:
|
|
_, writer = self.tcp_connections[stream_id]
|
|
writer.write(data)
|
|
await writer.drain()
|
|
except Exception as e:
|
|
logging.error(f"[!] Error writing to TCP: {e}")
|
|
await self.close_tcp_connection(stream_id)
|
|
|
|
async def tcp_to_quic(self, stream_id, reader):
|
|
try:
|
|
while True:
|
|
data = await reader.read(BUFFER_SIZE)
|
|
if not data:
|
|
break
|
|
|
|
self._quic.send_stream_data(stream_id, data)
|
|
self.transmit()
|
|
except Exception as e:
|
|
logging.error(f"[!] Error reading from TCP: {e}")
|
|
finally:
|
|
await self.close_tcp_connection(stream_id)
|
|
|
|
async def close_tcp_connection(self, stream_id):
|
|
if stream_id in self.tcp_connections:
|
|
_, writer = self.tcp_connections[stream_id]
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
del self.tcp_connections[stream_id]
|
|
|
|
async def close_all_tcp_connections(self):
|
|
try:
|
|
# Make a copy of the keys to avoid modification during iteration
|
|
stream_ids = list(self.tcp_connections.keys())
|
|
for stream_id in stream_ids:
|
|
try:
|
|
await self.close_tcp_connection(stream_id)
|
|
except KeyError:
|
|
# Silently ignore if the stream ID no longer exists
|
|
pass
|
|
except Exception as e:
|
|
# Catch any other exceptions that might occur
|
|
logging.debug(f"[!] Error closing TCP connections: {e}")
|
|
|
|
async def start_quic_server(listen_interface, cert_path, key_path):
|
|
# Configure QUIC
|
|
configuration = QuicConfiguration(
|
|
alpn_protocols=["smb"],
|
|
is_client=False,
|
|
)
|
|
|
|
# Load certificate and private key
|
|
try:
|
|
configuration.load_cert_chain(cert_path, key_path)
|
|
except Exception as e:
|
|
logging.error(f"[!] Could not load {cert_path} and {key_path}: {e}")
|
|
return
|
|
|
|
# Resolve interfaces to IP addresses
|
|
listen_ip = listen_interface
|
|
if not is_ip_address(listen_interface):
|
|
listen_ip = get_interface_ip(listen_interface)
|
|
if not listen_ip:
|
|
logging.error(f"[!] Could not resolve IP address for interface {listen_interface}")
|
|
return
|
|
|
|
target_ip = listen_interface
|
|
if not is_ip_address(listen_interface):
|
|
target_ip = get_interface_ip(listen_interface)
|
|
if not target_ip:
|
|
logging.error(f"[!] Could not resolve IP address for interface {listen_interface}")
|
|
return
|
|
|
|
# Start QUIC server with correct protocol factory
|
|
server = await serve(
|
|
host=listen_ip,
|
|
port=443,
|
|
configuration=configuration,
|
|
create_protocol=lambda *args, **kwargs: QUIC(
|
|
*args,
|
|
target_address=target_ip,
|
|
**kwargs
|
|
)
|
|
)
|
|
|
|
logging.info(f"[*] Started listening on {listen_ip}:443 (UDP)")
|
|
logging.info(f"[*] Forwarding connections to {target_ip}:445 (TCP)")
|
|
|
|
# Keep the server running forever
|
|
await asyncio.Future()
|
|
|
|
|
|
def is_ip_address(address):
|
|
"""Check if a string is a valid IP address."""
|
|
import socket
|
|
try:
|
|
socket.inet_pton(socket.AF_INET, address)
|
|
return True
|
|
except socket.error:
|
|
try:
|
|
socket.inet_pton(socket.AF_INET6, address)
|
|
return True
|
|
except socket.error:
|
|
return False
|