Torrent Support

Following our recent introduction to Scan Providers, here’s a first implementation example. In this post we’ll see how to add support for Torrent files in Profiler. Of course, the implementation shown in this post will be available in the upcoming 2.5.0 release.

Let’s start by creating an entry in the configuration file:

[Torrent]
label = BitTorrent File
group = db
file = Torrent.py
allocator = torrentAllocator

For the automatic signature recognition we may rely on a simple one:

rule torrent
{
    strings:
        $sig = "d8:announce"

    condition:
        $sig at 0
}

Torrent files are encoded dictionaries and they usually start with the announce item. There’s no guarantee for that, but for now this simple matching should be good enough.

The encoded dictionary is in the Beconde format. Fortunately, someone already wrote the Python code to decode it:

#
# BEGIN OF 3RD PARTY CODE (adapted to work with Python 3)
#
# The contents of this file are subject to the BitTorrent Open Source License
# Version 1.1 (the License).  You may not copy or use this file, in either
# source code or executable form, except in compliance with the License.  You
# may obtain a copy of the License at http://www.bittorrent.com/license/.
#
# Software distributed under the License is distributed on an AS IS basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied.  See the License
# for the specific language governing rights and limitations under the
# License.

# Written by Petru Paler

def decode_int(x, f):
    f += 1
    newf = x.index(0x65, f)
    n = int(x[f:newf])
    if x[f] == 0x2D: # -
        if x[f + 1] == 0x30:
            raise ValueError
    elif x[f] == 0x30 and newf != f+1:
        raise ValueError
    return (n, newf+1)

def decode_string(x, f):
    colon = x.index(0x3A, f) # :
    n = int(x[f:colon])
    if x[f] == 0x30 and colon != f+1:
        raise ValueError
    colon += 1
    return (x[colon:colon+n], colon+n)

def decode_list(x, f):
    r, f = [], f+1
    while x[f] != 0x65: # e
        v, f = decode_func[x[f]](x, f)
        r.append(v)
    return (r, f + 1)

def decode_dict(x, f):
    r, f = {}, f+1
    while x[f] != 0x65: # e
        k, f = decode_string(x, f)
        r[k], f = decode_func[x[f]](x, f)
    return (r, f + 1)

decode_func = {}
decode_func[0x6C] = decode_list # l
decode_func[0x64] = decode_dict # d
decode_func[0x69] = decode_int  # i
decode_func[0x30] = decode_string
decode_func[0x31] = decode_string
decode_func[0x32] = decode_string
decode_func[0x33] = decode_string
decode_func[0x34] = decode_string
decode_func[0x35] = decode_string
decode_func[0x36] = decode_string
decode_func[0x37] = decode_string
decode_func[0x38] = decode_string
decode_func[0x39] = decode_string

def bdecode(x):
    try:
        r, l = decode_func[x[0]](x, 0)
    except (IndexError, KeyError, ValueError):
        return {}
    if l != len(x):
        return {}
    return r
    
#
# END OF 3RD PARTY CODE
#

We can now load the file and decode its dictionary:

class TorrentObject(CFFObject):

    def __init__(self):
        super(TorrentObject, self).__init__()
        self.SetObjectFormatName("TORRENT")
        self.SetDefaultEndianness(ENDIANNESS_LITTLE)
        self.tdict = None
        
    def GetDictionary(self):
        if self.tdict == None:
            size = min(self.GetSize(), MAX_TORRENT_SIZE)
            data = self.Read(0, size)
            self.tdict = bdecode(bytes(data))
        return self.tdict

class TorrentScanProvider(ScanProvider):

    def __init__(self):
        super(TorrentScanProvider, self).__init__()
        self.obj = None
        
        # ....

    def _clear(self):
        self.obj = None

    def _getObject(self):
        return self.obj

    def _initObject(self):
        self.obj = TorrentObject()
        self.obj.Load(self.getStream())
        d = self.obj.GetDictionary()
        return self.SCAN_RESULT_OK if len(d) != 0 else self.SCAN_RESULT_ERROR

We call the GetDictionary method first time in the _initObject method, so that the parsing occurs when we’re in another thread and we don’t stall the UI.

Let’s display the parsed dictionary to the user:

    def _getFormat(self):
        ft = FormatTree()
        ft.enableIDs(True)
        fi = ft.appendChild(None, self.FormatItem_Dictionary)
        return ft
        
    def _formatViewInfo(self, finfo):
        if finfo.fid >= 1 or finfo.fid - 1 < len(self.fi_names):
            finfo.text = self.fi_names[finfo.fid - 1]
            return True
        return False

    def _formatViewData(self, sdata):
        if sdata.fid == self.FormatItem_Dictionary:
            sdata.setViews(SCANVIEW_TEXT)
            txt = pprint.pformat(self.obj.GetDictionary())
            sdata.data.setData(txt)
            return True
        return False

Dictionary

This is the description extracted from Wikipedia of some of the keys:

  • announce—the URL of the tracker
  • info—this maps to a dictionary whose keys are dependent on whether one or more files are being shared:
    • name—suggested filename where the file is to be saved (if one file)/suggested directory name where the files are to be saved (if multiple files)
    • piece length—number of bytes per piece. This is commonly 28 KiB = 256 KiB = 262,144 B.
    • pieces—a hash list, i.e., a concatenation of each piece's SHA-1 hash. As SHA-1 returns a 160-bit hash, pieces will be a string whose length is a multiple of 160-bits.
    • length—size of the file in bytes (only when one file is being shared)
    • files—a list of dictionaries each corresponding to a file (only when multiple files are being shared). Each dictionary has the following keys:
      • path—a list of strings corresponding to subdirectory names, the last of which is the actual file name
      • length—size of the file in bytes.

