Viaduct Engines View Source

Viaduct is built for speed and asynchronous processing with clear separations of responsibilities. The code can be divided into 5 engines that each perform their own work independent of the others. The engines are

  • Engine 1: The ECLRS extractor that takes the ECLRS export files from S3, weeds out the lines Viaduct has seen before, and inserts all new lines into the database.
  • Engine 2: The ECLRS->CommCare Transformer that discovers as-yet unprocessed lines from ECLRS, finds or creates a "person" (Viaduct's own, internal concept) and then creates or updates the records that will later be sent to CommCare: an "index case" and a "lab result".
  • Engine 3: The CommCare Enqueuer that discovers index cases that haven't been synched to CommCare since their last change, and puts them on the Oban queue for controlled processing.
  • Engine 4: The CommCare Case Loader that takes one index case at a time and posts it and its lab results to CommCare after figuring out which county's project space it belongs to - either the one that ECLRS said, or one to which it has been transferred within CommCare.
  • Engine 5: The CommCare Extractor which runs constantly, synching down new and updated information about patients that we recognize.

The "independent" aspect bears elaborating: Rather than one engine passing work to the next one in the conceptual pipeline, each engine looks for data (in the database, etc) that is ready to be processed by that engine. Thus, the API between the engines is not a callable API, but data at rest in a certain format.

In the following sections we are going to look at each of the engines in more detail. The code for each engine can be found in lib/nys_etl/engines/e1 for engine 1, etc.

E1: ECLRS extractor

NYSETL.Engines.E1.*: A Broadway pipeline started on demand by a Task that listens for SQS notifications.

The NYS DOH delivers a 14-day export of positive Covid-19 test results to an Amazon S3 bucket roughly every 30 minutes. S3 notifies Viaduct (via Amazon SNS and Amazon SQS) when a new file is received. This notification is received by NYSETL.Engines.E1.SQSListener in collaboration with NYSETL.Engines.E1.SQSTask, which then kick off NYSETL.Engines.E1.ECLRSFileExtractor, which in turn starts the engine's NYSETL.Engines.E1.Supervisor and actively waits for it to finish before letting NYSETL.Engines.E1.SQSTask listen for the next notification.

NYSETL.Engines.E1.Supervisor starts two children: NYSETL.Engines.E1.State, which keeps track of the main process's progress, and NYSETL.Engines.E1.Broadway which orchestrates a two-step Broadway pipeline to process all rows in the input file. Rows are read from the ECLRS file by NYSETL.Engines.E1.FileReader, and then converted from plain strings to NYSETL.Engines.E1.Messages which contain the raw row, its checksum, and a Map with the data, prepared for the database. These messages are then processed by NYSETL.Engines.E1.Processor which stores them in the test_results table along with some meta-data in the abouts table (if the information is new) or just updates the corresponding record in the abouts table (if the ECLRS row has been seen before).

At the end of the import round, some summary statistics is stored in the files table.

Notes:

  • The files table has a unique index on the file path to make sure we don't import the same file twice. This worked well when the files were stored orderly on the server, but since we switched to on-the-fly downloading to a random tmp path, the uniquiness constraint is moot.

E2: ECLRS->CommCare Transformer

NYSETL.Engines.E2.*: A Broadway pipeline that runs continuously.

NYSETL.Engines.E2.TestResultProducer queries the database for test_results records that have no finished-state test_result_events record and makes them available to the pipeline. NYSETL.Engines.E2.Processor.process/2 processes the records one by one in parallel like so:

  • Find a person in the people table that matches patient key, or exact combination of date of birth, last name and first name. If none found, create one.
  • Look up index cases in the same county of the test result, creating a new one if necessary.
  • For the index case(s) in the previous step, update lab result records with the "accession number" in the test result. If an index case does not have a lab result with that accession number, create one.

Notes:

  • "Update" means add-if-missing. Viaduct never overwrites existing data.
  • While idling, the producer polls the database every 5 seconds for unprocessed test result records. While processing, it polls every 100 ms or when requested by Broadway.
  • A single ECLRS row, which contains information about one test result, may result in several lab result records being created if the "person" in Viaduct has more than one index case in the county of the test result.
  • For historical reasons, Engine 2 borrows the NYSETL.Engines.E1.Cache library from Engine 1.
  • First names are split on spaces, and the first part gets used. e.g. a first name of "Mary Ann" would match on "Mary".

E3: CommCare Enqueuer

NYSETL.Engines.E3.*: A Broadway pipeline that runs continuously.

NYSETL.Engines.E3.IndexCaseProducer queries the database for index cases that have not been enqueued since they were last updated. All such index cases are enqueued in Oban for processing by NYSETL.Engines.E4.CommcareCaseLoader.

Notes:

  • While idling, the producer polls the database every 5 seconds for enqueueable index cases. While processing, it polls every second or when requested by Broadway.

E4: CommCare Case Loader

NYSETL.Engines.E4.*: An Oban Worker that is retried up to 20 times over a number of days.

NYSETL.Engines.E4.CommcareCaseLoader creates or updates an index case with all its lab results to the county domain in CommCare where it belongs. Before posting, NYSETL.Engines.E4.CaseTransferChain is used to determine if the index case has been transferred to another county domain within CommCare. If so, NYSETL.Engines.E4.CaseTransferChain tries to find the index case in the transfer destination county domain and update that one instead.

Notes:

  • A single ECLRS row, which contains information about one test result, may result in several lab result records being created if the "person" in Viaduct has more than one index case in the county of the test result. The index cases, and thus the lab results, are enqueued separately and posted separately to CommCare.
  • A transfer chain can be long and winding and might even be circular. Sometimes it is not possible to find the final index case in the chain. Sometimes it would be possible, but it's not sure that implementing a solution would be an overall gain: it might solve some case but make the whole system too complicated to reason about.
  • If a final-destination index case cannot be found, it is always the case that the same lab result that Viaduct is trying to publish can also be found on another index case, often in the right county. For that reason, broken transfer chains are rarely a problem.

E5: CommCare Extractor

NYSETL.Engines.E5.*: A Broadway pipeline that runs continuously.

NYSETL.Engines.E5.Producer wakes up every five minutes and polls CommCare for all changes to index cases since midnight. It works its way through the counties list (in reverse alphabetical order), downloading index cases and their lab results. NYSETL.Engines.E5.Processor processes the cases according to the first rule that matches:

  • An index case with this case_id exists in our DB:
    • Update the case with changes from CommCare, but ignore any new lab results.
  • A person exists that can be matched by dob + last_name + first_name:
    • Create an index case linked to the person and lab results linked to the index case.
  • If no matching case or person exists:
    • Create a person, index case, and lab result record(s).

Notes:

  • We're polling for changes since midnight since that is what we thought the API allowed. We've since found how to express a timestamp that would allow us to fetch changes since any time, and we should fix that at some point.
  • The data we sync down is used when merging records in Engine 3. It is not (yet) used to figure out transfer chains for Engine 4.