jebu.net
thoughts scribbles images from silicon plateau
-
Erlang tap to the Twitter stream
Posted on September 18th, 2009 8 commentsErlang http module works really well for plugging into the Twitter stream. The async option of the http module gives each chunk of a chunk encoded http response as a callback to the async request handler. And guess what the Twitter stream api gives each tweet as a chunk, on the json version each chunk is a self contained json. Club this with the Erlang light weight processes and twitter stream processing just flows along.
Here is the code which taps into the Twitter data stream. I use the erlang json parser from Lshift for processing the tweets.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
-module(twitter_stream). -author('jebu@jebu.net'). %% %% Copyright (c) 2009, Jebu Ittiachen %% All rights reserved. %% %% Redistribution and use in source and binary forms, with or without modification, are %% permitted provided that the following conditions are met: %% %% 1. Redistributions of source code must retain the above copyright notice, this list of %% conditions and the following disclaimer. %% %% 2. Redistributions in binary form must reproduce the above copyright notice, this list %% of conditions and the following disclaimer in the documentation and/or other materials %% provided with the distribution. %% %% THIS SOFTWARE IS PROVIDED BY JEBU ITTIACHEN ``AS IS'' AND ANY EXPRESS OR IMPLIED %% WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND %% FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JEBU ITTIACHEN OR %% CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR %% CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR %% SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON %% ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING %% NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF %% ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. %% %% The views and conclusions contained in the software and documentation are those of the %% authors and should not be interpreted as representing official policies, either expressed %% or implied, of Jebu Ittiachen. %% %% API -export([fetch/1, fetch/3, process_data/1]). % single arg version expects url of the form http://user:password@stream.twitter.com/1/statuses/sample.json % this will spawn the 3 arg version so the shell is free fetch(URL) -> spawn(twitter_stream, fetch, [URL, 5, 30]). % 3 arg version expects url of the form http://user:password@stream.twitter.com/1/statuses/sample.json % retry - number of times the stream is reconnected % sleep - secs to sleep between retries. fetch(URL, Retry, Sleep) when Retry > 0 -> % setup the request to process async % and have it stream the data back to this process try http:request(get, {URL, []}, [], [{sync, false}, {stream, self}]) of {ok, RequestId} -> case receive_chunk(RequestId) of {ok, _} -> % stream broke normally retry timer:sleep(Sleep * 1000), fetch(URL, Retry - 1, Sleep); {error, unauthorized, Result} -> {error, Result, unauthorized}; {error, timeout} -> timer:sleep(Sleep * 1000), fetch(URL, Retry - 1, Sleep); {_, Reason} -> error_logger:info_msg("Got some Reason ~p ~n", [Reason]), timer:sleep(Sleep * 1000), fetch(URL, Retry - 1, Sleep) end; _ -> timer:sleep(Sleep * 1000), fetch(URL, Retry - 1, Sleep) catch _:_ -> timer:sleep(Sleep * 1000), fetch(URL, Retry - 1, Sleep) end; % fetch(_, Retry, _) when Retry =< 0 -> error_logger:info_msg("No more retries done with processing fetch thread~n"), {error, no_more_retry}. % % this is the tweet handler persumably you could do something useful here % process_data(Data) -> error_logger:info_msg("Received tweet ~p ~n", [Data]), ok. %%==================================================================== %% Internal functions %%==================================================================== receive_chunk(RequestId) -> receive {http, {RequestId, {error, Reason}}} when (Reason =:= etimedout) orelse (Reason =:= timeout) -> {error, timeout}; {http, {RequestId, {{_, 401, _} = Status, Headers, _}}} -> {error, unauthorized, {Status, Headers}}; {http, {RequestId, Result}} -> {error, Result}; %% start of streaming data {http,{RequestId, stream_start, Headers}} -> error_logger:info_msg("Streaming data start ~p ~n",[Headers]), receive_chunk(RequestId); %% streaming chunk of data %% this is where we will be looping around, %% we spawn this off to a seperate process as soon as we get the chunk and go back to receiving the tweets {http,{RequestId, stream, Data}} -> spawn(twitter_stream, process_data, [Data]), receive_chunk(RequestId); %% end of streaming data {http,{RequestId, stream_end, Headers}} -> error_logger:info_msg("Streaming data end ~p ~n", [Headers]), {ok, RequestId} %% timeout after 60 * 1000 -> {error, timeout} end.
-
http://www.gucci-outlet-store.com gucci handbags
-
http://damjan.softver.org.mk/ gdamjan
-
http://blog.jebu.net jebu
-
http://www.moncler-down-jackets.com ugg boots for sale
-
http://twitter.com/MiCHiLU Takanao Endoh
-
http://twitter.com/MiCHiLU Takanao Endoh
-
gdamjan
-
gdamjan
-


