-
Notifications
You must be signed in to change notification settings - Fork 57
concepts
FIXME: metadata handling
FIXME: `inputs`? `input`? `source`? // `outputs`? `output`? `sink`?
TODO: How to cleanly do a 'read off header lines, pass through the rest'?
TODO: Can I replace myself with an identity streamer?
FIXED: `process` not `call`
FIXED: `Processor`, not Transform or Transformer?
FIXED: stick with `.receive` not `.make`
File.open('xxes.tsv').each do |rec1| rec2 = db.get()
o1, o2 = act(rec1, rec2)
end
A wukong record responds to:
-
to_wire
-- conversion of the record and all its children to wireable types (Integer/Float/String/Hash/Array/true/false/nil). When stringified, corresponds to the body of a flume event, the body of an HTTP request/response, or the value of a mapreduce key-value pair.foo.class.receive(foo.to_wire)
should behave identically to the original object. -
_metadata
-- bucket of key-value pairs shipped around with this record. Non-nested; use dot-addressed keys. Corresponds to flume metadata; to HTTP headers (where.
becomes-
, prefixed withWu-
); serialized in-record for mapreduces. Keys must be lowercased dot-separated identifiers (dash-
is not OK). Values must be primitive types (Integer/Float/String/true/false/nil -- no arrays or hashes) and should be robust against stringification mid-flight (that is, be prepared fortrue => "true"
,false => "false"
, andnil => ""
). The following are reserved item names within the metadata bucket:-
id
-- arbitrary stringlike value identifying this record among all records. Records with the same[partition, group, ID]
are considered interchangeable. -
partition
-- named dataflow path. All records in a partition should meet the uniform contract of its consumers. Lowercase identifier string. -
channel
-- named subgrouping of records. Lowercase identifier string. -
group
-- alias forchannel
.
-
An event is a model that additionally responds to:
-
timestamp
-- time the event originated, assigned by the origin (as anything they like) and unchanged afterwards. A UTC ruby time, serialized as a unix timestamp. -
origin
-- name for the source of this record; in flume, the dispatching host. This influences delivery guarantees. A downcased, dasherized, dot-separated identifier. -
nano_ctr
-- nanosecond timestamp, monotonically-increasing within each origin. The[origin, nano_ctr]
pair may be considered globally unique. Serialized as whatever flume uses.
- Universe -- mapred hdfs ; flume master ; mongoid -- install ; mysql -- install
- Flow -- mapred partition ; flume stream ; mongoid -- collection ; mysql -- db
- Channel -- mapred group key ; flume channel ; mongoid -- _type ; mysql -- table
- Id -- mapred record id ; flume ev id ; mongoid -- _id ; mysql -- id
A stage is a node in a directed graph. It has a
name
-
inputs
-- (note the 's'). collection of stages that are inputs to the node. If the node is a source, this will be empty. -
output
-- an item that responds toprocess
.
and responds to
initialize(options{name, flow})
setup(hsh{input, output})
stop
-
report
-- returns a bucket summarizing the state of the stage itself.
An event-driven ('push') source.
-
each
-- gives source a block; doesblock.process
on each record. The block in this case serves as a callback. empty?
output
An iterable ('pull') source of data.
-
pop
(?get?) (?get_next?) (next is a ruby keyword) empty?
At some point, one might implement BufferedDriven
wrapper that turns any driver
(push) into a iterable
(pull). Similarly, we can turn an iterable into a driver by polling it in a loop; in flume, this is done by source runners
, and in vayacondios, by biographers
.
-
input
-- -
output
-
process(rec)
-- handle a record. Callsoutput.process(rec)
to emit a record downstream; the return value ofprocess
is discarded. -
??
error
input
-
process(rec)
-- handle a record
-
call(rec)
-- given one record, returns one record - invoking
to_transform(output)
decorates with a process methoddef process(rec) output.process(self.call(rec)) ; end
- Integer (
int
,long
) - Float (
float
,double
) - String (
string
,bytes
,fixed
,enum
) - true/false (
boolean
) - nil
The primitive types plus
- Array (
array
) - Hash (
map
)
Transported as a key-value bucket, with _type
naming the factory capable of reconstituting it.
An identifier starts with a lowercase letter, and contains only lowercased letters, numbers and underscore characters. In some cases, we transport a predictable tuple of identifiers; typically separated with .
dots. Lastly, we use the dasherized form for internet hostnames.
- downcased identifier:
/\A([a-z][a-z0-9\_]*)\z/
- downcased, dot-separated identifier:
/\A([a-z][a-z0-9\_]*)(\.[a-z][a-z0-9\_]*)*\z/
- downcased, dasherized, dot-separated identifier:
/\A([a-z][a-z0-9\-]*)(\.[a-z][a-z0-9\-]*)*\z/
There "known" errors -- think of the difference between a 4xx
error (NotFoundError
, BadRequestError
) and a 5xx
error (InternalServerError
). In the former case, everything was processed correctly but the request was invalid (ie it's the sender's fault). In the latter case, the record was not processed correctly.
Exceptions have the following class_attribute
s:
-
description
, a string -
status_code
, an integer -
ProcessingError
-
BadRecordError
-
BadPayloadError
-- can't generate raw structure from blob; un-JSONize, bad UTF, etc -
TypeMismatchError
-- factory can't generate object from raw structure
-
-
OversizeBodyError
-- too large
-
Process hosting this graph segment.
- Fileish (path, dir),
- Serverish (capability, address (port+addr), realm (cluster))
- Queueish (size)
Cube: Cube event
- body --
data
; structured. - metadata -- no special support, so serialized in as
meta
. - timestamp --
time
(ISO 8601 UTC), - channel --
type
. - key --
id
.
Flume: Flume user guide
- body -- text blob, maximum of 32KB per event (flume.event.max.size.bytes)
- metadata:
- table with an arbitrary number of attribute value pairs.
- priority (trace, debug, info, warn, error, or fatal)
- Source host
- stream -- none natively, use extended metadata
- channel -- none natively, use extended metadata
- time:
- unix timestamp
- nanosecond timestamp (considered monotonic per machine)