Monday, November 7, 2016

Converting an ADT HL7 message to JSON

If anyone has ever had the pleasure of working the HL7 ADT messages in to exchange information between healthcare systems, you'll know the frustration of trying to actually using that information in a meaningful way (i.e. trying to work with it without having to read 200 pages of documentation to understand what the different segments of an HL7 message are).

Although the HL7 standard makes it less verbose when sending the information along, I prefer working with JSON objects as opposed to pipe delimited strings. If you are in the same boat, then you can use the following two python functions to convert an HL7 message to an easier to digest/understand. (Note these two functsion depend on the hl7apy python library)

Yes, these functions return a python dictionary and not a JSON object, but you can trivially convert a dictionary to a JSON string.
import json

from hl7apy.parser import parse_message

# Taken from http://hl7apy.org/tutorial/index.html#elements-manipulation
s = """MSH|^~\&|GHH_ADT||||20080115153000||ADT^A01^ADT_A01|0123456789|P|2.5||||AL
EVN||20080115153000||AAA|AAA|20080114003000
PID|1||566-554-3423^^^GHH^MR||EVERYMAN^ADAM^A|||M|||2222 HOME STREET^^ANN ARBOR^MI^^USA||555-555-2004~444-333-222|||M
NK1|1|NUCLEAR^NELDA^W|SPO|2222 HOME STREET^^ANN ARBOR^MI^^USA"""

# Convert it
d = hl7_str_to_dict(s)

# Dump it as a JSON string
print json.dumps(d)
Hope this helps someone who appreciates new data representations more than old data representations ;)

Thursday, September 22, 2016

Simple open_sftp() context manager for sftp read and writing of files

I had to do some reading/writing of files from an SFTP source/destination and ended up putting together a simple context manager for being able to do this so that it follows the same general interface as open() for local files. Usage is as simple as
from open_sftp import open_sftp

path = "sftp://user:p@ssw0rd@test.com/path/to/file.txt"

# Read a file
with open_sftp(path) as f:
    s = f.read() 
print s

# Write to a file
with open_sftp(path, mode='w') as f:
    f.write("Some content.") 
It's as simple as that. The full code can be found as a gist on GitHub. Note: This assumes that the directory already exists, but one could modify this trivially to create the path automatically by adding the details from this StackOverflow thread.

Wednesday, August 17, 2016

Configuring the Python Elasticsearch Client to use TLSv1.1

We spent an hour trying to configure the python Elasticsearch client to work over SSL. In the end, it is a very easy solution (and it's even partially documented!), but in case any one else runs in to the issue... here is what the symptom and the solution was.

Basic Setup

First, here was snippet of code that we were using to connect to our Elasticsearch instance. (Note,obviously that IP address isn't the one we are actually using)
from elasticsearch import ElasticSearch

es = ElasticSearch(
    "hosts": [
        {
            "host": "123.45.67.890",
            "use_ssl": true
        }
    ]
)

print es.info()
If you were to look at the docs, you'd thing that this is all that you would have to do. Unfortunately, this (most likely) won't work. And if you are reading this, then it probably didn't work for you either.

Symptom & Diagnosis

Running the above snippet yielded the following error message:
ConnectionError: ConnectionError(HTTPSConnectionPool(host=u'123.45.67.890', port=9200): 
Max retries exceeded with url: / (Caused by : )) 
caused by: MaxRetryError(HTTPSConnectionPool(host=u'123.45.67.890', port=9200):
Max retries exceeded with url: / (Caused by : ))

We then checked in a regular browser to make sure that we can actually reach the Elasticsearch server (i.e. visited https://123.45.67.890:9200) and we indeed were able to connect and we received a nice response with some basic config details.

Following this we did a tcpdump to make sure that we actually were able to connect the the Elasticsearch server, and, as you might expect, according to the dump, a TCP connection was being made. More specifically, we did:

sudo tcpdump -n host 123.45.67.890
With a result that included valid connections and responses from the server:
...
16:50:49.060077 IP 10.1.248.172.49322 > 123.45.67.890.9200: Flags [S], seq 4274669687, 
    win 65535, options [mss 1366,nop,wscale 5,nop,nop,TS val 1362130305 ecr 0,
    sackOK,eol], length 0
16:50:49.125589 IP 10.1.248.172.49322 > 123.45.67.890.9200: Flags [.], ack 1, 
    win 8192, length 0
16:50:49.127457 IP 10.1.248.172.49322 > 123.45.67.890.9200: Flags [P.], seq 1:96, 
    ack 1, win 8192, length 95
...

So, by the looks of it, we were able to connect to the server with a browser, AND our python snippet was correctly sending data to our server, but things were not working. After some head scratching we looked at the logs on the Elasticsearch server (in our case that was in /var/log/messages)and discovered the following interesting error:

javax.net.ssl.SSLHandshakeException: Client requested protocol TLSv1 
    not enabled or not supported
  at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1431)
  at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
  at sun.security.ssl.SSLEngineImpl.readNetRecord(SSLEngineImpl.java:813)
  at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:781)
  at javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:624)
  at org.jboss.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1218)
  at org.jboss.netty.handler.ssl.SslHandler.decode(SslHandler.java:852)
  at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(
    FrameDecoder.java:425)
  at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
  at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(
    SimpleChannelUpstreamHandler.java:70)
  at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(
    DefaultChannelPipeline.java:564)
  at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(
    DefaultChannelPipeline.java:559)
  at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
  at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
  at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
  at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(
    AbstractNioWorker.java:108)
  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(
    AbstractNioSelector.java:337)
  at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
  at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
  at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
  at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

It looks like by default the Elasticsearch client uses TLSv1. Now, most machines (correctly) have TLSv1 disabled due to known vulnerabilities. But don't worry, before getting upset about having to downgrade to an insecure TLSv1, there is a very easy solution to this problem.

