Skip to content

Commit

Permalink
Twitter publishing successful
Browse files Browse the repository at this point in the history
- Fixed typos on environment variables
- Using Twitter library asynchronously
- Updating rules for notifying, tweets must be 30 minutes apart.
- Fixing minor bugs in feed processing, adding null streams check

TODOS: tests, strong validation, error handling. Automation
  • Loading branch information
Jorge A Villalobos G (aka Latincoder) committed Apr 21, 2020
1 parent 67f1f8a commit 9f2daa9
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ public async Task<string> FunctionHandler(DynamoDBEvent dynamoDBEvent, ILambdaCo
foreach(var record in dynamoDBEvent.Records) {
var dynamoDocument = Document.FromAttributeMap(record.Dynamodb.NewImage);
CityFeed feed = CityFeedDocument.ToDTO(dynamoDocument);
if (feed == null) {
System.Console.WriteLine("Streamed had no new data");
continue;
}
System.Console.WriteLine($"Feed Object:{feed}");
System.Console.WriteLine($"Accessed stream: {feed.CityName}");
var msg = NotificationFormatter.GetTwitterMessageSpanish(feed);
await publishToTwitter(msg);
await publishToGeneral(NotificationFormatter.GetSimpleMessage(feed));
await publishToTwitter(msg);

}
return "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
<ItemGroup>
<PackageReference Include="Amazon.Lambda.Core" Version="1.1.0" />
<PackageReference Include="Amazon.Lambda.Serialization.SystemTextJson" Version="1.0.0" />
<PackageReference Include="Amazon.Lambda.SNSEvents" Version="1.1.0" />
<PackageReference Include="AWSSDK.Lambda" Version="3.3.109.9" />
<PackageReference Include="TweetinviAPI" Version="4.0.3" />
</ItemGroup>
<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,46 +1,72 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Amazon.Lambda.SNSEvents;
using Tweetinvi;
using Tweetinvi.Models;

using Amazon.Lambda.Core;


// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
//[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))]
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.LambdaJsonSerializer))]

namespace AirQualityTwitterPublisherLambda
{
public class Function
{

private bool hasMessage(SNSEvent.SNSRecord snsRecord) {
var msg = string.Empty;
if (snsRecord == null) return false;
try {
msg = snsRecord.Sns.Message;
} catch(NullReferenceException e) {
return false;
}
return !string.IsNullOrEmpty(msg);
}
/// <summary>
/// A simple function that takes a string and does a ToUpper
/// </summary>
/// <param name="input"></param>
/// <param name="context"></param>
/// <returns></returns>
public void FunctionHandler(System.IO.Stream inputStream, ILambdaContext context)
public System.Threading.Tasks.Task<string> FunctionHandler(SNSEvent snsEvent, ILambdaContext context)
{
System.IO.StreamReader reader = new System.IO.StreamReader(inputStream);
var receivedMessage = reader.ReadToEnd();
System.Console.WriteLine("This is where twitter API integration will happen");
System.Console.WriteLine(receivedMessage);
var apiKey = Environment.GetEnvironmentVariable("TWITTER_API_KEY");
var apiSecret = Environment.GetEnvironmentVariable("TWITTER_API_SECRET");
var apiKey = Environment.GetEnvironmentVariable("TWITTER_APIK");
var apiSecret = Environment.GetEnvironmentVariable("TWITTER_APIS");
var token = Environment.GetEnvironmentVariable("TWITTER_ACCESS_TOKEN");
var tokenSecret = Environment.GetEnvironmentVariable("TWITTER_TOKEN_SECRET");
var credentials = new TwitterCredentials(apiKey, apiSecret, token, tokenSecret);
Auth.SetCredentials(credentials);
var tokenSecret = Environment.GetEnvironmentVariable("TWITTER_SECRET_TOKEN");
var credentials = Auth.CreateCredentials(apiKey, apiSecret, token, tokenSecret);
Auth.SetUserCredentials(apiKey, apiSecret, token, tokenSecret);
// TweetInvi lacks returning asynchronous types that can be awaited
var publisher = User.GetAuthenticatedUser(credentials);

try {
var tweet = Auth.ExecuteOperationWithCredentials(credentials, () => Tweet.PublishTweet(receivedMessage));
System.Console.WriteLine($"Published tweet by {tweet.TweetDTO.CreatedBy.Name}");
} catch(Exception error) {
// TODO: if tweet fails publish to DLQ (SQS)
System.Console.WriteLine(error);
}
var t = new System.Threading.Tasks.Task<string>(() => {
System.Console.WriteLine($"Authenticated User {publisher.Name}");
var records = from r in snsEvent.Records
where hasMessage(r)
select r;
foreach(var snsRecord in records) {
var receivedMessage = snsRecord.Sns.Message;
System.Console.WriteLine("This is where twitter API integration will happen");
System.Console.WriteLine(receivedMessage);
try {
System.Console.WriteLine($"Start to publish Tweet.{receivedMessage}");
publisher.PublishTweet(receivedMessage);
System.Threading.Thread.Sleep(5000);
System.Console.WriteLine($"Published tweet: {publisher.Name}");
} catch(Exception error) {
// TODO: if tweet fails publish to DLQ (SQS)
System.Console.WriteLine($"Error detected:{error}");
}
}
return "{\"msg\":\"execution completed\"}";
});
t.RunSynchronously();
return t;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class Function
/// <param name="input"></param>
/// <param name="context"></param>
/// <returns></returns>
public async Task FunctionHandler(string input, ILambdaContext context)
public async Task FunctionHandler(System.IO.Stream input, ILambdaContext context)
{
_citiesRaw = Environment.GetEnvironmentVariable(EnvCitiesWaqiGetCityFeedLambda);
_token = Environment.GetEnvironmentVariable(EnvWaqiTokenKey);
Expand All @@ -53,7 +53,7 @@ public async Task FunctionHandler(string input, ILambdaContext context)
await AsyncExecute(input, context);
}

public async Task<string> AsyncExecute(string input, ILambdaContext context) {
public async Task<string> AsyncExecute(System.IO.Stream input, ILambdaContext context) {
_ = input; // discard
var waqiProxy = Waqi.create(_token);
var cities = JsonSerializer.Deserialize<List<Latincoder.AirQuality.Model.Config.City>>(_citiesRaw); // default C# serialization
Expand Down Expand Up @@ -112,7 +112,7 @@ group feed by $"{city.Country}-{city.Name}" into stationsByCity
var notificationIsNotSpam = NotificationValidator.CreateyValidator();
// spam means max aqi changed at least by 5 points, and is at least 20 minutes apart frm last notification
notificationIsNotSpam.AddRules(
Rules.MinutesApartFrom(prevFeed, 20),
Rules.MinutesApartFrom(prevFeed, 30),
Rules.AbsoluteAqiChangedBy(prevFeed, 5));
// urgent or meets criteria
if (notificationIsUrgent.MeetsGlobalCriteria(feed) && notificationIsNotSpam.MeetsGlobalCriteria(feed)) {
Expand Down

0 comments on commit 9f2daa9

Please sign in to comment.