import socket import struct class DNSServer: def __init__(self, host='0.0.0.0', port=53): self.host = host self.port = port self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket.bind((self.host, self.port)) print(f"DNS Server started on {self.host}:{self.port}") def run(self): while True: try: data, addr = self.socket.recvfrom(512) print(f"Received query from {addr}") response = self.handle_query(data) self.socket.sendto(response, addr) except Exception as e: print(f"Error: {e}") def handle_query(self, data): # 解析查询 header = data[:12] query_id, flags, qdcount, ancount, nscount, arcount = struct.unpack('!HHHHHH', header) # 构建响应头部 # 设置响应标志 flags = 0x8400 # 标准查询响应,无错误 # 解析查询部分 offset = 12 queries = [] for _ in range(qdcount): qname, offset = self._parse_name(data, offset) qtype, qclass = struct.unpack('!HH', data[offset:offset+4]) offset += 4 queries.append((qname, qtype, qclass)) # 构建响应 response = header[:2] # 保留查询ID response += struct.pack('!H', flags) response += struct.pack('!HHHH', qdcount, 0, 0, 0) # 暂时没有回答、权威和附加记录 # 添加查询部分 for qname, qtype, qclass in queries: response += self._encode_name(qname) response += struct.pack('!HH', qtype, qclass) return response def _parse_name(self, data, offset): parts = [] while True: length = data[offset] if length == 0: offset += 1 break offset += 1 parts.append(data[offset:offset+length].decode('utf-8')) offset += length return '.'.join(parts), offset def _encode_name(self, name): encoded = b'' for part in name.split('.'): encoded += struct.pack('!B', len(part)) encoded += part.encode('utf-8') encoded += b'\x00' # 结束标记 return encoded if __name__ == '__main__': server = DNSServer() server.run()