Advertisement
Sweetening

Untitled

Aug 5th, 2023
53
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.03 KB | None | 0 0
  1. from binascii import hexlify, unhexlify
  2. from cStringIO import StringIO
  3. from collections import namedtuple
  4. import sys
  5. from scapy.all import *
  6. from struct import pack, unpack
  7.  
  8. SccpMsg = namedtuple('SccpMsg', 'type klass handling header mandatory optional')
  9.  
  10. class Reader:
  11. def __init__(self, data):
  12. self.roff = 0
  13. self.data = data
  14.  
  15. def read(self, n):
  16. avail = self.data[self.roff:self.roff+n]
  17. self.advance(len(avail))
  18. return avail
  19.  
  20. def seek(self, off):
  21. self.roff = off
  22.  
  23. def advance(self, n):
  24. self.roff += n
  25.  
  26.  
  27. class ShortRead(Exception): pass
  28. class UnknownType(Exception): pass
  29.  
  30. def pop_u8(f):
  31. data = f.read(1)
  32. if len(data) != 1: raise ShortRead()
  33. return ord(data)
  34.  
  35. def pop_u16(f):
  36. data = f.read(2)
  37. if len(data) != 2: raise ShortRead()
  38. return unpack('!H', data)[0]
  39.  
  40. def pop_u32(f):
  41. data = f.read(4)
  42. if len(data) != 4: raise ShortRead()
  43. return unpack('!I', data)[0]
  44.  
  45.  
  46. '''
  47. header, num of mandatory parameters, allows optional
  48. '''
  49. SCCP_TYPES = {
  50. 0x11: (1, 3, True),
  51. 0x09: (0, 3, False),
  52. }
  53.  
  54. def decode_sccp(data):
  55. f = Reader(data)
  56.  
  57. type = pop_u8(f)
  58. u = pop_u8(f)
  59. message_handling = (u>>4)
  60. klass = (u & 0x0f)
  61.  
  62. hdr = ''
  63. if type in SCCP_TYPES:
  64. (header, n_mandatory, use_optional) = SCCP_TYPES[type]
  65.  
  66. hdr = f.read(header)
  67.  
  68. mandatory_offsets = []
  69. for i in range(n_mandatory):
  70. u = pop_u8(f)
  71. mandatory_offsets.append(f.roff + u - 1)
  72.  
  73. if use_optional:
  74. start_of_optional = f.roff + pop_u8(f)
  75. else:
  76. raise UnknownType(type)
  77.  
  78. mandatory_parameters = []
  79. for mp in mandatory_offsets:
  80. f.seek(mp)
  81.  
  82. size = pop_u8(f)
  83. value = f.read(size)
  84. assert(len(value) == size)
  85.  
  86. mandatory_parameters.append(value)
  87.  
  88. optional_parameters = []
  89. if use_optional:
  90. f.seek(start_of_optional)
  91. while True:
  92. parameter_name = pop_u8(f)
  93. if parameter_name == 0: break
  94. size = pop_u8(f)
  95. value = f.read(size)
  96. assert(len(value) == size)
  97. optional_parameters.append(value)
  98.  
  99. return SccpMsg(type, klass, message_handling, hdr, mandatory_parameters, optional_parameters)
  100.  
  101.  
  102.  
  103. def fragment_sccp(called, calling,
  104. data, fragsize=12):
  105. chunks = []
  106. for o in range(0, len(data), fragsize):
  107. chunks.append(data[o:o+fragsize])
  108.  
  109. for i in range(len(chunks)):
  110. chunk = chunks[i]
  111. f = StringIO()
  112. f.write(pack('!BBBBBBB',
  113. 0x11, # XUDT
  114. 0x01, # handling / class
  115. 0x0c, # hop counter
  116. 4, # first mandatory variable parameter is always at 4
  117. 4+len(called), # second mandatory variable parameter
  118. 4+len(called)+len(calling), # third mandatory variable parameter
  119. 4+len(called)+len(calling)+len(chunk)
  120. ))
  121.  
  122. if i == 0: first_segment = 1
  123. else: first_segment = 0
  124. remaining = len(chunks)-1-i
  125.  
  126. segmentation = pack('!B', ((first_segment<<7) + (1<<6) + remaining)) + '\xfa\xca\xde'
  127.  
  128. f.write(pack('!B', len(called)) + called)
  129. f.write(pack('!B', len(calling)) + calling)
  130. f.write(pack('!B', len(chunk)) + chunk)
  131. f.write(pack('!BB', 0x10, len(segmentation)) + segmentation)
  132. f.write('\x00')
  133.  
  134. yield f.getvalue()
  135.  
  136.  
  137. def decode_segment(chunk):
  138. segmentation = chunk.optional[0]
  139. u = ord(segmentation[0])
  140. first = u >> 7
  141. klass = (u >> 6) & 0x01
  142. remaining = u & 0x0f
  143. local_ref = reduce(lambda x,y: x + (y<<8), map(ord, segmentation[1:]))
  144. return (first, klass, remaining, local_ref)
  145.  
  146. def reassemble(chunks):
  147. assert(all(len(c.optional) == 1 for c in chunks))
  148. segments = [(decode_segment(c), c) for c in chunks]
  149.  
  150. # ensure the same local reference is spread across all fragments
  151. local_ref = None
  152. for s in segments:
  153. if local_ref is None: local_ref = s[0][3]
  154. assert(s[0][3] == local_ref)
  155.  
  156. segments = sorted(segments, key=lambda x: x[0][2], reverse=True)
  157.  
  158. reassembled = ''
  159. for s in segments:
  160. reassembled += s[1].mandatory[2]
  161. return reassembled
  162.  
  163.  
  164. def hexdump(data):
  165. for i in range(0, len(data), 8):
  166. sys.stdout.write('%04x ' % i)
  167. for j in range(i, min([i+8, len(data)])):
  168. sys.stdout.write('%02x ' % ord(data[j]))
  169. print('')
  170.  
  171. '''
  172. reassembled = reassemble([decode(c) for c in chunks[0:2]])
  173. hexdump(reassembled)
  174.  
  175. hexdump(decode(chunks[2]).mandatory[2])
  176. '''
  177.  
  178. M3UA = namedtuple('M3UA', 'version klass type')
  179. SS7 = namedtuple('SS7', 'opc dpc si ni mp sls')
  180.  
  181. class UnhandledVariant(Exception): pass
  182.  
  183. def decode_m3ua(f):
  184. version = pop_u8(f)
  185. if version != 1: raise UnhandledVariant(version)
  186. reserved = pop_u8(f)
  187. if reserved != 0: raise UnhandledVariant(reserved)
  188. klass = pop_u8(f)
  189. if klass != 1: raise UnhandledVariant(klass)
  190. type = pop_u8(f)
  191. if type != 1: raise UnhandledVariant(type)
  192.  
  193. length = pop_u32(f)
  194.  
  195. left = length - 8
  196. data = f.read(left)
  197. assert(len(data) == left)
  198.  
  199. g = Reader(data)
  200.  
  201. while True:
  202. tag = pop_u16(g)
  203. if tag == 0x210: break
  204. length = pop_u16(g)
  205. g.read(length-4)
  206.  
  207. if tag != 0x210: return
  208.  
  209. length = pop_u16(g)
  210.  
  211. opc = pop_u32(g)
  212. dpc = pop_u32(g)
  213. si = pop_u8(g)
  214. ni = pop_u8(g)
  215. mp = pop_u8(g)
  216. sls = pop_u8(g)
  217.  
  218. ss7 = SS7(opc, dpc, si, ni, mp, sls)
  219.  
  220. data = g.read(length-16)
  221. assert(len(data) == length-16)
  222.  
  223. sccp = decode_sccp(data)
  224.  
  225. return (M3UA(version, klass, type), ss7, sccp)
  226.  
  227.  
  228. def encode_m3ua(m3ua, ss7, sccp):
  229. f = StringIO()
  230.  
  231. f.write(pack('!BBBBIHH', m3ua.version, 0, m3ua.klass, m3ua.type, len(sccp)+16+8,
  232. 0x210, len(sccp)+16))
  233. f.write(pack('!IIBBBB', ss7.opc, ss7.dpc, ss7.si, ss7.ni, ss7.mp, ss7.sls))
  234. f.write(sccp)
  235.  
  236. return f.getvalue()
  237.  
  238.  
  239.  
  240. def sccp_segment(pkt, fragsize=12):
  241. # scapy does not support M3UA / SCCP
  242. data = pkt[SCTPChunkData].data
  243.  
  244. f = Reader(data)
  245. (m3ua, ss7, sccp) = decode_m3ua(f)
  246. # we require an SCCP UDT containing an upper TCAP to segment
  247. assert(sccp.type == 0x09) # fragment UDT only
  248. called = sccp.mandatory[0]
  249. calling = sccp.mandatory[1]
  250. tcap = sccp.mandatory[2]
  251.  
  252. tsn = pkt[SCTPChunkData].tsn
  253. stream_seq = pkt[SCTPChunkData].stream_seq
  254.  
  255. for xudt in fragment_sccp(called, calling, tcap):
  256. data = encode_m3ua(m3ua, ss7, xudt)
  257.  
  258. new_pkt = pkt.copy()
  259.  
  260. new_pkt[IP].len = None
  261. new_pkt[IP].chksum = None
  262.  
  263. new_pkt[SCTP].chksum = None
  264.  
  265. new_pkt[SCTPChunkData].tsn = tsn
  266. tsn += 1
  267. tsn &= 0xffffffff
  268.  
  269. new_pkt[SCTPChunkData].stream_seq = stream_seq
  270. stream_seq += 1
  271. stream_seq &= 0xffff
  272.  
  273. new_pkt[SCTPChunkData].data = data
  274. new_pkt[SCTPChunkData].len = None
  275.  
  276. yield new_pkt
  277.  
  278.  
  279. if __name__ == '__main__':
  280. import argparse
  281.  
  282. parser = argparse.ArgumentParser()
  283. parser.add_argument('input', help='input filename, expected to a pcap file, containing a single packet containing IP/SCTP/M3UA/SCCP UDT message')
  284. parser.add_argument('output', help='output filename')
  285. args = parser.parse_args()
  286.  
  287. pkts = rdpcap(args.input)
  288. assert(len(pkts) == 1)
  289. pkt = pkts[0]
  290. assert(IP in pkt)
  291. assert(SCTP in pkt)
  292. assert(SCTPChunkData in pkt)
  293.  
  294. pkts = sccp_segment(pkt)
  295.  
  296. wrpcap(args.output, pkts)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement