Add Distributed Tracing to Application
Some checks failed
continuous-integration/drone Build is failing

* add distributed tracing to application
   * add jaeger helm chart
   * Create Interceptor to propagate trace context to gRPC server
   * Create Tower Layer to extract trace context and add to request extensions
   * Retrieve trace context from request extensions and set inside request handlers
* add registration to frontend
* use one image for all core services

Reviewed-on: #1
Co-authored-by: Steve Sampson <mail@stephensampson.dev>
Co-committed-by: Steve Sampson <mail@stephensampson.dev>
This commit was merged in pull request #1.
This commit is contained in:
2022-09-23 18:33:00 -03:00
parent b4e1342879
commit 30a5a01384
41 changed files with 820 additions and 190 deletions

View File

@@ -1,5 +1,11 @@
[workspace]
members = [
"crates/*",
"crates/api-gateway",
"crates/common",
"crates/consumer",
"crates/grpc",
"crates/producer",
"crates/types",
"crates/user-service",
]
edition = "2018"

View File

@@ -7,8 +7,12 @@ RUN apt update && apt install -y \
# && wget http://ftp.us.debian.org/debian/pool/main/g/glibc/libc6_2.36-0experimental4_amd64.deb \
# && dpkg -i libc6_2.34-0experimental4_amd64.deb \
# && rm libc6_2.34-0experimental4_amd64.deb
ARG binary_location
ARG binary_name
COPY $binary_location /bin/$binary_name
ENV binary_name $binary_name
CMD /bin/$binary_name
#ARG binary_location
#ARG binary_name
#ADD $binary_location /bin/$binary_name
#ENV binary_name $binary_name
#CMD /bin/$binary_name
ADD target/docker/debug/api-gateway /bin
ADD target/docker/debug/user-service /bin
ADD target/docker/debug/producer /bin
ADD target/docker/debug/consumer /bin

View File

@@ -13,8 +13,14 @@ debug:
cargo build
# ----------------------------------------
# # Set Git Config for Git Hooks
# Set Git Config for Git Hooks
# ----------------------------------------
hooks:
$(shell git config --local core.hooksPath .githooks)# ----------------------------------------
$(shell git config --local core.hooksPath .githooks)
# ----------------------------------------
# Make docker builder
# ----------------------------------------
builder:
docker build -f dockerfiles/Build . -t rustbuilder

View File

