Skip to content

Commit

Permalink
anthropic[patch]: Fix issues with duplicate streaming tokens (#6265)
Browse files Browse the repository at this point in the history
* anthropic[patch]: add vertex and bedrock support, streamResponseChunks optimize

* yarn.lock and format/lint

* drop bedrock/vertex

* anthropic[patch]: fix stream usage calculation

* add back stream usage param

* fix issues with merge

---------

Co-authored-by: tofuliang <[email protected]>
  • Loading branch information
bracesproul and tofuliang authored Jul 29, 2024
1 parent a1f4114 commit edb8766
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 62 deletions.
35 changes: 5 additions & 30 deletions libs/langchain-anthropic/src/chat_models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@ import { Anthropic, type ClientOptions } from "@anthropic-ai/sdk";
import type { Stream } from "@anthropic-ai/sdk/streaming";

import { CallbackManagerForLLMRun } from "@langchain/core/callbacks/manager";
import {
AIMessageChunk,
type BaseMessage,
UsageMetadata,
} from "@langchain/core/messages";
import { AIMessageChunk, type BaseMessage } from "@langchain/core/messages";
import { ChatGenerationChunk, type ChatResult } from "@langchain/core/outputs";
import { getEnvironmentVariable } from "@langchain/core/utils/env";
import {
Expand Down Expand Up @@ -431,24 +427,20 @@ export class ChatAnthropicMessages<
...formattedMessages,
stream: true,
});
let usageData = { input_tokens: 0, output_tokens: 0 };

for await (const data of stream) {
if (options.signal?.aborted) {
stream.controller.abort();
throw new Error("AbortError: User aborted the request.");
}

const shouldStreamUsage = this.streamUsage ?? options.streamUsage;
const result = _makeMessageChunkFromAnthropicEvent(data, {
streamUsage: !!(this.streamUsage || options.streamUsage),
streamUsage: shouldStreamUsage,
coerceContentToString,
usageData,
});
if (!result) continue;

const { chunk, usageData: updatedUsageData } = result;

usageData = updatedUsageData;
const { chunk } = result;

const newToolCallChunk = extractToolCallChunk(chunk);

Expand All @@ -460,7 +452,7 @@ export class ChatAnthropicMessages<
content: chunk.content,
additional_kwargs: chunk.additional_kwargs,
tool_call_chunks: newToolCallChunk ? [newToolCallChunk] : undefined,
usage_metadata: chunk.usage_metadata,
usage_metadata: shouldStreamUsage ? chunk.usage_metadata : undefined,
response_metadata: chunk.response_metadata,
id: chunk.id,
}),
Expand All @@ -471,23 +463,6 @@ export class ChatAnthropicMessages<
await runManager?.handleLLMNewToken(token);
}
}

let usageMetadata: UsageMetadata | undefined;
if (this.streamUsage || options.streamUsage) {
usageMetadata = {
input_tokens: usageData.input_tokens,
output_tokens: usageData.output_tokens,
total_tokens: usageData.input_tokens + usageData.output_tokens,
};
}
yield new ChatGenerationChunk({
message: new AIMessageChunk({
content: coerceContentToString ? "" : [],
additional_kwargs: { usage: usageData },
usage_metadata: usageMetadata,
}),
text: "",
});
}

/** @ignore */
Expand Down
44 changes: 12 additions & 32 deletions libs/langchain-anthropic/src/utils/message_outputs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@ export function _makeMessageChunkFromAnthropicEvent(
fields: {
streamUsage: boolean;
coerceContentToString: boolean;
usageData: { input_tokens: number; output_tokens: number };
}
): {
chunk: AIMessageChunk;
usageData: { input_tokens: number; output_tokens: number };
} | null {
let usageDataCopy = { ...fields.usageData };

if (data.type === "message_start") {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { content, usage, ...additionalKwargs } = data.message;
Expand All @@ -34,43 +30,31 @@ export function _makeMessageChunkFromAnthropicEvent(
filteredAdditionalKwargs[key] = value;
}
}
usageDataCopy = usage;
let usageMetadata: UsageMetadata | undefined;
if (fields.streamUsage) {
usageMetadata = {
input_tokens: usage.input_tokens,
output_tokens: usage.output_tokens,
total_tokens: usage.input_tokens + usage.output_tokens,
};
}
const usageMetadata: UsageMetadata = {
input_tokens: usage.input_tokens,
output_tokens: usage.output_tokens,
total_tokens: usage.input_tokens + usage.output_tokens,
};
return {
chunk: new AIMessageChunk({
content: fields.coerceContentToString ? "" : [],
additional_kwargs: filteredAdditionalKwargs,
usage_metadata: usageMetadata,
usage_metadata: fields.streamUsage ? usageMetadata : undefined,
id: data.message.id,
}),
usageData: usageDataCopy,
};
} else if (data.type === "message_delta") {
let usageMetadata: UsageMetadata | undefined;
if (fields.streamUsage) {
usageMetadata = {
input_tokens: data.usage.output_tokens,
output_tokens: 0,
total_tokens: data.usage.output_tokens,
};
}
if (data?.usage !== undefined) {
usageDataCopy.output_tokens += data.usage.output_tokens;
}
const usageMetadata: UsageMetadata = {
input_tokens: 0,
output_tokens: data.usage.output_tokens,
total_tokens: data.usage.output_tokens,
};
return {
chunk: new AIMessageChunk({
content: fields.coerceContentToString ? "" : [],
additional_kwargs: { ...data.delta },
usage_metadata: usageMetadata,
usage_metadata: fields.streamUsage ? usageMetadata : undefined,
}),
usageData: usageDataCopy,
};
} else if (
data.type === "content_block_start" &&
Expand All @@ -89,7 +73,6 @@ export function _makeMessageChunkFromAnthropicEvent(
],
additional_kwargs: {},
}),
usageData: usageDataCopy,
};
} else if (
data.type === "content_block_delta" &&
Expand All @@ -109,7 +92,6 @@ export function _makeMessageChunkFromAnthropicEvent(
],
additional_kwargs: {},
}),
usageData: usageDataCopy,
};
}
} else if (
Expand All @@ -129,7 +111,6 @@ export function _makeMessageChunkFromAnthropicEvent(
],
additional_kwargs: {},
}),
usageData: usageDataCopy,
};
} else if (
data.type === "content_block_start" &&
Expand All @@ -149,7 +130,6 @@ export function _makeMessageChunkFromAnthropicEvent(
],
additional_kwargs: {},
}),
usageData: usageDataCopy,
};
}
}
Expand Down

0 comments on commit edb8766

Please sign in to comment.