Solution

The only thing that you have to change when you setup the client it to make it use the RequestsHttpConnection. It's really as simple as that.
from elasticSearch import ElasticSearch, RequestsHttpConnection 

es = ElasticSearch(
    "hosts": [
        {
            "host": "123.45.67.890",
            "use_ssl": true
        }
    ],
    connection_class=RequestsHttpConnection
)

print es.info()

Note this will require you to install the requests library.

In the documentation this functionality is described when using it to connect to AWS with IAM, but not as how one should set it up to use TLSv1.1. Well, I guess now we know.

Hopefully this saves someone some pain and frustration!

Monday, July 25, 2016

Start of day in UTC timezone

Since all of our times are stored in UTC in our database (hopefully yours are as well!), getting all items created "yesterday" is not as straightforward as one would like. Since I have to look up how to get the correct UTC time, I figured I'd simply write it down.
from datetime import datetime, date, time, timedelta

utc_offset = datetime.utcnow() - datetime.now()

today_start = datetime.combine(date.today(), time())
today_start += utc_offset
today_end = today_start + timedelta(hours=24)
Hopefully this saves someone a little time.

Thursday, January 14, 2016

Web Application Data Input Validation and Easy Documenting (Flask) Routes in one Fell Swoop

One of the things that has always bugged me about input validation for web projects (in particular the one we are building at work) was that it was done inside the endpoint. Furthermore, when one does the validation programmatically inside the endpoint function, it is hard to get a good "definition" of the method -- i.e. there is no easy way to know exactly what valid inputs are. The usual way that people solve this problem is through documentation. The trouble with normal function documentation is that it can get stale very easily (it's much to easy to forget to update the documentation when updating part of the code). As such I wanted input validation to achieve two goals simultaneously:
  1. Input validation for HTTP parameters and JSON data
  2. API Documentation (that doesn't go stale)
For the sake of simplicity I will be using the python micro-framework Flask for all of the examples below, but this can be applied to pretty much any framework.

First, let's see two examples of things that are commonly done: validating HTTP parameters and validating JSON data
@route('/foo')
def foo():
    try:
        param1 = request.args['param1']
    except:
        return "param1 missing", 400
    try:
        param2 = int(request.args.get('param2'))
    except:
        return "param2 needs to be an int", 400
    # Do stuff

@route('/bar')
def bar():
    try:
        param1 = request.json['param1']
    except:
        return "param1 missing", 400
    try:
        param2 = int(request.json.get('param2'))
    except:
        return "param2 needs to be an int", 400
    # Do stuff
So, when one looks at those functions, the only way to know that param1 is required and param2 needs to be an integer is to actually look at the code. Not only is ugly and hard to maintain, it is also hard to understand.

One potential way to validate input JSON is to use JSON Schema. (There is even a flask-jsonschema project for exactly this). The pitfall with this is that it depends on the JSON Schema standard that has a couple of limitations (e.g. hard to do validation across attributes). Instead I turned to the object serialization library marshmallow that has fantastic object validation methods. So, continuing with the above examples, we can create a marshmallow schema to define the valid input parameters.

MySchema(Schema):
    param1 = fields.Str(required=True)
    param2 = fields.Int()
Now that we have a schema to help us validate the input, we need a way to actually apply it to the input. We can accomplish this by using a decorator that applies the given schema to the input parameters and/or the JSON data -- I happened to call this decorator ensure. The function is a bit long so I won't include it here, but it is available in full here. Since both routes happen to have the same validation needs, we simply need to say where to apply the schema.
@route('/foo')
@ensure(params=MySchema)
def foo():
    pass

@route('/bar')
@ensure(input=MySchema)
def bar():
    pass
While defining explicit marshmallow schemas in advance is great if the same schema is reused (as in the above example), it is a painstaking process when having to generate explicit schemas for routes that are all different. As such, with a little bit of coding, we can create Schemas on the fly after being defined as a dictionary.
def build_schema_from_dict(d, allow_nested=True):
    """Build a Marshmallow schema based on a dictionary of parameters

    :param d: The dict of parameters to use to build the Schema
    :param allow_nested: Whether or not nested schemas are allowed. If
                         ``True`` then a fields.Nested() will be created
                         when there is a nested value.
    :return: A Marshmallow schema based on the dictionary
    """
    for k, v in d.iteritems():
        if isinstance(v, tuple):
            schema = v[0]
            if len(v) > 1:
                opts = v[1]
        elif isinstance(v, dict):
            schema = v
            opts = {}
        else:
            continue

        if not allow_nested:
            raise ValueError("Nested attributes not allowed.")

        # Recursively generate the nested schema(s)
        schema = build_schema_from_dict(schema)

        # Update the current dict with the Nested schema
        d[k] = fields.Nested(schema, **opts)

    return type('Schema', (Schema, ), d)
Combining the above function build_schema_from_dict and our ensure decorator, we can achieve the following
@route('/foo')
@ensure(
    params={
        'param1': fields.Str(required=True),
        'param2': fields.Int()
    })
def foo():
    # Do stuff. Oh, and request.params is a dict that has all of the 
    # validated values from the request. yay!
    pass

@route('/bar')
@ensure(
    input={
        'param1': fields.Str(required=True),
        'param2': fields.Int()
    })
def bar():
    # Do stuff. Oh, and request.input is a dict that has all of the 
    # validated values from the request. yay!
    pass
Hurray! Now we have an easy to use way to both validate and document input by just decorating routes with the ensure decorator. Win, win!

If you look at the details of the ensure function available in full here, you will see that I add the validated data to the request object, so that it is easy to get the correctly typed data in the function

Happy coding!