@@ -45,3 +45,11 @@ technologies to use in the project.
* [docker](https://www.docker.com/)
* [minikube](https://minikube.sigs.k8s.io/docs/start/)
* [tilt](https://tilt.dev/)
## Getting Started
1. `git clone`
2. `cd connected-home`
3. `make builder`
4. `tilt up`

111
Tiltfile
View File

@@ -5,29 +5,19 @@ update_settings(max_parallel_updates=8)
#
###################################################################################################
#local_resource(
# 'cargo-build',
# 'make cargo-build',
# labels=['Build']
#)
local_resource(
'make-builder',
'docker build -f dockerfiles/Build . -t rustbuilder',
labels=['Build']
)
local_resource(
'docker-build',
'docker run -u $(id -u ${USER}):$(id -g ${USER}) --mount=target=/app,type=bind,source=$PWD rustbuilder',
'docker-build-and-pack',
'docker run -u $(id -u ${USER}):$(id -g ${USER}) --mount=target=/app,type=bind,source=$PWD rustbuilder \
&& eval $(minikube docker-env) \
&& docker build -t core-service . --no-cache \
&& minikube image tag core-service connectedhome/api-gateway:latest \
&& minikube image tag core-service connectedhome/user-service:latest \
&& minikube image tag core-service connectedhome/consumer:latest \
&& minikube image tag core-service connectedhome/producer:latest',
labels=['Build'],
allow_parallel = True
)
###################################################################################################
#
###################################################################################################
local_resource(
'update-helm',
'helm repo add bitnami https://charts.bitnami.com/bitnami \
@@ -45,23 +35,13 @@ local_resource(
# API GATEWAY
###################################################################################################
local_resource(
'pack-api-gateway',
'docker build -t api-gateway --build-arg binary_location=target/docker/debug/api-gateway --build-arg binary_name=api-gateway . \
&& minikube image load api-gateway',
resource_deps=[
'docker-build'
],
deps=['target/docker/debug/api-gateway'],
labels=['Build'],
allow_parallel = True
)
k8s_yaml(helm('./helm/api-gateway'))
k8s_resource(
workload='api-gateway',
resource_deps=[
'pack-api-gateway',
'docker-build-and-pack',
'update-helm',
'jaeger',
],
port_forwards=[
port_forward(8082, 8082, name='API Gateway'),
@@ -73,23 +53,13 @@ k8s_resource(
# User Service
###################################################################################################
local_resource(
'pack-user-service',
'docker build -t user-service --build-arg binary_location=target/docker/debug/user-service --build-arg binary_name=user-service . \
&& minikube image load user-service',
resource_deps=[
'docker-build'
],
deps=['target/docker/debug/user-service'],
labels=['Build'],
allow_parallel = True
)
k8s_yaml(helm('./helm/user-service'))
k8s_resource(
workload='user-service',
resource_deps=[
'pack-user-service',
'docker-build-and-pack',
'update-helm',
'jaeger',
],
labels=['Core_Services']
)
@@ -98,24 +68,14 @@ k8s_resource(
# Consumer
###################################################################################################
local_resource(
'pack-consumer',
'docker build -t consumer --build-arg binary_location=target/docker/debug/consumer --build-arg binary_name=consumer . \
&& minikube image load consumer',
resource_deps=[
'docker-build'
],
deps=['target/docker/debug/consumer'],
labels=['Build'],
allow_parallel = True
)
k8s_yaml(helm('./helm/consumer'))
k8s_resource(
workload='consumer',
resource_deps=[
'pack-consumer',
'docker-build-and-pack',
'update-helm',
'rabbitmq',
'jaeger',
'rabbitmq'
],
labels=['Core_Services']
)
@@ -124,23 +84,11 @@ k8s_resource(
# Producer
###################################################################################################
local_resource(
'pack-producer',
'docker build -t producer --build-arg binary_location=target/docker/debug/producer --build-arg binary_name=producer . \
&& minikube image load producer',
resource_deps=[
'docker-build',
'rabbitmq'
],
deps=['target/docker/debug/producer'],
labels=['Build'],
allow_parallel = True
)
k8s_yaml(helm('./helm/producer'))
k8s_resource(
workload='producer',
resource_deps=[
'pack-producer',
'docker-build-and-pack',
'update-helm',
'rabbitmq',
],
@@ -169,6 +117,13 @@ k8s_resource(
labels=['PostgreSQL']
)
local_resource(
'make-migrate-db',
'cd crates/user-service \
&& DATABASE_URL=postgres://postgres:password@localhost/postgres diesel migration run',
labels=['PostgreSQL']
)
###################################################################################################
# RaabitMQ
###################################################################################################
@@ -295,6 +250,26 @@ k8s_resource(
# labels=['Redpanda']
#)
###################################################################################################
# Jaeger
###################################################################################################
k8s_yaml(helm(
'./helm/jaeger',
name='jaeger',
values=[
'helm/jaeger/values.yaml'
]
))
k8s_resource(
workload='jaeger',
port_forwards=[
port_forward(16686, 16686, name='Dashboard'),
],
labels=['Observability']
)
###################################################################################################
# END
###################################################################################################

View File

@@ -3,28 +3,29 @@ name = "api-gateway"
version = "0.1.0"
authors = ["Steve Sampson <mail@stephensampson.dev>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
grpc = { path = "../grpc" }
types= { path = "../types" }
log = "0.4"
pretty_env_logger = "0.4"
chrono = { version = "0.4.19", features = ["serde"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
grpc = { path = "../grpc" }
hyper = "0.14.13"
opentelemetry = { version = "0.18.0", default-features = false, features = ["trace"] }
opentelemetry-jaeger = "0.17.0"
prost = "0.11.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["fs", "sync", "time", "io-util"] }
tokio-stream = "0.1.1"
tokio-util = { version = "0.6", features = ["io"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
hyper = "0.14.13"
tracing = { version = "0.1.21", default-features = false, features = ["std"] }
tonic = "0.8.1"
tracing = "0.1.35"
tracing-attributes = "0.1.22"
tracing-core = { version = "0.1.28" }
tracing-opentelemetry = "0.18.0"
tracing-subscriber = { version = "0.3.0", features = ["json", "env-filter"] }
types = { path = "../types" }
warp = "0.3"
chrono = { version = "0.4.19", features = ["serde"]}
tonic = "0.6.1"
prost = "0.9.0"
[dev-dependencies]
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "io-util"] }

View File

@@ -4,12 +4,23 @@ use crate::api::user::profile::profile;
use crate::api::user::register::register;
use hyper::Body;
use std::convert::TryInto;
use std::sync::Arc;
use tracing::instrument;
use types::jwt::Jwt;
use warp::http::{Method, Response, StatusCode};
mod countries;
mod user;
#[derive(Debug)]
pub struct ServiceContext {}
impl ServiceContext {
pub fn new() -> Self {
Self {}
}
}
#[derive(Debug)]
pub enum DeserializationError {
InvalidRequestBody,
@@ -17,16 +28,18 @@ pub enum DeserializationError {
impl warp::reject::Reject for DeserializationError {}
// TODO: skip printing sensitive information like username / password in body
// #[instrument(skip(body))]
#[instrument]
pub async fn api(
method: Method,
path: String,
body: hyper::body::Bytes,
jwt: Option<String>,
context: Arc<ServiceContext>,
) -> Result<Response<Body>, warp::Rejection> {
info!(
"Got {} request for: /api/{} with token: {:?}",
method, path, jwt
);
tracing::info!("handling API request");
let jwt = Jwt::from(jwt.unwrap_or_default());
// todo: find a better way to filter requests, returning appropriate status codes
Ok(match method {

View File

@@ -4,11 +4,15 @@ use crate::api::user::UserRequest;
use grpc::user::user_service_client::UserServiceClient;
use grpc::user::LoginRequest;
use grpc::user::UserCredentials;
use grpc::SendTracingContext;
use hyper::Body;
use tracing::instrument;
use warp::http::{Response, StatusCode};
pub async fn login(new_user: UserRequest) -> Response<Body> {
#[instrument]
pub async fn login(user: UserRequest) -> Response<Body> {
tracing::info!("attempting to login");
// creating a channel ie connection to server
let channel = tonic::transport::Channel::from_static("http://user-service:8083")
.connect()
@@ -16,13 +20,13 @@ pub async fn login(new_user: UserRequest) -> Response<Body> {
.unwrap();
// creating gRPC client from channel
let mut client = UserServiceClient::new(channel);
let mut client = UserServiceClient::with_interceptor(channel, SendTracingContext::default());
// creating a new Request
let request = tonic::Request::new(LoginRequest {
credentials: Some(UserCredentials {
username: new_user.username,
password: new_user.password,
username: user.username,
password: user.password,
}),
});
@@ -31,11 +35,15 @@ pub async fn login(new_user: UserRequest) -> Response<Body> {
match response {
Ok(success_response) => {
let login_response = success_response.into_inner();
tracing::info!("successfully logged in");
make_response(StatusCode::OK, Some(login_response.jwt))
}
Err(err_response) => make_response(
StatusCode::BAD_REQUEST,
Some(err_response.message().to_string()),
),
Err(err_response) => {
tracing::info!("failed to log in");
make_response(
StatusCode::BAD_REQUEST,
Some(err_response.message().to_string()),
)
}
}
}

View File

@@ -1,10 +1,10 @@
use crate::api::make_response;
use grpc::user::user_service_client::UserServiceClient;
use grpc::user::{ProfileRequest, ProfileResponse};
use grpc::SendTracingContext;
use hyper::Body;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use tonic::{metadata::MetadataValue, Request};
use types::jwt::Jwt;
use warp::http::{Response, StatusCode};
@@ -15,13 +15,9 @@ pub async fn profile(jwt: Jwt) -> Response<Body> {
.await
.unwrap();
let token = MetadataValue::from_str(jwt.to_string().as_str()).unwrap();
// creating gRPC client from channel
let mut client = UserServiceClient::with_interceptor(channel, move |mut req: Request<()>| {
req.metadata_mut().insert("authorization", token.clone());
Ok(req)
});
let mut client =
UserServiceClient::with_interceptor(channel, SendTracingContext::with_jwt(jwt));
// creating a new Request
let request = tonic::Request::new(ProfileRequest {});

View File

@@ -1,14 +1,17 @@
use crate::api::make_response;
use crate::api::user::UserRequest;
use grpc::user::user_service_client::UserServiceClient;
use grpc::user::RegisterRequest;
use grpc::user::UserCredentials;
use grpc::SendTracingContext;
use hyper::Body;
use tracing::instrument;
use warp::http::{Response, StatusCode};
#[instrument]
pub async fn register(new_user: UserRequest) -> Response<Body> {
tracing::info!("attempting to register new user");
// creating a channel ie connection to server
let channel = tonic::transport::Channel::from_static("http://user-service:8083")
.connect()
@@ -16,7 +19,7 @@ pub async fn register(new_user: UserRequest) -> Response<Body> {
.unwrap();
// creating gRPC client from channel
let mut client = UserServiceClient::new(channel);
let mut client = UserServiceClient::with_interceptor(channel, SendTracingContext::default());
// creating a new Request
let request = tonic::Request::new(RegisterRequest {
@@ -32,11 +35,15 @@ pub async fn register(new_user: UserRequest) -> Response<Body> {
match response {
Ok(success_response) => {
let register_response = success_response.into_inner();
tracing::info!("successfully registered new user");
make_response(StatusCode::CREATED, Some(register_response.jwt))
}
Err(err_response) => make_response(
StatusCode::BAD_REQUEST,
Some(err_response.message().to_string()),
),
Err(err_response) => {
tracing::info!("failed to register new user");
make_response(
StatusCode::BAD_REQUEST,
Some(err_response.message().to_string()),
)
}
}
}

View File

@@ -1,9 +1,9 @@
#[macro_use]
extern crate log;
use crate::api::api;
use crate::api::{api, ServiceContext};
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use std::sync::Arc;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::prelude::*;
use warp::http::{HeaderMap, HeaderValue, StatusCode};
use warp::{Filter, Rejection, Reply};
@@ -16,8 +16,27 @@ enum Status {
#[tokio::main]
async fn main() {
pretty_env_logger::init();
info!("API Gateway Starting Up...");
let opentelemetry = tracing_opentelemetry::layer()
.with_tracer(
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("api-gateway")
.with_endpoint("jaeger:6831")
.install_simple()
.unwrap(),
)
.with_filter(tracing_subscriber::filter::LevelFilter::INFO);
let stdout = tracing_subscriber::fmt::layer()
.pretty()
.with_filter(tracing_subscriber::filter::LevelFilter::INFO);
tracing_subscriber::registry()
.with(opentelemetry)
.with(stdout)
.try_init()
.unwrap();
let service_context = Arc::new(ServiceContext::new());
let mut headers = HeaderMap::new();
headers.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
@@ -35,8 +54,16 @@ async fn main() {
.and(warp::path::param())
.and(warp::body::bytes())
.and(warp::header::optional::<String>("authorization"))
.and(with_context(service_context))
.and_then(api)
.with(warp::reply::with::headers(headers.clone()));
.with(warp::reply::with::headers(headers.clone()))
.with(warp::trace(|info| {
tracing::info_span!(
"request",
method = %info.method(),
path = %info.path(),
)
}));
let ready = warp::path("ready")
.and(warp::get())
@@ -68,3 +95,9 @@ async fn handle_rejection(_: Rejection) -> Result<impl Reply, Infallible> {
let empty: Vec<u8> = Vec::new();
Ok(warp::reply::with_status(empty, StatusCode::BAD_REQUEST))
}
fn with_context(
context: Arc<ServiceContext>,
) -> impl Filter<Extract = (Arc<ServiceContext>,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || context.clone())
}

View File

@@ -3,14 +3,9 @@ name = "common"
version = "0.1.0"
authors = ["Steve Sampson <mail@stephensampson.dev>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
hyper = "0.14"
warp = "0.3"
jwt = "0.15.0"
rand = "0.7.2"
rust-argon2 = "0.6.0"
hmac = "0.11.0"
sha2 = "0.9.8"
warp = "0.3"

View File

@@ -3,15 +3,13 @@ name = "consumer"
version = "0.1.0"
authors = ["Steve Sampson <mail@stephensampson.dev>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
amiquip = { version = "0.4", default-features = false }
common = { path = '../common' }
hyper = "0.14"
log = "0.4"
pretty_env_logger = "0.4"
hyper = "0.14"
routerify = "2.2.0"
tokio = { version = "1.0", features = ["fs", "sync", "time", "io-util"] }
amiquip = { version = "0.4", default-features = false }
warp = "0.3"
common = { path = '../common' }

View File

@@ -3,13 +3,19 @@ name = "grpc"
version = "0.1.0"
authors = ["Steve Sampson <mail@stephensampson.dev>"]
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tonic = "0.6.1"
prost = "0.9.0"
prost-types = "0.9.0"
futures = "0.3.17"
hyper = "0.14.20"
opentelemetry = { version = "0.18.0", default-features = false, features = ["trace"] }
prost = "0.11.0"
prost-types = "0.11.1"
tonic = "0.8.1"
tower = "0.4.13"
tracing = "0.1.35"
tracing-opentelemetry = "0.18.0"
types = { path = "../types" }
[build-dependencies]
tonic-build = "0.6.0"
tonic-build = "0.8.0"

View File

@@ -1,3 +1,151 @@
use hyper::header::ToStrError;
use hyper::{Body, Request};
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::{
task::{Context, Poll},
time::Instant,
};
use tonic::body::BoxBody;
use tonic::codegen::http::header::HeaderName;
use tonic::codegen::http::HeaderValue;
use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue, MetadataValue};
use tower::{Layer, Service};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use types::jwt::Jwt;
pub mod user {
tonic::include_proto!("user");
}
#[derive(Default)]
pub struct SendTracingContext {
jwt: Option<Jwt>,
}
impl SendTracingContext {
pub fn with_jwt(jwt: Jwt) -> Self {
Self { jwt: Some(jwt) }
}
}
impl tonic::service::Interceptor for SendTracingContext {
fn call(
&mut self,
mut request: tonic::Request<()>,
) -> Result<tonic::Request<()>, tonic::Status> {
let span = tracing::Span::current();
let context = span.context();
let propagator = TraceContextPropagator::new();
let mut context_map = HashMap::new();
propagator.inject_context(&context, &mut context_map);
let meta = request.metadata_mut();
for (k, v) in context_map.into_iter() {
let metadata_key =
AsciiMetadataKey::from_bytes(HeaderName::try_from(k).unwrap().as_str().as_bytes())
.unwrap();
let metadata_value =
AsciiMetadataValue::try_from(HeaderValue::try_from(v).unwrap().as_bytes()).unwrap();
meta.insert(metadata_key, metadata_value);
}
if let Some(jwt) = &self.jwt {
request.metadata_mut().insert(
"authorization",
MetadataValue::try_from(jwt.to_string().as_str()).unwrap(),
);
}
Ok(request)
}
}
#[derive(Clone)]
pub struct RestoreTracingContextLayer {}
impl<S> Layer<S> for RestoreTracingContextLayer {
type Service = Tracing<S>;
fn layer(&self, service: S) -> Self::Service {
Tracing { inner: service }
}
}
#[derive(Clone)]
pub struct Tracing<S> {
inner: S,
}
#[derive(Clone)]
pub struct RequestContext {
pub tracing_context: opentelemetry::Context,
}
impl TryFrom<&hyper::Request<Body>> for RequestContext {
type Error = ToStrError;
fn try_from(req: &Request<Body>) -> Result<Self, Self::Error> {
let parent_key = "traceparent".to_string();
let state_key = "tracestate".to_string();
let trace_parent = req.headers().get(&parent_key);
let trace_state = req.headers().get(&state_key);
let mut fields: HashMap<String, String> = HashMap::new();
if let Some(trace_parent) = trace_parent {
fields.insert(parent_key, trace_parent.to_str()?.to_string());
}
if let Some(trace_state) = trace_state {
fields.insert(state_key, trace_state.to_str()?.to_string());
}
Ok(Self {
tracing_context: TraceContextPropagator::new().extract(&fields),
})
}
}
impl<S> Service<hyper::Request<Body>> for Tracing<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<BoxBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, mut req: hyper::Request<Body>) -> Self::Future {
// See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
// for details on why this is necessary
let clone = self.inner.clone();
let mut inner = std::mem::replace(&mut self.inner, clone);
match RequestContext::try_from(&req) {
Ok(ctx) => {
req.extensions_mut().insert(ctx.clone());
}
_ => {
tracing::warn!("received request without valid tracing context");
}
};
Box::pin(async move {
let started = Instant::now();
let response: Self::Response = inner.call(req).await?;
let _elapsed = started.elapsed();
Ok(response)
})
}
}

View File

@@ -5,11 +5,10 @@ authors = ["Steve Sampson <mail@stephensampson.dev>"]
edition = "2018"
[dependencies]
amiquip = { version = "0.4", default-features = false }
common = { path = '../common' }
hyper = "0.14"
log = "0.4"
pretty_env_logger = "0.4"
hyper = "0.14"
routerify = "2.2.0"
tokio = { version = "1.0", features = ["fs", "sync", "time", "io-util"] }
amiquip = { version = "0.4", default-features = false }
warp = "0.3"
common = { path = '../common' }

View File

@@ -3,18 +3,14 @@ name = "types"
version = "0.1.0"
authors = ["Steve Sampson <mail@stephensampson.dev>"]
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
log = "0.4"
pretty_env_logger = "0.4"
jwt = "0.15.0"
rand = "0.7.2"
rust-argon2 = "0.6.0"
chrono = { version = "0.4.19", features = ["serde"] }
hmac = "0.11.0"
sha2 = "0.9.8"
jwt = "0.15.0"
log = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
chrono = { version = "0.4.19", features = ["serde"]}
sha2 = "0.9.8"

View File

@@ -3,31 +3,34 @@ name = "user-service"
version = "0.1.0"
authors = ["Steve Sampson <mail@stephensampson.dev>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
grpc = { path = "../grpc" }
types = { path = "../types" }
chrono = { version = "0.4.19", features = ["serde"] }
common = { path = '../common' }
log = "0.4"
pretty_env_logger = "0.4"
warp = "0.3"
tokio = "1.0"
tonic = "0.6.1"
prost = "0.9.0"
diesel = { version = "1.4.8", features = ["postgres", "chrono"] }
futures = "0.3.17"
futures-util = "0.3.17"
grpc = { path = "../grpc" }
hmac = "0.11.0"
jwt = "0.15.0"
opentelemetry = { version = "0.18.0", default-features = false, features = ["trace"] }
opentelemetry-jaeger = "0.17.0"
prost = "0.11.0"
rand = "0.7.2"
rust-argon2 = "0.6.0"
hmac = "0.11.0"
sha2 = "0.9.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
chrono = { version = "0.4.19", features = ["serde"]}
diesel = { version = "1.4.8", features = ["postgres", "chrono"] }
sha2 = "0.9.8"
tokio = "1.0"
tonic = "0.8.1"
tower = "0.4.13"
tower-http = { version = "0.3.4", features = ["trace"] }
tracing = "0.1.35"
tracing-attributes = "0.1.22"
tracing-core = { version = "0.1.28" }
tracing-opentelemetry = "0.18.0"
tracing-subscriber = { version = "0.3.0", features = ["json", "env-filter"] }
types = { path = "../types" }
warp = "0.3"
futures = "0.3.17"
futures-util = "0.3.17"

View File

@@ -1,6 +1,4 @@
#[macro_use]
extern crate log;
#[macro_use]
extern crate diesel;
use common::{make_healthy_filter, make_ready_filter};
@@ -10,6 +8,8 @@ use std::env;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use tonic::transport::Server;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::prelude::*;
use user::handlers::MyUserService;
use warp::Filter;
@@ -19,8 +19,33 @@ mod user;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
pretty_env_logger::init();
info!("User Service Starting Up...");
let opentelemetry = tracing_opentelemetry::layer()
.with_tracer(
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("user-service")
.with_endpoint("jaeger:6831")
.install_simple()
.unwrap(),
)
.with_filter(tracing_subscriber::filter::LevelFilter::INFO);
let stdout = tracing_subscriber::fmt::layer()
.pretty()
.with_filter(tracing_subscriber::filter::LevelFilter::INFO);
tracing_subscriber::registry()
.with(opentelemetry)
.with(stdout)
.try_init()
.unwrap();
{
let root = tracing::span!(tracing::Level::INFO, "app_start", work_units = 2);
let _enter = root.enter();
tracing::warn!("About to exit!");
tracing::trace!("status: {}", true);
} // Once this scope is closed, all spans inside are closed as well
let ready_flag = Arc::new(Mutex::new(false));
@@ -35,7 +60,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// todo: manage config & secrets
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
debug!("Database URL: {}", database_url);
tracing::debug!("Database URL: {}", database_url);
let connection = Arc::new(Mutex::new(
PgConnection::establish(&database_url)
@@ -50,7 +75,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
*r = true;
}
let layer = tower::ServiceBuilder::new()
.layer(grpc::RestoreTracingContextLayer {})
.into_inner();
Server::builder()
.layer(layer)
.add_service(UserServiceServer::new(user_service))
.serve(addr)
.await?;

View File

@@ -12,9 +12,12 @@ use jwt::SignWithKey;
use schema::users::dsl::*;
use sha2::Sha256;
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use tonic::{Code, Request, Response, Status};
use tracing::instrument;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use types::jwt::Jwt;
// defining a struct for our service
@@ -22,6 +25,12 @@ pub struct MyUserService {
db: Arc<Mutex<PgConnection>>,
}
impl Debug for MyUserService {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "this is a test")
}
}
impl MyUserService {
pub fn new(db: Arc<Mutex<PgConnection>>) -> Self {
Self { db }
@@ -30,11 +39,15 @@ impl MyUserService {
#[tonic::async_trait]
impl UserService for MyUserService {
#[instrument]
async fn register(
&self,
request: Request<RegisterRequest>,
) -> Result<Response<RegisterResponse>, Status> {
info!("Got Register Request");
if let Some(request_context) = request.extensions().get::<grpc::RequestContext>() {
let span = tracing::Span::current();
span.set_parent(request_context.to_owned().tracing_context);
}
let new_user = request
.into_inner()
@@ -42,11 +55,6 @@ impl UserService for MyUserService {
//.ok_or_else(|| Err(Status::new(Code::InvalidArgument, "")))?;
.unwrap();
println!(
"Registering user: {} with password: {}",
new_user.username, new_user.password
);
// todo: remove unwrap
let db = self.db.lock().unwrap();
@@ -76,11 +84,15 @@ impl UserService for MyUserService {
}))
}
#[instrument]
async fn login(
&self,
request: Request<LoginRequest>,
) -> Result<Response<LoginResponse>, Status> {
info!("Got Login Request");
if let Some(request_context) = request.extensions().get::<grpc::RequestContext>() {
let span = tracing::Span::current();
span.set_parent(request_context.to_owned().tracing_context);
}
// todo: remove unwrap
let credentials = request.into_inner().credentials.unwrap();
@@ -113,14 +125,19 @@ impl UserService for MyUserService {
))
}
#[instrument]
async fn profile(
&self,
request: Request<ProfileRequest>,
) -> Result<Response<ProfileResponse>, Status> {
info!("Got Profile Request");
if let Some(request_context) = request.extensions().get::<grpc::RequestContext>() {
let span = tracing::Span::current();
span.set_parent(request_context.to_owned().tracing_context);
}
match request.metadata().get("authorization") {
Some(token) => {
tracing::info!("got token: {}", token.to_str().unwrap());
let jwt: Jwt = serde_json::from_str(token.to_str().unwrap()).unwrap();
// todo: manage secret
let claims = jwt.verify("SUPERSECRETKEY");
@@ -136,7 +153,7 @@ impl UserService for MyUserService {
}),
})),
Err(e) => {
error!("{:?}", e);
tracing::error!("{:?}", e);
Err(Status::unauthenticated("No valid auth token"))
}
}
@@ -146,6 +163,7 @@ impl UserService for MyUserService {
}
}
#[instrument]
pub fn create_jwt(user: &str) -> Jwt {
// todo: manage secrets
let key: Hmac<Sha256> = Hmac::new_from_slice(b"SUPERSECRETKEY").unwrap();

View File

@@ -18,6 +18,7 @@ pub struct User {
}
impl User {
#[tracing::instrument]
pub fn new(username: String, password: String) -> Self {
let now = Utc::now().naive_utc();
User {
@@ -29,15 +30,18 @@ impl User {
}
}
#[tracing::instrument]
pub fn verify_password(&self, password: String) -> bool {
argon2::verify_encoded(&self.password, password.as_bytes()).unwrap_or(false)
}
#[tracing::instrument]
pub fn username(&self) -> &str {
self.username.as_str()
}
}
#[tracing::instrument]
fn hash(password: &[u8]) -> String {
let salt = rand::thread_rng().gen::<[u8; 32]>();
let config = Config::default();

View File

@@ -3,7 +3,8 @@ ARG DEBIAN_FRONTEND=noninteractive
RUN apt update && apt install -y \
iputils-ping \
libpq-dev \
cmake
cmake \
protobuf-compiler
WORKDIR /app
RUN rustup component add rustfmt
CMD CARGO_TARGET_DIR=target/docker cargo build

View File

@@ -29,6 +29,7 @@ spec:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: {{ .Chart.Name }}
command: {{ .Values.command }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"

View File

@@ -7,7 +7,7 @@ replicaCount: 1
image:
#name: api-gateway
#repository: 172.17.0.1:5000/api-gateway
repository: api-gateway
repository: connectedhome/api-gateway
#pullPolicy: IfNotPresent
pullPolicy: Never
# Overrides the image tag whose default is the chart appVersion.
@@ -85,6 +85,8 @@ tolerations: []
affinity: {}
command: ["/bin/api-gateway"]
# todo: Get rid of these and handle env vars / secrets better
RUST_LOG: info
DATABASE_URL: "postgres://postgres:password@postgresql/postgres"

View File

@@ -29,6 +29,7 @@ spec:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: {{ .Chart.Name }}
command: {{ .Values.command }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"

View File

@@ -7,7 +7,7 @@ replicaCount: 1
image:
#name: consumer
#repository: 172.17.0.1:5000/consumer
repository: consumer
repository: connectedhome/consumer
#pullPolicy: IfNotPresent
pullPolicy: Never
# Overrides the image tag whose default is the chart appVersion.
@@ -85,5 +85,7 @@ tolerations: []
affinity: {}
command: ["/bin/consumer"]
RUST_LOG: info

23
helm/jaeger/.helmignore Normal file
View File

@@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/

6
helm/jaeger/Chart.yaml Normal file
View File

@@ -0,0 +1,6 @@
apiVersion: v2
name: jaeger
description: Trace Collection
type: application
version: 0.1.0
appVersion: "0.1.0"

View File

@@ -0,0 +1,22 @@
1. Get the application URL by running these commands:
{{- if .Values.ingress.enabled }}
{{- range $host := .Values.ingress.hosts }}
{{- range .paths }}
http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }}
{{- end }}
{{- end }}
{{- else if contains "NodePort" .Values.service.type }}
export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "jaeger.fullname" . }})
export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}")
echo http://$NODE_IP:$NODE_PORT
{{- else if contains "LoadBalancer" .Values.service.type }}
NOTE: It may take a few minutes for the LoadBalancer IP to be available.
You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "jaeger.fullname" . }}'
export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "jaeger.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}")
echo http://$SERVICE_IP:{{ .Values.service.port }}
{{- else if contains "ClusterIP" .Values.service.type }}
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "jaeger.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
echo "Visit http://127.0.0.1:8080 to use your application"
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT
{{- end }}

View File

@@ -0,0 +1,62 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "jaeger.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "jaeger.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
{{- end }}
{{- end }}
{{- end }}
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "jaeger.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Common labels
*/}}
{{- define "jaeger.labels" -}}
helm.sh/chart: {{ include "jaeger.chart" . }}
{{ include "jaeger.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end }}
{{/*
Selector labels
*/}}
{{- define "jaeger.selectorLabels" -}}
app.kubernetes.io/name: {{ include "jaeger.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create the name of the service account to use
*/}}
{{- define "jaeger.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "jaeger.fullname" .) .Values.serviceAccount.name }}
{{- else }}
{{- default "default" .Values.serviceAccount.name }}
{{- end }}
{{- end }}

View File

@@ -0,0 +1,66 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "jaeger.fullname" . }}
labels:
{{- include "jaeger.labels" . | nindent 4 }}
spec:
{{- if not .Values.autoscaling.enabled }}
replicas: {{ .Values.replicaCount }}
{{- end }}
selector:
matchLabels:
{{- include "jaeger.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "jaeger.selectorLabels" . | nindent 8 }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "jaeger.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: thrift
containerPort: 6831
protocol: UDP
- name: dashboard
containerPort: 16686
protocol: TCP
livenessProbe:
initialDelaySeconds: 5
httpGet:
path: /
port: 16686
readinessProbe:
initialDelaySeconds: 5
httpGet:
path: /
port: 16686
resources:
{{- toYaml .Values.resources | nindent 12 }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@@ -0,0 +1,28 @@
{{- if .Values.autoscaling.enabled }}
apiVersion: autoscaling/v2beta1
kind: HorizontalPodAutoscaler
metadata:
name: {{ include "jaeger.fullname" . }}
labels:
{{- include "jaeger.labels" . | nindent 4 }}
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: {{ include "jaeger.fullname" . }}
minReplicas: {{ .Values.autoscaling.minReplicas }}
maxReplicas: {{ .Values.autoscaling.maxReplicas }}
metrics:
{{- if .Values.autoscaling.targetCPUUtilizationPercentage }}
- type: Resource
resource:
name: cpu
targetAverageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }}
{{- end }}
{{- if .Values.autoscaling.targetMemoryUtilizationPercentage }}
- type: Resource
resource:
name: memory
targetAverageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }}
{{- end }}
{{- end }}

View File

@@ -0,0 +1,61 @@
{{- if .Values.ingress.enabled -}}
{{- $fullName := include "jaeger.fullname" . -}}
{{- $svcPort := .Values.service.port -}}
{{- if and .Values.ingress.className (not (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion)) }}
{{- if not (hasKey .Values.ingress.annotations "kubernetes.io/ingress.class") }}
{{- $_ := set .Values.ingress.annotations "kubernetes.io/ingress.class" .Values.ingress.className}}
{{- end }}
{{- end }}
{{- if semverCompare ">=1.19-0" .Capabilities.KubeVersion.GitVersion -}}
apiVersion: networking.k8s.io/v1
{{- else if semverCompare ">=1.14-0" .Capabilities.KubeVersion.GitVersion -}}
apiVersion: networking.k8s.io/v1beta1
{{- else -}}
apiVersion: extensions/v1beta1
{{- end }}
kind: Ingress
metadata:
name: {{ $fullName }}
labels:
{{- include "jaeger.labels" . | nindent 4 }}
{{- with .Values.ingress.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
{{- if and .Values.ingress.className (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion) }}
ingressClassName: {{ .Values.ingress.className }}
{{- end }}
{{- if .Values.ingress.tls }}
tls:
{{- range .Values.ingress.tls }}
- hosts:
{{- range .hosts }}
- {{ . | quote }}
{{- end }}
secretName: {{ .secretName }}
{{- end }}
{{- end }}
rules:
{{- range .Values.ingress.hosts }}
- host: {{ .host | quote }}
http:
paths:
{{- range .paths }}
- path: {{ .path }}
{{- if and .pathType (semverCompare ">=1.18-0" $.Capabilities.KubeVersion.GitVersion) }}
pathType: {{ .pathType }}
{{- end }}
backend:
{{- if semverCompare ">=1.19-0" $.Capabilities.KubeVersion.GitVersion }}
service:
name: {{ $fullName }}
port:
number: {{ $svcPort }}
{{- else }}
serviceName: {{ $fullName }}
servicePort: {{ $svcPort }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}

View File

@@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: {{ include "jaeger.fullname" . }}
labels:
{{- include "jaeger.labels" . | nindent 4 }}
spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: {{ .Values.service.port }}
protocol: {{ .Values.service.protocol }}
name: {{ .Values.service.name }}
selector:
{{- include "jaeger.selectorLabels" . | nindent 4 }}

View File

@@ -0,0 +1,12 @@
{{- if .Values.serviceAccount.create -}}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "jaeger.serviceAccountName" . }}
labels:
{{- include "jaeger.labels" . | nindent 4 }}
{{- with .Values.serviceAccount.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- end }}

View File

@@ -0,0 +1,15 @@
apiVersion: v1
kind: Pod
metadata:
name: "{{ include "jaeger.fullname" . }}-test-connection"
labels:
{{- include "jaeger.labels" . | nindent 4 }}
annotations:
"helm.sh/hook": test
spec:
containers:
- name: wget
image: busybox
command: ['wget']
args: ['{{ include "jaeger.fullname" . }}:{{ .Values.service.port }}']
restartPolicy: Never

53
helm/jaeger/values.yaml Normal file
View File

@@ -0,0 +1,53 @@
replicaCount: 1
image:
name: jaeger
repository: jaegertracing/all-in-one
pullPolicy: IfNotPresent
tag: "latest"
imagePullSecrets: []
nameOverride: ""
fullnameOverride: "jaeger"
serviceAccount:
create: true
annotations: {}
name: ""
podAnnotations: {}
podSecurityContext: {}
securityContext: {}
service:
type: ClusterIP
port: 6831
protocol: UDP
name: thrift
ingress:
enabled: false
className: ""
annotations: {}
hosts:
- host: chart-example.local
paths:
- path: /
pathType: ImplementationSpecific
tls: []
resources: {}
autoscaling:
enabled: false
minReplicas: 1
maxReplicas: 100
targetCPUUtilizationPercentage: 80
nodeSelector: {}
tolerations: []
affinity: {}

View File

@@ -29,6 +29,7 @@ spec:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: {{ .Chart.Name }}
command: {{ .Values.command }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"

View File

@@ -7,7 +7,7 @@ replicaCount: 1
image:
#name: producer
#repository: 172.17.0.1:5000/producer
repository: producer
repository: connectedhome/producer
#pullPolicy: IfNotPresent
pullPolicy: Never
# Overrides the image tag whose default is the chart appVersion.
@@ -85,4 +85,6 @@ tolerations: []
affinity: {}
command: ["/bin/producer"]
RUST_LOG: info

View File

@@ -29,6 +29,7 @@ spec:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: {{ .Chart.Name }}
command: {{ .Values.command }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"

View File

@@ -7,7 +7,7 @@ replicaCount: 1
image:
#name: api-gateway
#repository: 172.17.0.1:5000/api-gateway
repository: user-service
repository: connectedhome/user-service
#pullPolicy: IfNotPresent
pullPolicy: Never
# Overrides the image tag whose default is the chart appVersion.
@@ -88,6 +88,8 @@ tolerations: []
affinity: {}
command: ["/bin/user-service"]
# todo: Get rid of these and handle env vars / secrets better
RUST_LOG: info
DATABASE_URL: "postgres://postgres:password@postgresql/postgres"