1. ActiveMQ and ETL

    Doug Bell

    Bank of America

  2. Extract, Transform, Load

  3. Data Requirements

  4. A Single Script

  5. New Requirement: Async Load

  6. Data Requirements

  7. Asynchronous Loading

  8. Message Queues

  9. ActiveMQ

  10. STOMP

  11. The Original Code

    # Fake code! Do not use!
    # bin/etl.pl
    use strict; use warnings;
    use ETL;
    use Reuters; 
    use MIM; 
    sub main {
        my ( $job_name ) = @_;
        my $job     = ETL->init( $job_name );
        my $e_data  = Reuters->extract( $job->{feeds} );
        my $t_data  = ETL->transform( $job, $e_data );
        MIM->load( $t_data );
    }
    main(@ARGV); # bin/etl.pl futures
    
  12. Broken Into Steps -- Extract

    # Fake code! Do not use!
    # bin/extract.pl
    use strict; use warnings;
    use ETL;
    use Reuters; 
    use Net::Stomp::Client;
    use JSON;
    sub main {
        my ( $job_name ) = @_;
        my $job     = ETL->init( $job_name );
        my $e_data  = Reuters->extract( $job->{feeds} );
        send_message( 'extract', $job, $e_data );
    }
    main(@ARGV); # bin/extract.pl futures
    
  13. Broken Into Steps -- send_message

    sub send_message {
        my ( $queue, $job, $data ) = @_;
        my $client  = Net::Stomp::Client->new( 
            host => 'localhost',
            port => '61613',
        );
        $client->connect( login => 'dev',
            passcode => '********', );
        $client->send( 
            # STOMP queues start with /queue/
            destination => "/queue/$queue", 
            body        => JSON->new->encode( {
                job         => $job,
                data        => $e_data,
            } ),
        );
    }
    
  14. Broken Into Steps - Transform

    # Fake code! Do not use!
    # bin/transform.pl
    use strict; use warnings;
    use ETL;
    use Net::Stomp::Client;
    use JSON;
    sub main {
        my $client = subscribe( 'extract' );
        $client->wait_for_frames(
            callback    => \&transform,
            timeout     => 60,
        );
    }
    main(@ARGV); # bin/transform.pl
    
  15. Broken Into Steps - Subscribe

    sub subscribe {
        my ( $queue ) = @_;
        my $client = Net::Stomp::Client->new(
            host => 'localhost',
            port => 61613,
        );
        $client->connect( login => 'dev', 
            passcode => '********' );
        $client->subscribe(
            destination => '/queue/' . $queue,
            id          => 'transform',
        );
        return $client;
    }
    
  16. Broken Into Steps - Transform (2)

    sub transform {
        my ( $frame ) = @_;
        my $message = JSON->new->decode( $frame->body );
        my $job     = $message->{job};
        my $e_data  = $message->{data};
        my $t_data  = ETL->transform( $job, $e_data );
        send_message( 'transform', $job, $t_data );
    }
    
  17. Broken Into Steps - Load

    # Fake code! Do not use!
    # bin/load.pl
    use strict; use warnings;
    use Net::Stomp::Client;
    use JSON;
    use MIM;
    sub main {
        my $client = subscribe( 'transform' );
        $client->wait_for_frames(
            callback    => \&load,
            timeout     => 60,
        );
    }
    main(@ARGV); # bin/load.pl
    
  18. Broken Into Steps - Load

    sub load {
        my ( $frame ) = @_;
        my $message = JSON->new->decode( $frame->body );
        my $job     = $message->{job};
        my $t_data  = $message->{data};
        MIM->load( $t_data );
        send_message( 'complete', $job );
    }
    
  19. The Result

  20. Usefulness

  21. The End!

    Slides are licensed under a CC-BY-SA 3.0 license.

    Code is licensed under the Artistic License or GNU GPL v1.0 or later (the same terms as Perl itself).