thoughts scribbles images from silicon plateau
RSS icon Home icon
  • TweetALoc – Sync your geocoded tweets to FireEagle

    Posted on November 29th, 2009 jebu Comments

    tweetaloc_logoThis was always there at the back of my mind ever since Twitter announced they were adding geo tagging to status updates. So here it is TweetALoc.

    Once you authorize it by linking your Twitter account and your FireEagle account, it will listen to your tweet stream, if geo information is present in any of your tweets it sends it along to FireEagle. The updates occur usually under 20secs from you tweeting. No polling involved here. Once authorized your twitter id is added to tweetaloc’s follow list within 5 minutes. So the first update will be only after that, and continuous from then on. I guess right now there are only a handful of Twitter clients which add location to your tweets, Twitter blog post on geotagging release has a list. I use one of my own which is why you see my tweets signed from nowwhat.in.

    Just in case you feel that you need to stop updating into FireEagle, I have also provided for a de-authorizing facility. FireEagle tokens associated with your Twitter id are removed and your id is removed from the follow list. You can of-course go into FireEagle and remove the permissions to TweetALoc.

    Behind the scenes this extends on the Erlang Twitter stream boiler plate that I had talked about in a previous post. I have used the Erlang OAuth library for talking OAuth to FireEagle and the PHP OAuth libraries for the web interface portions. That logo was GIMPed by me :)

  • Erlang tap to the Twitter stream

    Posted on September 18th, 2009 jebu Comments

    Erlang http module works really well for plugging into the Twitter stream. The async option of the http module gives each chunk of a chunk encoded http response as a callback to the async request handler. And guess what the Twitter stream api gives each tweet as a chunk, on the json version each chunk is a self contained json. Club this with the Erlang light weight processes and twitter stream processing just flows along.

    Here is the code which taps into the Twitter data stream. I use the erlang json parser from Lshift for processing the tweets.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    
    -module(twitter_stream).
    -author('jebu@jebu.net').
    %% 
    %% Copyright (c) 2009, Jebu Ittiachen
    %% All rights reserved.
    %% 
    %% Redistribution and use in source and binary forms, with or without modification, are
    %% permitted provided that the following conditions are met:
    %% 
    %%    1. Redistributions of source code must retain the above copyright notice, this list of
    %%       conditions and the following disclaimer.
    %% 
    %%    2. Redistributions in binary form must reproduce the above copyright notice, this list
    %%       of conditions and the following disclaimer in the documentation and/or other materials
    %%       provided with the distribution.
    %% 
    %% THIS SOFTWARE IS PROVIDED BY JEBU ITTIACHEN ``AS IS'' AND ANY EXPRESS OR IMPLIED
    %% WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
    %% FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JEBU ITTIACHEN OR
    %% CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
    %% CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
    %% SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
    %% ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
    %% NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
    %% ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    %% 
    %% The views and conclusions contained in the software and documentation are those of the
    %% authors and should not be interpreted as representing official policies, either expressed
    %% or implied, of Jebu Ittiachen.
    %%
    %% API
    -export([fetch/1, fetch/3, process_data/1]).
     
    % single arg version expects url of the form http://user:password@stream.twitter.com/1/statuses/sample.json
    % this will spawn the 3 arg version so the shell is free
    fetch(URL) ->
      spawn(twitter_stream, fetch, [URL, 5, 30]).
     
    % 3 arg version expects url of the form http://user:password@stream.twitter.com/1/statuses/sample.json  
    % retry - number of times the stream is reconnected
    % sleep - secs to sleep between retries.
    fetch(URL, Retry, Sleep) when Retry > 0 ->
      % setup the request to process async
      % and have it stream the data back to this process
      try http:request(get, 
                        {URL, []},
                        [], 
                        [{sync, false}, 
                         {stream, self}]) of
        {ok, RequestId} ->
          case receive_chunk(RequestId) of
            {ok, _} ->
              % stream broke normally retry 
              timer:sleep(Sleep * 1000),
              fetch(URL, Retry - 1, Sleep);
            {error, unauthorized, Result} ->
              {error, Result, unauthorized};
            {error, timeout} ->
              timer:sleep(Sleep * 1000),
              fetch(URL, Retry - 1, Sleep);
            {_, Reason} ->
              error_logger:info_msg("Got some Reason ~p ~n", [Reason]),
              timer:sleep(Sleep * 1000),
              fetch(URL, Retry - 1, Sleep)
          end;
        _ ->
          timer:sleep(Sleep * 1000),
          fetch(URL, Retry - 1, Sleep)
      catch 
        _:_ -> 
          timer:sleep(Sleep * 1000),
          fetch(URL, Retry - 1, Sleep)
      end;
    %
    fetch(_, Retry, _) when Retry =< 0 ->
      error_logger:info_msg("No more retries done with processing fetch thread~n"),
      {error, no_more_retry}.
    %
    % this is the tweet handler persumably you could do something useful here
    %
    process_data(Data) ->
      error_logger:info_msg("Received tweet ~p ~n", [Data]),
      ok.
     
    %%====================================================================
    %% Internal functions
    %%====================================================================
    receive_chunk(RequestId) ->
      receive
        {http, {RequestId, {error, Reason}}} when(Reason =:= etimedout) orelse(Reason =:= timeout) -> 
          {error, timeout};
        {http, {RequestId, {{_, 401, _} = Status, Headers, _}}} -> 
          {error, unauthorized, {Status, Headers}};
        {http, {RequestId, Result}} -> 
          {error, Result};
     
        %% start of streaming data
        {http,{RequestId, stream_start, Headers}} ->
          error_logger:info_msg("Streaming data start ~p ~n",[Headers]),
          receive_chunk(RequestId);
     
        %% streaming chunk of data
        %% this is where we will be looping around, 
        %% we spawn this off to a seperate process as soon as we get the chunk and go back to receiving the tweets
        {http,{RequestId, stream, Data}} ->
          spawn(twitter_stream, process_data, [Data]),
          receive_chunk(RequestId);
     
        %% end of streaming data
        {http,{RequestId, stream_end, Headers}} ->
          error_logger:info_msg("Streaming data end ~p ~n", [Headers]),
          {ok, RequestId}
     
      %% timeout
      after 60 * 1000 ->
        {error, timeout}
     
      end.