Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds streaming query #48

Merged
merged 6 commits into from
Jan 29, 2025
Merged

feat: adds streaming query #48

merged 6 commits into from
Jan 29, 2025

Conversation

nathanielc
Copy link
Collaborator

No description provided.

@nathanielc
Copy link
Collaborator Author

Here is an example using the new SDK streamQuery function.

import { CeramicClient } from '@ceramic-sdk/http-client'
import type { ModelDefinition } from '@ceramic-sdk/model-protocol'
import { ModelClient } from '@ceramic-sdk/model-client'
import { ModelInstanceClient } from '@ceramic-sdk/model-instance-client'
import { getAuthenticatedDID } from '@didtools/key-did'
import { ClientOptions, createFlightSqlClient } from '@ceramic-sdk/flight-sql-client';
import { tableFromIPC } from 'apache-arrow';



async function main() {
  await Promise.all([read(), write()])
}

async function write() {
  const client = new CeramicClient({ url: "http://localhost:5101" })

  // Create an authenticated DID
  const authenticatedDID = await getAuthenticatedDID(new Uint8Array(32))

  // Create a model client
  const modelClient = new ModelClient({
    ceramic: client,
    did: authenticatedDID,
  })

  const model: ModelDefinition = {
    version: '2.0',
    name: 'Post',
    description: 'test model',
    accountRelation: { type: 'list' },
    interface: false,
    implements: [],
    schema: {
      type: 'object',
      properties: {
        title: { type: 'string', maxLength: 12 },
        body: { type: 'string', maxLength: 1024 },
        nonce: { type: 'number' },
      },
      required: ['userName'],
      additionalProperties: false,
    },
  };
  const modelStream = await modelClient.postDefinition(model);
  // Create a model instance client
  const modelInstanceClient = new ModelInstanceClient({
    ceramic: client,
    did: authenticatedDID,
  })
  while (true) {
    // creating a MID using a `list` model
    await modelInstanceClient.postSignedInit({
      model: modelStream,
      content: { title: 'This is a new post', body: 'This is the body for a new post', nonce: Math.random() },
      shouldIndex: true,
    })
    await sleep(1000)
  }
}

function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

async function read() {
  const options: ClientOptions = {
    username: undefined,
    password: undefined,
    tls: false,
    host: '127.0.0.1',
    port: 5102,
    headers: [],
  };
  const client = await createFlightSqlClient(options);
  const stream = await client.streamQuery(`
    SELECT
      "index",
      cid_string(event_cid) as event_cid,
      data::varchar as data
    FROM conclusion_events_stream
    WHERE
    "index" > 43
  `);
  while (true) {
    const batch = await stream.next();
    if (batch == null) {
      break;
    }
    const table = tableFromIPC(batch);
    for (let i = 0; i < table.numRows; i++) {
      const row = table.get(i);
      console.log(row);
    }
  }
  console.log('read finished');
}

main()

Copy link
Collaborator

@dav1do dav1do left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than fixing/understanding the package.json changes and CI, this looks good to me.

packages/http-client/package.json Outdated Show resolved Hide resolved
Three things change in this commit:
1. Logging from within fligh-sql-client Rust code works. Recommend using
   CI=true when running tests otherwise jest stomps on the Rust logs.
2. Existing tests that just had a sleep now use flight sql to wait for
   the specific event ids to be processed.
3. prepared queries work with stream queries as well
Copy link
Collaborator Author

@nathanielc nathanielc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dav1do This is ready for another review. Summary of recent changes.

  1. Tests for feed queries
  2. Support for prepared feed queries with tests
  3. Fixed logging from flight-sql-client rust code
  4. Changed tests to not sleep and instead use flight-sql-client to wait till the specific event has been aggregated
  5. Added support for binary parameters my multibase encoding them.
  6. Update tests to be less interdependent. Tests still fail if run in parallel but they no longer make assertions about actions previous tests made
  7. Changed default image to one with debug symbols

@@ -29,7 +29,7 @@
"scripts": {
"artifacts": "napi artifacts",
"build:debug": "napi build --platform",
"build": "napi build --platform --release",
"build": "napi build --platform --release --target $(rustc -vV | sed -n 's|host: ||p')",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicitly pass the target so that we can use external tooling that does the same.

@nathanielc nathanielc requested a review from dav1do January 28, 2025 17:50
@@ -26,7 +26,6 @@
"build:js": "swc src -d ./dist --config-file ../../.swcrc --strip-leading-paths",
"build:types": "tsc --project tsconfig.json --emitDeclarationOnly --skipLibCheck",
"build": "pnpm build:clean && pnpm build:types && pnpm build:js",
"lint": "eslint src --fix",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these are removed as eslint is not configured as a dependency and the workspace is configured to use biome which lints the whole repo so individual package linting is not needed.

Copy link
Collaborator

@dav1do dav1do left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small comments/questions but this looks great.


#[module_exports]
fn init(_exports: JsObject) -> napi::Result<()> {
init_logging();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this panic if you call it twice? wondering if we should ignore the result of tracing::subscriber::set_global_default or if this is fine

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting it a second time will fail but we ignore the failure, its a returned result not a panic. Therefore we should be fine.

@@ -42,7 +44,7 @@ export default class CeramicOneContainer {

static async healthFn(port: number): Promise<boolean> {
try {
const res = await fetch(`http://localhost:${port}/ceramic/version`)
const res = await fetch(`http://localhost:${port}/ceramic/liveness`)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better 👍

@@ -40,6 +48,16 @@ const CONTAINER_OPTS: EnvironmentOptions = {
testPort: 5223,
}

const FLIGHT_OPTIONS: ClientOptions = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like all these tests use the same options. you could define this somewhere shared (index.ts in this case) and import them but it's also kind of nice to keep config close to tests

@nathanielc nathanielc merged commit e8acbe9 into main Jan 29, 2025
10 checks passed
@nathanielc nathanielc deleted the feat/stream-queries branch January 29, 2025 16:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants