Skip to content

Commit e8bccbd

Browse files
committed
Merge pull request #2 from picwelltimjones/master
Fix 'Error: write after end at' seen in Lambda
2 parents 16f3022 + 1e4a686 commit e8bccbd

1 file changed

Lines changed: 29 additions & 30 deletions

File tree

src/s3_lambda_es.js

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -42,40 +42,11 @@ var numDocsAdded = 0; // Number of log lines added to ES so far
4242
*/
4343
var creds = new AWS.EnvironmentCredentials('AWS');
4444

45-
46-
/* == Streams ==
47-
* To avoid loading an entire (typically large) log file into memory,
48-
* this is implemented as a pipeline of filters, streaming log data
49-
* from S3 to ES.
50-
* Flow: S3 file stream -> Log Line stream -> Log Record stream -> ES
51-
*/
52-
var lineStream = new LineStream();
53-
// A stream of log records, from parsing each log line
54-
var recordStream = new stream.Transform({objectMode: true})
55-
recordStream._transform = function(line, encoding, done) {
56-
var logRecord = parse(line.toString());
57-
var serializedRecord = JSON.stringify(logRecord);
58-
this.push(serializedRecord);
59-
totLogLines ++;
60-
done();
61-
}
62-
63-
64-
/* Lambda "main": Execution starts here */
65-
exports.handler = function(event, context) {
66-
console.log('Received event: ', JSON.stringify(event, null, 2));
67-
event.Records.forEach(function(record) {
68-
var bucket = record.s3.bucket.name;
69-
var objKey = decodeURIComponent(record.s3.object.key.replace(/\+/g, ' '));
70-
s3LogsToES(bucket, objKey, context);
71-
});
72-
}
73-
7445
/*
7546
* Get the log file from the given S3 bucket and key. Parse it and add
7647
* each log record to the ES domain.
7748
*/
78-
function s3LogsToES(bucket, key, context) {
49+
function s3LogsToES(bucket, key, context, lineStream, recordStream) {
7950
// Note: The Lambda function should be configured to filter for .log files
8051
// (as part of the Event Source "suffix" setting).
8152

@@ -137,3 +108,31 @@ function postDocumentToES(doc, context) {
137108
context.fail();
138109
});
139110
}
111+
112+
/* Lambda "main": Execution starts here */
113+
exports.handler = function(event, context) {
114+
console.log('Received event: ', JSON.stringify(event, null, 2));
115+
116+
/* == Streams ==
117+
* To avoid loading an entire (typically large) log file into memory,
118+
* this is implemented as a pipeline of filters, streaming log data
119+
* from S3 to ES.
120+
* Flow: S3 file stream -> Log Line stream -> Log Record stream -> ES
121+
*/
122+
var lineStream = new LineStream();
123+
// A stream of log records, from parsing each log line
124+
var recordStream = new stream.Transform({objectMode: true})
125+
recordStream._transform = function(line, encoding, done) {
126+
var logRecord = parse(line.toString());
127+
var serializedRecord = JSON.stringify(logRecord);
128+
this.push(serializedRecord);
129+
totLogLines ++;
130+
done();
131+
}
132+
133+
event.Records.forEach(function(record) {
134+
var bucket = record.s3.bucket.name;
135+
var objKey = decodeURIComponent(record.s3.object.key.replace(/\+/g, ' '));
136+
s3LogsToES(bucket, objKey, context, lineStream, recordStream);
137+
});
138+
}

0 commit comments

Comments
 (0)