応答を解析する
ストリーミングAPI応答の body 部分は、改行で区切られたメッセージの連なりになっています。ここでの “改行” とは\r\n
(16進数では、 0x0D 0x0A
)のことで、 “メッセージ” とは JSON でエンコードされたデータか空行のことです。
ツイートコンテンツには時々改行の¥n
文字が含まれることがありますが、キャリッジ・リターンの ¥r
が含まれることはないので覚えておいてください。 Therefore, to make sure you get whole message payloads, break out each message on
boundaries, as
may occur in the middle of a message. その代わりに、区切りメッセージの項目で説明されている delimited パラメータを使用してください。
ストリーミングAPIから受信するメッセージについての情報は、ストリーミングメッセージのタイプを参照してください。
このAPIで流れる個々のメッセージはJSON 形式になっています。 JSONエンコードされたオブジェクトの属性は、順番通りに並んでいないことを覚えておいてください - 属性の並び順に依存する処理はしないでください。 また、JSONを解析する場合は、予期せぬフィールドがあったり逆にフィールドなかったりする場合を想定して処理を行うことを忘れないでください。
非常に稀ですが、ストリーミングAPIは取得に非常に時間がかかるデータを待たず、属性が不完全な状態でツイート情報を送信してくることがあります。 count系の数値型属性でこうしたデータ欠落が発生した場合、そのcount属性の値は-1になります。 値が-1になっているcountを見つけた場合は、必要に応じて REST API を使い正しい値で埋めてください。
"user":{ "followers_count":-1, "friends_count":-1, "listed_count":null, "created_at":"Wed Sep 23 17:35:05 +0000 2009", "favourites_count":-1, "utc_offset":null, "time_zone":null, "geo_enabled":false, "verified":false, "statuses_count":-1, "lang":"en", ... }
大抵のストリーム接続では、応答内のTransfer-Encoding: chunked
HTTP ヘッダが表すように chunked transfer encodingを使ってエンコードされています。大抵のHTTPライブラリはchunked transfer encodingを自動的に処理してくれるので、このドキュメントではあなたのプログラムではHTTP層のみアクセスして
このエンコードについては処理をしないという想定で説明します。
万が一あなたのプログラムでが生のTCPストリームを解析している場合は、手動でHTTP chunksを再結合する必要があります。 Be aware that Tweets and other streamed messages will not necessarily fall on HTTP chunk boundaries. delimited パラメータを使用している場合は、transfer encoding lengths (HTTP level) と delimited Tweet lengths (Application level)の両方を受け取ります。
ストリームへの接続時にdelimited=length
を渡すことで (この値は番号ではなく文字列の長さ
なので注意してください)、
各メッセージの前に、メッセージの長さをバイト単位で表すbase-10 数値を文字列にした値が付けられます。
これはchunked transfer encodingには一切関係なく、影響を与えないので覚えておいてください。
Clients may use these length delimiters to more efficiently copy chunks of text off of the incoming stream, rather than having to parse message text for
¥r¥n
tokens.
この実装方法を説明するため、以下にストリームからlength-delimitedメッセージを読み込む擬似コードを記載します。:
while (true) { do { lengthBytes = readline() } while (lengthBytes.length < 1) messageLength = parseInt(lengthBytes); messageBytes = read(messageLength); enqueueForMarkupProcessor(messageBytes); }
メッセージを素早く処理することができないクライアントは切断されます。あなたのクライアントの処理が遅くないかを調べるには、受信したツイートのタイムスタンプと現在の時刻を比較します。 時間がたつにつれてタイムスタンプの時差が増えていく場合、クライアントがツイートを処理する速度がその配信速度に追いついていません。 クライアントの遅延通知を受信するのためのもう一つの方法は、ストリーミング接続を確立する時に stall_warnings パラメータを渡すことです。
Twitter のストリーミング量は一定ではありません。24時間を通して、1秒間に配信されるTweetの数には波があります。 また、月を追うごとにデータ量は着実に増えており、 大規模な世界的・文化的イベントの際には通常のピーク時の3倍以上トラフィックが急増することもあります。 クライアントのコネクションを維持するには、そうしたケースに備えてテストをしておかなければなりません。
ツイートとその他ストリーミングメッセージを取得するためのベストプラクティスは、大量のストリームに対しての収集と処理を分離することです。 例えば、一つ目のプロセスでメッセージテキストを収集し、各メッセージをメッセージキューやファイルやデータベースに渡します。 二つ目のプロセスでメッセージを解析し、保存する必要があったり追加の処理が必要なフィールドを抽出します。
ストリーミングAPIで取得するメッセージは順番通りには配信されません。 通所は、本来の表示タイミングから数秒以内の範囲内でメッセージが届きます。 割合としては少ないですが、場合によっては本来の表示タイミングよりも数十秒から数秒ほど遅れて届くこともあります。
タイムラインを全て順番どおりに表示すると、配信が遅れてしまったメッセージがタイムラインの枠外に挿入された場合、ユーザーがそのメッセージを見落とす可能性があります。 メッセージが枠外に挿入されてしまいそうな場合のみタイムラインの並び替えを行い、枠内に挿入される場合タイムラインの上部へ追加するようにしてください。
Delete messages may be delivered before the original Tweet so implementations should be able to replay a local cache of unrecognized deletes.
重複メッセージが配信されて来ることもあるので、ツイートを2回以上受信しても問題なく動作するような実装をしてください。
メッセージの重複は、ストリーミングAPIへの再接続や欠落したメッセージの埋め戻しを行った際に良く起こります。
REST APIを使った埋め戻しでもメッセージ重複はよく発生します。重複メッセージを処理し、 since_id
REST パラメータを使って重複、待ち時間、サーバ側での読み込みを減らすようにしてください。
Gzip 圧縮を使用することで、ストリームの処理に必要は帯域幅を、圧縮していない場合のの1/5のサイズにすることができます。 以下のHTTPヘッダを付けて接続して、Gzip 圧縮されたストリームをリクエストしてください:
Accept-Encoding: deflate, gzip
Twitter はGzip 圧縮されたストリームで応答を返します。
注意:上記リクエストをしても、 Twitter が圧縮されたストリームを返さない場合があります。常に Content-Encoding
ヘッダを確認し、ストリームが実際に圧縮されているかを調べてください。ストリームを圧縮した状態で取得するためには、以下のことが必要です:
User-Agent
ヘッダをインクルードします。値は何でもいいです。
Host
ヘッダをインクルードします。.
Connection: close
ヘッダは送信しないでください。
Ruby のEventMachine ライブラリ は既定でConnection: close
ヘッダを送信するので、gzipエンコードができません。
これを防ぐために、ストリーミングエンドポイントに接続する時に:keepalive => true
を渡します。
EventMachine では現時点でdeflate 圧縮されたストリームのみサポートしているので、Accept-Encoding: deflate
ヘッダを送信します。
以下の例をEventMachineに組み入れるには、改行を入れる必要があります:
@ http = EventMachine::HttpRequest.new('STREAMING URL').post(:body=>BODY, :head => {"Content-Type" => "application/x-www-form-urlencoded", "Accept-Encoding" => "deflate", "User-Agent" => "USER AGENT"}, :timeout => 90, :keepalive => true) do |client|
java.util.zip.GZIPInputStream()
を使い、それをjava.io.BufferedReader()
に引数として渡してストリーミングAPIデータを読み込んでいるJavaクライアントでは、
低容量によるバッファリングが発生することがあります。これは GZIPInputStream
’s available()
メソッドがストリーミング目的には適していないためです。
これに対処するには、GZIPInputStream()
で available()
メソッドをオーバーライドしたサブクラスを作成します。例えば:
import java.io.IOException; import java.io.InputStream; import java.util.zip.GZIPInputStream; final class StreamingGZIPInputStream extends GZIPInputStream { private final InputStream wrapped; public StreamingGZIPInputStream(InputStream is) throws IOException { super(is); wrapped = is; } /** * Overrides behavior of GZIPInputStream which assumes we have all the data available * which is not true for streaming. We instead rely on the underlying stream to tell us * how much data is available. * * Programs should not count on this method to return the actual number * of bytes that could be read without blocking. * * @return - whatever the wrapped InputStream returns * @exception IOException if an I/O error occurs. */ public int available() throws IOException { return wrapped.available(); } }
このクラスを使用するには、以下のようにGZIPInputStream
を使っている箇所を:
Reader reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(http.getInputStream());
StreamingGZipInputStream
に置き換えます:
Reader reader = new BufferedReader(new InputStreamReader(new StreamingGZIPInputStream(http.getInputStream());