While the dictionary already could suffice to extract all the information the user needs, we may want to present parts of the dictionary in an easier way to read.

First, we'd like to show to the user some meta-data information, which may be contained in the dictionary. To do that, we add a meta-data scan entry:

    def _startScan(self):
        d = self.obj.GetDictionary()
        if any(mk in d for mk in self.meta_keys):
            e = ScanEntryData()
            e.category = SEC_Privacy
            e.type = CT_MetaData
            self.addEntry(e)
        if self.obj.GetSize() > MAX_TORRENT_SIZE:
            e = ScanEntryData()
            e.category = SEC_Warn
            e.type = CT_UnaccountedSpace
            self.addEntry(e)
        return self.SCAN_RESULT_FINISHED

We also warn the user if the file exceeds the allowed maximum. We perform the whole scan logic in the UI thread, since we're not doing any CPU intensive operation and thus we return SCAN_RESULT_FINISHED, which causes the _threadScan method not be called.

Here we return the meta-data to the UI:

    def _scanViewData(self, xml, dnode, sdata):
        if sdata.type == CT_MetaData:
            d = self.obj.GetDictionary()
            out = proTextStream()
            for mk in self.meta_keys:
                if mk in d:
                    tmk = mk.decode("utf-8", errors="ignore")
                    if tmk == "creation date":
                        dt = self.obj.CreationDate()
                        tmv = dt.toString() if dt.isValid() else "?"
                    else:
                        tmv = d[mk].decode("utf-8", errors="ignore")
                    out._print(tmk)
                    out._print(": ")
                    out._print(tmv)
                    out.nl()
            sdata.setViews(SCANVIEW_TEXT)
            sdata.data.setData(out.buffer)
            return True

MetaData

Also it would be convenient to see the list of trackers and files. Let's start with the trackers:

class TorrentObject(CFFObject):

    # ...

    def GetTrackers(self):
        d = self.GetDictionary()
        trackers = []
        dup = set()
        if b"announce" in d and type(d[b"announce"]) is bytes:
            trackers.append(d[b"announce"])
            dup.add(trackers[0])
        if b"announce-list" in d:
            al = d[b"announce-list"]
            for a in al:
                if type(a) is list and len(a) > 0 and a[0] not in dup and type(a[0]) is bytes:
                    trackers.append(a[0])
                    dup.add(a[0])
        return trackers

def trackersViewCb(cv, trackers, code, view, data):
    if code == pvnInit:
        tv = cv.getView(1)
        tv.setColumnCount(1)
        labels = NTStringList()
        labels.append("Tracker")
        tv.setColumnLabels(labels)
        tv.setColumnCWidth(0, 70)
        tv.setRowCount(len(trackers))
        return 1
    elif code == pvnGetTableRow:
        if view.id() == 1:
            data.setText(0, trackers[data.row].decode("utf-8", errors="ignore"))
    return 0

class TorrentScanProvider(ScanProvider):

    # ...

    def _formatViewData(self, sdata):
        # ...
        elif sdata.fid == self.FormatItem_Trackers:
            sdata.setViews(SCANVIEW_CUSTOM)
            sdata.data.setData("")
            sdata.setCallback(trackersViewCb, self.obj.GetTrackers())
            return True
        return False

Trackers

When retrieving data from the dictionary, we also make sure that it is in the correct type, so that the code which handles this data won't end up generating an exception when trying to process an unexpected type.

And now the files:

class TorrentObject(CFFObject):

    # ...

    def GetFiles(self):
        d = self.GetDictionary()
        if not b"info" in d:
            return []
        d = d[b"info"]
        if not type(d) is dict:
            return []
        files = []
        if not b"files" in d: 
            if b"name" in d and type(d[b"name"]) is bytes:
                sz = d.get(b"length", 0)
                files.append((d[b"name"], sz if type(sz) is int else 0))
        else:
            flist = d[b"files"]
            if not type(flist) is list:
                return []
            for fd in flist:
                if type(fd) is dict:
                    if b"path" in fd:
                        pt = fd[b"path"]
                        if type(pt) is list and len(pt) > 0 and type(pt[0]) is bytes:
                            sz = fd.get(b"length", 0)
                            files.append((pt[0], sz if type(sz) is int else 0))
        return files

def filesViewCb(cv, files, code, view, data):
    if code == pvnInit:
        tv = cv.getView(1)
        tv.setColumnCount(2)
        labels = NTStringList()
        labels.append("Name")
        labels.append("Size")
        tv.setColumnLabels(labels)
        tv.setColumnCWidth(0, 70)
        tv.setColumnCWidth(1, 35)
        tv.setRowCount(len(files))
        return 1
    elif code == pvnGetTableRow:
        if view.id() == 1:
            data.setText(0, files[data.row][0].decode("utf-8", errors="ignore"))
            sz = files[data.row][1]
            data.setText(1, "%.02f MBs (%d bytes)" % (sz / 0x100000, sz))
    return 0

