Python3 UDP

Memos to socket programming in python3.

1 socket

  • Tcp socket is a little different but common and reliable connection.
  • Linux must run python as sudo.
  • Python3 socket only deal with binary data, must encode user data.
  • Host IP must be actual address, not 127.0.0.1.

1.1 Python3 Coding Style

Add this info at head of every filename.py.

#!usr/bin/env python
# -*-coding:utf-8 -*-

'''
| author: demo@demo.com
| date: 08/24/2018
| desc: Learn Socket
| version: 0.1
'''

1.2 TCP Client

import socket

target_host = "www.baidu.com"
target_port = 80

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((target_host,target_port))
client.send("GET / HTTP/1.1\r\nHost: baidu.com\r\n\r\n".encode('utf-8'))
response = client.recv(4096)
print(response)

1.3 UDP Client

import socket

host = "192.168.11.113"
port = 32000
udp_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

while 1:
# data = 'hello'.encode(encoding='utf-8')
data = input('>>>')
if not data:
break
addr = (host, port)
### Python3 socket only deal with binary data, must encode user data.
udp_client.sendto(data.encode(encoding='utf-8'), addr)
print('Sending: ', addr, data)
data,addr = udp_client.recvfrom(1024)
print('Recv from', addr, data)

udp_client.close()

1.4 test

### Must run .py as sudo
sudo python udp.py

1.5 UDP Server

import socket

### Make sure the host's IP is empty to listen on all IP.
### Or actual IP addr, not 127.0.0.1 which make trouble.
ip_port=('',32000)
BUFSIZE=1024
udp_server=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
udp_server.bind(ip_port)
print('Waiting for connection...')

while True:
msg,addr=udp_server.recvfrom(BUFSIZE)
print('Recv from', addr, msg)
udp_server.sendto(msg, addr)
print('Send back', addr, msg)

udp_server.close()

2 Scan

  • TCP, UDP, ICMP are IP packets.
  • Use struct.unpack network stream to C strings.
  • Send UDP packets, and decode ICMP packets.

2.1 class IP

class IP(ctypes.Structure):
_fields_ = [
('ihl', ctypes.c_ubyte, 4),
('ver', ctypes.c_ubyte, 4),
('tos', ctypes.c_ubyte),
('len', ctypes.c_ushort),
('id', ctypes.c_ushort),
('offset', ctypes.c_ushort),
('ttl', ctypes.c_ubyte),
('pro', ctypes.c_ubyte),
('sum', ctypes.c_ushort),
('src', ctypes.c_uint),
('dst', ctypes.c_uint),
]

def __new__(self, socket_buffer):
return self.from_buffer_copy(socket_buffer)

def __init__(self, socket_buffer):
self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}
self.src_addr = socket.inet_ntoa(struct.pack("<L",self.src))
self.dst_addr = socket.inet_ntoa(struct.pack("<L",self.dst))
self.ihlen = self.ihl * 4

try:
self.protocol = self.protocol_map[self.pro]
except Exception as e:
self.protocol = str(self.pro)

2.2 class ICMP

class ICMP(ctypes.Structure):
_fields_ = [
('type', ctypes.c_ubyte),
('code', ctypes.c_ubyte),
('sum', ctypes.c_ushort),
('id', ctypes.c_ushort),
('seq', ctypes.c_ushort),
]

def __new__(self, socket_buffer):
return self.from_buffer_copy(socket_buffer)

def __init__(self, socket_buffer):
pass

2.3 scan.py

#!/usr/bin/env python
# -*-coding:utf-8 -*-

'''
| author: demo@echo.com
| date: 08/24/2018
| desc: UDP tools
| version: 0.1
'''

import socket
import os
import ctypes
import struct
import netaddr
import time
import threading
import fcntl

class ColorTxt:
imp = '\033[1;33m[!]\033[1;m '
err = '\033[1;35m[*]\033[1;m '
que = '\033[1;34m[?]\033[1;m '
ski = '\033[1;31m[-]\033[1;m '
tar = '\033[1;32m[+]\033[1;m '
run = '\033[1;36m[~]\033[1;m '

class IP(ctypes.Structure):
_fields_ = [
('ihl', ctypes.c_ubyte, 4),
('ver', ctypes.c_ubyte, 4),
('tos', ctypes.c_ubyte),
('len', ctypes.c_ushort),
('id', ctypes.c_ushort),
('offset', ctypes.c_ushort),
('ttl', ctypes.c_ubyte),
('pro', ctypes.c_ubyte),
('sum', ctypes.c_ushort),
('src', ctypes.c_uint),
('dst', ctypes.c_uint),
]

def __new__(self, socket_buffer):
return self.from_buffer_copy(socket_buffer)

def __init__(self, socket_buffer):
self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}
self.src_addr = socket.inet_ntoa(struct.pack("<L",self.src))
self.dst_addr = socket.inet_ntoa(struct.pack("<L",self.dst))
self.ihlen = self.ihl * 4

try:
self.protocol = self.protocol_map[self.pro]
except Exception as e:
self.protocol = str(self.pro)

class ICMP(ctypes.Structure):
_fields_ = [
('type', ctypes.c_ubyte),
('code', ctypes.c_ubyte),
('sum', ctypes.c_ushort),
('id', ctypes.c_ushort),
('seq', ctypes.c_ushort),
]

def __new__(self, socket_buffer):
return self.from_buffer_copy(socket_buffer)

def __init__(self, socket_buffer):
pass

HOST = "192.168.11.113"
SUBNET = '192.168.11.0/24'
MESSAGE = 'hello'.encode(encoding='utf-8')

def udp_sender(SUBNET, MESSAGE):
time.sleep(2)
sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for ip in netaddr.IPNetwork(SUBNET):
try:
sender.sendto(MESSAGE, ("%s" % ip, 32000))
except:
pass

def main():
if os.name=='nt':
socket_protocol = socket.IPPROTO_IP
else:
socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
sniffer.bind((HOST, 0))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

if os.name == 'nt':
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

t = threading.Thread(target = udp_sender, args = (SUBNET, MESSAGE))
t.start()

try:
while 1:
raw_buffer = sniffer.recvfrom(65565)[0]
ip_header = IP(raw_buffer)

buf = raw_buffer[ip_header.ihlen:ip_header.ihlen + ctypes.sizeof(ICMP)]
icmp_header = ICMP(buf)
# print ("[*] Received -> Protocol:%s, Source:%s, TTL:%d, Type:%d, Code:%d" % (ip_header.protocol, ip_header.src_addr, ip_header.ttl, icmp_header.type, icmp_header.code))
if icmp_header.code == 3 and icmp_header.type == 3:
if netaddr.IPAddress(ip_header.src_addr) in netaddr.IPNetwork(SUBNET):
# print(raw_buffer[len(raw_buffer) - len(MESSAGE):])
if raw_buffer[len(raw_buffer) - len(MESSAGE):] == MESSAGE:
print ('%s%s is up' % (ColorTxt.tar, ip_header.src_addr))
else:
print ('%s%s appears to be up' % (ColorTxt.tar, ip_header.src_addr))
except KeyboardInterrupt as e:
print(ColorTxt.err, 'Quit')
if os.name == 'nt':
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)


if __name__ == '__main__':
main()