http-service
The http-service source receives POST requests via HTTP and HTTPS protocols in format such as text and JSON and sends responses via its corresponding http-service-response sink correlated through a unique source.id.
For request and response correlation, it generates a messageId upon each incoming request and expose it via transport
properties in the format trp:messageId to correlate them with the responses at the http-service-response sink.
The request headers and properties can be accessed via transport properties in the format trp:<header>.
It also supports basic authentication to ensure events are received from authorized users/systems.
Syntax
CREATE SOURCE <NAME> WITH (type="http-service", map.type="<STRING>", receiver.url="<STRING>", source.id="<STRING>", connection.timeout="<INT>", basic.auth.enabled="<STRING>", worker.count="<INT>", socket.idle.timeout="<INT>", ssl.verify.client="<STRING>", ssl.protocol="<STRING>", tls.store.type="<STRING>", ssl.configurations="<STRING>", request.size.validation.configurations="<STRING>", header.validation.configurations="<STRING>", server.bootstrap.configurations="<STRING>", trace.log.enabled="<BOOL>")
Query Parameters
| Name | Description | Default Value | Possible Data Types | Optional | Dynamic | 
|---|---|---|---|---|---|
| receiver.url | The URL on which events should be received. To enable SSL, use https protocol in the URL. | http://0.0.0.0:9763// | STRING | Yes | No | 
| source.id | Identifier to correlate the http-service source to its corresponding http-service-response sinks to send responses. | STRING | No | No | |
| connection.timeout | Connection timeout in millis. The system will send a timeout, if a corresponding response is not sent by an associated http-service-response sink within the given time. | 120000 | INT | Yes | No | 
| basic.auth.enabled | This only works in VM, Docker and Kubernetes. Where when enabled it authenticates each request using the Authorization:'Basic encodeBase64(username:Password)' header. | false | STRING | Yes | No | 
| worker.count | The number of active worker threads to serve the incoming events. By default the value is set to 1 to ensure events are processed in the same order they arrived. By increasing this value, higher performance can be achieved in the expense of loosing event ordering. | 1 | INT | Yes | No | 
| socket.idle.timeout | Idle timeout for HTTP connection in millis. | 120000 | INT | Yes | No | 
| ssl.verify.client | The type of client certificate verification. Supported values are require, optional. | - | STRING | Yes | No | 
| ssl.protocol | SSL/TLS protocol. | TLS | STRING | Yes | No | 
| tls.store.type | TLS store type. | JKS | STRING | Yes | No | 
| ssl.configurations | SSL/TSL configurations in format "'<key>:<value>','<key>:<value>'". Some supported parameters:  - SSL/TLS protocols: 'sslEnabledProtocols:TLSv1.1,TLSv1.2'  - List of ciphers: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'  - Enable session creation: 'client.enable.session.creation:true'  - Supported server names: 'server.suported.server.names:server'  - Add HTTP SNIMatcher: 'server.supported.snimatchers:SNIMatcher' | - | STRING | Yes | No | 
| request.size.validation.configurations | Configurations to validate the HTTP request size. Expected format "'<key>:<value>','<key>:<value>'". Some supported configurations :  - Enable request size validation: 'request.size.validation:true'  If request size is validated  - Maximum request size: 'request.size.validation.maximum.value:2048'  - Response status code when request size validation fails: 'request.size.validation.reject.status.code:401'  - Response message when request size validation fails: 'request.size.validation.reject.message:Message is bigger than the valid size'  - Response Content-Type when request size validation fails: 'request.size.validation.reject.message.content.type:plain/text' | - | STRING | Yes | No | 
| header.validation.configurations | Configurations to validate HTTP headers. Expected format "'<key>:<value>','<key>:<value>'". Some supported configurations :  - Enable header size validation: 'header.size.validation:true'  If header size is validated  - Maximum length of initial line: 'header.validation.maximum.request.line:4096'  - Maximum length of all headers: 'header.validation.maximum.size:8192'  - Maximum length of the content or each chunk: 'header.validation.maximum.chunk.size:8192'  - Response status code when header validation fails: 'header.validation.reject.status.code:401'  - Response message when header validation fails: 'header.validation.reject.message:Message header is bigger than the valid size'  - Response Content-Type when header validation fails: 'header.validation.reject.message.content.type:plain/text' | - | STRING | Yes | No | 
| server.bootstrap.configurations | Server bootstrap configurations in format "'<key>:<value>','<key>:<value>'". Some supported configurations :  - Server connect timeout in millis: 'server.bootstrap.connect.timeout:15000'  - Server socket timeout in seconds: 'server.bootstrap.socket.timeout:15'  - Enable TCP no delay: 'server.bootstrap.nodelay:true'  - Enable server keep alive: 'server.bootstrap.keepalive:true'  - Send buffer size: 'server.bootstrap.sendbuffersize:1048576'  - Receive buffer size: 'server.bootstrap.recievebuffersize:1048576'  - Number of connections queued: 'server.bootstrap.socket.backlog:100' | - | STRING | Yes | No | 
| trace.log.enabled | Enable trace log for traffic monitoring. | false | BOOL | Yes | No | 
System Parameters
| Name | Description | Default Value | Possible Parameters | 
|---|---|---|---|
| serverBootstrapBossGroupSize | Number of boss threads to accept incoming connections. | Number of available processors | Any positive integer | 
| serverBootstrapWorkerGroupSize | Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels. | (Number of available processors) * 2 | Any positive integer | 
| serverBootstrapClientGroupSize | Number of client threads to perform non-blocking read and write to one or more channels. | (Number of available processors) * 2 | Any positive integer | 
| defaultHost | The default host of the transport. | 0.0.0.0 | Any valid host | 
| defaultScheme | The default protocol. | http | http https | 
| defaultHttpPort | The default HTTP port when default scheme is http. | 8280 | Any valid port | 
| defaultHttpsPort | The default HTTPS port when default scheme is https. | 8243 | Any valid port | 
| keyStoreLocation | The default keystore file path. | `\${carbon.home}/resources/security/gdncarbon.jks` | Path to `.jks` file | 
| keyStorePassword | The default keystore password. | gdncarbon | Keystore password as string | 
Example
@App:name('Sample-HTTP-Source')
@App:description("This application shows how to receive POST requests via Stream Workers API.")
@App:qlVersion('2')
CREATE SOURCE AddStream WITH (type='http-service', source.id='adder', map.type='json', map.attributes.messageId='trp:messageId', map.attributes.value1='$.event.value1', map.attributes.value2='$.event.value2') (messageId string, value1 long, value2 long);
CREATE SINK ResultStream WITH (type='http-service-response', source.id='adder', message.id='{{messageId}}', map.type = 'json') (messageId string, results long);
@info(name = 'query1')
insert into ResultStream
select messageId, value1 + value2 as results
from AddStream;
Above sample listens events for JSON messages on the format:
{
  "event": {
    "value1": 3,
    "value2": 4
  }
}
Map the vents into AddStream, process the events through query query1, and sends the results produced on ResultStream via http-service-response sink on the message format:
{
  "event": {
    "results": 7
  }
}