Working with WHIP and Stream Forwarding
If you're building intelligent audio/video solutions - think real-time transcription, vision analytics, or smart moderation - you'll want an efficient way to get your audio/video streams for AI processing. Our platform makes this simple using WHIP (WebRTC-HTTP Ingestion Protocol), designed for low-latency media processing.
What Is WHIP?
WHIP is a straightforward signaling protocol that uses standard HTTP methods (like POST and DELETE) to establish and manage a WebRTC media session between a client and a media server.
By standardizing the signaling process, WHIP simplifies media ingestion—making it easier to integrate into workflows where audio or video streams are forwarded for processing, such as AI pipelines.
Benefits for AI Streaming
- Low latency: WHIP leverages WebRTC for real-time media transport, ensuring minimal delay.
- Scalability: Efficient HTTP-based signaling makes it easy to manage thousands of sessions.
- Compatibility: WebRTC peer connections integrate seamlessly with AI pipelines like GStreamer, FFmpeg, or custom ML models.
Forwarding Streams with Eyeson
Eyeson offers a powerful API Forward Stream feature that enables you to forward video and audio streams from a room to a specified destination.
You can forward streams from individual participants, the full MCU One View stream, or playback streams to external endpoints — all through the API.
Streams are delivered in their native format without any transcoding. This preserves the original quality and ensures seamless integration with various processing pipelines, minimizing latency and maintaining high fidelity.
Forwarding is automatically stopped when a source disconnects, simplifying stream management. Developers can also leverage Eyeson's Observer API to monitor participant and playback updates, enabling quick and efficient responses to connection changes.
WHIP + AI Processing: System Overview
Here's the high-level flow for using WHIP to forward audio/video streams to an AI pipeline:
-
Eyeson API forward stream sends a POST request with an SDP offer to the WHIP Server.
-
The WHIP Server creates a WebRTC peer connection using this offer and returns an SDP answer.
-
Once the WebRTC negotiation completes, the media stream flows directly into a PeerConnection.
-
Streams are managed and transformed on the AI Pipeline to ensure full AI compatibility.
-
When the session is done, a DELETE request tears down the connection gracefully.
Code Examples: WHIP server setup
Here are some starter snippets for creating a WHIP-compatible server.
- python
- go
- node.js
Refer to lines 64-85 for an example of how to read audio and video data for further processing.
#!/usr/bin/env python3
import asyncio
import logging
import traceback
import uuid
from aiohttp import web
from aiortc import RTCPeerConnection, RTCSessionDescription, RTCConfiguration, RTCIceServer
logger = logging.getLogger(__name__)
HOST = '0.0.0.0'
PORT = 8889
# Simplified WHIP server
class SimpleWhipServer:
"""A simplified WHIP server"""
def __init__(self):
self.sessions = {}
async def handle_whip_options(self, request):
"""Handle OPTIONS request for CORS preflight."""
headers = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "OPTIONS, POST, GET, DELETE",
"Access-Control-Allow-Headers": "Content-Type",
}
return web.Response(status=204, headers=headers)
async def handle_whip_request(self, request):
"""Handle WHIP POST request with SDP offer."""
try:
logger.debug(f"Received request: {request.method} {request.url}")
logger.debug(f"Headers: {dict(request.headers)}")
session_id = str(uuid.uuid4())
headers = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "OPTIONS, POST, GET, DELETE",
"Access-Control-Allow-Headers": "Content-Type",
"Access-Control-Expose-Headers": "Location",
"Content-Type": "application/sdp",
"Location": f"{request.url}/{session_id}"
}
content_type = request.headers.get("Content-Type", "")
if "application/sdp" not in content_type:
return web.Response(
status=400,
text="Content-Type must be application/sdp",
headers=headers
)
sdp = await request.text()
logger.warning(f"Received SDP: {sdp}")
config = RTCConfiguration(iceServers=[RTCIceServer(urls="stun:stun.l.google.com:19302")])
pc = RTCPeerConnection(configuration=config)
self.sessions[session_id] = pc
@pc.on("track")
async def on_track(track):
logger.warning(f"Received {track.kind} track")
if track.kind == "video":
while track.readyState == "live":
try:
frame = await track.recv()
logger.warning(f"Video frame: {frame.width}x{frame.height} @ {frame.time}")
# You can add additional processing here
except Exception as e:
logger.error(f"Error receiving video frame: {e}")
break
elif track.kind == "audio":
while track.readyState == "live":
try:
frame = await track.recv()
logger.warning(f"Audio frame: {frame.samples} samples @ {frame.sample_rate} Hz")
# You can add audio processing here
except Exception as e:
logger.error(f"Error receiving audio frame: {e}")
break
@pc.on("connectionstatechange")
async def on_connectionstatechange():
logger.warning(f"Connection state changed: {pc.connectionState}")
if pc.connectionState in ("failed", "closed", "disconnected"):
logger.warning(f"Peer connection {session_id} ended with state: {pc.connectionState}")
await self._cleanup_session(session_id)
offer = RTCSessionDescription(sdp=sdp, type="offer")
await pc.setRemoteDescription(offer)
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return web.Response(
text=pc.localDescription.sdp,
headers=headers,
status=201
)
except Exception as e:
logger.error(f"Error processing WHIP request: {e}")
logger.error(traceback.format_exc())
headers = {
"Access-Control-Allow-Origin": "*",
"Content-Type": "text/plain"
}
return web.Response(
status=500,
text=f"Server error: {str(e)}",
headers=headers
)
async def handle_whip_delete(self, request):
"""Handle DELETE /whip/{session_id} to stop the peer connection."""
headers = {
"Access-Control-Allow-Origin": "*",
}
session_id = request.match_info.get('session_id')
if session_id not in self.sessions:
return web.Response(status=404, text=f"No active session with ID {session_id}", headers=headers)
try:
logger.info(f"Stopping session {session_id} via DELETE")
await self._cleanup_session(session_id)
return web.Response(status=200, text=f"Session {session_id} terminated", headers=headers)
except Exception as e:
logger.error(f"Error stopping session {session_id}: {e}")
return web.Response(status=500, text=f"Error stopping session: {str(e)}", headers=headers)
async def _cleanup_session(self, session_id):
"""Helper function to close peer connection for a session."""
pc = self.sessions.get(session_id)
if not pc:
logger.warning(f"No session found with ID {session_id} for cleanup")
return
try:
if pc and pc.connectionState != "closed":
await pc.close()
logger.info(f"PeerConnection closed for session {session_id}")
except Exception as e:
logger.error(f"Error closing PeerConnection for session {session_id}: {e}")
self.sessions.pop(session_id, None)
async def init_app():
"""Initialize the application."""
whip_server = SimpleWhipServer()
@web.middleware
async def error_middleware(request, handler):
try:
return await handler(request)
except web.HTTPException as ex:
logger.warning(f"HTTP Exception: {ex.status} - {ex.reason}")
raise
except Exception as e:
logger.error(f"Unhandled exception: {e}")
logger.error(traceback.format_exc())
return web.Response(status=500, text=str(e))
app = web.Application(middlewares=[error_middleware])
app.router.add_post('/whip', whip_server.handle_whip_request)
app.router.add_options('/whip', whip_server.handle_whip_options)
app.router.add_delete('/whip/{session_id}', whip_server.handle_whip_delete)
app.router.add_options('/whip/{session_id}', whip_server.handle_whip_options)
return app
async def main():
"""Main application entry point."""
app = await init_app()
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, HOST, PORT)
logger.info(f"Starting WHIP server on {HOST}:{PORT}")
await site.start()
print(f"WHIP server listening on :8889\n")
try:
while True:
await asyncio.sleep(3600)
except asyncio.CancelledError:
logger.info("Server shutdown initiated")
finally:
await runner.cleanup()
logger.info("Server stopped")
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Server stopped by user")
As of now, the aiortc library does not support the VP9 video codec. If you're planning to work with video streams, make sure to use the H.264 or VP8 codec for compatibility.
This makes aiortc
an excellent match with Ghost by Eyeson, which streams video over RTMP or RTSP using the H.264 codec. With this setup, you'll have a smooth and fully supported video streaming pipeline.
Refer to lines 107-136 for an example of how to read audio and video data for further processing.
package main
import (
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"log"
"net/http"
"strings"
"sync"
"errors"
"github.com/pion/webrtc/v4"
)
var (
sessionStore = sync.Map{} // map[string]*webrtc.PeerConnection
)
// Generates a random session ID
func generateSessionID() string {
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
panic(err)
}
return hex.EncodeToString(b)
}
// Ends a WHIP session: closes PeerConnection and removes from session store
// Also stops the video recording if active by closing the stdin pipe
func endSession(sessionID string) {
val, ok := sessionStore.Load(sessionID)
if !ok {
return
}
pc := val.(*webrtc.PeerConnection)
// Close PeerConnection
if err := pc.Close(); err != nil {
log.Printf("Error closing PeerConnection for session %s: %v", sessionID, err)
}
sessionStore.Delete(sessionID)
log.Printf("Session %s ended", sessionID)
}
// Handles POST /whip
func whipPostHandler(w http.ResponseWriter, r *http.Request) {
// CORS
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, DELETE, OPTIONS, GET")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.Header().Set("Access-Control-Expose-Headers", "Location")
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusOK)
return
}
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
if r.Header.Get("Content-Type") != "application/sdp" {
http.Error(w, "Unsupported Media Type", http.StatusUnsupportedMediaType)
return
}
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read SDP body", http.StatusBadRequest)
return
}
log.Printf("SDP Offer: %s", body)
// Create PeerConnection with STUN server
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
peerConnection, err := webrtc.NewPeerConnection(config)
if err != nil {
http.Error(w, "Failed to create PeerConnection", http.StatusInternalServerError)
return
}
sessionID := generateSessionID()
// Connection state cleanup handler
peerConnection.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
log.Printf("Session %s connection state: %s", sessionID, state.String())
if state == webrtc.PeerConnectionStateDisconnected ||
state == webrtc.PeerConnectionStateFailed ||
state == webrtc.PeerConnectionStateClosed {
endSession(sessionID)
}
})
// Track handling
peerConnection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
log.Printf("Incoming track: %s", track.Codec().MimeType)
go func() {
for {
pkt, _, err := track.ReadRTP()
if err != nil {
if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "closed") {
log.Printf("Track %s ended: %v", track.ID(), err)
} else {
log.Printf("Error reading RTP packet from track %s: %v", track.ID(), err)
}
return
}
switch {
case strings.HasPrefix(track.Codec().MimeType, "video/"):
log.Printf("Video packet received: SSRC=%d, Timestamp=%d, PayloadType=%d, PayloadSize=%d",
pkt.SSRC, pkt.Timestamp, pkt.PayloadType, len(pkt.Payload))
// You can add additional video processing here
case strings.HasPrefix(track.Codec().MimeType, "audio/"):
log.Printf("Audio packet received: SSRC=%d, Timestamp=%d, PayloadType=%d, PayloadSize=%d",
pkt.SSRC, pkt.Timestamp, pkt.PayloadType, len(pkt.Payload))
// You can add additional audio processing here
}
}
}()
})
// Set remote offer
offer := webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: string(body),
}
if err := peerConnection.SetRemoteDescription(offer); err != nil {
http.Error(w, "SetRemoteDescription failed", http.StatusInternalServerError)
return
}
// Create and set local answer
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
http.Error(w, "CreateAnswer failed", http.StatusInternalServerError)
return
}
if err := peerConnection.SetLocalDescription(answer); err != nil {
http.Error(w, "SetLocalDescription failed", http.StatusInternalServerError)
return
}
// Wait for ICE gathering
<-webrtc.GatheringCompletePromise(peerConnection)
// Store session
sessionStore.Store(sessionID, peerConnection)
scheme := "http"
if r.TLS != nil {
scheme = "https"
}
// Respond with SDP answer and session location
location := fmt.Sprintf("%s://%s%s/%s", scheme, r.Host, r.RequestURI, sessionID)
w.Header().Set("Location", location)
w.Header().Set("Content-Type", "application/sdp")
w.WriteHeader(http.StatusCreated)
w.Write([]byte(peerConnection.LocalDescription().SDP))
}
// Handles DELETE /whip/{session-id}
func whipDeleteHandler(w http.ResponseWriter, r *http.Request) {
// CORS
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.Header().Set("Access-Control-Allow-Methods", "POST, DELETE, OPTIONS, GET")
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusOK)
return
}
if r.Method != http.MethodDelete {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
pathParts := strings.Split(r.URL.Path, "/")
if len(pathParts) < 3 || pathParts[1] != "whip" {
http.Error(w, "Invalid session path", http.StatusBadRequest)
return
}
sessionID := pathParts[2]
_, ok := sessionStore.Load(sessionID)
if !ok {
http.NotFound(w, r)
return
}
endSession(sessionID)
w.WriteHeader(http.StatusOK)
}
func main() {
http.HandleFunc("/whip", whipPostHandler) // POST /whip
http.HandleFunc("/whip/", whipDeleteHandler) // DELETE /whip/<id>
fmt.Println("WHIP server listening on :8889")
if err := http.ListenAndServe(":8889", nil); err != nil {
log.Fatal(err)
}
}
Refer to lines 39-55 for an example of how to read audio and video data for further processing.
import express from 'express';
import wrtc from '@roamhq/wrtc';
const app = express();
const port = 8889;
// Store active peer connections keyed by session ID
const sessions = {};
app.use(express.text({ type: '*/*' }));
// Enable CORS for all routes
app.use((_req, res, next) => {
res.header('Access-Control-Allow-Origin', '*'); // Or restrict to specific origin
res.header('Access-Control-Allow-Methods', 'GET, POST, DELETE, OPTIONS');
res.header('Access-Control-Allow-Headers', 'Content-Type');
res.header('Access-Control-Expose-Headers', 'Location');
next();
});
// Handle preflight OPTIONS requests
app.options('/{*splat}', (_req, res) => {
res.sendStatus(204); // No Content
});
app.post('/whip', async (req, res) => {
if (!req.is('application/sdp')) {
return res.status(415).send('Unsupported Media Type: expected application/sdp');
}
const sessionId = generateSessionId();
const offerSDP = req.body;
const pc = new wrtc.RTCPeerConnection({
iceServers: [
{ urls: 'stun:stun.l.google.com:19302' }
]
});
pc.ontrack = ({ track }) => {
console.log('Received track:', track.kind);
if (track.kind === 'audio') {
const audioSink = new wrtc.nonstandard.RTCAudioSink(track);
audioSink.ondata = ({ bitsPerSample, sampleRate, samples }) => {
// process audio data
console.log(bitsPerSample, sampleRate, samples.length);
};
}
else if (track.kind === 'video') {
const videoSink = new wrtc.nonstandard.RTCVideoSink(track);
videoSink.onframe = ({ frame: { width, height, data } }) => {
// process video frames
console.log(`${width}x${height}, ${data.length} bytes`);
};
}
};
pc.onconnectionstatechange = () => {
console.log('Connection state changed to:', pc.connectionState);
if (pc.connectionState === 'disconnected' || pc.connectionState === 'failed') {
if (sessions[sessionId]) {
terminateSession(sessionId);
}
}
};
try {
await pc.setRemoteDescription({ type: 'offer', sdp: offerSDP });
const answer = await pc.createAnswer();
await pc.setLocalDescription(answer);
await waitForIceGathering(pc, 1000);
const answerSDP = pc.localDescription.sdp;
// Save the session
sessions[sessionId] = pc;
const location = `${req.protocol}://${req.get('host')}${req.originalUrl}`;
res.setHeader('Location', `${location}/${sessionId}`);
res.setHeader('Content-Type', 'application/sdp');
res.status(201).send(answerSDP);
} catch (error) {
return res.status(500).send(error.toString());
}
});
// DELETE route to end session
app.delete('/whip/:id', (req, res) => {
const sessionId = req.params.id;
try {
terminateSession(sessionId);
res.status(200).send('Session terminated');
} catch (error) {
res.status(404).send(error.message);
}
});
function terminateSession(sessionId) {
console.log('DELETE', sessionId);
if (!sessions[sessionId]) {
throw new Error('Session not found');
}
const pc = sessions[sessionId];
// Close peer connection and clean up
pc.close();
Reflect.deleteProperty(sessions, sessionId);
console.log(`Session ${sessionId} terminated.`);
}
// Helper: wait for ICE gathering to finish
function waitForIceGathering(pc, maxTime) {
return new Promise((resolve) => {
const timer = setTimeout(() => resolve(), maxTime);
pc.onicecandidate = ({ candidate }) => {
if (!candidate) {
clearTimeout(timer);
resolve();
}
};
});
}
function generateSessionId() {
return Math.random().toString(36).substring(2, 10);
}
app.listen(port, (err) => {
if (err) {
console.log(err);
return;
}
console.log(`WHIP server is listening on http://localhost:${port}/whip`);
});
The @roamhq/wrtc
Node.js package lacks native H.264 support. This can be problematic when using tools like Ghost by Eyeson, where RTMP/RTSP streams typically require H.264. Consider alternative WebRTC implementations or transcoding solutions if H.264 is needed.
The WHIP URL of all examples is:
https://<ip-of-whip-server>:8889/whip
WHIP-Compatible Tools as Alternatives and for Prototyping
If you're exploring alternatives to building your own WHIP server or want to prototype quickly without diving into low-level implementation, there are several open-source tools that support WHIP and can feed WebRTC streams directly into your pipeline. Here are two solid options:
GStreamer: whipserversrc
Element
GStreamer is a powerful multimedia framework used across applications ranging from media playback to real-time AI inference. It now features a whipserversrc
element that acts as a WHIP endpoint, making it easy to receive and process WebRTC media.
https://gstreamer.freedesktop.org/documentation/rswebrtc/whipserversrc.html
Here's an example of a simple GStreamer pipeline to receive and play a WHIP stream:
- video
- audio
gst-launch-1.0 whipserversrc \
signaller::host-addr=http://0.0.0.0:8889 \
stun-server="stun://stun.l.google.com:19302" \
! decodebin ! videoconvert ! autovideosink
gst-launch-1.0 whipserversrc \
signaller::host-addr=http://0.0.0.0:8889 \
stun-server="stun://stun.l.google.com:19302" \
! decodebin ! audioconvert ! autoaudiosink
The WHIP URL will look like this:
https://<ip-of-gstreamer-server>:8889/whip/endpoint
You can also connect it directly to AI pipelines:
gst-launch-1.0 whipserversrc name=whip \
whip. ! decodebin ! videoconvert ! videoscale ! \
your_ai_inference_element_here
- Pros: A solid WHIP-compatible alternative for building prototypes.
- Easy integration with AI tools like TensorFlow or OpenCV.
- Ideal for developers already using GStreamer.
MediaMTX: WHIP-Ingest Server with RTSP/RTMP/UDP Output
MediaMTX (formerly rtsp-simple-server) is a robust media server that supports WHIP as a native ingestion method. It can convert incoming WebRTC streams into RTSP, RTMP, HLS, SRT, and more — making it extremely flexible for AI and media applications.
https://github.com/bluenviron/mediamtx
For a basic setup, clone and run MediaMTX:
git clone https://github.com/bluenviron/mediamtx
cd mediamtx
./mediamtx
It has the following default WHIP URL:
https://<ip-of-mediamtx-server>:8889/mystream/whip
- Converts WebRTC to RTSP, HLS, SRT, MPEG-TS, etc.
- WHIP server built in — great for rapid prototyping or format bridging.
- Perfect for AI systems that prefer non-WebRTC formats.
Contact
We'd love to hear from you!
If you're building real-time AI video pipelines and need help setting up WHIP or peer connection forwarding, feel free to reach out. We'd love to collaborate or support your development.