class TorrentScanProvider(ScanProvider):

    # ...

    def _formatViewData(self, sdata):
        # ...
        elif sdata.fid == self.FormatItem_Files:
            sdata.setViews(SCANVIEW_CUSTOM)
            sdata.data.setData("
") sdata.setCallback(filesViewCb, self.obj.GetFiles()) return True return False

Files

And that's it. Now again the whole code for a better overview:

from Pro.Core import *
from Pro.UI import pvnInit, pvnGetTableRow
import pprint

MAX_TORRENT_SIZE = 10485760 # 10 MBs

#
# BEGIN OF 3RD PARTY CODE (adapted to work with Python 3)
#
# The contents of this file are subject to the BitTorrent Open Source License
# Version 1.1 (the License).  You may not copy or use this file, in either
# source code or executable form, except in compliance with the License.  You
# may obtain a copy of the License at http://www.bittorrent.com/license/.
#
# Software distributed under the License is distributed on an AS IS basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied.  See the License
# for the specific language governing rights and limitations under the
# License.

# Written by Petru Paler

def decode_int(x, f):
    f += 1
    newf = x.index(0x65, f)
    n = int(x[f:newf])
    if x[f] == 0x2D: # -
        if x[f + 1] == 0x30:
            raise ValueError
    elif x[f] == 0x30 and newf != f+1:
        raise ValueError
    return (n, newf+1)

def decode_string(x, f):
    colon = x.index(0x3A, f) # :
    n = int(x[f:colon])
    if x[f] == 0x30 and colon != f+1:
        raise ValueError
    colon += 1
    return (x[colon:colon+n], colon+n)

def decode_list(x, f):
    r, f = [], f+1
    while x[f] != 0x65: # e
        v, f = decode_func[x[f]](x, f)
        r.append(v)
    return (r, f + 1)

def decode_dict(x, f):
    r, f = {}, f+1
    while x[f] != 0x65: # e
        k, f = decode_string(x, f)
        r[k], f = decode_func[x[f]](x, f)
    return (r, f + 1)

decode_func = {}
decode_func[0x6C] = decode_list # l
decode_func[0x64] = decode_dict # d
decode_func[0x69] = decode_int  # i
decode_func[0x30] = decode_string
decode_func[0x31] = decode_string
decode_func[0x32] = decode_string
decode_func[0x33] = decode_string
decode_func[0x34] = decode_string
decode_func[0x35] = decode_string
decode_func[0x36] = decode_string
decode_func[0x37] = decode_string
decode_func[0x38] = decode_string
decode_func[0x39] = decode_string

def bdecode(x):
    try:
        r, l = decode_func[x[0]](x, 0)
    except (IndexError, KeyError, ValueError):
        return {}
    if l != len(x):
        return {}
    return r
    
#
# END OF 3RD PARTY CODE
#

class TorrentObject(CFFObject):

    def __init__(self):
        super(TorrentObject, self).__init__()
        self.SetObjectFormatName("TORRENT")
        self.SetDefaultEndianness(ENDIANNESS_LITTLE)
        self.tdict = None
        
    def GetDictionary(self):
        if self.tdict == None:
            size = min(self.GetSize(), MAX_TORRENT_SIZE)
            data = self.Read(0, size)
            self.tdict = bdecode(bytes(data))
        return self.tdict
        
    def CreationDate(self):
        d = self.GetDictionary()
        cd = d.get(b"creation date", None)
        if cd == None or not type(cd) is int:
            return NTDateTime()
        return NTDateTime.fromMSecsSinceEpoch(cd * 1000)
        
    def GetTrackers(self):
        d = self.GetDictionary()
        trackers = []
        dup = set()
        if b"announce" in d and type(d[b"announce"]) is bytes:
            trackers.append(d[b"announce"])
            dup.add(trackers[0])
        if b"announce-list" in d:
            al = d[b"announce-list"]
            for a in al:
                if type(a) is list and len(a) > 0 and a[0] not in dup and type(a[0]) is bytes:
                    trackers.append(a[0])
                    dup.add(a[0])
        return trackers
        
    def GetFiles(self):
        d = self.GetDictionary()
        if not b"info" in d:
            return []
        d = d[b"info"]
        if not type(d) is dict:
            return []
        files = []
        if not b"files" in d: 
            if b"name" in d and type(d[b"name"]) is bytes:
                sz = d.get(b"length", 0)
                files.append((d[b"name"], sz if type(sz) is int else 0))
        else:
            flist = d[b"files"]
            if not type(flist) is list:
                return []
            for fd in flist:
                if type(fd) is dict:
                    if b"path" in fd:
                        pt = fd[b"path"]
                        if type(pt) is list and len(pt) > 0 and type(pt[0]) is bytes:
                            sz = fd.get(b"length", 0)
                            files.append((pt[0], sz if type(sz) is int else 0))
        return files

def trackersViewCb(cv, trackers, code, view, data):
    if code == pvnInit:
        tv = cv.getView(1)
        tv.setColumnCount(1)
        labels = NTStringList()
        labels.append("Tracker")
        tv.setColumnLabels(labels)
        tv.setColumnCWidth(0, 70)
        tv.setRowCount(len(trackers))
        return 1
    elif code == pvnGetTableRow:
        if view.id() == 1:
            data.setText(0, trackers[data.row].decode("utf-8", errors="ignore"))
    return 0
    
def filesViewCb(cv, files, code, view, data):
    if code == pvnInit:
        tv = cv.getView(1)
        tv.setColumnCount(2)
        labels = NTStringList()
        labels.append("Name")
        labels.append("Size")
        tv.setColumnLabels(labels)
        tv.setColumnCWidth(0, 70)
        tv.setColumnCWidth(1, 35)
        tv.setRowCount(len(files))
        return 1
    elif code == pvnGetTableRow:
        if view.id() == 1:
            data.setText(0, files[data.row][0].decode("utf-8", errors="ignore"))
            sz = files[data.row][1]
            data.setText(1, "%.02f MBs (%d bytes)" % (sz / 0x100000, sz))
    return 0

class TorrentScanProvider(ScanProvider):

    def __init__(self):
        super(TorrentScanProvider, self).__init__()
        self.obj = None
        self.meta_keys = [b"created by", b"creation date", b"comment"]
        
        # format item IDs
        self.FormatItem_Dictionary = 1
        self.FormatItem_Trackers = 2
        self.FormatItem_Files = 3
        # format item names
        self.fi_names = ["Dictionary", "Trackers", "Files"]

    def _clear(self):
        self.obj = None

    def _getObject(self):
        return self.obj

    def _initObject(self):
        self.obj = TorrentObject()
        self.obj.Load(self.getStream())
        d = self.obj.GetDictionary()
        return self.SCAN_RESULT_OK if len(d) != 0 else self.SCAN_RESULT_ERROR

    def _startScan(self):
        d = self.obj.GetDictionary()
        if any(mk in d for mk in self.meta_keys):
            e = ScanEntryData()
            e.category = SEC_Privacy
            e.type = CT_MetaData
            self.addEntry(e)
        if self.obj.GetSize() > MAX_TORRENT_SIZE:
            e = ScanEntryData()
            e.category = SEC_Warn
            e.type = CT_UnaccountedSpace
            self.addEntry(e)
        return self.SCAN_RESULT_FINISHED

    def _scanViewData(self, xml, dnode, sdata):
        if sdata.type == CT_MetaData:
            d = self.obj.GetDictionary()
            out = proTextStream()
            for mk in self.meta_keys:
                if mk in d:
                    tmk = mk.decode("utf-8", errors="ignore")
                    if tmk == "creation date":
                        dt = self.obj.CreationDate()
                        tmv = dt.toString() if dt.isValid() else "?"
                    else:
                        tmv = d[mk].decode("utf-8", errors="ignore")
                    out._print(tmk)
                    out._print(": ")
                    out._print(tmv)
                    out.nl()
            sdata.setViews(SCANVIEW_TEXT)
            sdata.data.setData(out.buffer)
            return True
        elif sdata.type == CT_UnaccountedSpace:
            sdata.setViews(SCANVIEW_TEXT)
            sdata.data.setData("The file size exceeds the maximum allowed one of %d bytes!" % (MAX_TORRENT_SIZE,))
            return True
        return False
        
    def _getFormat(self):
        ft = FormatTree()
        ft.enableIDs(True)
        fi = ft.appendChild(None, self.FormatItem_Dictionary)
        ft.appendChild(fi, self.FormatItem_Trackers)
        ft.appendChild(fi, self.FormatItem_Files)
        return ft
        
    def _formatViewInfo(self, finfo):
        if finfo.fid >= 1 or finfo.fid - 1 < len(self.fi_names):
            finfo.text = self.fi_names[finfo.fid - 1]
            return True
        return False
        
    def _formatViewData(self, sdata):
        if sdata.fid == self.FormatItem_Dictionary:
            sdata.setViews(SCANVIEW_TEXT)
            txt = pprint.pformat(self.obj.GetDictionary())
            sdata.data.setData(txt)
            return True
        elif sdata.fid == self.FormatItem_Trackers:
            sdata.setViews(SCANVIEW_CUSTOM)
            sdata.data.setData("
") sdata.setCallback(trackersViewCb, self.obj.GetTrackers()) return True elif sdata.fid == self.FormatItem_Files: sdata.setViews(SCANVIEW_CUSTOM) sdata.data.setData("
") sdata.setCallback(filesViewCb, self.obj.GetFiles()) return True return False def torrentAllocator(): return TorrentScanProvider()

We could still extract more information from the torrent file. For instance, we could show the list of hashes and to which portion of which file they belong to. If that's interesting for forensic purposes, we can easily add this view in the future.

Scan Providers

Version 2.5.0 is close to being released and comes with the last type of extension exposed to Python: scan providers. Scan providers extensions are not only the most complex type of extensions, but also the most powerful ones as they allow to add support for new file formats entirely from Python!

This feature required exposing a lot more of the SDK to Python and can’t be completely discussed in one post. This post is going to introduce the topic, while future posts will show real life examples.

Let’s start from the list of Python scan providers under Extensions -> Scan providers:

Scan provider extensions

This list is retrieved from the configuration file ‘scanp.cfg’. Here’s an example entry:

[TEST]
label = Test scan provider
ext = test2,test3
group = db
file = Test.py
allocator = allocator

The name of the section has two purposes: it specifies the name of the format being supported (in this case ‘TEST’) and also the name of the extension, which automatically is associated to that format (in this case ‘.test’, case insensitive). The hard limit for format names is 9 characters for now, this may change in the future if more are needed. The label is the description. The ext parameter is optional and specifies additional extensions to be associated to the format. group specifies the type of file which is being supported; available groups are: img, video, audio, doc, font, exe, manexe, arch, db, sys, cert, script. file specifies the Python source file and allocator the function which returns a new instance of the scan provider class.

Let’s start with the allocator:

def allocator():
    return TestScanProvider()

It just returns a new instance of TestScanProvider, which is a class dervided from ScanProvider:

class TestScanProvider(ScanProvider):

    def __init__(self):
        super(TestScanProvider, self).__init__()
        self.obj = None

Every scan provider has some mandatory methods it must override, let’s begin with the first ones:

    def _clear(self):
        self.obj = None

    def _getObject(self):
        return self.obj

    def _initObject(self):
        self.obj = TestObject()
        self.obj.Load(self.getStream())
        return self.SCAN_RESULT_OK

_clear gives a chance to free internal resources when they’re no longer used. In Python this is not usually important as member objects will automatically be freed when their reference count reaches zero.

_getObject must return the internal instance of the object being parsed. This must return an instance of a CFFObject derived class.

_initObject creates the object instance and loads the data stream into it. In the sample above we assume it being successful. Otherwise, we would have to return SCAN_RESULT_ERROR. This method is not called by the main thread, so that it doesn’t block the UI during long parse operations.

Let’s take a look at the TestObject class:

class TestObject(CFFObject):

    def __init__(self):
        super(TestObject, self).__init__()
        self.SetObjectFormatName("TEST")
        self.SetDefaultEndianness(ENDIANNESS_LITTLE)

This is a minimalistic implementation of a CFFObject derived class. Usually it should contain at least an override of the CustomLoad method, which gives the opportunity to fail when the data stream is first loaded through the Load method. SetDefaultEndianness wouldn’t even be necessary, as every object defaults to little endian by default. SetObjectFormatName, on the other hand, is very important, as it sets the internal format name of the object.

Let’s now take a look at how we scan a file:

    def _startScan(self):
        return self.SCAN_RESULT_OK
        
    def _threadScan(self):
        e = ScanEntryData()
        e.category = SEC_Warn
        e.type = CT_NativeCode
        self.addEntry(e)

The code above will issue a single warning concerning native code. When _startScan returns SCAN_RESULT_OK, _threadScan will be called from a thread other than the main UI one. The logic behind this is that _startScan is actually called from the main thread and if the scan of the file doesn’t require complex operations, like in the case above, then the method could return SCAN_RESULT_FINISHED and then _threadScan won’t be called at all. During a threaded scan, an abort by the user can be detected via the isAborted method.

From the UI side point of view, when a scan entry is clicked in summary, the scan provider is supposed to return UI information.

    def _scanViewData(self, xml, dnode, sdata):
        if sdata.type == CT_NativeCode:
            sdata.setViews(SCANVIEW_TEXT)
            sdata.data.setData("Hello, world!")
            return True
        return False

This will display a text field with a predefined content when the user clicks the scan entry in the summary. This is fairly easy, but what happens when we have several entries of the same type and need to differentiate between them? There’s where the data member of ScanEntryData plays a role, this is a string which will be included in the report xml and passed again back to _scanViewData as an xml node.

For instance:

e.data = "1234"

Becomes this in the final XML report:


    1234

The dnode argument of _scanViewData points to the ‘d’ node and its first child will be the ‘o’ node we passed. the xml argument represents an instance of the NTXml class, which can be used to retrieve the children of the dnode.

But this is only half of the story: some of the scan entries may represent embedded files (category SEC_File), in which case the _scanViewData method must return the data representing the file.

Apart from scan entries, we may also want the user to explore the format of the file. To do that we must return a tree representing the structure of our file:

    def _getFormat(self):
        ft = FormatTree()
        ft.enableIDs(True)
        fi = ft.appendChild(None, 1)
        ft.appendChild(fi, 2)
        return ft

The enableIDs method must be called right after creating a new FormatTree class. The code above creates a format item with id 1 with a child item with id 2, which results in the following:

Format tree

But of course, we haven’t specified neither labels nor different icons in the function above. This information is retrieved for each item when required through the following method:

    def _formatViewInfo(self, finfo):
        if finfo.fid == 1:
            finfo.text = "directory"
            finfo.icon = PubIcon_Dir
            return True
        elif finfo.fid == 2:
            finfo.text = "entry"
            return True
        return False

The various items are identified by their id, which was specified during the creation of the tree.

The UI data for each item is retrieved through the _formatViewData method:

    def _formatViewData(self, sdata):
        if sdata.fid == 1:
            sdata.setViews(SCANVIEW_CUSTOM)
            sdata.data.setData("
") sdata.setCallback(cb, None) return True return False

This will display a custom view with a table and a hex view separated by a splitter:

Custom view

Of course, also have specified the callback for our custom view:

def cb(cv, ud, code, view, data):
    if code == pvnInit:
        return 1
    return 0

It is good to remember that format item IDs and IDs used in custom views are used to encode bookmark jumps. So if they change, saved bookmark jumps become invalid.

And here again the whole code for a better overview:

from Pro.Core import *
from Pro.UI import pvnInit, PubIcon_Dir

class TestObject(CFFObject):

    def __init__(self):
        super(TestObject, self).__init__()
        self.SetObjectFormatName("TEST")
        self.SetDefaultEndianness(ENDIANNESS_LITTLE)

def cb(cv, ud, code, view, data):
    if code == pvnInit:
        return 1
    return 0

class TestScanProvider(ScanProvider):

    def __init__(self):
        super(TestScanProvider, self).__init__()
        self.obj = None

    def _clear(self):
        self.obj = None

    def _getObject(self):
        return self.obj

    def _initObject(self):
        self.obj = TestObject()
        self.obj.Load(self.getStream())
        return self.SCAN_RESULT_OK

    def _startScan(self):
        return self.SCAN_RESULT_OK
        
    def _threadScan(self):
        print("thread msg")
        e = ScanEntryData()
        e.category = SEC_Warn
        e.type = CT_NativeCode
        self.addEntry(e)

    def _scanViewData(self, xml, dnode, sdata):
        if sdata.type == CT_NativeCode:
            sdata.setViews(SCANVIEW_TEXT)
            sdata.data.setData("Hello, world!")
            return True
        return False
        
    def _getFormat(self):
        ft = FormatTree()
        ft.enableIDs(True)
        fi = ft.appendChild(None, 1)
        ft.appendChild(fi, 2)
        return ft
        
    def _formatViewInfo(self, finfo):
        if finfo.fid == 1:
            finfo.text = "directory"
            finfo.icon = PubIcon_Dir
            return True
        elif finfo.fid == 2:
            finfo.text = "entry"
            return True
        return False
        
    def _formatViewData(self, sdata):
        if sdata.fid == 1:
            sdata.setViews(SCANVIEW_CUSTOM)
            sdata.data.setData("
") sdata.setCallback(cb, None) return True return False def allocator(): return TestScanProvider()

If you have noticed from the screen-shot above, the analysed file is called ‘a.t’ and as such doesn’t automatically associate to our ‘test’ format. So how does it associate anyway?

Clearly Profiler doesn’t rely on extensions alone to identify the format of a file. For external scan providers a signature mechanism based on YARA has been introduced. In the config directory of the user, you can create a file named ‘yara.plain’ and insert your identification rules in it, e.g.:

rule test
{
    strings:
        $sig = "test"

    condition:
        $sig at 0
}

This rule will identify the format as ‘test’ if the first 4 bytes of the file match the string ‘test’: the name of the rule identifies the format.

The file ‘yara.plain’ will be compiled to the binary ‘yara.rules’ file at the first run. In order to refresh ‘yara.rules’, you must delete it.

One important thing to remember is that a rule isn’t matched against an entire file, but only against the first 512 bytes.

Of course, our provider behaves 100% like all other providers and can be used to load embedded files:

Embedded files

Our new provider is used automatically when an embedded file is identified as matching our format.

Profiler 2.4

Profiler 2.4 is out with the following news:

added initial support for PDB files (including export of types)
added support for Windows Encoded Scripts (VBE, JSE)
– introduced fixed xml structures
added automatic string decoding in struct tables
added Python string command line execution
– remember the last selected logic group
– fixed missing support for wchar_t in C types
– updated Qt to 5.4.1
– various bug fixes

While the most important newly introduced feature is the support for PDB files, here are some interesting new features:

Support for Windows Encoded Scripts (VBE, JSE)

Windows encoded scripts like VBE and JSE files (the encoded variants of VBS and JS script files) are now supported and automatically decoded.

In the screen-shot you can see the decoded output of an encoded file (showed at the bottom).

Automatic string decoding in struct tables

A very basic feature: byte-arrays in structures are automatically checked for strings and in case decoded.

(notice the section name automatically displayed as ascii string)

Python string command line execution

Apart from executing script files passed as command line arguments, now it is also possible to execute Python statements directly passed as argument.

For instance:

cerpro -c -e "from Pro.Core import *;proCoreContext().msgBox(0, \"Hello world!\")"

The optional argument ‘-c’ specifies to not display the UI.

Enjoy!

PDB support (including export of types)

The main feature of the upcoming 2.4 version of Profiler is the initial support for the PDB format. Our code doesn’t rely on the Microsoft DIA SDK and thus works also on OS X and Linux.

Since the PDB format is undocumented, this task would’ve been extremely difficult without the fantastic work on PDBs of the never too much revered Sven B. Schreiber.

Let’s open a PDB file.

As you can see the streams in the PDB can be explored. The TPI stream (the one describing types) offers further inspection.

All the types contained in the PDB can be exported to a Profiler header by pressing Ctrl+R and executing the ‘Dump types to header’ action.

Now the types can be used from both the hex editor and the Python SDK.

We can explore the dumped header by using, as usual, the Header Manager tool.

The type showed above in the hex editor is simple. So let’s look what a more complex PDB type may look like.


 
  
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

The PDB code is also exposed to the SDK. This is a small snippet of code, which dumps all the types to a text buffer and then displays them in a text view.

from Pro.Core import *
from Pro.UI import *
from Pro.PDB import *

def showPDBTypes():
    ctx = proContext()
    out = proTextStream()
    out.setIndentSize(4)

    obj = ctx.currentScanProvider().getObject()
    tpi = obj.GetStreamObject(PDB_STREAM_ID_TPI)
    tpihdr = obj.TPIHeader(tpi)
    tiMin = tpihdr.Num("tiMin")
    tiMax = tpihdr.Num("tiMax")
    tctx = obj.CreateTypeContext(tpi)
    for ti in range(tiMin, tiMax):
        tctx.DumpType(out, ti)

    view = ctx.createView(ProView.Type_Text, "PDB Test")
    view.setLanguage("XML")
    view.setText(out.buffer)
    ctx.addView(view)

showPDBTypes()

In order to dump all types to a single header, you can use the DumpAllToHeader method.

Profiler 2.3

Profiler 2.3 is out with the following news:

introduced YARA 3.2 support
added groups for logic providers
added Python action to encode/decode text
added Python action to strip XML down to text
added the possibility to choose the fixed font
added color randomization for structs and intervals
added close report and quit APIs
exposed more methods of the Report class (including save)
– improved indentation handling in the script editor
synchronized main and workspace output views
– improved output view
– updated libmagic to 5.21
– updated Capstone to 3.0
– many small improvements
– fixed libmagic on Linux
– removed the tray icon
– minor bug fixes

Logic provider groups

Logic providers can now be grouped in order to avoid clutter in the main window. Adding the following line to an existing logic provider will result in a new group being created:

group = Extra

Encode/decode text action

A handy Python action to convert from hex to text and vice-versa using all of Python’s supported encodings. Place yourself in a hex or text view and run the encoding/decoding action ‘Bytes to text’ or ‘Text to bytes’.

The operation will open a new text or hex view depending if it was an encoding or a decoding.

XML to text action

Strips tags from an XML and displays only the text. The action can be performed both on a hex and text view.

And it will open a new text view. This is useful to view the text of a DOCX or ODT document. In the future the preview for these documents will be made available automatically, but in the meantime this action is helpful.

Fixed font preferences

The fixed font used in most views can now be chosen from the ‘General’ settings.

Struct/intervals color randomization

When adding a structure or interval to the hex view the chosen color is now being randomized every time the dialog shows up. This behaviour can be disabled from the dialog itself and it’s also possible to randomize again the color by clicking on the specific refresh button.

Manually picking a different color for every interval is time consuming and so this feature should speed up raw data analysis.

Report APIs

Most of the report APIs have been exposed (check out the SDK documentation). This combined with the newly introduced ‘quit’ SDK method can be used to perform custom scans programmatically and save the resulting report.

Here’s a small example which can be launched from the command line:

from Pro.Core import *
import sys
 
ctx = proCoreContext()
 
def init():
    ctx.getSystem().addFile(sys.argv[1])
    return True
 
def rload():
    ctx.unregisterLogicProvider("test_logic")
    ctx.getReport().saveAs("auto.cpro")
    ctx.quit()
 
ctx.registerLogicProvider("test_logic", init, None, None, None, rload)
ctx.startScan("test_logic")

The command line syntax to run this script would be:

cerpro -r scan.py [file to scan]

The UI will show up and close automatically once the ‘quit’ method is called. Running this script in console mode using the ‘-c’ parameter is not yet possible, because of the differences in message handling on different platforms, but it will be in the future.

Synchronized output views

The output view of the main window and of the workspace are now synchronized, thus avoiding missing important log messages being printed in one or the other context.

Enjoy!

YARA 3.2.0 support

The upcoming 2.3 version of Profiler includes support for the latest YARA engine. This new release is scheduled for the first week of January and it will include YARA on all supported platforms.

One inherent technical advantage of having YARA support in Profiler is that it will be possible to scan for YARA rules inside embedded files/objects, like files in a Zip archive, in a CHM file, in an OLEStream, streams in a PDF, etc.

The YARA engine itself has been compiled with all standard modules (except for cuckoo). Even the magic module is available, since libmagic is also supported by Profiler.

The initial YARA integration comes as a hook extension, an action and Python SDK support. The YARA Python support is the official one and differs from it only in the import statement. You can run existing YARA Python code without modification by using the following import syntax:

import Pro.yara as yara

So let’s start a YARA scan. To do that, we need to enable the YARA hook extension. On Windows remember to configure Python in case you haven’t yet, since all extensions have been written in it.

When a scan is started, a YARA settings dialog will show up.

This dialog lets us choose various settings including the type of rules to load.

There are four possibilities. A simple text field containing YARA rules, a plain text rules file, a compiled rules file or a custom expression which must eval to a valid Rules object.

The report settings specify how we will be alerted of matches. The ‘only matches’ option makes sure that only files (or their sub-files) with a match will be included in the final report. The ‘add to meta-data” option causes the matches to be visible as meta-data strings of a file. The ‘as threats’ option reports every match as a 100% risk threat. The ‘print to output’ option prints the matches to the output view.

Since we had the ‘only matches’ option enabled, we will find only matching files in our final report.

And since we had also the ‘to meta-data’ option enabled, we will see the matches when opening a file in the workspace.

The YARA scan functionality comes also as an action when we find ourselves in a hex view. You can either scan the whole hex data or select a range. Then press Ctrl+R to run an action and select ‘YARA scan’.

In this case we won’t be given report options, since the only thing which can be performed is to print out matches in the output view.

Like this:

Of course, all supported platforms come also with the official YARA command line utility.

Since this has been a customer request for quite some time, I think it will be appreciated by some of our users.

Stripping symbols from an ELF

Just as the previous post about stripping symbols from a Mach-O binary, here’s one about stripping them from an ELF binary.

The syntax to execute the script is the same as in the previous post, only the called function changes:

cerpro -c -r path/to/strip.py:stripELF source destination

Here’s the code:

from Pro.Core import *
from Pro.ELF import *

def stripELF(srcname, dstname):
    oldc = createContainerFromFile(srcname)
    if oldc.isNull():
        print("error: couldn't open '%s'" % (srcname,))
        return
    obj = ELFObject()
    if not obj.Load(oldc):
        print("error: could't load ELF")
        return
    newc = oldc.copyToNewContainer()

    sects = obj.Sections()

    count = 0
    it = obj.SymbolTableSections().iterator()
    it.next()
    while it.hasNext():
        s = sects.At(it.next())
        # only the .symtab section
        if s.Num("sh_type") != 2:
            continue
        strtaboffs = sects.At(s.Num("sh_link")).Num("sh_offset")
        sit = obj.Symbols(s).iterator()
        while sit.hasNext():
            sym = sit.next()
            # only local symbols
            if sym.Num("st_shndx") == 0:
                continue
            nameoffs = sym.Num("st_name") + strtaboffs
            name, ret = obj.ReadUInt8String(nameoffs, 0x10000)
            newc.fill(nameoffs, 0, len(name))
            count += 1

    if newc.save(dstname):
        print("successfully stripped all %d local symbols!" % (count,))
    else:
        print("error: couldn't save stripped binary to '%s'" % (dstname,))

That’s it!

And here again the complete script which you can use to strip both a Mach-O and an ELF binary.

# strip.py

from Pro.Core import *
from Pro.MachO import *
from Pro.ELF import *

def stripMachO(srcname, dstname):
    oldc = createContainerFromFile(srcname)
    if oldc.isNull():
        print("error: couldn't open '%s'" % (srcname,))
        return
    obj = MachObject()
    if not obj.Load(oldc):
        print("error: could't load Mach-O")
        return
    obj.ProcessLoadCommands()
    newc = oldc.copyToNewContainer()

    symlc = obj.SymTableLC()
    stroffs = symlc.Num("stroff")

    count = 0
    it = obj.SymbolNList(symlc).iterator()
    while it.hasNext():
        syms = it.next()
        # only local symbols
        if syms.Num("sect") == 0:
            continue
        nameoffs = obj.AddressToOffset(syms.Num("strx") + stroffs)
        name, ret = obj.ReadUInt8String(nameoffs, 0x10000)
        newc.fill(nameoffs, 0, len(name))
        count += 1

    if newc.save(dstname):
        print("successfully stripped all %d local symbols!" % (count,))
    else:
        print("error: couldn't save stripped binary to '%s'" % (dstname,))

def stripELF(srcname, dstname):
    oldc = createContainerFromFile(srcname)
    if oldc.isNull():
        print("error: couldn't open '%s'" % (srcname,))
        return
    obj = ELFObject()
    if not obj.Load(oldc):
        print("error: could't load ELF")
        return
    newc = oldc.copyToNewContainer()

    sects = obj.Sections()

    count = 0
    it = obj.SymbolTableSections().iterator()
    it.next()
    while it.hasNext():
        s = sects.At(it.next())
        # only the .symtab section
        if s.Num("sh_type") != 2:
            continue
        strtaboffs = sects.At(s.Num("sh_link")).Num("sh_offset")
        sit = obj.Symbols(s).iterator()
        while sit.hasNext():
            sym = sit.next()
            # only local symbols
            if sym.Num("st_shndx") == 0:
                continue
            nameoffs = sym.Num("st_name") + strtaboffs
            name, ret = obj.ReadUInt8String(nameoffs, 0x10000)
            newc.fill(nameoffs, 0, len(name))
            count += 1

    if newc.save(dstname):
        print("successfully stripped all %d local symbols!" % (count,))
    else:
        print("error: couldn't save stripped binary to '%s'" % (dstname,))

The Linux x64 version of Profiler should be released soon. So stay tuned!

Stripping symbols from a Mach-O

A common mistake many developers do is to leave names of local symbols inside applications built on OS X. Using the strip utility combined with the compiler visibility flags is, unfortunately, not enough.

So I wrote a small script for Profiler to be run from the command line and I integrated it in the build process.

The syntax to execute the script is:

cerpro -c -r path/to/strip.py:stripMachO source destination

Here’s the whole code:

# name: strip.py

from Pro.Core import *
from Pro.MachO import *

def stripMachO(srcname, dstname):
    oldc = createContainerFromFile(srcname)
    if oldc.isNull():
        print("error: couldn't open '%s'" % (srcname,))
        return
    obj = MachObject()
    if not obj.Load(oldc):
        print("error: could't load Mach-o")
        return
    obj.ProcessLoadCommands()
    newc = oldc.copyToNewContainer()

    symlc = obj.SymTableLC()
    stroffs = symlc.Num("stroff")

    it = obj.SymbolNList(symlc).iterator()
    while it.hasNext():
        syms = it.next()
        # only local symbols
        if syms.Num("sect") == 0:
            continue
        nameoffs = obj.AddressToOffset(syms.Num("strx") + stroffs)
        name, ret = obj.ReadUInt8String(nameoffs, 0x10000)
        newc.fill(nameoffs, 0, len(name))

    if newc.save(dstname):
        print("successfully stripped all local symbols!")
    else:
        print("error: couldn't save stripped binary to '%s'" % (dstname,))

The code zeroes names of symbols which are not associated to any section in the binary.

Easy. 🙂

Profiler 2.1 (Mac OS X support)

The new version of Profiler is out and it includes support for Mac OS X!

Here’s the change-list for the current version:

– ported Cerbero Profiler to MacOSX
– added command-line scripting support
– updated Header Manager to Clang 3.5
– improved parsing of PE Delay Import Descriptors
– fixed some small issues

Enjoy!