Skip to content

Commit 6dfa793

Browse files
authored
Add CAE Support. (#18785)
1 parent e5fd329 commit 6dfa793

File tree

9 files changed

+667
-26
lines changed

9 files changed

+667
-26
lines changed

sdk/core/azure-core-experimental/CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
# Release History
22

3-
## 1.0.0-beta.10 (Unreleased)
3+
## 1.0.0-beta.10 (2021-02-05)
44

5+
### New Features
6+
7+
- Added challenge based authentication support via `BearerTokenAuthenticationChallengePolicy` and `AccessTokenCache` classes.
58

69
## 1.0.0-beta.9 (2021-01-11)
710

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.core.experimental.credential;
5+
6+
import com.azure.core.credential.AccessToken;
7+
import com.azure.core.util.logging.ClientLogger;
8+
import reactor.core.publisher.Flux;
9+
import reactor.core.publisher.Mono;
10+
import reactor.core.publisher.MonoProcessor;
11+
import reactor.core.publisher.Signal;
12+
13+
import java.time.Duration;
14+
import java.time.OffsetDateTime;
15+
import java.util.Objects;
16+
import java.util.concurrent.atomic.AtomicReference;
17+
import java.util.function.Function;
18+
import java.util.function.Predicate;
19+
import java.util.function.Supplier;
20+
21+
/**
22+
* A token cache that supports caching a token and refreshing it.
23+
*/
24+
public class AccessTokenCache {
25+
// The delay after a refresh to attempt another token refresh
26+
private static final Duration REFRESH_DELAY = Duration.ofSeconds(30);
27+
// the offset before token expiry to attempt proactive token refresh
28+
private static final Duration REFRESH_OFFSET = Duration.ofMinutes(5);
29+
private volatile AccessToken cache;
30+
private volatile OffsetDateTime nextTokenRefresh = OffsetDateTime.now();
31+
private final AtomicReference<MonoProcessor<AccessToken>> wip;
32+
private final Supplier<Mono<AccessToken>> tokenSupplier;
33+
private final Predicate<AccessToken> shouldRefresh;
34+
private final ClientLogger logger = new ClientLogger(AccessTokenCache.class);
35+
36+
/**
37+
* Creates an instance of AccessTokenCache with default scheme "Bearer".
38+
*
39+
* @param tokenSupplier a method to get a new token
40+
*/
41+
public AccessTokenCache(Supplier<Mono<AccessToken>> tokenSupplier) {
42+
Objects.requireNonNull(tokenSupplier, "The token supplier cannot be null");
43+
this.wip = new AtomicReference<>();
44+
this.tokenSupplier = tokenSupplier;
45+
this.shouldRefresh = accessToken -> OffsetDateTime.now()
46+
.isAfter(accessToken.getExpiresAt().minus(REFRESH_OFFSET));
47+
}
48+
49+
/**
50+
* Asynchronously get a token from either the cache or replenish the cache with a new token.
51+
* @return a Publisher that emits an AccessToken
52+
*/
53+
public Mono<AccessToken> getToken() {
54+
return getToken(this.tokenSupplier, false);
55+
}
56+
57+
/**
58+
* Asynchronously get a token from either the cache or replenish the cache with a new token.
59+
*
60+
* @param tokenSupplier The method to get a new token
61+
* @param forceRefresh The flag indicating if the cache needs to be skipped and a token needs to be fetched via the
62+
* credential.
63+
* @return The Publisher that emits an AccessToken
64+
*/
65+
public Mono<AccessToken> getToken(Supplier<Mono<AccessToken>> tokenSupplier, boolean forceRefresh) {
66+
return Mono.defer(retrieveToken(tokenSupplier, forceRefresh))
67+
// Keep resubscribing as long as Mono.defer [token acquisition] emits empty().
68+
.repeatWhenEmpty((Flux<Long> longFlux) -> longFlux.concatMap(ignored -> Flux.just(true)));
69+
}
70+
71+
private Supplier<Mono<? extends AccessToken>> retrieveToken(Supplier<Mono<AccessToken>> tokenSupplier,
72+
boolean forceRefresh) {
73+
return () -> {
74+
try {
75+
if (wip.compareAndSet(null, MonoProcessor.create())) {
76+
final MonoProcessor<AccessToken> monoProcessor = wip.get();
77+
OffsetDateTime now = OffsetDateTime.now();
78+
Mono<AccessToken> tokenRefresh;
79+
Mono<AccessToken> fallback;
80+
if (forceRefresh) {
81+
tokenRefresh = Mono.defer(tokenSupplier);
82+
fallback = Mono.empty();
83+
} else if (cache != null && !shouldRefresh.test(cache)) {
84+
// fresh cache & no need to refresh
85+
tokenRefresh = Mono.empty();
86+
fallback = Mono.just(cache);
87+
} else if (cache == null || cache.isExpired()) {
88+
// no token to use
89+
if (now.isAfter(nextTokenRefresh)) {
90+
// refresh immediately
91+
tokenRefresh = Mono.defer(tokenSupplier);
92+
} else {
93+
// wait for timeout, then refresh
94+
tokenRefresh = Mono.defer(tokenSupplier)
95+
.delaySubscription(Duration.between(now, nextTokenRefresh));
96+
}
97+
// cache doesn't exist or expired, no fallback
98+
fallback = Mono.empty();
99+
} else {
100+
// token available, but close to expiry
101+
if (now.isAfter(nextTokenRefresh)) {
102+
// refresh immediately
103+
tokenRefresh = Mono.defer(tokenSupplier);
104+
} else {
105+
// still in timeout, do not refresh
106+
tokenRefresh = Mono.empty();
107+
}
108+
// cache hasn't expired, ignore refresh error this time
109+
fallback = Mono.just(cache);
110+
}
111+
return tokenRefresh
112+
.materialize()
113+
.flatMap(processTokenRefreshResult(monoProcessor, now, fallback))
114+
.doOnError(monoProcessor::onError)
115+
.doFinally(ignored -> wip.set(null));
116+
} else if (cache != null && !cache.isExpired() && !forceRefresh) {
117+
// another thread might be refreshing the token proactively, but the current token is still valid
118+
return Mono.just(cache);
119+
} else {
120+
// another thread is definitely refreshing the expired token
121+
//If this thread, needs to force refresh, then it needs to resubscribe.
122+
if (forceRefresh) {
123+
return Mono.empty();
124+
}
125+
MonoProcessor<AccessToken> monoProcessor = wip.get();
126+
if (monoProcessor == null) {
127+
// the refreshing thread has finished
128+
return Mono.just(cache);
129+
} else {
130+
// wait for refreshing thread to finish but defer to updated cache in case just missed onNext()
131+
return monoProcessor.switchIfEmpty(Mono.defer(() -> Mono.just(cache)));
132+
}
133+
}
134+
} catch (Throwable t) {
135+
return Mono.error(t);
136+
}
137+
};
138+
}
139+
140+
private Function<Signal<AccessToken>, Mono<? extends AccessToken>> processTokenRefreshResult(
141+
MonoProcessor<AccessToken> monoProcessor, OffsetDateTime now, Mono<AccessToken> fallback) {
142+
return signal -> {
143+
AccessToken accessToken = signal.get();
144+
Throwable error = signal.getThrowable();
145+
if (signal.isOnNext() && accessToken != null) { // SUCCESS
146+
logger.info(refreshLog(cache, now, "Acquired a new access token"));
147+
cache = accessToken;
148+
monoProcessor.onNext(accessToken);
149+
monoProcessor.onComplete();
150+
nextTokenRefresh = OffsetDateTime.now().plus(REFRESH_DELAY);
151+
return Mono.just(accessToken);
152+
} else if (signal.isOnError() && error != null) { // ERROR
153+
logger.error(refreshLog(cache, now, "Failed to acquire a new access token"));
154+
nextTokenRefresh = OffsetDateTime.now().plus(REFRESH_DELAY);
155+
return fallback.switchIfEmpty(Mono.error(error));
156+
} else { // NO REFRESH
157+
monoProcessor.onComplete();
158+
return fallback;
159+
}
160+
};
161+
}
162+
163+
private static String refreshLog(AccessToken cache, OffsetDateTime now, String log) {
164+
StringBuilder info = new StringBuilder(log);
165+
if (cache == null) {
166+
info.append(".");
167+
} else {
168+
Duration tte = Duration.between(now, cache.getExpiresAt());
169+
info.append(" at ").append(tte.abs().getSeconds()).append(" seconds ")
170+
.append(tte.isNegative() ? "after" : "before").append(" expiry. ")
171+
.append("Retry may be attempted after ").append(REFRESH_DELAY.getSeconds()).append(" seconds.");
172+
if (!tte.isNegative()) {
173+
info.append(" The token currently cached will be used.");
174+
}
175+
}
176+
return info.toString();
177+
}
178+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
/**
5+
* Package containing experimental credential classes for authentication purposes.
6+
*/
7+
package com.azure.core.experimental.credential;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.core.experimental.http.policy;
5+
6+
import com.azure.core.credential.AccessToken;
7+
import com.azure.core.credential.TokenCredential;
8+
import com.azure.core.credential.TokenRequestContext;
9+
import com.azure.core.experimental.credential.AccessTokenCache;
10+
import com.azure.core.experimental.implementation.AuthenticationChallenge;
11+
import com.azure.core.http.HttpPipelineCallContext;
12+
import com.azure.core.http.HttpPipelineNextPolicy;
13+
import com.azure.core.http.HttpResponse;
14+
import com.azure.core.http.policy.HttpPipelinePolicy;
15+
import reactor.core.publisher.Mono;
16+
17+
import java.nio.charset.StandardCharsets;
18+
import java.util.ArrayList;
19+
import java.util.Base64;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.Objects;
24+
import java.util.function.Supplier;
25+
import java.util.regex.Matcher;
26+
import java.util.regex.Pattern;
27+
28+
/**
29+
* The pipeline policy that applies a token credential to an HTTP request
30+
* with "Bearer" scheme.
31+
*/
32+
public class BearerTokenAuthenticationChallengePolicy implements HttpPipelinePolicy {
33+
private static final String AUTHORIZATION_HEADER = "Authorization";
34+
private static final String BEARER = "Bearer";
35+
public static final Pattern AUTHENTICATION_CHALLENGE_PATTERN =
36+
Pattern.compile("(\\w+) ((?:\\w+=\".*?\"(?:, )?)+)(?:, )?");
37+
public static final Pattern AUTHENTICATION_CHALLENGE_PARAMS_PATTERN =
38+
Pattern.compile("(?:(\\w+)=\"([^\"\"]*)\")+");
39+
public static final String WWW_AUTHENTICATE = "WWW-Authenticate";
40+
public static final String CLAIMS_PARAMETER = "claims";
41+
42+
private final TokenCredential credential;
43+
private final String[] scopes;
44+
private final Supplier<Mono<AccessToken>> defaultTokenSupplier;
45+
private final AccessTokenCache cache;
46+
47+
/**
48+
* Creates BearerTokenAuthenticationChallengePolicy.
49+
*
50+
* @param credential the token credential to authenticate the request
51+
* @param scopes the scopes of authentication the credential should get token for
52+
*/
53+
public BearerTokenAuthenticationChallengePolicy(TokenCredential credential, String... scopes) {
54+
Objects.requireNonNull(credential);
55+
this.credential = credential;
56+
this.scopes = scopes;
57+
this.defaultTokenSupplier = () -> credential.getToken(new TokenRequestContext().addScopes(scopes));
58+
this.cache = new AccessTokenCache(defaultTokenSupplier);
59+
}
60+
61+
/**
62+
*
63+
* Executed before sending the initial request and authenticates the request.
64+
*
65+
* @param context The request context.
66+
* @return A {@link Mono} containing {@link Void}
67+
*/
68+
public Mono<Void> onBeforeRequest(HttpPipelineCallContext context) {
69+
return authenticateRequest(context, defaultTokenSupplier, false);
70+
}
71+
72+
/**
73+
* Handles the authentication challenge in the event a 401 response with a WWW-Authenticate authentication
74+
* challenge header is received after the initial request.
75+
*
76+
* @param context The request context.
77+
* @param response The Http Response containing the authentication challenge header.
78+
* @return A {@link Mono} containing the status, whether the challenge was successfully extracted and handled.
79+
* if true then a follow up request needs to be sent authorized with the challenge based bearer token.
80+
*/
81+
public Mono<Boolean> onChallenge(HttpPipelineCallContext context, HttpResponse response) {
82+
String authHeader = response.getHeaderValue(WWW_AUTHENTICATE);
83+
if (response.getStatusCode() == 401 && authHeader != null) {
84+
List<AuthenticationChallenge> challenges = parseChallenges(authHeader);
85+
for (AuthenticationChallenge authenticationChallenge : challenges) {
86+
Map<String, String> extractedChallengeParams =
87+
parseChallengeParams(authenticationChallenge.getChallengeParameters());
88+
if (extractedChallengeParams.containsKey(CLAIMS_PARAMETER)) {
89+
String claims = new String(Base64.getUrlDecoder()
90+
.decode(extractedChallengeParams.get(CLAIMS_PARAMETER)), StandardCharsets.UTF_8);
91+
return authenticateRequest(context,
92+
() -> credential.getToken(new TokenRequestContext()
93+
.addScopes(scopes).setClaims(claims)), true)
94+
.flatMap(b -> Mono.just(true));
95+
}
96+
}
97+
}
98+
return Mono.just(false);
99+
}
100+
101+
@Override
102+
public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
103+
if ("http".equals(context.getHttpRequest().getUrl().getProtocol())) {
104+
return Mono.error(new RuntimeException("token credentials require a URL using the HTTPS protocol scheme"));
105+
}
106+
HttpPipelineNextPolicy nextPolicy = next.clone();
107+
108+
return onBeforeRequest(context)
109+
.then(next.process())
110+
.flatMap(httpResponse -> {
111+
String authHeader = httpResponse.getHeaderValue(WWW_AUTHENTICATE);
112+
if (httpResponse.getStatusCode() == 401 && authHeader != null) {
113+
return onChallenge(context, httpResponse).flatMap(retry -> {
114+
if (retry) {
115+
return nextPolicy.process();
116+
} else {
117+
return Mono.just(httpResponse);
118+
}
119+
});
120+
}
121+
return Mono.just(httpResponse);
122+
});
123+
}
124+
125+
/**
126+
* Get the {@link AccessTokenCache} holding the cached access tokens and the logic to retrieve and refresh
127+
* access tokens.
128+
*
129+
* @return the {@link AccessTokenCache}
130+
*/
131+
public AccessTokenCache getTokenCache() {
132+
return cache;
133+
}
134+
135+
private Mono<Void> authenticateRequest(HttpPipelineCallContext context, Supplier<Mono<AccessToken>> tokenSupplier,
136+
boolean forceTokenRefresh) {
137+
return cache.getToken(tokenSupplier, forceTokenRefresh)
138+
.flatMap(token -> {
139+
context.getHttpRequest().getHeaders().set(AUTHORIZATION_HEADER, BEARER + " " + token.getToken());
140+
return Mono.empty();
141+
});
142+
}
143+
144+
List<AuthenticationChallenge> parseChallenges(String header) {
145+
Matcher matcher = AUTHENTICATION_CHALLENGE_PATTERN.matcher(header);
146+
147+
List<AuthenticationChallenge> challenges = new ArrayList<>();
148+
while (matcher.find()) {
149+
challenges.add(new AuthenticationChallenge(matcher.group(1), matcher.group(2)));
150+
}
151+
152+
return challenges;
153+
}
154+
155+
Map<String, String> parseChallengeParams(String challengeParams) {
156+
Matcher matcher = AUTHENTICATION_CHALLENGE_PARAMS_PATTERN.matcher(challengeParams);
157+
158+
Map<String, String> challengeParameters = new HashMap<>();
159+
while (matcher.find()) {
160+
challengeParameters.put(matcher.group(1), matcher.group(2));
161+
}
162+
return challengeParameters;
163+
}
164+
}
165+
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
/**
5+
* Package containing experimental http policies.
6+
*/
7+
package com.azure.core.experimental.http.policy;

0 commit comments

Comments
 (0)