-
ActiveMQ and ETL
Doug Bell
-
Extract, Transform, Load
- Databases and Data Warehousing
- Extract from a Source
- Transform
- Normalize
- Perform calculations
- Load into Database
-
Data Requirements
- Extract financial data from news feeds
- Validate and normalize raw data
- Correct date
- Data exists
- Fractions to decimal
- Reformat dates
- Transform to add necessary basic data
- Calculate Mid between Bid and Ask
- Load into a database
- Market Information Machine (MIM)
- Sybase
- Distribute to customers
- E-mail
- CSV or text files
- Simple notifications
-
A Single Script
- Easy to follow
- Configuration files for different jobs
- Doesn't scale automatically
- Jobs are assigned to specific boxes
- Synchronous
- Each step is run through during the same process
-
New Requirement: Async Load
- MIM Package
- Self-extracting shell script
- Loads more effectively
- Same method vendor uses to provide data
- Repeatable in case of failure
-
Data Requirements
- Extract financial data from news feeds
- Validate and normalize raw data
- Correct date
- Data exists
- Fractions to decimal
- Reformat dates
- Transform to add necessary basic data
- Calculate Mid between Bid and Ask
- Load into a database
- Market Information Machine (MIM)
- Sybase
- Distribute to customers
- E-mail
- CSV or text files
- Simple notifications
-
Asynchronous Loading
- Distribute step must wait
- Unknowable length of time
- How to notify when data is loaded?
- Use a message queue
-
Message Queues
- Message Broker
- Accepts messages from publishers
- Distributes messages to subscribers
- Event Dispatching
- Request/Response
- Many Implementations
- System V IPC Queues
- ActiveMQ
- ZeroMQ (0MQ)
- RabbitMQ
-
ActiveMQ
- The Apache Foundation
- Java
- Highly Configurable
- Queues
- Topics
- Multiplexing
- Clustering
- Web interface
- Administration
- Monitoring
-
STOMP
- Simple, Text-Oriented Message Protocol
- HTTP-like interface to message queues
- Request/Response
- Publish/Subscribe (server push)
- Message acknowledgment
- Two Perl modules
-
The Original Code
# Fake code! Do not use!
# bin/etl.pl
use strict; use warnings;
use ETL;
use Reuters;
use MIM;
sub main {
my ( $job_name ) = @_;
my $job = ETL->init( $job_name );
my $e_data = Reuters->extract( $job->{feeds} );
my $t_data = ETL->transform( $job, $e_data );
MIM->load( $t_data );
}
main(@ARGV); # bin/etl.pl futures
-
Broken Into Steps -- Extract
# Fake code! Do not use!
# bin/extract.pl
use strict; use warnings;
use ETL;
use Reuters;
use Net::Stomp::Client;
use JSON;
sub main {
my ( $job_name ) = @_;
my $job = ETL->init( $job_name );
my $e_data = Reuters->extract( $job->{feeds} );
send_message( 'extract', $job, $e_data );
}
main(@ARGV); # bin/extract.pl futures
-
Broken Into Steps -- send_message
sub send_message {
my ( $queue, $job, $data ) = @_;
my $client = Net::Stomp::Client->new(
host => 'localhost',
port => '61613',
);
$client->connect( login => 'dev',
passcode => '********', );
$client->send(
# STOMP queues start with /queue/
destination => "/queue/$queue",
body => JSON->new->encode( {
job => $job,
data => $e_data,
} ),
);
}
-
Broken Into Steps - Transform
# Fake code! Do not use!
# bin/transform.pl
use strict; use warnings;
use ETL;
use Net::Stomp::Client;
use JSON;
sub main {
my $client = subscribe( 'extract' );
$client->wait_for_frames(
callback => \&transform,
timeout => 60,
);
}
main(@ARGV); # bin/transform.pl
-
Broken Into Steps - Subscribe
sub subscribe {
my ( $queue ) = @_;
my $client = Net::Stomp::Client->new(
host => 'localhost',
port => 61613,
);
$client->connect( login => 'dev',
passcode => '********' );
$client->subscribe(
destination => '/queue/' . $queue,
id => 'transform',
);
return $client;
}
-
Broken Into Steps - Transform (2)
sub transform {
my ( $frame ) = @_;
my $message = JSON->new->decode( $frame->body );
my $job = $message->{job};
my $e_data = $message->{data};
my $t_data = ETL->transform( $job, $e_data );
send_message( 'transform', $job, $t_data );
}
-
Broken Into Steps - Load
# Fake code! Do not use!
# bin/load.pl
use strict; use warnings;
use Net::Stomp::Client;
use JSON;
use MIM;
sub main {
my $client = subscribe( 'transform' );
$client->wait_for_frames(
callback => \&load,
timeout => 60,
);
}
main(@ARGV); # bin/load.pl
-
Broken Into Steps - Load
sub load {
my ( $frame ) = @_;
my $message = JSON->new->decode( $frame->body );
my $job = $message->{job};
my $t_data = $message->{data};
MIM->load( $t_data );
send_message( 'complete', $job );
}
-
The Result
- etl.pl -> extract.pl && transform.pl && load.pl
- Different processes, different boxes
- Scale with load
- Add more processes during peak times
- Add more boxes easily
- Refactor individual steps
- Replace old ETL->transform with new EpicTL->transform
- Have both running at the same time
-
Usefulness
- Asynchronous Jobs
- Monitoring
- Advisory Messages
- Virtual Destinations (multiplexing)
- Auditing
- Dump $data out to file
- Re-run by putting $data back on queue
- Request/Response
- Load Balancing
- REST Interface
- Scaling
- Multiple request/job processors
- Multiple ActiveMQ instances
- Coordinate a network of job-running nodes
- Message Queue becomes the public API
-
The End!
Slides are licensed under a CC-BY-SA
3.0 license.
Code is licensed under the
Artistic License or GNU GPL v1.0 or
later (the same terms as Perl itself).